|
| 1 | +# cocotb_test.py: |
| 2 | +# Copyright (C) 2024 CESNET z. s. p. o. |
| 3 | +# Author(s): Daniel Kondys <[email protected]> |
| 4 | +# |
| 5 | +# SPDX-License-Identifier: BSD-3-Clause |
| 6 | + |
| 7 | +import itertools |
| 8 | +import sys |
| 9 | +from random import randint |
| 10 | + |
| 11 | +import cocotb |
| 12 | +from cocotb.clock import Clock |
| 13 | +from cocotb.triggers import RisingEdge, ClockCycles |
| 14 | +from cocotb_bus.drivers import BitDriver |
| 15 | +from cocotb_bus.scoreboard import Scoreboard |
| 16 | + |
| 17 | +from cocotbext.ofm.mfb.drivers import MFBDriver |
| 18 | +from cocotbext.ofm.mfb.monitors import MFBMonitor |
| 19 | +from cocotbext.ofm.mvb.drivers import MVBDriver, MvbTrClassic |
| 20 | +from cocotbext.ofm.base.generators import ItemRateLimiter |
| 21 | +from cocotbext.ofm.ver.generators import random_packets |
| 22 | + |
| 23 | + |
| 24 | +class testbench(): |
| 25 | + def __init__(self, dut, debug=False): |
| 26 | + self.dut = dut |
| 27 | + self.mfb_rx_drv = MFBDriver(dut, "RX_MFB", dut.CLK) |
| 28 | + self.mfb_tx_drv = BitDriver(dut.TX_MFB_DST_RDY, dut.CLK) |
| 29 | + self.mvb_rx_drv = MVBDriver(dut, "RX_MVB", dut.CLK) |
| 30 | + self.mfb_tx_mon = MFBMonitor(dut, "TX_MFB", dut.CLK) |
| 31 | + |
| 32 | + self.pkts_sent = 0 |
| 33 | + self.expected_output = [] |
| 34 | + self.scoreboard = Scoreboard(dut) |
| 35 | + self.scoreboard.add_interface(self.mfb_tx_mon, self.expected_output) |
| 36 | + |
| 37 | + if debug: |
| 38 | + self.mfb_rx_drv.log.setLevel(cocotb.logging.DEBUG) |
| 39 | + self.mvb_rx_drv.log.setLevel(cocotb.logging.DEBUG) |
| 40 | + self.mfb_tx_mon.log.setLevel(cocotb.logging.DEBUG) |
| 41 | + |
| 42 | + def model(self, append_tr: bytes, packet_tr: bytes): |
| 43 | + """Model of the DUT""" |
| 44 | + appended_tr = packet_tr + append_tr |
| 45 | + self.expected_output.append(appended_tr) |
| 46 | + self.pkts_sent += 1 |
| 47 | + |
| 48 | + async def reset(self): |
| 49 | + self.dut.RESET.value = 1 |
| 50 | + await ClockCycles(self.dut.CLK, 10) |
| 51 | + self.dut.RESET.value = 0 |
| 52 | + await RisingEdge(self.dut.CLK) |
| 53 | + |
| 54 | + |
| 55 | +@cocotb.test() |
| 56 | +async def run_test(dut, pkt_count=10000, frame_size_min=60, frame_size_max=1500): |
| 57 | + dut.RESET.value = 1 |
| 58 | + cocotb.start_soon(Clock(dut.CLK, 5, units='ns').start()) |
| 59 | + |
| 60 | + tb = testbench(dut) |
| 61 | + # Change MVB driver's IdleGenerator to ItemRateLimiter |
| 62 | + idle_gen_conf = dict(random_idles=True, max_idles=5, zero_idles_chance=50) |
| 63 | + tb.mvb_rx_drv.set_idle_generator(ItemRateLimiter(rate_percentage=30, **idle_gen_conf)) |
| 64 | + await tb.reset() |
| 65 | + |
| 66 | + cocotb.log.info("\n--- Beginning the test ---\n") |
| 67 | + |
| 68 | + tb.mfb_tx_drv.start((1, i % 3) for i in itertools.count()) |
| 69 | + # Option to change MVB drivers Rate Limiter (0 = random Idles = inconsistent rate) |
| 70 | + tb.mvb_rx_drv.set_idle_generator(ItemRateLimiter(rate_percentage=0)) |
| 71 | + |
| 72 | + await ClockCycles(tb.dut.CLK, 10) |
| 73 | + |
| 74 | + for mfb_pkt in random_packets(frame_size_min, frame_size_max, pkt_count): |
| 75 | + tb.mfb_rx_drv.append(mfb_pkt) |
| 76 | + |
| 77 | + mvb_tr = MvbTrClassic() |
| 78 | + mvb_bus_width = tb.mvb_rx_drv.item_widths['data'] |
| 79 | + mvb_append = randint(0, 2**mvb_bus_width-1) |
| 80 | + mvb_tr.data = mvb_append |
| 81 | + tb.mvb_rx_drv.append(mvb_tr) |
| 82 | + |
| 83 | + tb.model(append_tr=(mvb_append.to_bytes(mvb_bus_width//8, sys.byteorder)), packet_tr=(mfb_pkt)) |
| 84 | + |
| 85 | + last_num = 0 |
| 86 | + while (tb.mfb_tx_mon.frame_cnt < pkt_count): |
| 87 | + if (this_num := tb.mfb_tx_mon.frame_cnt // (pkt_count // 10)) > last_num: |
| 88 | + last_num = this_num |
| 89 | + cocotb.log.info(f"Number of transactions processed: {tb.mfb_tx_mon.frame_cnt}/{pkt_count}") |
| 90 | + await ClockCycles(dut.CLK, 100) |
| 91 | + |
| 92 | + cocotb.log.info("\n--- Test complete, getting results ---\n") |
| 93 | + raise tb.scoreboard.result |
0 commit comments