import warnings
from collections import namedtuple
from math import ceil, floor, sqrt
from .core import (NP, AKD1500_v1, Device, FPGA_v2, Pico_FPGA, IpVersion, NSoC_v1, NSoC_v2,
TwoNodesIP_v1, LayerType, Model)
from .mapping import MapMode
LayerSequence = namedtuple('LayerSequence', ['layers'])
[docs]
def AKD1000():
"""Returns a virtual device for an AKD1000 NSoC.
This function returns a virtual device for the Brainchip's AKD1000
NSoC.
Returns:
:obj:`Device`: a virtual device.
"""
dma_event = NP.Ident(3, 1, 0)
dma_conf = NP.Ident(3, 1, 1)
nps = [
NP.Info(NP.Ident(1, 3, 0), {NP.Type.CNP1, NP.Type.FNP3}, False),
NP.Info(NP.Ident(1, 3, 1), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(1, 3, 2), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(1, 3, 3), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(1, 4, 0), {NP.Type.CNP1, NP.Type.FNP3}, False),
NP.Info(NP.Ident(1, 4, 1), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(1, 4, 2), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(1, 4, 3), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(1, 5, 0), {NP.Type.CNP1, NP.Type.FNP3}, False),
NP.Info(NP.Ident(1, 5, 1), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(1, 5, 2), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(1, 5, 3), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(2, 3, 0), {NP.Type.CNP1, NP.Type.FNP3}, False),
NP.Info(NP.Ident(2, 3, 1), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(2, 3, 2), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(2, 3, 3), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(2, 4, 0), {NP.Type.CNP1, NP.Type.FNP3}, False),
NP.Info(NP.Ident(2, 4, 1), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(2, 4, 2), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(2, 4, 3), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(2, 5, 0), {NP.Type.CNP1, NP.Type.FNP3}, False),
NP.Info(NP.Ident(2, 5, 1), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(2, 5, 2), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(2, 5, 3), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(3, 1, 2), {NP.Type.CNP1}, False),
NP.Info(NP.Ident(3, 1, 3), {NP.Type.CNP1}, False),
NP.Info(NP.Ident(3, 2, 0), {NP.Type.CNP1, NP.Type.FNP3}, False),
NP.Info(NP.Ident(3, 2, 1), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(3, 2, 2), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(3, 2, 3), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(3, 3, 0), {NP.Type.CNP1, NP.Type.FNP3}, False),
NP.Info(NP.Ident(3, 3, 1), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(3, 3, 2), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(3, 3, 3), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(3, 4, 0), {NP.Type.CNP1, NP.Type.FNP3}, False),
NP.Info(NP.Ident(3, 4, 1), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(3, 4, 2), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(3, 4, 3), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(3, 5, 0), {NP.Type.CNP1, NP.Type.FNP3}, False),
NP.Info(NP.Ident(3, 5, 1), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(3, 5, 2), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(3, 5, 3), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(4, 1, 0), {NP.Type.CNP1, NP.Type.FNP2}, False),
NP.Info(NP.Ident(4, 1, 1), {NP.Type.CNP1, NP.Type.FNP2}, False),
NP.Info(NP.Ident(4, 1, 2), {NP.Type.CNP1, NP.Type.FNP2}, False),
NP.Info(NP.Ident(4, 1, 3), {NP.Type.CNP1, NP.Type.FNP2}, False),
NP.Info(NP.Ident(4, 2, 0), {NP.Type.CNP1, NP.Type.FNP3}, False),
NP.Info(NP.Ident(4, 2, 1), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(4, 2, 2), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(4, 2, 3), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(4, 3, 0), {NP.Type.CNP1, NP.Type.FNP3}, False),
NP.Info(NP.Ident(4, 3, 1), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(4, 3, 2), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(4, 3, 3), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(4, 4, 0), {NP.Type.CNP1, NP.Type.FNP3}, False),
NP.Info(NP.Ident(4, 4, 1), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(4, 4, 2), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(4, 4, 3), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(4, 5, 0), {NP.Type.CNP1, NP.Type.FNP3}, False),
NP.Info(NP.Ident(4, 5, 1), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(4, 5, 2), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(4, 5, 3), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(5, 2, 0), {NP.Type.CNP1, NP.Type.FNP3}, False),
NP.Info(NP.Ident(5, 2, 1), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(5, 2, 2), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(5, 2, 3), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(5, 3, 0), {NP.Type.CNP1, NP.Type.FNP3}, False),
NP.Info(NP.Ident(5, 3, 1), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(5, 3, 2), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(5, 3, 3), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(5, 4, 0), {NP.Type.CNP1, NP.Type.FNP3}, False),
NP.Info(NP.Ident(5, 4, 1), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(5, 4, 2), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(5, 4, 3), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(5, 5, 0), {NP.Type.CNP1, NP.Type.FNP3}, False),
NP.Info(NP.Ident(5, 5, 1), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(5, 5, 2), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(5, 5, 3), {NP.Type.CNP1, NP.Type.CNP2}, False)
]
mesh = NP.Mesh(IpVersion.v1, dma_event, dma_conf, NP.Info.hrc(False), nps)
return Device(NSoC_v2, mesh)
[docs]
def TwoNodesIPv1():
"""Returns a virtual device for a two nodes Akida IP.
Returns:
:obj:`Device`: a virtual device.
"""
dma_event = NP.Ident(1, 1, 0)
dma_conf = NP.Ident(1, 1, 1)
nps = [
NP.Info(NP.Ident(1, 2, 0), {NP.Type.CNP1, NP.Type.FNP2}, False),
NP.Info(NP.Ident(1, 2, 1), {NP.Type.CNP1}, False),
NP.Info(NP.Ident(1, 2, 2), {NP.Type.CNP1}, False),
NP.Info(NP.Ident(1, 2, 3), {NP.Type.CNP1}, False),
NP.Info(NP.Ident(1, 3, 0), {NP.Type.CNP1, NP.Type.FNP3}, False),
NP.Info(NP.Ident(1, 3, 1), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(1, 3, 2), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(1, 3, 3), {NP.Type.CNP1, NP.Type.CNP2}, False)
]
mesh = NP.Mesh(IpVersion.v1, dma_event, dma_conf, NP.Info.hrc(False), nps)
return Device(TwoNodesIP_v1, mesh)
[docs]
def AKD1500():
"""Returns a virtual device for AKD1500 chip.
Returns:
:obj:`Device`: a virtual device.
"""
dma_event = NP.Ident(1, 1, 0)
dma_conf = NP.Ident(1, 1, 1)
nps = [
NP.Info(NP.Ident(1, 2, 0), {NP.Type.CNP1, NP.Type.FNP2}, False),
NP.Info(NP.Ident(1, 2, 1), {NP.Type.CNP1}, False),
NP.Info(NP.Ident(1, 2, 2), {NP.Type.CNP1}, False),
NP.Info(NP.Ident(1, 2, 3), {NP.Type.CNP1}, False),
NP.Info(NP.Ident(1, 3, 0), {NP.Type.CNP1, NP.Type.FNP3}, False),
NP.Info(NP.Ident(1, 3, 1), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(1, 3, 2), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(1, 3, 3), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(2, 1, 0), {NP.Type.CNP1, NP.Type.FNP3}, False),
NP.Info(NP.Ident(2, 1, 1), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(2, 1, 2), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(2, 1, 3), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(2, 2, 0), {NP.Type.CNP1, NP.Type.FNP3}, False),
NP.Info(NP.Ident(2, 2, 1), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(2, 2, 2), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(2, 2, 3), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(2, 3, 0), {NP.Type.CNP1, NP.Type.FNP3}, False),
NP.Info(NP.Ident(2, 3, 1), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(2, 3, 2), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(2, 3, 3), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(3, 1, 0), {NP.Type.CNP1, NP.Type.FNP3}, False),
NP.Info(NP.Ident(3, 1, 1), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(3, 1, 2), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(3, 1, 3), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(3, 2, 0), {NP.Type.CNP1, NP.Type.FNP3}, False),
NP.Info(NP.Ident(3, 2, 1), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(3, 2, 2), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(3, 2, 3), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(3, 3, 0), {NP.Type.CNP1, NP.Type.FNP3}, False),
NP.Info(NP.Ident(3, 3, 1), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(3, 3, 2), {NP.Type.CNP1, NP.Type.CNP2}, False),
NP.Info(NP.Ident(3, 3, 3), {NP.Type.CNP1, NP.Type.CNP2}, False)
]
mesh = NP.Mesh(IpVersion.v1, dma_event, dma_conf, NP.Info.hrc(False), nps)
return Device(AKD1500_v1, mesh)
[docs]
def TwoNodesIPv2():
"""Returns a 2-node virtual device for FPGA v2.
Returns:
:obj:`Device`: a virtual device.
"""
dma_event = NP.Ident(1, 1, 0)
dma_conf = NP.Ident(1, 1, 1)
skipdmas_num_channels = 2
skip_dmas = [
NP.Info(
NP.Ident(1, 1, 3, skipdmas_num_channels),
{NP.Type.SKIP_DMA_STORE, NP.Type.SKIP_DMA_LOAD}, False)]
nps = [
NP.Info(NP.Ident(1, 2, 0), {NP.Type.CNP1, NP.Type.FNP2}, True),
NP.Info(NP.Ident(1, 2, 1), {NP.Type.CNP1}, True),
NP.Info(NP.Ident(1, 2, 2), {NP.Type.CNP1}, True),
NP.Info(NP.Ident(1, 2, 3), {NP.Type.CNP1}, True),
NP.Info(NP.Ident(2, 2, 0), {NP.Type.CNP1, NP.Type.FNP3}, True),
NP.Info(NP.Ident(2, 2, 1), {NP.Type.CNP1, NP.Type.CNP2}, True),
NP.Info(NP.Ident(2, 2, 2), {NP.Type.CNP1, NP.Type.CNP2}, True),
NP.Info(NP.Ident(2, 2, 3), {NP.Type.CNP1, NP.Type.CNP2}, True)
]
mesh = NP.Mesh(IpVersion.v2, dma_event, dma_conf, NP.Info.hrc(True), nps, skip_dmas)
return Device(FPGA_v2, mesh)
[docs]
def SixNodesIPv2():
"""Returns a 6-node virtual device for FPGA v2.
Returns:
:obj:`Device`: a virtual device.
"""
dma_event = NP.Ident(1, 1, 0)
dma_conf = NP.Ident(1, 1, 1)
skipdmas_num_channels = 4
skip_dmas = [
NP.Info(
NP.Ident(1, 1, 3, skipdmas_num_channels),
{NP.Type.SKIP_DMA_STORE, NP.Type.SKIP_DMA_LOAD}, False)]
nps = [
NP.Info(NP.Ident(1, 2, 0), {NP.Type.TNP_B, NP.Type.CNP1, NP.Type.FNP2}, True),
NP.Info(NP.Ident(1, 2, 1), {NP.Type.TNP_B, NP.Type.CNP1}, True),
NP.Info(NP.Ident(1, 2, 2), {NP.Type.TNP_B, NP.Type.CNP1}, True),
NP.Info(NP.Ident(1, 2, 3), {NP.Type.TNP_B, NP.Type.CNP1}, True),
NP.Info(NP.Ident(1, 3, 0), {NP.Type.TNP_B, NP.Type.CNP1, NP.Type.CNP2}, True),
NP.Info(NP.Ident(1, 3, 1), {NP.Type.TNP_B, NP.Type.CNP1, NP.Type.CNP2}, True),
NP.Info(NP.Ident(1, 3, 2), {NP.Type.TNP_B, NP.Type.CNP1, NP.Type.CNP2}, True),
NP.Info(NP.Ident(1, 3, 3), {NP.Type.TNP_B, NP.Type.CNP1, NP.Type.CNP2}, True),
NP.Info(NP.Ident(2, 2, 0), {NP.Type.TNP_B, NP.Type.CNP1, NP.Type.FNP3}, True),
NP.Info(NP.Ident(2, 2, 1), {NP.Type.TNP_B, NP.Type.CNP1, NP.Type.CNP2}, True),
NP.Info(NP.Ident(2, 2, 2), {NP.Type.TNP_B, NP.Type.CNP1, NP.Type.CNP2}, True),
NP.Info(NP.Ident(2, 2, 3), {NP.Type.TNP_B, NP.Type.CNP1, NP.Type.CNP2}, True),
NP.Info(NP.Ident(2, 3, 0), {NP.Type.TNP_B, NP.Type.CNP1, NP.Type.CNP2}, True),
NP.Info(NP.Ident(2, 3, 1), {NP.Type.TNP_B, NP.Type.CNP1, NP.Type.CNP2}, True),
NP.Info(NP.Ident(2, 3, 2), {NP.Type.TNP_B, NP.Type.CNP1, NP.Type.CNP2}, True),
NP.Info(NP.Ident(2, 3, 3), {NP.Type.TNP_B, NP.Type.CNP1, NP.Type.CNP2}, True),
NP.Info(NP.Ident(3, 2, 0), {NP.Type.TNP_B, NP.Type.CNP1, NP.Type.FNP3}, True),
NP.Info(NP.Ident(3, 2, 1), {NP.Type.TNP_B, NP.Type.CNP1, NP.Type.CNP2}, True),
NP.Info(NP.Ident(3, 2, 2), {NP.Type.TNP_B, NP.Type.CNP1, NP.Type.CNP2}, True),
NP.Info(NP.Ident(3, 2, 3), {NP.Type.TNP_B, NP.Type.CNP1, NP.Type.CNP2}, True),
NP.Info(NP.Ident(3, 3, 0), {NP.Type.TNP_B, NP.Type.CNP1, NP.Type.CNP2}, True),
NP.Info(NP.Ident(3, 3, 1), {NP.Type.TNP_B, NP.Type.CNP1, NP.Type.CNP2}, True),
NP.Info(NP.Ident(3, 3, 2), {NP.Type.TNP_B, NP.Type.CNP1, NP.Type.CNP2}, True),
NP.Info(NP.Ident(3, 3, 3), {NP.Type.TNP_B, NP.Type.CNP1, NP.Type.CNP2}, True)
]
mesh = NP.Mesh(IpVersion.v2, dma_event, dma_conf, NP.Info.hrc(True), nps, skip_dmas)
return Device(FPGA_v2, mesh)
[docs]
def PicoIP():
"""Returns a Pico virtual device for Pico FPGA.
Returns:
:obj:`Device`: a virtual device.
"""
dma_event = NP.Ident(1, 1, 0)
dma_conf = NP.Ident(1, 1, 1)
nps = [NP.Info(NP.Ident(1, 2, 0), {NP.Type.TNP_R}, False)]
mesh = NP.Mesh(IpVersion.pico, dma_event, dma_conf, None, nps)
return Device(Pico_FPGA, mesh)
[docs]
def create_device(num_cnp_tnp,
num_fnp,
num_skip_dma_channel=0,
include_hrc=True,
sram_size=None,
hw_version=FPGA_v2,
):
"""Creates an Akida device with the specified hardware components.
Args:
num_cnp_tnp (int): Number of CNP and TNP units. (TNP is available on 2.x and Pico devices.
Ignored on Pico devices; treated as 1.)
num_fnp (int): Number of FNP units to include. An FNP2 with external memory is added first,
followed by FNP3 units. (Ignored on Pico devices; treated as 0.)
num_skip_dma_channel (int, optional): Number of skip DMA channels (only applicable for
2.x devices). Defaults to 0.
include_hrc (bool, optional): Whether to include the HRC. Defaults to True.
sram_size (akida.NP.SramSize, optional): Size of shared SRAM available inside the mesh.
Defaults to None.
weight_memory (int, optional): Size of shared filter SRAM in bytes available inside the
mesh for each two NPs. Defaults to None.
hw_version (akida.HwVersion, optional): The version of the device. Defaults to FPGA_v2.
Returns:
akida.Device: An Akida device.
"""
# General akida node info
SKIP_DMA_ROW = 1
SKIP_DMA_ID = 3
MAX_SKIP_DMA_CHANNELS_PER_COL = 4
NUM_NPS_PER_NODE = 4
# Get Ip version
ip_version = hw_version.ip_version
# Lut is a v2 feature
has_lut = ip_version == IpVersion.v2
def _get_supported_hw_version(ip_version):
if ip_version == IpVersion.v2:
return [FPGA_v2]
elif ip_version == IpVersion.pico:
return [Pico_FPGA]
return [NSoC_v1, NSoC_v2, TwoNodesIP_v1, AKD1500_v1]
def _compute_total_nps(num_cnp_tnp, num_fnp):
total_nps = num_cnp_tnp + num_fnp
# The nodes are completed with NPs of type CNP1, CNP2 (and TNP_B if hw_version = FPGA_v2)
# if the requested NPs are not a multiple of NUM_NPS_PER_NODE.
nps_to_add = (-total_nps) % NUM_NPS_PER_NODE
num_cnp_tnp += nps_to_add
total_nps += nps_to_add
return total_nps, num_cnp_tnp
def _compute_optimal_nps_grid_shape(total_nps):
if total_nps == 0:
return 0, 0
num_nodes = total_nps / NUM_NPS_PER_NODE
fractional_diff = (num_nodes / sqrt(num_nodes)) - (num_nodes // sqrt(num_nodes))
# Increment columns first and then rows
num_cols = floor(sqrt(num_nodes)) + ceil(fractional_diff)
num_rows = floor(sqrt(num_nodes)) + round(fractional_diff)
return num_rows, num_cols
def _make_skip_dmas(num_cols, num_skip_dma_channel):
skip_dmas = []
if hw_version != FPGA_v2 and num_skip_dma_channel > 0:
raise ValueError(f"Skip DMAs are only supported on v2 devices (hw_version=FPGA_v2). "
f"Current hardware version: {hw_version}.")
if num_skip_dma_channel == 0:
return skip_dmas
# Distribute Skip DMAs across columns as much as possible
# When the number of Skip DMAs exceeds the number of columns used by nps, we increase
# the number of columns.
current_max_skip_dma_channels = num_cols * MAX_SKIP_DMA_CHANNELS_PER_COL
if (extra_channels := num_skip_dma_channel - current_max_skip_dma_channels) > 0:
num_cols += ceil((extra_channels) / MAX_SKIP_DMA_CHANNELS_PER_COL)
# Compute number of channels per skip dma
num_channels_per_skip_dma = ceil(num_skip_dma_channel / num_cols)
# Deduce the number of skip dmas in the device
num_skip_dmas = min(num_skip_dma_channel, num_cols)
for col in range(1, num_skip_dmas + 1):
skip_dmas.append(
NP.Info(NP.Ident(col, SKIP_DMA_ROW, SKIP_DMA_ID, num_channels_per_skip_dma),
{NP.Type.SKIP_DMA_STORE, NP.Type.SKIP_DMA_LOAD}, False)
)
return skip_dmas
def _make_nps(num_rows, num_cols, num_cnp_tnp, num_fnp):
if hw_version == Pico_FPGA:
# There is no mesh on Pico devices
return [NP.Info(NP.Ident(0, 0, 0), [NP.Type.TNP_R], False)]
# Construct NP types
cnp_tnp_types = [NP.Type.CNP1, NP.Type.CNP2]
if hw_version == FPGA_v2:
cnp_tnp_types.insert(0, NP.Type.TNP_B)
fnp_types = [NP.Type.FNP2]
# If a device with only 1 FNP is requested, the corresponding NP will only
# have FNP installed.
if num_fnp > 1:
fnp_types = [NP.Type.CNP1, NP.Type.CNP2] + fnp_types
if hw_version == FPGA_v2:
fnp_types.insert(0, NP.Type.TNP_B)
nps = []
# Starting from row 2
for row in range(2, num_rows + 2):
for col in range(1, num_cols + 1):
# Now loop over nps
for id in range(NUM_NPS_PER_NODE):
if num_cnp_tnp > 0:
nps.append(NP.Info(NP.Ident(col, row, id), cnp_tnp_types, has_lut))
num_cnp_tnp -= 1
elif num_fnp > 0:
nps.append(NP.Info(NP.Ident(col, row, id), fnp_types, has_lut))
# Change FNP2 with FNP3
if fnp_types[-1] == NP.Type.FNP2:
fnp_types[-1] = NP.Type.FNP3
num_fnp -= 1
return nps
# Check HW version
supported_hw_version = _get_supported_hw_version(ip_version)
if hw_version not in supported_hw_version:
raise ValueError(f"Invalid HW version '{hw_version}'. "
f"Expected one of: {supported_hw_version}.")
if hw_version == Pico_FPGA:
if num_cnp_tnp != 1 or num_fnp != 0:
warnings.warn("Pico device supports only 1 TNP-R.")
num_cnp_tnp = 1
num_fnp = 0
total_nps = num_cnp_tnp = num_rows = num_cols = 1
else:
# Compute total nps and construct the optimal grid for NPs
# The mesh should be as square as possible
total_nps, num_cnp_tnp = _compute_total_nps(num_cnp_tnp, num_fnp)
num_rows, num_cols = _compute_optimal_nps_grid_shape(total_nps)
if total_nps == 0 and not include_hrc:
raise ValueError("It is not possible to create a completely empty device. "
f"num_cnp_tnp + num_fnp ({total_nps}) must be greater than zero or "
"HRC must be included).")
# Make DMA event and conf
dma_event = NP.Ident(1, 1, 0)
dma_conf = NP.Ident(1, 1, 1)
# Make SkipDMAs
skip_dmas = _make_skip_dmas(num_cols, num_skip_dma_channel)
# Make NPs
nps = _make_nps(num_rows, num_cols, num_cnp_tnp, num_fnp)
# Default SRAM size if not specified
if sram_size is None:
if ip_version == IpVersion.pico:
sram_size = NP.SramSize_pico
elif ip_version == IpVersion.v2:
sram_size = NP.SramSize_v2
else:
sram_size = NP.SramSize_v1
# Make the mesh
hrc = NP.Info.hrc(has_lut) if include_hrc else None
mesh = NP.Mesh(ip_version, dma_event, dma_conf, hrc, nps, skip_dmas, sram_size)
return Device(hw_version, mesh)
[docs]
def compute_minimal_memory(model):
"""Compute the minimal memory required on the device.
Args:
model (akida.Model): an Akida model.
Returns:
int, int, int, int, int, int: minimal input_buffer memory, weight memory in bytes,
stsram_32b_size, fsram_64b_size, tsram_51b_size, evsram_32b_size. The last four integer
values are Pico-specific.
"""
# Check that model is mapped
assert any([s.program is not None for s in model.sequences]), "Model needs to be mapped"
max_input_buffer_memory = 0
max_weight_memory = 0
max_stsram_32b_size = 0
max_fsram_64b_size = 0
max_tsram_51b_size = 0
max_evsram_32b_size = 0
for layer in model.layers:
if not layer.mapping:
continue
for np in layer.mapping.nps:
np_weight_size = np.mem_info.weight_size
if np.type == NP.Type.FNP3:
# FNP weight SRAM in 32-bit words, 48 bits are used per 50-bit word.
# We need to convert first to 32 bit by dividing by 4 to compute weight size
np_weight_size /= 4
# Compute weight size and convert back to bytes by multiplying by 4
np_weight_size = ceil(50 * np_weight_size / 48) * 4
max_input_buffer_memory = max(max_input_buffer_memory, np.mem_info.input_size)
max_weight_memory = max(max_weight_memory, np_weight_size)
max_stsram_32b_size = max(max_stsram_32b_size, np.mem_info.stsram_32b_size)
max_fsram_64b_size = max(max_fsram_64b_size, np.mem_info.fsram_64b_size)
max_tsram_51b_size = max(max_tsram_51b_size, np.mem_info.tsram_51b_size)
max_evsram_32b_size = max(max_evsram_32b_size, np.mem_info.evsram_32b_size)
return max_input_buffer_memory, max_weight_memory, max_stsram_32b_size, max_fsram_64b_size, \
max_tsram_51b_size, max_evsram_32b_size
def _get_outbounds(layer, layers):
return [ly for ly in layers if layer in ly.inbounds]
def _model_generator(layers):
# Scroll through a list of layers, returning a pair of consecutive layers. Notes:
# - one of the two branches must not have nodes (not implemented yet)
# - merge layer is performed in the following NP, so we take their inbounds
queue = [layers[-1]]
while len(queue) > 0:
t_layer = queue.pop(0)
inbounds = t_layer.inbounds
# Skip a layer if it is a merge one.
if len(inbounds) == 1 and len(inbounds[0].inbounds) > 1:
inbounds = inbounds[0].inbounds
# Check inbounds constraints.
if len(inbounds) > 1:
# In case of multiple branches, one of them must not contain layers.
# This translates to some inbound having multiple outbounds.
new_inbounds = []
for ly in inbounds:
# Remove the branch with empty layers
if len(_get_outbounds(ly, layers)) == 1:
new_inbounds.append(ly)
if len(new_inbounds) != 1:
raise NotImplementedError(f"{t_layer} has multiple inbounds, "
"but there is no empty branch.")
# Remove the inbounds that are not empty branches.
inbounds = new_inbounds
# Yield the pair (inbound, target_layer) if both have been mapped
# Or if target_layer is mapped and the inbound is an InputData layer.
if len(inbounds) == 1 and t_layer.mapping is not None:
if inbounds[0].parameters.layer_type == LayerType.InputData or \
inbounds[0].mapping is not None:
yield LayerSequence((inbounds[0], t_layer))
# Then, update the queue with the inbound layer.
queue.append(inbounds[0])
def _get_initial_skip_dma_channels(model):
# The initial number of skip DMAs is len(btc) + len(skips)
SKIP_LAYER_TYPES = [LayerType.Add, LayerType.Concatenate]
BTC_LAYER_TYPES = [LayerType.BufferTempConv, LayerType.DepthwiseBufferTempConv]
skip_dma_channels = 0
for ly in model.layers:
if ly.parameters.layer_type in SKIP_LAYER_TYPES + BTC_LAYER_TYPES:
skip_dma_channels += 1
return skip_dma_channels
def _get_initial_number_of_fnp(model):
# The initial number of FNP is len(dense), since they are not split
FNP_LAYER_TYPES = [LayerType.Dense1D]
nb_fnp = 0
for ly in model.layers:
if ly.parameters.layer_type in FNP_LAYER_TYPES:
nb_fnp += 1
return nb_fnp
def _get_np_components(model_or_pass, np_types=None):
total_nps = []
for layer in model_or_pass.layers:
if hasattr(layer.mapping, 'nps'):
for np in layer.mapping.nps:
if np_types is None or np.type in np_types:
total_nps.append(np)
if hasattr(layer.mapping, 'skipdma_loads'):
for np in layer.mapping.skipdma_loads:
if np_types is None or np.type in np_types:
total_nps.append(np)
if hasattr(layer.mapping, 'skipdma_stores'):
for np in layer.mapping.skipdma_stores:
if np_types is None or np.type in np_types:
total_nps.append(np)
return total_nps
def _compute_skip_dma_channels(model_or_pass):
# Compute the number of skip DMA channels as max(len(SKIP_DMA_STORE), len(SKIP_DMA_LOAD))
skip_dma_load = _get_np_components(model_or_pass, (NP.SKIP_DMA_LOAD,))
skip_dma_store = _get_np_components(model_or_pass, (NP.SKIP_DMA_STORE,))
return max(len(skip_dma_load), len(skip_dma_store))
def _compute_number_of_cnp_tnp(model_or_pass):
# Compute the number of CNP/TNP.
CNP_TNP_B_TYPES = (NP.CNP1, NP.CNP2, NP.TNP_B, NP.TNP_R)
total_cnps = _get_np_components(model_or_pass, CNP_TNP_B_TYPES)
return len(total_cnps)
def _compute_number_of_fnp(model_or_pass):
# Compute the number of FNP.
FNP_TYPES = (NP.FNP2, NP.FNP3)
total_fnps = _get_np_components(model_or_pass, FNP_TYPES)
return len(total_fnps)
def _compute_params_with_hwpr(model, **params):
initial_skip_dma_channels = _get_initial_skip_dma_channels(model)
num_cnp_tnp = num_fnp = 0
for layer_seq in _model_generator(model.layers):
# Compute the number of CNP/FNP needed to map the model in multiple passes,
# as the larger sum of 2 consecutive layers.
num_cnp_tnp = max(num_cnp_tnp, _compute_number_of_cnp_tnp(layer_seq))
num_fnp = max(num_fnp, _compute_number_of_fnp(layer_seq))
params.update({"num_cnp_tnp": num_cnp_tnp, "num_fnp": num_fnp})
# To compute the minimum number of skip DMA channels needed when partial reconfiguration
# is allowed, we iterate the device until we find a valid one.
num_skip_dma_channel = 0
for num_skip_dma_channel in range(1, initial_skip_dma_channels + 1):
try:
device = create_device(num_skip_dma_channel=num_skip_dma_channel, **params)
model.map(device, mode=MapMode.Minimal, hw_only=True)
break
except Exception:
continue
return num_cnp_tnp, num_fnp, num_skip_dma_channel
def _update_to_max(dst, other):
# Update `dst` in place by taking the maximum value of each SRAM field
# between `dst` and `other`.
dst.input_bytes = max(dst.input_bytes, other.input_bytes)
dst.weight_bytes = max(dst.weight_bytes, other.weight_bytes)
dst.stsram_32b_size = max(dst.stsram_32b_size, other.stsram_32b_size)
dst.fsram_64b_size = max(dst.fsram_64b_size, other.fsram_64b_size)
dst.tsram_51b_size = max(dst.tsram_51b_size, other.tsram_51b_size)
dst.evsram_32b_size = max(dst.evsram_32b_size, other.evsram_32b_size)
return dst
[docs]
def compute_min_device(model,
enable_hwpr=False,
sram_size=None,
minimal_memory=False):
"""Builds the Akida virtual device that can fit the model entirely
with or without reconfiguration.
Args:
model (akida.Model): the model used to determine the device.
enable_hwpr (bool, optional): if True, the device is computed leveraging partial
reconfiguration as much as possible. Defaults to False. (Ignore for Pico devices)
sram_size (NP.SramSize, optional): Size of shared SRAM available inside the mesh.
Ignored when `minimal_memory` is True. Defaults to None.
minimal_memory (bool, optional): if True, computes and sets the minimal required
inputs and weights memory for the device. Defaults to False.
Returns:
akida.Device: the computed device
"""
if not isinstance(model, Model):
raise TypeError(f"Expected model to be an {Model}, got {type(model)}.")
# Partial reconfiguration is not supported for Pico devices.
if model.ip_version == IpVersion.pico:
enable_hwpr = False
NUM_NPS_PER_NODE = 4
INITIAL_NUM_NODES = 256
if model.ip_version != IpVersion.v2 and model.ip_version != IpVersion.pico:
raise ValueError("Only IpVersion.v2 and IpVersion.pico models are supported. "
f"Current model version={model.ip_version}")
# Create a copy of the model to avoid modifying the original one.
model = Model(layers=model.layers)
# Compute a base device with which to compute the next parameters.
params = {"num_skip_dma_channel": _get_initial_skip_dma_channels(model),
"num_fnp": _get_initial_number_of_fnp(model),
"sram_size": sram_size}
if model.ip_version == IpVersion.pico:
params["num_cnp_tnp"] = 1 # Pico device has only 1 TNP
else:
params["num_cnp_tnp"] = NUM_NPS_PER_NODE * INITIAL_NUM_NODES - params["num_fnp"]
if params["num_cnp_tnp"] < 0:
raise ValueError("Impossible to compute base device: "
f"the number of initial nodes ({INITIAL_NUM_NODES}) is not enough.")
# Specify HW version from model
params["hw_version"] = Pico_FPGA if model.ip_version == IpVersion.pico else FPGA_v2
device = create_device(**params)
# Map model with the default parameters.
model.map(device, mode=MapMode.Minimal, hw_only=True)
# Now that the model has been mapped onto the base device,
# we can compute the parameters to build the required device.
if enable_hwpr:
num_cnp_tnp, num_fnp, num_skip_dma_channel = (
_compute_params_with_hwpr(model))
params["num_cnp_tnp"] = num_cnp_tnp
params["num_fnp"] = num_fnp
params["num_skip_dma_channel"] = num_skip_dma_channel
else:
params["num_cnp_tnp"] = _compute_number_of_cnp_tnp(model)
params["num_fnp"] = _compute_number_of_fnp(model)
params["num_skip_dma_channel"] = _compute_skip_dma_channels(model)
if minimal_memory:
if sram_size is not None:
warnings.warn(
"The 'sram_size' argument will be ignored because 'minimal_memory' is set to True. "
"The required memory will be computed automatically. Continuing execution"
)
sram_size = NP.SramSize(*compute_minimal_memory(model)[:2])
sram_size_pico = NP.SramSize(*compute_minimal_memory(model)[2:6])
params["sram_size"] = _update_to_max(sram_size, sram_size_pico)
# Create a virtual device with the requirements.
device = create_device(**params)
# Sanity check: map model on device.
try:
model.map(device, mode=MapMode.Minimal, hw_only=True)
except Exception as e:
raise RuntimeError("It was not possible to find a device for this model. "
f"Reason:\n{str(e)}")
return device
[docs]
def compute_common_device(ak_models, enable_hwpr=False):
"""Computes a common Akida device that can run all the given models.
Ensures all models were mapped.
Args:
ak_models (List[akida.Model]): A list of Akida models whose hardware
requirements will be combined.
enable_hwpr (bool, optional): if True, the device is computed leveraging partial
reconfiguration as much as possible. Defaults to False. (Ignore for Pico devices)
Returns:
akida.Device: A new device that can map all the given models.
"""
if not ak_models:
raise ValueError("The list of Akida models cannot be empty.")
if wrong_model_types := [type(m) for m in ak_models if not isinstance(m, Model)]:
raise TypeError(f"Devices cannot be computed for models of type {wrong_model_types}.")
if any(model.device is None for model in ak_models):
raise ValueError("All models must be mapped on a device.")
# Check that all models devices have the same version
if not all(model.device.version == ak_models[0].device.version for model in ak_models):
raise ValueError("Models devices have different versions.")
# Partial reconfiguration is not supported for Pico devices.
if ak_models[0].ip_version == IpVersion.pico:
enable_hwpr = False
include_hrc = any(model.device.mesh.hrc for model in ak_models)
max_num_cnp_tnp = 0
max_num_fnp = 0
max_num_skip_dma_channel = 0
sram_size = NP.SramSize(0, 0)
for model in ak_models:
# Update params
if enable_hwpr:
num_cnp_tnp, num_fnp, num_skip_dma_channel = (
_compute_params_with_hwpr(model,
sram_size=model.device.mesh.np_sram_size,
include_hrc=include_hrc))
max_num_cnp_tnp = max(max_num_cnp_tnp, num_cnp_tnp)
max_num_fnp = max(max_num_fnp, num_fnp)
max_num_skip_dma_channel = max(max_num_skip_dma_channel,
num_skip_dma_channel)
else:
for sequence in model.sequences:
for pass_ in sequence.passes:
max_num_cnp_tnp = max(max_num_cnp_tnp,
_compute_number_of_cnp_tnp(pass_))
max_num_fnp = max(max_num_fnp,
_compute_number_of_fnp(pass_))
max_num_skip_dma_channel = max(max_num_skip_dma_channel,
_compute_skip_dma_channels(pass_))
# Update Sram size
_update_to_max(sram_size, model.device.mesh.np_sram_size)
return create_device(max_num_cnp_tnp, max_num_fnp,
max_num_skip_dma_channel, include_hrc,
sram_size, ak_models[0].device.version)