Source code for quantizeml.onnx_support.layers.quantizers
#!/usr/bin/env python
# ******************************************************************************
# Copyright 2023 Brainchip Holdings Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ******************************************************************************
__all__ = ["InputQuantizer", "Dequantizer", "get_input_quantizer"]
import numpy as np
from onnx.helper import make_node
from onnx import AttributeProto as AP
from onnxruntime.quantization.calibrate import TensorData
from .base_layer import OnnxLayer
from .layer_compatibility import check_node_link_to_input, check_node_has_one_initializer
from ..graph_tools import (TENSOR_SHAPE, value_info_to_tensor_shape, array_to_tp, get_field,
get_variable, get_node, has_field, to_field)
from ..quantization.input_scale import input_zp_scale, needs_zp
def get_input_quantizer(nodes, graph):
check_node_link_to_input(nodes[0], graph)
perm = None
transpose_node = get_node(nodes, "Transpose")
if (transpose_node := get_node(nodes, "Transpose")) is not None:
perm = get_field(transpose_node, "perm")
input_ts = graph.input[0]
input_unsigned = needs_zp(graph, nodes[-1].output[0])
input_quantizer = InputQuantizer(name="quantize",
input_tp=input_ts,
input_signed=not input_unsigned,
perm=perm)
# Sets rescaling weights
if (mul_node := get_node(nodes, "Mul")) is not None:
check_node_has_one_initializer(mul_node, graph)
input_quantizer.set_weight("input_scale", get_variable(mul_node.input[1], graph))
if (add_node := get_node(nodes, "Add")) is not None:
check_node_has_one_initializer(add_node, graph)
input_quantizer.set_weight("offset", get_variable(add_node.input[1], graph))
return input_quantizer
def _compute_quantization_parameters(r_scale, r_offset, out_tensor_range, signed):
assert isinstance(out_tensor_range, TensorData)
# Computing the ranges before the Rescale node (e.g. the model inputs)
in_tensor_range = [(x - r_offset) / r_scale for x in out_tensor_range.range_value]
in_tensor_range = TensorData(lowest=np.minimum(*in_tensor_range),
highest=np.maximum(*in_tensor_range))
# Compute scale and zero point to quantize the inputs
input_scale, input_zp = input_zp_scale(in_tensor_range, allow_zp=not signed)
# Compute scale and output to dequantize the outputs
output_scale, output_zp = input_zp_scale(out_tensor_range, allow_zp=not signed)
# Check constraints
err_msg = "Impossible to quantize inputs when folding rescale: "
if not signed:
# Compare if rescale is valid
np.testing.assert_allclose(input_scale / output_scale,
r_scale,
atol=1e-3,
err_msg=err_msg + "input/output scales ratio is not valid.")
else:
np.testing.assert_equal(r_offset, 0, err_msg + "offset must be zero when input is signed.")
return (input_scale, input_zp), (output_scale, output_zp)
[docs]
class InputQuantizer(OnnxLayer):
"""Intermediate representation of QuantizeLinear(), use to quantize the input.
Args:
input_tp (TensorProto): the input of the ONNX model.
perm (list, optional): list representing the permutations of the rescale node.
Defaults to None.
input_signed (bool, optional): whether the input is signed. Defaults to False.
name (str, optional): the node name. Defaults to ''.
"""
def __init__(self, input_tp, perm=None, input_signed=False, name=''):
super().__init__("InputQuantizer", name=name, perm=perm)
self.input_signed = input_signed
self._input = [input_tp]
# Declare weights
self._add_weight("input_scale")
self._add_weight("offset")
def __build__(self, downscale=True):
assert downscale, f"{self.name} ({self.base_name}) does not support 32bit output"
input_ts = value_info_to_tensor_shape(self.input)
assert input_ts.dtype == np.float32
# Add/initialize weights
zp_dtype = "int8" if self.input_signed else "uint8"
self._add_weight("zero_point", value=np.zeros(input_ts.shape[1]), dtype=zp_dtype)
if self.weights["input_scale"].size == 0:
self.set_weight("input_scale", np.ones((), dtype="float32"))
if self.weights["offset"].size == 0:
self.set_weight("offset", np.zeros((), dtype="float32"))
# Update perm attribute
input_ndim = len(input_ts.shape)
if has_field(self, "perm"):
perm = get_field(self, "perm")
else:
perm = list(range(input_ndim))
self.attribute.append(to_field("perm", perm))
# Compute output shape
output_shape = tuple(input_ts.shape[i] for i in perm)
output_ts = TENSOR_SHAPE(output_shape, np.dtype(zp_dtype))
# Assert wrong weights format
first_channel_expected_shape = (1,) + output_shape[1:2] + (1,) * (input_ndim - 2)
if (self.weights["input_scale"].size != 1 and
self.weights["input_scale"].shape != first_channel_expected_shape):
raise ValueError(f"Unsupported 'input_scale' in {self.name} ({self.base_name}): "
"it must be brodcastable in the channels dimension.")
if (self.weights["offset"].size != 1 and
self.weights["offset"].shape != first_channel_expected_shape):
raise ValueError(f"Unsupported 'offset' in {self.name} ({self.base_name}): "
"it must be brodcastable in the channels dimension.")
return output_ts
def __quantize__(self, out_tensor_range, force_fp=False):
if force_fp:
raise NotImplementedError("Force input scale to be a FP is not implemented yet.")
# Calibration was done by axis=1. Therefore we can squeeze dimension in mean/offset
rescale_scale = np.squeeze(self.weights["input_scale"])
rescale_offset = np.squeeze(self.weights["offset"])
# Computing quantization parameters
input_scale_zp, output_scale_zp = _compute_quantization_parameters(rescale_scale,
rescale_offset,
out_tensor_range,
self.input_signed)
# Scale to set in weights is the reciprocal of ONNX calibrated one.
input_scale = np.array(1 / input_scale_zp[0], dtype=np.float32)
# Compute weights to serialize
weights = {f"{self.name}_scale": input_scale, f"{self.name}_zp": input_scale_zp[1]}
# Save zero point (used by next layer)
self.set_weight("zero_point", output_scale_zp[1])
return weights, output_scale_zp[0]
@staticmethod
def build_subgraph(op_type):
nodes = [make_node('Transpose', inputs=["X"], outputs=["Yi"])]
nodes[-1].attribute.append(AP(name="perm", ref_attr_name="perm", type=AP.INTS))
nodes.append(make_node('QuantizeLinear', inputs=["Yi", "scale", "zp"], outputs=["Y"]))
return nodes
[docs]
class Dequantizer(OnnxLayer):
"""Intermediate representation of DequantizeLinear(), use to dequantize the input.
Args:
name (str, optional): the node name. Defaults to ''.
"""
def __init__(self, name=''):
super().__init__("Dequantizer", name=name)
def __build__(self, input_ts, downscale=True):
assert input_ts.dtype in (np.int8, np.int32)
# Compute output shape
output_ts = TENSOR_SHAPE(input_ts.shape, np.dtype("float32"))
return output_ts
def quantize(self, qinput):
# To keep homogenity with the other layers, this function is called 'quantize'
# even though it does the opposite (dequantize): apply scale in the input integers.
if self._output is None or self._input is None:
# Build the layer if required
self.build(qinput.output)
# Scale to set in weights is the reciprocal of ONNX calibrated one.
i_scale = qinput.weights["scale"]
scale = np.array(1 / i_scale, dtype=np.float32)
# Return ONNX node and weights
weights = {f"{self.name}_scale": scale}
inputs = [ts.name for ts in self._input] + list(weights)
onnx_node = self.make_node(inputs, [self.output.name])
onnx_weights = array_to_tp(**weights)
return onnx_node, onnx_weights
@staticmethod
def build_subgraph(op_type):
return [make_node('DequantizeLinear', inputs=["X", 'scale'], outputs=["Y"])]