#!/usr/bin/env python
# ******************************************************************************
# Copyright 2023 Brainchip Holdings Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ******************************************************************************
__all__ = ["InputQuantizer", "Dequantizer", "get_input_quantizer"]
import numpy as np
import uuid
from onnx.helper import make_node, make_tensor_value_info, np_dtype_to_tensor_dtype
from onnx import AttributeProto as AP
from onnx import ValueInfoProto
from onnxruntime.quantization.calibrate import TensorData
from .base_layer import OnnxLayer
from .layer_compatibility import check_node_link_to_input, check_node_has_one_initializer
from ..graph_tools import (TENSOR_SHAPE, value_info_to_tensor_shape, array_to_tp, get_field,
get_variable, get_node, has_field, to_field)
from ..quantization.input_scale import input_zp_scale
from ...models import get_quantization_params
def get_input_quantizer(nodes, graph):
check_node_link_to_input(nodes[0], graph)
perm = None
transpose_node = get_node(nodes, "Transpose")
if (transpose_node := get_node(nodes, "Transpose")) is not None:
perm = get_field(transpose_node, "perm")
input_ts = graph.input[0]
input_signed = get_quantization_params().input_dtype.kind == "i"
input_quantizer = InputQuantizer(name="quantize",
input_tp=input_ts,
input_signed=input_signed,
perm=perm)
# Sets rescaling weights
if (mul_node := get_node(nodes, "Mul")) is not None:
check_node_has_one_initializer(mul_node, graph)
input_quantizer.set_weight("input_scale", get_variable(mul_node.input[1], graph))
if (add_node := get_node(nodes, "Add")) is not None:
check_node_has_one_initializer(add_node, graph)
input_quantizer.set_weight("offset", get_variable(add_node.input[1], graph))
return input_quantizer
def _compute_quantization_parameters(r_scale, r_offset, out_tensor_range, signed):
assert isinstance(out_tensor_range, TensorData)
# Computing the ranges before the Rescale node (e.g. the model inputs)
in_tensor_range = [(x - r_offset) / r_scale for x in out_tensor_range.range_value]
in_tensor_range = TensorData(lowest=np.minimum(*in_tensor_range),
highest=np.maximum(*in_tensor_range))
# Compute scale and zero point to quantize the inputs
input_scale, input_zp = input_zp_scale(in_tensor_range, allow_zp=not signed)
# Compute scale and output to dequantize the outputs
output_scale, output_zp = input_zp_scale(out_tensor_range, allow_zp=not signed)
# Check constraints
err_msg = "Impossible to quantize inputs when folding rescale: "
if not signed:
# Compare if rescale is valid
np.testing.assert_allclose(input_scale / output_scale,
r_scale,
atol=1e-3,
err_msg=err_msg + "input/output scales ratio is not valid.")
else:
np.testing.assert_equal(r_offset, 0, err_msg + "offset must be zero when input is signed.")
return (input_scale, input_zp), (output_scale, output_zp)
[docs]
class Dequantizer(OnnxLayer):
"""Intermediate representation of DequantizeLinear(), use to dequantize the inputs.
Args:
name (str, optional): the node name. Defaults to ''.
"""
def __init__(self, name=''):
super().__init__("Dequantizer", name=name)
def __build__(self, *input_ts):
assert len(input_ts) >= 1
assert [ts.dtype in (np.int8, np.int32) for ts in input_ts]
# Compute output shapes
output_ts = [TENSOR_SHAPE(ts.shape, np.dtype("float32")) for ts in input_ts]
return output_ts
@property
def op_type(self):
op_name = self.base_name
if self.serialize_attr["num_inputs"] > 1:
op_name += str(self.serialize_attr["num_inputs"])
return op_name
def build(self, *inputs_vi):
assert all(isinstance(x, ValueInfoProto) for x in inputs_vi)
# Serialize the number of inputs
self.serialize_attr["num_inputs"] = len(inputs_vi)
# Replace empty name
if not self.name:
self.name = str(uuid.uuid4())
# Convert ValueInfoProto into TensorShape for each input
self._input = inputs_vi
input_ts = [value_info_to_tensor_shape(x) for x in inputs_vi]
output_ts = self.__build__(*input_ts)
self._output = [make_tensor_value_info(f"{vi.name}/dequantize",
elem_type=np_dtype_to_tensor_dtype(out_ts.dtype),
shape=out_ts.shape)
for vi, out_ts in zip(inputs_vi, output_ts)]
def quantize(self, *qlayers):
# To keep homogenity with the other layers, this function is called 'quantize'
# even though it does the opposite (dequantize): apply scale in the inputs integers.
if self._output is None or self._input is None:
# Build the layer if required
input_ts = [qly.output for qly in qlayers]
self.build(*input_ts)
# Scale to set in weights is the reciprocal of ONNX calibrated one.
i_scales = [qlayer.weights["scale"] for qlayer in qlayers]
scales = [np.array(1 / i_scale, dtype=np.float32) for i_scale in i_scales]
# Return ONNX node and weights
output_names = [out.name for out in self.output]
weights = {f"{self.name}_scale_{i+1}": scale for i, scale in enumerate(scales)}
if len(self.output) == 1:
# Remove suffix when number of inputs/outputs is one
weights[f"{self.name}_scale"] = weights.pop(f"{self.name}_scale_1")
# Inputs should be ordered as follows : X1, S1, X2, S2...
input_names = [ts.name for ts in self._input]
inputs = sum(list(zip(input_names, weights)), ())
onnx_node = self.make_node(inputs, output_names)
onnx_weights = array_to_tp(**weights)
return onnx_node, onnx_weights
@staticmethod
def build_subgraph(op_type):
# When there is only one output, the op_type is called Dequantizer
node_params = []
if op_type != 'Dequantizer':
num_inputs = int(op_type.replace('Dequantizer', ''))
for i in range(1, num_inputs + 1):
node_params.append({"inputs": [f"X{i}", f"scale_{i}"], "outputs": [f"Y{i}"]})
else:
node_params.append({"inputs": ["X", "scale"], "outputs": ["Y"]})
nodes = [make_node('DequantizeLinear', **nparams) for nparams in node_params]
return nodes