Source code for akida_models.sparsity

#!/usr/bin/env python
# ******************************************************************************
# Copyright 2024 Brainchip Holdings Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ******************************************************************************
__all__ = ["compute_sparsity"]

import numpy as np
import tensorflow as tf
from collections import deque, defaultdict

import akida
from tensorflow.keras import Model

from akida.core import LayerType, Model as ak_model

import onnxruntime
from onnx import ModelProto
from quantizeml.onnx_support.quantization.quantize import ONNXModel
from quantizeml.onnx_support.quantization.transforms import sanitize
from quantizeml.onnx_support.graph_tools import value_info_to_tensor_shape
from quantizeml.random import generate_np_random_samples
from quantizeml.models.transforms.transforms_utils import get_layers_by_type
from quantizeml.layers import BufferTempConv, DepthwiseBufferTempConv


[docs] def compute_sparsity(model, layer_names=None, layer_types=None, samples=None, batch_size=100, verbose=False): """Compute the sparsity of a model across selected layers and samples. This function loads a model from the specified path and computes the sparsity for specified layers (if not specified, ReLU layers for float models, layers with OutputQuantizer for quantized models, all layers for Akida models and Clip and ReLU nodes for ONNX models) using input samples. Args: model (keras.Model or onnx.ModelProto or akida.Model): the model to be analyzed. layer_names (list, optional): List of layer names to compute sparsity for. If None, sparsity is computed for relevant layers. Defaults to None. layer_types (tuple, optional): The types of layers for which sparsity will be computed. layer_types is a tuple of type when compute_sparsity is applied on Keras or Akida models, and a tuple of str when applied on ONNX models. Defaults to None. samples (np.ndarray): the sample to compute the sparsity. If None random samples are generated samples. Defaults to None. batch_size (int, optional): The number of tensors to generate or extract. Defaults to 100. verbose (bool, optional): Whether to print sparsity results. Defaults to False. Raises: Exception: If the model cannot be loaded or an unsupported model format is provided. Returns: dict: A dictionary where keys are layer/node names and values are the computed sparsity values. """ if layer_names and layer_types: raise ValueError( "It is not possible to request both options, " + "either provide layer_names or layer_types parameter.") if isinstance(model, Model): samples_shape = model.input_shape[1:] dtype = model.dtype elif isinstance(model, ak_model): samples_shape = tuple(model.input_shape) first_layer = model.layers[0] if first_layer.parameters.layer_type == akida.LayerType.InputData: dtype = "int" if first_layer.output_signed else "uint" dtype += f"{np.ceil(first_layer.input_bits/8)*8:.0f}" dtype = np.dtype(dtype) else: dtype = np.uint8 elif isinstance(model, ModelProto): model = ONNXModel(model) samples_shape, dtype = value_info_to_tensor_shape(model.input[0]) samples_shape = samples_shape[1:] else: raise NotImplementedError("Sparsity computation is only " + "supported for Keras and Akida and ONNX models.") if samples is not None: if batch_size > len(samples): raise ValueError("Batch size exceeds the available number " + "of tensors in the dataset.") samples = samples[:batch_size, ...].astype(dtype) else: samples = generate_np_random_samples(size=(batch_size,) + samples_shape, dtype=dtype) if isinstance(model, Model): results = _compute_sparsity_tf(model=model, layer_names=layer_names, layer_types=layer_types, samples=samples) elif isinstance(model, ak_model): results = _compute_sparsity_ak(model=model, layer_names=layer_names, layer_types=layer_types, samples=samples) elif isinstance(model, ONNXModel): results = _compute_sparsity_onnx(model=model, node_names=layer_names, node_types=layer_types, samples=samples) if verbose: for layer, sparsity in results.items(): print(f"{layer} : {float(f'{sparsity:.3f}')}") print(f'Mean sparsity : {np.mean(list(results.values()))}') return results
def _compute_sparsity_tf(model, samples, layer_names=None, layer_types=None): """Compute the sparsity of selected layers in a TensorFlow model. This function calculates the sparsity (proportion of zero-valued elements) in specific layers of a TensorFlow model across a set of input samples. If layer names are provided, it computes sparsity for those layers, otherwise, it computes sparsity for all ReLU layers or layers with an OutputQuantizer. Args: model (tf.keras.Model): The TensorFlow model to analyze. samples (list): List of input samples to evaluate sparsity on. layer_names (list, optional): List of layer names to compute sparsity for. Defaults to None. layer_types (tuple of type, optional): The types of layers for which sparsity will be computed. Defaults to None. If None, it will compute sparsity for all ReLU or QuantizedReLU layers. Returns: dict: A dictionary where keys are layer names and values are the computed sparsity values. """ outputs = [] results = {} if layer_names: target_layers = [model.get_layer(lname) for lname in layer_names] elif layer_types: target_layers = [layer for layer in model.layers if isinstance(layer, layer_types)] else: target_layers = [layer for layer in model.layers if isinstance(layer, tf.keras.layers.ReLU) or hasattr(layer, 'out_quantizer') and layer.out_quantizer] if target_layers == []: raise ValueError("No layers found to compute the sparsity") for layer in target_layers: outputs.append(layer.output) # Force the new model to generate a list of tensors if len(outputs) == 1: outputs = [outputs] new_model = Model(inputs=model.inputs, outputs=outputs, name="sparsity") if get_layers_by_type(model, (BufferTempConv, DepthwiseBufferTempConv)): all_outputs = new_model(samples[0][None, ...]) for frame in samples[1:]: frame = frame[None, ...] outputs = new_model(frame) for i, output in enumerate(outputs): all_outputs[i] = tf.concat([all_outputs[i], output], axis=0) else: all_outputs = new_model(samples) # If there is only one tensor in `target_layers`, wrap `all_outputs` in a list # to ensure `all_outputs` is always a list if len(target_layers) == 1: all_outputs = [all_outputs] for layer, out in zip(target_layers, all_outputs): if isinstance(out, tf.Tensor): results[layer.name] = np.sum(out == 0) / np.prod(out.shape) else: # out is a FixedPoint or QTensor results[layer.name] = np.sum(out.values == 0) / np.prod(out.shape) return results def _compute_sparsity_ak(model, samples, layer_names=None, layer_types=None): """Compute the sparsity of selected layers in an Akida model. This function calculates the sparsity (proportion of zero-valued elements) for specific layers in an Akida model by running forward passes on input samples. If layer names are provided, it computes sparsity for those layers; otherwise, it computes sparsity for all layers that have activation functions, excluding input and dequantizer layers. Args: model (akida.Model): The Akida model to analyze. samples (numpy.ndarray): Input samples to evaluate sparsity on. layer_names (list, optional): List of layer names to compute sparsity for. Defaults to None. layer_types (tuple of type, optional): The types of layers for which sparsity will be computed. Defaults to None. If None, sparsity is computed for all applicable layers with activations. Returns: dict: A dictionary where keys are layer names and values are the computed sparsity values. """ def _search_layers_to_build_submodel(layer): queue, visited = [layer], [] in_degree, dependents, queue_sort = {}, defaultdict(list), deque() # Search all the inbounds up to the input while len(queue) > 0: target_layer = queue.pop(0) visited.insert(0, target_layer) # Insert in queue all the inbounds of target layer queue.extend([ly for ly in target_layer.inbounds if ly not in visited]) # Update dictionnaries requires to sort layer list in_degree[target_layer] = len(target_layer.inbounds) if in_degree[target_layer] == 0: queue_sort.append(target_layer) for inbound in target_layer.inbounds: dependents[inbound].append(target_layer) # Topologically sort the inbounds layers, using Kahn's algorithm sorted_layers = [] while queue_sort: current = queue_sort.popleft() sorted_layers.append(current) # Reduce the in-degree of its dependents for dependent in dependents[current]: in_degree[dependent] -= 1 if in_degree[dependent] == 0: queue_sort.append(dependent) return sorted_layers results = {} if layer_names: target_layers = [model.get_layer(lname) for lname in layer_names] elif layer_types: target_layers = [layer for layer in model.layers if layer.parameters.layer_type in layer_types] else: target_layers = [layer for layer in model.layers if layer.parameters.layer_type not in (LayerType.InputData, LayerType.Dequantizer)] if target_layers == []: raise ValueError("No layers found to compute the sparsity") for layer in target_layers: sub_model = akida.Model(layers=_search_layers_to_build_submodel(layer)) output = sub_model.forward(samples) results[layer.name] = np.count_nonzero(output == 0) / np.prod(output.shape) return results def _compute_sparsity_onnx(model, samples, node_names=None, node_types=None): """Compute the sparsity of selected nodes in an ONNX model. This function calculates the sparsity (proportion of zero-valued elements) for specific nodes in an ONNX model by running forward passes on input samples. If node names are provided, it computes sparsity for those nodes; otherwise, it computes sparsity for Clip and ReLU nodes. Args: model (onnx.ModelProto): The ONNX model to analyze. samples (numpy.ndarray): Input samples to evaluate sparsity on. node_names (list, optional): List of node names to compute sparsity for. Defautls to None. node_types (tuple of str, optional): The types of nodes for which sparsity will be computed. Defaults to None. If None, sparsity is computed for Clip and ReLU nodes. Returns: dict: A dictionary where keys are node names and values are the computed sparsity values. """ is_quantized = any(node.domain == "com.brainchip" for node in model.nodes()) if not is_quantized: model = sanitize(model) if node_names: target_nodes = [] for name in node_names: if (tnode := model.find_node_by_name(name)) is None: all_node_names = [node.name for node in model.nodes()] raise ValueError(f'No such node: {name}. Existing nodes are: [{all_node_names}].') target_nodes.append(tnode) elif node_types: target_nodes = [node for node in model.nodes() if node.op_type in node_types] elif is_quantized: target_nodes = [node for node in model.nodes() if node.op_type not in ("InputQuantizer", "Dequantizer")] else: target_nodes = [node for node in model.nodes() if node.op_type in ["Relu", "Clip"]] if len(target_nodes) == 0: raise ValueError("No nodes found to compute the sparsity") # Create an intermediate model with the inputs of target_nodes model.graph().ClearField("output") out_names = [] for node in target_nodes: for oname in node.output: vi = model.find_value_info_by_name(oname) if vi is not None: model.output.append(vi) out_names.append(vi.name) samples_dict = {} samples_dict[model.input[0].name] = samples outputs = onnxruntime.InferenceSession(model.serialized).run(out_names, samples_dict) # Compute sparsity per node results = {} for node, output in zip(target_nodes, outputs): results[node.name] = np.count_nonzero(output == 0) / np.prod(output.shape) return results