from enum import Enum from nmigen import * from nmigen.hdl.rec import * from nmigen.asserts import * from nmigen.cli import main from pipe_stage import PipeStage # NEXT TASKS (IMMEDIATE) # get the testbenches working and model checking done # formal proof (k-induction and/or ABC-PDR) # figure out how the Altera gearbox design works (number of pipeline stages and what each does) class GearboxBusLayout(Layout): def __init__(self, *, upstream_width, downstream_width): # the * forces keyword args super().__init__([ # DATA ("data_in", unsigned(upstream_width)), # FROM SOURCE ("data_out", unsigned(downstream_width)), # TO DEST # CONTROL ("upstream_valid_in", 1), # FROM SOURCE ("upstream_ready_out", 1), # TO SOURCE ("downstream_ready_in", 1), # FROM DEST ("downstream_valid_out", 1), # TO DEST ]) class GearboxBus(Record): def __init__(self, *, upstream_width, downstream_width): super().__init__(GearboxBusLayout(upstream_width=upstream_width, downstream_width=downstream_width)) class ArbitraryGearbox(Elaboratable): def __init__(self, *, upstream_width, downstream_width): self.upstream_width = upstream_width self.downstream_width = downstream_width self.bus = GearboxBus(upstream_width=upstream_width, downstream_width=downstream_width) def elaborate(self, platform): m = Module() upstream_width = self.upstream_width downstream_width = self.downstream_width shift_reg_len = upstream_width + downstream_width shift_reg = Signal(shift_reg_len) valid_bit_count = Signal(range(shift_reg_len)) upstream_transaction = Signal(1) downstream_transaction = Signal(1) m.d.comb += upstream_transaction.eq( self.bus.upstream_ready_out & self.bus.upstream_valid_in) m.d.comb += downstream_transaction.eq(self.bus.downstream_ready_in & self.bus.downstream_valid_out) m.d.comb += self.bus.upstream_ready_out.eq( shift_reg_len - valid_bit_count >= upstream_width) m.d.comb += self.bus.downstream_valid_out.eq(shift_reg_len >= downstream_width) m.d.comb += self.bus.data_out.eq(shift_reg[0:downstream_width]) with m.If(upstream_transaction == 1): with m.If(downstream_transaction == 1): # upstream and downstream # the simultaneous transaction is hard, but updating the valid count isn't m.d.sync += valid_bit_count.eq(valid_bit_count + upstream_width - downstream_width) # the updated shift register is composed of two parts, one from the old bits, # and one from the new bits. # # First, we shift out downstream_width bits from the bottom of the register # What remains is (shift_reg >> downstream_width) # # To fit the new bits in, we must shift them at the correct location so no valid # bit is overwritten and no junk bits are left: # (self.bus.data_in << (valid_bit_count - downstream_width)) m.d.sync += shift_reg.eq((self.bus.data_in << (valid_bit_count - downstream_width))| (shift_reg >> downstream_width)) with m.Else(): # upstream, no downstream m.d.sync += valid_bit_count.eq(valid_bit_count + upstream_width) # we shift in bits on the high side of the shift register m.d.sync += shift_reg.eq((self.bus.data_in << valid_bit_count) | shift_reg) with m.Else(): with m.If(downstream_transaction == 1): # no upstream, downstream m.d.sync += valid_bit_count.eq(valid_bit_count - downstream_width) # we get rid of the low bits that go out the bus during this transaction m.d.sync += shift_reg.eq(shift_reg[downstream_width:]) return m class IdempotentGearbox(Elaboratable): def __init__(self, *, width): self.width = width self.bus = GearboxBus(upstream_width=width, downstream_width=width) def elaborate(self, platform): m = Module() m.submodules.pipe = pipe = PipeStage(width=self.width, registered_ready=True) m.d.comb += pipe.upstream_valid_in.eq(self.bus.valid_in) m.d.comb += pipe.downstream_ready_in.eq(self.bus.ready_in) m.d.comb += pipe.upstream_data_in.eq(self.bus.data_in) m.d.comb += self.bus.valid_out.eq(pipe.downstream_valid_out) m.d.comb += self.bus.data_out.eq(pipe.downstream_data_out) m.d.comb += self.bus.ready_out.eq(pipe.upstream_ready_out) return m # This is non-synthesizable but is intended to provide a model for non-unitary ratio gearbox # in order to do formal verification and model-checking against both the unitary-ratio gearbox # and also the arbitrary ratio gearbox. class GoldenGearboxModel(Elaboratable): def __init__(self, *, upstream_width, downstream_width, sim_memory_size): self.upstream_width = upstream_width self.downstream_width = downstream_width self.sim_memory_size = sim_memory_size self.memory = Signal(sim_memory_size) self.bus = GearboxBus(upstream_width=upstream_width, downstream_width=downstream_width) def elaborate(self, platform): m = Module() write_ptr = Signal(range(self.sim_memory_size)) read_ptr = Signal(range(self.sim_memory_size)) with m.If(self.bus.valid_in == 1): m.d.sync += write_ptr.eq(write_ptr + self.upstream_width) with m.Switch(write_ptr): for cand_wptr in range(0, self.sim_memory_size): with m.Case(cand_wptr): m.d.sync += self.memory.eq( Cat(self.memory.bit_select(0, cand_wptr), self.bus.data_in)) with m.If(self.bus.ready_in == 1): m.d.comb += self.bus.data_out.eq(self.memory.bit_select(read_ptr,self.downstream_width)) m.d.sync += read_ptr.eq(read_ptr + self.downstream_width) return m class DummyPlug(Elaboratable): #def __init__(self): def elaborate(self, platform): m = Module() #m.d.comb += gearbox.bus.data_in.eq(AnySeq(3)) #m.d.comb += gearbox.bus.ready_in.eq(AnySeq(1)) #m.d.comb += gearbox.bus.valid_in.eq(AnySeq(1)) m.submodules.gearbox = gearbox = ArbitraryGearbox(upstream_width=4, downstream_width=2) #, sim_memory_size=16) counter = Signal(8) m.d.sync += counter.eq(counter+1) #with m.If(counter%4 = ): m.d.comb += gearbox.bus.downstream_ready_in.eq(1) m.d.comb += gearbox.bus.upstream_valid_in.eq(1) m.d.comb += gearbox.bus.data_in.eq(0xe) return m if __name__ == '__main__': baka =DummyPlug() main(baka) #platform.build(DummyPlug())