#!/usr/bin/env python
# ******************************************************************************
# Copyright 2023 Brainchip Holdings Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ******************************************************************************
__all__ = ["InputQuantizer", "Dequantizer", "get_input_quantizer"]
import uuid
import numpy as np
from onnx import AttributeProto as AP
from onnx import ValueInfoProto, TensorProto
from onnx.helper import make_node, make_tensor_value_info, np_dtype_to_tensor_dtype
from onnxruntime.quantization.calibrate import TensorData
from ...models import get_quantization_params
from ..graph_tools import (TENSOR_SHAPE, array_to_tp, get_field, get_node, has_field,
to_field, value_info_to_tensor_shape)
from ..quantization.core import input_zp_scale, round_to_nearest_pow2
from .base_layer import OnnxLayer
from .layer_compatibility import check_node_link_to_input
def get_input_quantizer(nodes, graph, tensor_ranges):
if nodes:
check_node_link_to_input(nodes[0], graph)
if (cast_node := get_node(nodes, "Cast")) is not None:
assert get_field(cast_node, "to") == TensorProto.FLOAT, "Cast node dtype must be float32"
perm = None
transpose_node = get_node(nodes, "Transpose")
if (transpose_node := get_node(nodes, "Transpose")) is not None:
perm = get_field(transpose_node, "perm")
input_ts = graph.input[0]
input_signed = get_quantization_params().input_dtype.kind == "i"
input_quantizer = InputQuantizer(name="quantize",
input_tp=input_ts,
input_signed=input_signed,
perm=perm)
# Set calibration ranges
# The range key is the output of the last node ("Cast" or "Transpose") if nodes is not empty,
# otherwise it is the name of the graph input.
range_key = nodes[-1].output[0] if nodes else graph.input[0].name
ranges = tensor_ranges[range_key]
input_quantizer.set_weight("range_min", ranges.lowest)
input_quantizer.set_weight("range_max", ranges.highest)
return input_quantizer
[docs]
class Dequantizer(OnnxLayer):
"""Intermediate representation of DequantizeLinear(), use to dequantize the inputs.
Args:
name (str, optional): the node name. Defaults to ''.
"""
def __init__(self, name=''):
super().__init__("Dequantizer", name=name)
def __build__(self, *input_ts):
assert len(input_ts) >= 1
assert [ts.dtype in (np.int8, np.int32) for ts in input_ts]
# Compute output shapes
output_ts = [TENSOR_SHAPE(ts.shape, np.dtype("float32")) for ts in input_ts]
return output_ts
@property
def op_type(self):
op_name = self.base_name
if self.serialize_attr["num_inputs"] > 1:
op_name += str(self.serialize_attr["num_inputs"])
return op_name
def build(self, *inputs_vi):
assert all(isinstance(x, ValueInfoProto) for x in inputs_vi)
# Serialize the number of inputs
self.serialize_attr["num_inputs"] = len(inputs_vi)
# Replace empty name
if not self.name:
self.name = str(uuid.uuid4())
# Convert ValueInfoProto into TensorShape for each input
self._input = inputs_vi
input_ts = [value_info_to_tensor_shape(x) for x in inputs_vi]
output_ts = self.__build__(*input_ts)
self._output = [make_tensor_value_info(f"{vi.name}/dequantize",
elem_type=np_dtype_to_tensor_dtype(out_ts.dtype),
shape=out_ts.shape)
for vi, out_ts in zip(inputs_vi, output_ts)]
def quantize(self, *qlayers):
# To keep homogenity with the other layers, this function is called 'quantize'
# even though it does the opposite (dequantize): apply scale in the inputs integers.
if self._output is None or self._input is None:
# Build the layer if required
input_ts = [qly.output for qly in qlayers]
self.build(*input_ts)
# Scale to set in weights is the reciprocal of ONNX calibrated one.
i_scales = [qlayer.weights["scale"] for qlayer in qlayers]
scales = [np.array(1 / i_scale, dtype=np.float32) for i_scale in i_scales]
# Return ONNX node and weights
output_names = [out.name for out in self.output]
weights = {f"{self.name}_scale_{i+1}": scale for i, scale in enumerate(scales)}
if len(self.output) == 1:
# Remove suffix when number of inputs/outputs is one
weights[f"{self.name}_scale"] = weights.pop(f"{self.name}_scale_1")
# Inputs should be ordered as follows : X1, S1, X2, S2...
input_names = [ts.name for ts in self._input]
inputs = sum(list(zip(input_names, weights)), ())
onnx_node = self.make_node(inputs, output_names)
onnx_weights = array_to_tp(**weights)
return onnx_node, onnx_weights
@staticmethod
def build_subgraph(op_type):
# When there is only one output, the op_type is called Dequantizer
node_params = []
if op_type != 'Dequantizer':
num_inputs = int(op_type.replace('Dequantizer', ''))
for i in range(1, num_inputs + 1):
node_params.append({"inputs": [f"X{i}", f"scale_{i}"], "outputs": [f"Y{i}"]})
else:
node_params.append({"inputs": ["X", "scale"], "outputs": ["Y"]})
nodes = [make_node('DequantizeLinear', **nparams) for nparams in node_params]
return nodes