Source code for quantizeml.models.calibrate

#!/usr/bin/env python
# ******************************************************************************
# Copyright 2023 Brainchip Holdings Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ******************************************************************************

__all__ = ["calibrate", "calibration_required"]

import numpy as np
import tensorflow as tf

from copy import deepcopy
from keras.layers import (Conv2D, Conv2DTranspose, SeparableConv2D, DepthwiseConv2D, Dense, Dropout,
                          Reshape, Flatten, MaxPool2D, GlobalAvgPool2D)

from .transforms.insert_layer import insert_in_config
from .transforms.transforms_utils import get_layers_by_type
from .utils import apply_weights_to_model
from ..layers import (OutputQuantizer, OutputObserver, PaddedConv2D, DepthwiseConv2DTranspose,
                      Attention, BufferTempConv, DepthwiseBufferTempConv, StatefulRecurrent)
from ..tensors import FixedPoint, floor_log2


[docs]def calibration_required(model): """Checks if a model requires calibration. If one of the 'OutputQuantizer' layers in the model has its range_max variable set to 1, it requires calibration. Args: model (keras.Model): the model to check Returns: bool: True if calibration is required, False otherwise. """ calibrables = get_layers_by_type(model, OutputQuantizer) for calibrable in calibrables: # If the model has never been calibrated, all range_max of the OutputQuantizer objects # will be set to 1. if tf.reduce_all(calibrable.variables[0] == 1): return True # all calibrable objects are set return False
def _get_calibration_model(model, qmodel): """Builds a calibration model with OutputObserver added between blocks. Args: model (keras.Model): input model qmodel (keras.Model): quantized model Returns: keras.Model, dict: the calibration ready model and dict mapping end of block layer names to the name of their OutputObserver. """ # Get model config to edit config = deepcopy(model.get_config()) # Insert OutputObservers where OutputQuantizers are end_of_blocks = {} for layer in qmodel.layers: if getattr(layer, 'out_quantizer', None): # Build an observer and store it for future use observer = OutputObserver(layer.out_quantizer.axis) end_of_blocks[layer.name] = observer.name # Insert observer in config insert_in_config(model, layer.name, observer, config) # Build the calibration model from the config custom_objects = {"OutputObserver": OutputObserver} calibration_model = model.from_config(config, custom_objects) # Load original weights variables_dict = {var.name: var for var in model.variables} apply_weights_to_model(calibration_model, variables_dict, False) return calibration_model, end_of_blocks def _get_next_layer(layer, supported_layers, skippable_layers, reshape_ops): """ Finds the layer following a target layer. Args: layer (keras.Layer): the layer of interest supported_layers (list): layer types that will support equalization skippable_layers (list): layer types that can be skipped reshape_ops (list): list of reshape operation encountered as [(input_shape,), (output_shape,)] Returns: keras.Layer: the layer following the layer of interest if valid, None otherwise """ # Limit support to single outbound outbounds = layer.outbound_nodes if len(outbounds) != 1: return None next_layer = outbounds[0].layer # If the layer is supported, it is a valid candidate if isinstance(next_layer, supported_layers): return next_layer # If the next layer can be skipped, recursively call the function elif isinstance(next_layer, skippable_layers): return _get_next_layer(next_layer, supported_layers, skippable_layers, reshape_ops) # If the next layer is a Reshape or Flatten store the performed ops and continue (limit reshape # ops to a single layer) elif isinstance(next_layer, (Reshape, Flatten)) and len(reshape_ops) == 0: reshape_ops.append([next_layer.input_shape[1:], next_layer.output_shape[1:]]) return _get_next_layer(next_layer, supported_layers, skippable_layers, reshape_ops) # If next layer is not supported, alignment cannot happen return None def _set_and_equalize(qmodel, layer_name, range_max): """ Set 'layer_name' output quantizer range_max to the ideal value. The ideal range_max is computed from the maximum value the target layer OutputQuantizer's can represent given it's bitwidth, and the float calibrated range_max. The ratio between the calibrated value and the ideal value is stored in the OutputQuantizer and will be applied when the scale_out operation happens. Equalization happens by dividing weights of the next layer by the ratio to preserve global outputs. Args: qmodel (keras.Model): quantized keras model layer_name (str): layer name where to set the range_max range_max (tf.Tensor): the float calibrated range_max """ # First set the calibrated range_max in the target OutputQuantizer target_layer = qmodel.get_layer(layer_name) target_layer.out_quantizer.range_max.assign(range_max) # Skip Attention layers because they do not perform a scale out so rescaling rate cannot be # applied if isinstance(target_layer, Attention): return # Define layers that will support or can be skipped when equalizing supported_layers = (Conv2D, PaddedConv2D, Conv2DTranspose, SeparableConv2D, DepthwiseConv2D, Dense, DepthwiseConv2DTranspose, BufferTempConv, DepthwiseBufferTempConv) skippable_layers = (Dropout, MaxPool2D, GlobalAvgPool2D) # When quantized per-tensor, Reshaping layers can be skipped as the rescaling rate single value # will be used during equalization on all weights values if target_layer.out_quantizer.axis == 'per-tensor': skippable_layers += (Reshape, Flatten) # Retrieve next_layer: if there is no candidate layer following layer_name, equalization cannot # happen reshape_ops = [] next_layer = _get_next_layer(target_layer, supported_layers, skippable_layers, reshape_ops) # For now cross layer equalization for (Dephwise)BufferTempConvLayers is possible only if their # input is quantized per-tensor. if next_layer is None or (isinstance(next_layer, (BufferTempConv, DepthwiseBufferTempConv)) and target_layer.out_quantizer.axis == 'per-axis'): return # Compute ideal range_max bitwidth = target_layer.out_quantizer.bitwidth frac_bits = bitwidth - floor_log2(range_max) int_max = FixedPoint.int_max(bitwidth) ideal_range_max = FixedPoint(int_max, bitwidth, frac_bits).to_float().numpy() # Check that values are different if np.all(ideal_range_max == range_max): return # Set ideal_range_max into the target OutputQuantizer target_layer.out_quantizer.range_max.assign(ideal_range_max) # range_max can be approximately 0 on some channels (eg. a ReLU where an input channel # had all negative values). When that happens, range_max set on output quantizer # (ideal_range_max) is exactly zero. In that case rescaling_rate is forced to 1 # so that cross-layer equalization does nothing. if target_layer.out_quantizer.axis == 'per-axis': range_max = range_max.numpy() range_max[ideal_range_max == 0] = 1 ideal_range_max[ideal_range_max == 0] = 1 # Compute the rescaling rate rescaling_rate = ideal_range_max / range_max # Set rate in the target OutputQuantizer target_layer.out_quantizer.rescaling_rate.assign(rescaling_rate) # Divide weights of the next layer by the rescaling rate next_weights = next_layer.get_weights() new_weights = next_weights[0] # Apply reshape ops if reshape_ops: # Supporting a single reshape ops reshape_ops = reshape_ops[0] # Check that last dimension is unchanged by the reshape ops, if that's not the case, the op # needs to be undone on weights, eg. for Flatten: X*Y*C, F -> X, Y, C, F if reshape_ops[0][-1] != reshape_ops[1][-1]: F = (new_weights.shape[-1], ) new_weights = np.reshape(new_weights, reshape_ops[0] + F) # Invert last two dimensions (input and output channels), eg. for flatten: # X, Y, C, F -> X, Y, F, C axes = list(range(0, len(new_weights.shape))) assert len(axes) > 1 axes[-2], axes[-1] = axes[-1], axes[-2] new_weights = np.transpose(new_weights, axes) else: # Expand dims to allow broadcasting on the expected dimension rescaling_rate = np.expand_dims(rescaling_rate, -1) # Apply rescaling rate new_weights /= rescaling_rate # Undo reshape ops if reshape_ops: # Revert last dimensions swap, eg. for Flatten: X, Y, F, C -> X, Y, C, F new_weights = np.transpose(new_weights, axes) # Undo the reshape op on weights when last dimension is changed, eg. for Flatten: # X, Y, C, F -> X*Y*C, F if reshape_ops[0][-1] != reshape_ops[1][-1]: F = (new_weights.shape[-1], ) new_weights = np.reshape(new_weights, reshape_ops[1] + F) next_weights[0] = new_weights next_layer.set_weights(next_weights) def _warmup_calib(calibration_model, warmup_samples, end_of_blocks, batch_size): """ Helper method to calibrate the buffer TENN models with warmpup period. Args: calibration_model (keras.Model): the calibration model. warmup_samples (tf.Dataset, np.array or generator, optional): warmup samples. end_of_blocks (dict): dict mapping end of block layer names to the name of their OutputObserver. batch_size (int): calibration batch size. """ # Fill the fifos. calibration_model.predict(x=warmup_samples, batch_size=batch_size) # reset the range_max for _, observer in end_of_blocks.items(): range_max = calibration_model.get_layer(observer).range_max range_max.assign(tf.ones_like(range_max)) def _split_samples(samples, buffer_layers, batch_size): """Helper to split the samples into warmup and calibration samples for the TENN models Args: samples (np.array): samples. buffer_layers (list: keras.Layers): the model (Depthwise)BufferTempConv layers batch_size (int): calibration batch_size. Returns: np.array, np.array, int: warmup samples, effective calibration samples and warmup steps. """ # Extract warmup_factor. The number of samples required to fill all model fifos. warmup_factor = 0 for buf_layer in buffer_layers: warmup_factor += buf_layer.kernel_size if warmup_factor * batch_size > samples.shape[0]: raise ValueError(f"Number of samples must be greater than {warmup_factor} * batch_size.") # Extract warmup samples warmup_samples = samples[:warmup_factor * batch_size] # Extract calibration samples calib_samples = samples[warmup_factor * batch_size:] return warmup_samples, calib_samples, warmup_factor
[docs]def calibrate(model, qmodel, samples=None, num_samples=1024, batch_size=None, epochs=1): """Calibrates the model using the provided samples. With TENN models only np.array samples are supported for calibration. Those should have a temporally coherent data, which means that their expected shape is [batch_size*Seq, dim_0,, ..., dim_n] for spatiotemporal TENNs where: - batch_size is the same batch_size provided to the calibration. - Seq is a dataset parameter that defines the temporally coherent data (eg number of frames per video clips). and [batch_size, (model.input_shape)] for recurrent TENNs. When no samples are provided, random samples are generated. Args: model (keras.Model): the original model qmodel (keras.Model): the quantized model to calibrate samples (tf.Dataset, np.array or generator, optional): calibration samples. When no samples are provided, random samples are generated. Defaults to None. num_samples (int, optional): number of samples to use in the provided samples or number of samples to generate. Defaults to 1024. batch_size (int, optional): the batch size. Defaults to None. epochs (int, optional): the number of epochs. Defaults to 1. """ if not calibration_required(qmodel): return # Disable sumbsampling in StatefulRecurrent layers before building the calibration model rec_layers = get_layers_by_type(model, StatefulRecurrent) for ly in rec_layers: ly.subsample_ratio = None # Build a calibration model which is a float model with OutputObservers at locations where the # quantized model has OutputQuantizers. calibration_model, end_of_blocks = _get_calibration_model(model, qmodel) if samples is None: # Generate random samples samples_shape = (num_samples, ) + qmodel.input_shape[1:] # Handle image like samples (channels in [1, 3]) as uint8, recurrent TENNs as int16 and # other as int8 if samples_shape[-1] in [1, 3]: samples_type = np.uint8 elif len(rec_layers) > 0: samples_type = np.int16 else: samples_type = np.int8 rng = np.random.default_rng() iinfo = np.iinfo(samples_type) samples = rng.integers(iinfo.min, iinfo.max, samples_shape, dtype=samples_type) # Extract number of (Depthwise)BufferTempConv layers buf_layers = get_layers_by_type(calibration_model, (BufferTempConv, DepthwiseBufferTempConv)) if buf_layers or rec_layers: if not isinstance(samples, np.ndarray): raise TypeError("TENN models calibration is only possible with np.array samples." f" Received {type(samples)}. Please convert them to a compatible" " format before the calibration. \n" " Spatiotemporal expects the following shape:" " [batch_size*Seq, dim_0,, ..., dim_n] where:" " - batch_size is the same batch_size provided to the calibration." " - Seq is a dataset parameter that defines the temporally coherent" " data (eg number of frames per video clips). \n" " Recurrent expects [batch_size, (model.input_shape)] array.") if buf_layers: # With ST TENN models, the number of samples is infered from the provided samples num_samples = len(samples) if batch_size is None: raise ValueError("batch_size not specified. With spatiotemporal TENN models " "batch_size must be given for the calibration, otherwise the " "result is unpredictible. Refer to 'calibrate' function docstring " "for more details.") # The number of samples must be a multiple of the batch_size to prevent a reset of the # FIFO if num_samples % batch_size != 0: raise ValueError("When calibrating spatiotemporal TENN, num_samples must be a " f"multiple of batch_size, got num_samples={num_samples} and " f"batch_size={batch_size}.") # Compute step value otherwise 'predict' will run until samples are exhausted (ie. indefinitely # if samples is a dataset with repeat enabled) if batch_size is None: steps = num_samples else: assert batch_size > 0, "The batch size should be strictly positive." steps = np.ceil(num_samples / batch_size) for i in range(epochs): if rec_layers: # Ensure graph execution model_func = tf.function(calibration_model) for i in range(samples.shape[1]): model_func(np.expand_dims(samples[:, i], 1)) else: # Set default calib samples and number of steps calib_samples = samples calibration_steps = steps # If the model is a buffer TENN, it normally requires a warmup period on the first epoch # in order to calibrate it. For that purpose batch_size is mandatory. if len(buf_layers) > 0 and batch_size and i == 0: warmup_samples, calib_samples, warmup_steps = _split_samples(samples, buf_layers, batch_size) _warmup_calib(calibration_model, warmup_samples, end_of_blocks, batch_size) calibration_steps = steps - warmup_steps calibration_model.predict( x=calib_samples, steps=calibration_steps, batch_size=batch_size) # Update quantized model OutputQuantizers range_max using OutputObservers calibrated values for eob, observer in end_of_blocks.items(): _set_and_equalize(qmodel, eob, calibration_model.get_layer(observer).range_max)