Source code for akida.virtual_devices

import warnings
from collections import namedtuple
from math import ceil, floor, sqrt

from .core import (NP, AKD1500_v1, Device, FPGA_v2, IpVersion,
                   NSoC_v1, NSoC_v2, TwoNodesIP_v1, LayerType, Model)
from .mapping import MapMode


LayerSequence = namedtuple('LayerSequence', ['layers'])


[docs] def AKD1000(): """Returns a virtual device for an AKD1000 NSoC. This function returns a virtual device for the Brainchip's AKD1000 NSoC. Returns: :obj:`Device`: a virtual device. """ dma_event = NP.Ident(3, 1, 0) dma_conf = NP.Ident(3, 1, 1) nps = [ NP.Info(NP.Ident(1, 3, 0), {NP.Type.CNP1, NP.Type.FNP3}, False), NP.Info(NP.Ident(1, 3, 1), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(1, 3, 2), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(1, 3, 3), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(1, 4, 0), {NP.Type.CNP1, NP.Type.FNP3}, False), NP.Info(NP.Ident(1, 4, 1), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(1, 4, 2), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(1, 4, 3), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(1, 5, 0), {NP.Type.CNP1, NP.Type.FNP3}, False), NP.Info(NP.Ident(1, 5, 1), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(1, 5, 2), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(1, 5, 3), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(2, 3, 0), {NP.Type.CNP1, NP.Type.FNP3}, False), NP.Info(NP.Ident(2, 3, 1), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(2, 3, 2), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(2, 3, 3), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(2, 4, 0), {NP.Type.CNP1, NP.Type.FNP3}, False), NP.Info(NP.Ident(2, 4, 1), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(2, 4, 2), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(2, 4, 3), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(2, 5, 0), {NP.Type.CNP1, NP.Type.FNP3}, False), NP.Info(NP.Ident(2, 5, 1), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(2, 5, 2), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(2, 5, 3), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(3, 1, 2), {NP.Type.CNP1}, False), NP.Info(NP.Ident(3, 1, 3), {NP.Type.CNP1}, False), NP.Info(NP.Ident(3, 2, 0), {NP.Type.CNP1, NP.Type.FNP3}, False), NP.Info(NP.Ident(3, 2, 1), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(3, 2, 2), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(3, 2, 3), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(3, 3, 0), {NP.Type.CNP1, NP.Type.FNP3}, False), NP.Info(NP.Ident(3, 3, 1), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(3, 3, 2), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(3, 3, 3), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(3, 4, 0), {NP.Type.CNP1, NP.Type.FNP3}, False), NP.Info(NP.Ident(3, 4, 1), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(3, 4, 2), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(3, 4, 3), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(3, 5, 0), {NP.Type.CNP1, NP.Type.FNP3}, False), NP.Info(NP.Ident(3, 5, 1), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(3, 5, 2), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(3, 5, 3), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(4, 1, 0), {NP.Type.CNP1, NP.Type.FNP2}, False), NP.Info(NP.Ident(4, 1, 1), {NP.Type.CNP1, NP.Type.FNP2}, False), NP.Info(NP.Ident(4, 1, 2), {NP.Type.CNP1, NP.Type.FNP2}, False), NP.Info(NP.Ident(4, 1, 3), {NP.Type.CNP1, NP.Type.FNP2}, False), NP.Info(NP.Ident(4, 2, 0), {NP.Type.CNP1, NP.Type.FNP3}, False), NP.Info(NP.Ident(4, 2, 1), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(4, 2, 2), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(4, 2, 3), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(4, 3, 0), {NP.Type.CNP1, NP.Type.FNP3}, False), NP.Info(NP.Ident(4, 3, 1), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(4, 3, 2), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(4, 3, 3), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(4, 4, 0), {NP.Type.CNP1, NP.Type.FNP3}, False), NP.Info(NP.Ident(4, 4, 1), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(4, 4, 2), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(4, 4, 3), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(4, 5, 0), {NP.Type.CNP1, NP.Type.FNP3}, False), NP.Info(NP.Ident(4, 5, 1), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(4, 5, 2), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(4, 5, 3), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(5, 2, 0), {NP.Type.CNP1, NP.Type.FNP3}, False), NP.Info(NP.Ident(5, 2, 1), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(5, 2, 2), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(5, 2, 3), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(5, 3, 0), {NP.Type.CNP1, NP.Type.FNP3}, False), NP.Info(NP.Ident(5, 3, 1), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(5, 3, 2), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(5, 3, 3), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(5, 4, 0), {NP.Type.CNP1, NP.Type.FNP3}, False), NP.Info(NP.Ident(5, 4, 1), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(5, 4, 2), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(5, 4, 3), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(5, 5, 0), {NP.Type.CNP1, NP.Type.FNP3}, False), NP.Info(NP.Ident(5, 5, 1), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(5, 5, 2), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(5, 5, 3), {NP.Type.CNP1, NP.Type.CNP2}, False) ] mesh = NP.Mesh(IpVersion.v1, dma_event, dma_conf, NP.Info.hrc(False), nps) return Device(NSoC_v2, mesh)
[docs] def TwoNodesIPv1(): """Returns a virtual device for a two nodes Akida IP. Returns: :obj:`Device`: a virtual device. """ dma_event = NP.Ident(1, 1, 0) dma_conf = NP.Ident(1, 1, 1) nps = [ NP.Info(NP.Ident(1, 2, 0), {NP.Type.CNP1, NP.Type.FNP2}, False), NP.Info(NP.Ident(1, 2, 1), {NP.Type.CNP1}, False), NP.Info(NP.Ident(1, 2, 2), {NP.Type.CNP1}, False), NP.Info(NP.Ident(1, 2, 3), {NP.Type.CNP1}, False), NP.Info(NP.Ident(1, 3, 0), {NP.Type.CNP1, NP.Type.FNP3}, False), NP.Info(NP.Ident(1, 3, 1), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(1, 3, 2), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(1, 3, 3), {NP.Type.CNP1, NP.Type.CNP2}, False) ] mesh = NP.Mesh(IpVersion.v1, dma_event, dma_conf, NP.Info.hrc(False), nps) return Device(TwoNodesIP_v1, mesh)
[docs] def AKD1500(): """Returns a virtual device for AKD1500 chip. Returns: :obj:`Device`: a virtual device. """ dma_event = NP.Ident(1, 1, 0) dma_conf = NP.Ident(1, 1, 1) nps = [ NP.Info(NP.Ident(1, 2, 0), {NP.Type.CNP1, NP.Type.FNP2}, False), NP.Info(NP.Ident(1, 2, 1), {NP.Type.CNP1}, False), NP.Info(NP.Ident(1, 2, 2), {NP.Type.CNP1}, False), NP.Info(NP.Ident(1, 2, 3), {NP.Type.CNP1}, False), NP.Info(NP.Ident(1, 3, 0), {NP.Type.CNP1, NP.Type.FNP3}, False), NP.Info(NP.Ident(1, 3, 1), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(1, 3, 2), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(1, 3, 3), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(2, 1, 0), {NP.Type.CNP1, NP.Type.FNP3}, False), NP.Info(NP.Ident(2, 1, 1), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(2, 1, 2), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(2, 1, 3), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(2, 2, 0), {NP.Type.CNP1, NP.Type.FNP3}, False), NP.Info(NP.Ident(2, 2, 1), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(2, 2, 2), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(2, 2, 3), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(2, 3, 0), {NP.Type.CNP1, NP.Type.FNP3}, False), NP.Info(NP.Ident(2, 3, 1), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(2, 3, 2), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(2, 3, 3), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(3, 1, 0), {NP.Type.CNP1, NP.Type.FNP3}, False), NP.Info(NP.Ident(3, 1, 1), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(3, 1, 2), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(3, 1, 3), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(3, 2, 0), {NP.Type.CNP1, NP.Type.FNP3}, False), NP.Info(NP.Ident(3, 2, 1), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(3, 2, 2), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(3, 2, 3), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(3, 3, 0), {NP.Type.CNP1, NP.Type.FNP3}, False), NP.Info(NP.Ident(3, 3, 1), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(3, 3, 2), {NP.Type.CNP1, NP.Type.CNP2}, False), NP.Info(NP.Ident(3, 3, 3), {NP.Type.CNP1, NP.Type.CNP2}, False) ] mesh = NP.Mesh(IpVersion.v1, dma_event, dma_conf, NP.Info.hrc(False), nps) return Device(AKD1500_v1, mesh)
[docs] def TwoNodesIPv2(): """Returns a 2-node virtual device for FPGA v2. Returns: :obj:`Device`: a virtual device. """ dma_event = NP.Ident(1, 1, 0) dma_conf = NP.Ident(1, 1, 1) skipdmas_num_channels = 2 skip_dmas = [ NP.Info( NP.Ident(1, 1, 3, skipdmas_num_channels), {NP.Type.SKIP_DMA_STORE, NP.Type.SKIP_DMA_LOAD}, False)] nps = [ NP.Info(NP.Ident(1, 2, 0), {NP.Type.CNP1, NP.Type.FNP2}, True), NP.Info(NP.Ident(1, 2, 1), {NP.Type.CNP1}, True), NP.Info(NP.Ident(1, 2, 2), {NP.Type.CNP1}, True), NP.Info(NP.Ident(1, 2, 3), {NP.Type.CNP1}, True), NP.Info(NP.Ident(2, 2, 0), {NP.Type.CNP1, NP.Type.FNP3}, True), NP.Info(NP.Ident(2, 2, 1), {NP.Type.CNP1, NP.Type.CNP2}, True), NP.Info(NP.Ident(2, 2, 2), {NP.Type.CNP1, NP.Type.CNP2}, True), NP.Info(NP.Ident(2, 2, 3), {NP.Type.CNP1, NP.Type.CNP2}, True) ] mesh = NP.Mesh(IpVersion.v2, dma_event, dma_conf, NP.Info.hrc(True), nps, skip_dmas) return Device(FPGA_v2, mesh)
[docs] def SixNodesIPv2(): """Returns a 6-node virtual device for FPGA v2. Returns: :obj:`Device`: a virtual device. """ dma_event = NP.Ident(1, 1, 0) dma_conf = NP.Ident(1, 1, 1) skipdmas_num_channels = 4 skip_dmas = [ NP.Info( NP.Ident(1, 1, 3, skipdmas_num_channels), {NP.Type.SKIP_DMA_STORE, NP.Type.SKIP_DMA_LOAD}, False)] nps = [ NP.Info(NP.Ident(1, 2, 0), {NP.Type.TNP_B, NP.Type.CNP1, NP.Type.FNP2}, True), NP.Info(NP.Ident(1, 2, 1), {NP.Type.TNP_B, NP.Type.CNP1}, True), NP.Info(NP.Ident(1, 2, 2), {NP.Type.TNP_B, NP.Type.CNP1}, True), NP.Info(NP.Ident(1, 2, 3), {NP.Type.TNP_B, NP.Type.CNP1}, True), NP.Info(NP.Ident(1, 3, 0), {NP.Type.TNP_B, NP.Type.CNP1, NP.Type.CNP2}, True), NP.Info(NP.Ident(1, 3, 1), {NP.Type.TNP_B, NP.Type.CNP1, NP.Type.CNP2}, True), NP.Info(NP.Ident(1, 3, 2), {NP.Type.TNP_B, NP.Type.CNP1, NP.Type.CNP2}, True), NP.Info(NP.Ident(1, 3, 3), {NP.Type.TNP_B, NP.Type.CNP1, NP.Type.CNP2}, True), NP.Info(NP.Ident(2, 2, 0), {NP.Type.TNP_B, NP.Type.CNP1, NP.Type.FNP3}, True), NP.Info(NP.Ident(2, 2, 1), {NP.Type.TNP_B, NP.Type.CNP1, NP.Type.CNP2}, True), NP.Info(NP.Ident(2, 2, 2), {NP.Type.TNP_B, NP.Type.CNP1, NP.Type.CNP2}, True), NP.Info(NP.Ident(2, 2, 3), {NP.Type.TNP_B, NP.Type.CNP1, NP.Type.CNP2}, True), NP.Info(NP.Ident(2, 3, 0), {NP.Type.TNP_B, NP.Type.CNP1, NP.Type.CNP2}, True), NP.Info(NP.Ident(2, 3, 1), {NP.Type.TNP_B, NP.Type.CNP1, NP.Type.CNP2}, True), NP.Info(NP.Ident(2, 3, 2), {NP.Type.TNP_B, NP.Type.CNP1, NP.Type.CNP2}, True), NP.Info(NP.Ident(2, 3, 3), {NP.Type.TNP_B, NP.Type.CNP1, NP.Type.CNP2}, True), NP.Info(NP.Ident(3, 2, 0), {NP.Type.TNP_B, NP.Type.CNP1, NP.Type.FNP3}, True), NP.Info(NP.Ident(3, 2, 1), {NP.Type.TNP_B, NP.Type.CNP1, NP.Type.CNP2}, True), NP.Info(NP.Ident(3, 2, 2), {NP.Type.TNP_B, NP.Type.CNP1, NP.Type.CNP2}, True), NP.Info(NP.Ident(3, 2, 3), {NP.Type.TNP_B, NP.Type.CNP1, NP.Type.CNP2}, True), NP.Info(NP.Ident(3, 3, 0), {NP.Type.TNP_B, NP.Type.CNP1, NP.Type.CNP2}, True), NP.Info(NP.Ident(3, 3, 1), {NP.Type.TNP_B, NP.Type.CNP1, NP.Type.CNP2}, True), NP.Info(NP.Ident(3, 3, 2), {NP.Type.TNP_B, NP.Type.CNP1, NP.Type.CNP2}, True), NP.Info(NP.Ident(3, 3, 3), {NP.Type.TNP_B, NP.Type.CNP1, NP.Type.CNP2}, True) ] mesh = NP.Mesh(IpVersion.v2, dma_event, dma_conf, NP.Info.hrc(True), nps, skip_dmas) return Device(FPGA_v2, mesh)
[docs] def create_device(num_cnp_tnp, num_fnp, num_skip_dma_channel=0, include_hrc=True, sram_size=None, hw_version=FPGA_v2, ): """Creates an Akida device with the specified hardware components. Args: num_cnp_tnp (int): Number of CNP and TNP_B units (TNP_B is only available on 2.x devices). num_fnp (int): Number of FNP units to include. An FNP2 with external memory is added first, followed by FNP3 units. num_skip_dma_channel (int, optional): Number of skip DMA channels (only applicable for 2.x devices). Defaults to 0. include_hrc (bool, optional): Whether to include the HRC. Defaults to True. sram_size (akida.NP.SramSize, optional): Size of shared SRAM available inside the mesh. Defaults to None. weight_memory (int, optional): Size of shared filter SRAM in bytes available inside the mesh for each two NPs. Defaults to None. hw_version (akida.HwVersion, optional): The version of the device. Defaults to FPGA_v2. Returns: akida.Device: An Akida device. """ # General akida node info SKIP_DMA_ROW = 1 SKIP_DMA_ID = 3 MAX_SKIP_DMA_CHANNELS_PER_COL = 4 NUM_NPS_PER_NODE = 4 # Get Ip version ip_version = hw_version.ip_version # Lut is a v2 feature has_lut = ip_version == IpVersion.v2 def _get_supported_hw_version(ip_version): if ip_version == IpVersion.v2: return [FPGA_v2] return [NSoC_v1, NSoC_v2, TwoNodesIP_v1, AKD1500_v1] def _compute_total_nps(num_cnp_tnp, num_fnp): total_nps = num_cnp_tnp + num_fnp # The nodes are completed with NPs of type CNP1, CNP2 (and TNP_B if hw_version = FPGA_v2) # if the requested NPs are not a multiple of NUM_NPS_PER_NODE. nps_to_add = (-total_nps) % NUM_NPS_PER_NODE num_cnp_tnp += nps_to_add total_nps += nps_to_add return total_nps, num_cnp_tnp def _compute_optimal_nps_grid_shape(total_nps): if total_nps == 0: return 0, 0 num_nodes = total_nps / NUM_NPS_PER_NODE fractional_diff = (num_nodes / sqrt(num_nodes)) - (num_nodes // sqrt(num_nodes)) # Increment columns first and then rows num_cols = floor(sqrt(num_nodes)) + ceil(fractional_diff) num_rows = floor(sqrt(num_nodes)) + round(fractional_diff) return num_rows, num_cols def _make_skip_dmas(num_cols, num_skip_dma_channel): skip_dmas = [] if hw_version != FPGA_v2 and num_skip_dma_channel > 0: raise ValueError(f"Skip DMAs are only supported on v2 devices (hw_version=FPGA_v2). " f"Current hardware version: {hw_version}.") if num_skip_dma_channel == 0: return skip_dmas # Distribute Skip DMAs accross columns as much as possible # When the number of Skip DMAs exceeds the number of columns used by nps, we increase # the number of columns. current_max_skip_dma_channels = num_cols * MAX_SKIP_DMA_CHANNELS_PER_COL if (extra_channels := num_skip_dma_channel - current_max_skip_dma_channels) > 0: num_cols += ceil((extra_channels) / MAX_SKIP_DMA_CHANNELS_PER_COL) # Compute number of channels per skip dma num_channels_per_skip_dma = ceil(num_skip_dma_channel / num_cols) # Deduce the number of skip dmas in the device num_skip_dmas = min(num_skip_dma_channel, num_cols) for col in range(1, num_skip_dmas + 1): skip_dmas.append( NP.Info(NP.Ident(col, SKIP_DMA_ROW, SKIP_DMA_ID, num_channels_per_skip_dma), {NP.Type.SKIP_DMA_STORE, NP.Type.SKIP_DMA_LOAD}, False) ) return skip_dmas def _make_nps(num_rows, num_cols, num_cnp_tnp, num_fnp): # Construct NP types cnp_types = [NP.Type.CNP1, NP.Type.CNP2] fnp_types = [NP.Type.FNP2] # If a device with only 1 FNP is requested, the corresponding NP will only # have FNP installed. if num_fnp > 1: fnp_types = [NP.Type.CNP1, NP.Type.CNP2] + fnp_types if hw_version == FPGA_v2: fnp_types.insert(0, NP.Type.TNP_B) if hw_version == FPGA_v2: cnp_types.insert(0, NP.Type.TNP_B) nps = [] # Starting from row 2 for row in range(2, num_rows + 2): for col in range(1, num_cols + 1): # Now loop over nps for id in range(NUM_NPS_PER_NODE): if num_cnp_tnp > 0: nps.append(NP.Info(NP.Ident(col, row, id), cnp_types, has_lut)) num_cnp_tnp -= 1 elif num_fnp > 0: nps.append(NP.Info(NP.Ident(col, row, id), fnp_types, has_lut)) # Change FNP2 with FNP3 if fnp_types[-1] == NP.Type.FNP2: fnp_types[-1] = NP.Type.FNP3 num_fnp -= 1 return nps # Check HW version supported_hw_version = _get_supported_hw_version(ip_version) if hw_version not in supported_hw_version: raise ValueError(f"Invalid HW version '{hw_version}'. " f"Expected one of: {supported_hw_version}.") # Compute total nps and construct the optimal grid for NPs # The mesh should be as square as possible total_nps, num_cnp_tnp = _compute_total_nps(num_cnp_tnp, num_fnp) num_rows, num_cols = _compute_optimal_nps_grid_shape(total_nps) if total_nps == 0 and not include_hrc: raise ValueError("It is not possible to create a completely empty device. " f"num_cnp_tnp + num_fnp ({total_nps}) must be greater than zero or " "HRC must be included).") # Make DMA event and conf dma_event = NP.Ident(1, 1, 0) dma_conf = NP.Ident(1, 1, 1) # Make SkipDMAs skip_dmas = _make_skip_dmas(num_cols, num_skip_dma_channel) # Make NPs nps = _make_nps(num_rows, num_cols, num_cnp_tnp, num_fnp) # Default SRAM size if not specified if sram_size is None: sram_size = NP.SramSize_v2 if ip_version == IpVersion.v2 else NP.SramSize_v1 # Make the mesh hrc = NP.Info.hrc(has_lut) if include_hrc else None mesh = NP.Mesh(ip_version, dma_event, dma_conf, hrc, nps, skip_dmas, sram_size) return Device(hw_version, mesh)
[docs] def compute_minimal_memory(model): """Compute the minimal memory required for inputs and weights on the device. Args: model (akida.Model): an Akida model. Returns: int, int: minimal input_buffer memory and minimal weight memory in bytes. """ # Check that model is mapped assert any([s.program is not None for s in model.sequences]), "Model needs to be mapped" minimal_input_buffer_memory = 0 minimal_weight_memory = 0 for layer in model.layers: if not layer.mapping: continue for np in layer.mapping.nps: np_weight_size = np.mem_info.weight_size if np.type == NP.Type.FNP3: # FNP weight SRAM in 32-bit words, 48 bits are used per 50-bit word. # We need to convert first to 32 bit by dividing by 4 to compute weight size np_weight_size /= 4 # Compute weight size and convert back to bytes by multiplying by 4 np_weight_size = ceil(50 * np_weight_size / 48) * 4 minimal_input_buffer_memory = max(minimal_input_buffer_memory, np.mem_info.input_size) minimal_weight_memory = max(minimal_weight_memory, np_weight_size) return minimal_input_buffer_memory, minimal_weight_memory
def _get_outbounds(layer, layers): return [ly for ly in layers if layer in ly.inbounds] def _model_generator(layers): # Scroll through a list of layers, returning a pair of consecutive layers. Notes: # - one of the two branches must not have nodes (not implemented yet) # - merge layer is performed in the following NP, so we take their inbounds queue = [layers[-1]] while len(queue) > 0: t_layer = queue.pop(0) inbounds = t_layer.inbounds # Skip a layer if it is a merge one. if len(inbounds) == 1 and len(inbounds[0].inbounds) > 1: inbounds = inbounds[0].inbounds # Check inbounds constraints. if len(inbounds) > 1: # In case of multiple branches, one of them must not contain layers. # This translates to some inbound having multiple outbounds. new_inbounds = [] for ly in inbounds: # Remove the branch with empty layers if len(_get_outbounds(ly, layers)) == 1: new_inbounds.append(ly) if len(new_inbounds) != 1: raise NotImplementedError(f"{t_layer} has multiple inbounds, " "but there is no empty branch.") # Remove the inbounds that are not empty branches. inbounds = new_inbounds # Yield the pair (inbound, target_layer) if both have been mapped # Or if taget_layer is mapped and the inbound is an InputData layer. if len(inbounds) == 1 and t_layer.mapping is not None: if inbounds[0].parameters.layer_type == LayerType.InputData or \ inbounds[0].mapping is not None: yield LayerSequence((inbounds[0], t_layer)) # Then, update the queue with the inbound layer. queue.append(inbounds[0]) def _get_initial_skip_dma_channels(model): # The initial number of skip DMAs is len(btc) + len(skips) SKIP_LAYER_TYPES = [LayerType.Add, LayerType.Concatenate] BTC_LAYER_TYPES = [LayerType.BufferTempConv, LayerType.DepthwiseBufferTempConv] skip_dma_channels = 0 for ly in model.layers: if ly.parameters.layer_type in SKIP_LAYER_TYPES + BTC_LAYER_TYPES: skip_dma_channels += 1 return skip_dma_channels def _get_initial_number_of_fnp(model): # The initial number of FNP is len(dense), since they are not split FNP_LAYER_TYPES = [LayerType.Dense1D] nb_fnp = 0 for ly in model.layers: if ly.parameters.layer_type in FNP_LAYER_TYPES: nb_fnp += 1 return nb_fnp def _get_np_components(model_or_pass, np_types=None): total_nps = [] for layer in model_or_pass.layers: if hasattr(layer.mapping, 'nps'): for np in layer.mapping.nps: if np_types is None or np.type in np_types: total_nps.append(np) if hasattr(layer.mapping, 'skipdma_loads'): for np in layer.mapping.skipdma_loads: if np_types is None or np.type in np_types: total_nps.append(np) if hasattr(layer.mapping, 'skipdma_stores'): for np in layer.mapping.skipdma_stores: if np_types is None or np.type in np_types: total_nps.append(np) return total_nps def _compute_skip_dma_channels(model_or_pass): # Compute the number of skip DMA channels as max(len(SKIP_DMA_STORE), len(SKIP_DMA_LOAD)) skip_dma_load = _get_np_components(model_or_pass, (NP.SKIP_DMA_LOAD,)) skip_dma_store = _get_np_components(model_or_pass, (NP.SKIP_DMA_STORE,)) return max(len(skip_dma_load), len(skip_dma_store)) def _compute_number_of_cnp_tnp(model_or_pass): # Compute the number of CNP/TNP-B. CNP_TNP_B_TYPES = (NP.CNP1, NP.CNP2, NP.TNP_B) total_cnps = _get_np_components(model_or_pass, CNP_TNP_B_TYPES) return len(total_cnps) def _compute_number_of_fnp(model_or_pass): # Compute the number of FNP. FNP_TYPES = (NP.FNP2, NP.FNP3) total_fnps = _get_np_components(model_or_pass, FNP_TYPES) return len(total_fnps)
[docs] def compute_min_device(model, enable_hwpr=False, sram_size=None, minimal_memory=False, initial_num_nodes=36): """Builds the Akida virtual device that can fit the model entirely with or without reconfiguration. Args: model (akida.Model): the model used to determine the device. enable_hwpr (bool, optional): if True, the device is computed assuming partial reconfiguration. Defaults to False. sram_size (NP.SramSize, optional): Size of shared SRAM available inside the mesh. Ignored when `minimal_memory` is True. Defaults to None. minimal_memory (bool, optional): if True, computes and sets the minimal required inputs and weights memory for the device. Defaults to False. initial_num_nodes (int, optional): the initial number of nodes with which to compute the base device. Defaults to 36. Returns: akida.Device: the computed device """ if not isinstance(model, Model): raise TypeError(f"Expected model to be an {Model}, got {type(model)}.") NUM_NPS_PER_NODE = 4 if model.ip_version != IpVersion.v2: raise ValueError("Only IpVersion.v2 models are supported. " f"Current model version={model.ip_version}") # Create a copy of the model to avoid modifying the original one. model = Model(layers=model.layers) # Compute a base device with which to compute the next parameters. params = {"num_skip_dma_channel": _get_initial_skip_dma_channels(model), "num_fnp": _get_initial_number_of_fnp(model), "sram_size": sram_size} params["num_cnp_tnp"] = NUM_NPS_PER_NODE * initial_num_nodes - params["num_fnp"] if params["num_cnp_tnp"] < 0: raise ValueError("Impossible to compute base device: " f"the number of initial nodes ({initial_num_nodes}) is not enough.") device = create_device(**params) # Map model with the default parameters. model.map(device, mode=MapMode.Minimal, hw_only=True) # Now that the model has been mapped onto the base device, # we can compute the parameters to build the required device. if enable_hwpr: params["num_cnp_tnp"] = params["num_fnp"] = 0 for layer_seq in _model_generator(model.layers): # Compute the number of CNP/FNP needed to map the model in multiple passes, # as the larger sum of 2 consecutive layers. params["num_cnp_tnp"] = max(params["num_cnp_tnp"], _compute_number_of_cnp_tnp(layer_seq)) params["num_fnp"] = max(params["num_fnp"], _compute_number_of_fnp(layer_seq)) # To compute the minimum number of skip DMA channels needed when partial reconfiguration # is allowed, we iterate the device until we find a valid one. for num_skip_dma_channel in range(1, params.pop("num_skip_dma_channel") + 1): try: device = create_device(num_skip_dma_channel=num_skip_dma_channel, **params) model.map(device, mode=MapMode.Minimal, hw_only=True) params["num_skip_dma_channel"] = num_skip_dma_channel break except Exception: continue else: params["num_cnp_tnp"] = _compute_number_of_cnp_tnp(model) params["num_fnp"] = _compute_number_of_fnp(model) params["num_skip_dma_channel"] = _compute_skip_dma_channels(model) if minimal_memory: if sram_size is not None: warnings.warn( "The 'sram_size' argument will be ignored because 'minimal_memory' is set to True. " "The required memory will be computed automatically. Continuing execution" ) params["sram_size"] = NP.SramSize(*compute_minimal_memory(model)) # Create a virtual device with the requirements. device = create_device(**params) # Sanity check: map model on device. try: model.map(device, mode=MapMode.Minimal, hw_only=True) except Exception as e: raise RuntimeError("It was not possible to find a device for this model. " f"Reason:\n{str(e)}") return device
[docs] def compute_common_device(ak_models): """Computes a common Akida device that can run all the given models. Ensures all models were mapped. Args: ak_models (List[akida.Model]): A list of Akida models whose hardware requirements will be combined. Returns: akida.Device: A new device that can map all the given models. """ if not ak_models: raise ValueError("The list of Akida models cannot be empty.") if wrong_model_types := [type(m) for m in ak_models if not isinstance(m, Model)]: raise TypeError(f"Devices cannot be computed for models of type {wrong_model_types}.") if any(model.device is None for model in ak_models): raise ValueError("All models must be mapped on a device.") # For safety, check that all models devices have the same version assert all(model.device.version == ak_models[0].device.version for model in ak_models), \ "Models devices have different versions." include_hrc = any(model.device.mesh.hrc for model in ak_models) max_num_cnp_tnp = 0 max_num_fnp = 0 max_num_skip_dma_channel = 0 sram_size = NP.SramSize(0, 0) for model in ak_models: # Update params for sequence in model.sequences: for pass_ in sequence.passes: max_num_cnp_tnp = max(max_num_cnp_tnp, _compute_number_of_cnp_tnp(pass_)) max_num_fnp = max(max_num_fnp, _compute_number_of_fnp(pass_)) max_num_skip_dma_channel = max(max_num_skip_dma_channel, _compute_skip_dma_channels(pass_)) # Update Sram size sram_size = NP.SramSize(max(sram_size.input_bytes, model.device.mesh.np_sram_size.input_bytes), max(sram_size.weight_bytes, model.device.mesh.np_sram_size.weight_bytes)) return create_device(max_num_cnp_tnp, max_num_fnp, max_num_skip_dma_channel, include_hrc, sram_size, ak_models[0].device.version)