Source code for cnn2snn.transforms.sequential

#!/usr/bin/env python
# ******************************************************************************
# Copyright 2021 Brainchip Holdings Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ******************************************************************************
"""Transformation utilities for Keras/CNN2SNN Sequential models.
"""

from keras import Model, Sequential, Input
from keras.layers import ReLU, Dropout, Concatenate, InputLayer, MaxPool2D

from ..quantization_layers import QuantizedActivation
from .clone import (clone_model_with_weights, clone_layer,
                    clone_layer_and_add_to_model)
from .batch_normalization import invert_batchnorm_pooling, fold_batchnorm


def prepare_to_convert(model):
    # Make sure the model is sequential
    seq_model = sequentialize(model)

    # For now, we support only models with a single branch
    if not isinstance(seq_model, Sequential):
        raise RuntimeError(
            "The model contains more than one sequential branch.")

    # Transform model to prepare conversion: change the order of layers,
    # fold BN, freeze quantizers, remove useless layers.
    sync_model = syncretize(seq_model)

    return sync_model


def sequential_remove_useless_layers(model):
    """Removes useless layers in a Sequential model.

    Useless layers are:
    - Dropout

    Args:
        model (:obj:`tf.keras.Model`): a Sequential Keras model.
    Returns:
        :obj:`tf.keras.Model`: a Sequential Keras model.
    """

    _raise_error_if_model_not_sequential(model)

    useless_layers = (Dropout,)

    new_model = Sequential()
    new_model.add(Input(model.input_shape[1:]))
    for layer in model.layers:
        if isinstance(layer, useless_layers):
            continue
        clone_layer_and_add_to_model(layer, new_model)

    return new_model


def sequential_invert_pooling_activation(model):
    """Invert activation and MaxPool2D layer in a Sequential model to have
    MaxPool2D before the activation.

    Having activation->MaxPool2D or MaxPool2D->activation is equivalent only if
    the activation is increasing (ok for ReLU and QuantizedActivation).
    Note that GlobalAvgPool2D cannot be inverted with an activation because
    there is no equivalence between activation->GAP and GAP->activation.

    Args:
        model (:obj:`tf.keras.Model`): a Sequential Keras model.
    Returns:
        :obj:`tf.keras.Model`: a Sequential Keras model.
    """

    _raise_error_if_model_not_sequential(model)

    new_model = Sequential()
    new_model.add(Input(model.input_shape[1:]))
    i = 0
    while i < len(model.layers) - 1:
        layer = model.layers[i]
        next_layer = model.layers[i + 1]
        if (isinstance(layer, (ReLU, QuantizedActivation)) and isinstance(next_layer, MaxPool2D)):
            clone_layer_and_add_to_model(next_layer, new_model)
            clone_layer_and_add_to_model(layer, new_model)
            i = i + 2
        else:
            clone_layer_and_add_to_model(layer, new_model)
            i = i + 1

    if i < len(model.layers):
        clone_layer_and_add_to_model(model.layers[-1], new_model)

    return new_model


def _raise_error_if_model_not_sequential(model):
    """Raises a ValueError if model is not a Sequential Keras model.
    """
    if not isinstance(model, Sequential):
        raise ValueError(f"Model is expected to be Sequential. Receives type "
                         f"{model.__class__.__name__}.")


def _check_layers_data_format(model):
    """Asserts that all layers in the model are 'channels_last'.
    Args:
        model (tf.keras.model): the Keras model to check.
    """

    # Error if 'data_format' is 'channels_first'
    for layer in model.layers:
        if hasattr(layer, 'data_format'):
            if layer.data_format == "channels_first":
                raise RuntimeError("Unsupported data format 'channels_first' "
                                   f"in layer '{layer.name}'.")


def _check_layers_dilation(model):
    """Asserts that all layers in the model have default dilation.
    Args:
        model (tf.keras.model): the Keras model to check.
    """

    # Error if 'dilation_rate' is not default
    for layer in model.layers:
        if hasattr(layer, 'dilation_rate') and layer.dilation_rate != (1, 1):
            raise ValueError(f"Unsupported dilation rate {layer.dilation_rate} in layer "
                             f"'{layer.name}'.")


def _check_model_input_output(model):
    """Asserts that model inputs/outputs are supported for conversion.
    The Keras model must have only one input layer and one output layer. The
    input shape must 4-D (N, H, W, C).

    Args:
        model (tf.keras.model): the Keras model to check.
    """

    # Error if multiple inputs
    if len(model.input_names) > 1:
        raise RuntimeError("Model must have only one input layer. Receives "
                           f"inputs {model.input_names}.")

    # Error if multiple outputs
    if len(model.output_names) != 1:
        raise RuntimeError("Model must have only one output layer. Receives"
                           f"outputs {model.output_names}.")

    # Error if input shape is not 4D, i.e. (N, H, W, C)
    if len(model.input_shape) not in (2, 4):
        raise RuntimeError(
            "Input shape of model must be 2-D or 4-D (batch size + 1-D or 3-D "
            f"tensors). Receives input shape {model.input_shape}.")


def _check_layer_inbounds(model):
    """Asserts that all layers in the model have only one inbound node and
    inbound layer (except for input layers and Concatenate layers).
    """
    for layer in model.layers:
        if len(layer.inbound_nodes) != 1:
            raise RuntimeError("Layers must have only one inbound node. "
                               f"Receives layer {layer.name} with "
                               f"{len(layer.inbound_nodes)} nodes.")

        if (layer.inbound_nodes[0].is_input or isinstance(layer, Concatenate)):
            continue

        num_inbound_layers = len(layer.inbound_nodes[0].parent_nodes)
        if num_inbound_layers != 1:
            raise RuntimeError(f"Layer {layer.name} must have only one inbound "
                               f"layer. Receives {num_inbound_layers}.")


def _check_concat_layer_compatibility(layer, inbound_layers):
    """Checks that the Concatenate layer is compatible for conversion:

    - A Concatenate layer axis parameter must be equal to -1.
    - A Concatenate layer must have exactly two inbound layers.

    Args:
        layer (:obj:`tf.keras.Layer`): a Keras Concatenate layer.
        inbound_layers (list): the inbound layers of the Concatenate layer.
    """

    if layer.axis != -1:
        raise RuntimeError(f"The Concatenate layer '{layer.name}' must have "
                           f"axis=-1. Receives axis={layer.axis}.")

    if len(inbound_layers) != 2:
        inbound_names = [layer.name for layer in inbound_layers]
        raise RuntimeError(f"The Concatenate layer '{layer.name}' must have "
                           f"exactly two inbound layers. Receives inbound "
                           f"layers {inbound_names}.")


[docs]def sequentialize(model): """Transform a Model into Sequential sub-models and Concatenate layers. This function returns an equivalent model where all linear branches are replaced by a Sequential sub-model. Args: model (:obj:`tf.keras.Model`): a Keras model Returns: :obj:`tf.keras.Model`: a Keras model with Sequential sub-models """ # Clone model to avoid shared layers model_clone = clone_model_with_weights(model) _check_layers_data_format(model_clone) _check_layers_dilation(model) _check_model_input_output(model_clone) _check_layer_inbounds(model_clone) if isinstance(model_clone, Sequential): return model_clone def parse_layer(layer, visited_layers, current_branch, output_tensors): """Go through a TensorFlow/Keras graph by recursively looking at outbound layers. Each linear branch is detected and converted to a Sequential sub-model. A linear branch ends if: - the current layer has multiple outbounds (split connections) - the current layer has no outbound (output layer of the model) - the next layer is a Concatenate layer """ if layer in visited_layers: raise RuntimeError(f"Layer {layer.name} already visited.") # Do not visit this layer if all inbound layers were not visited yet # (e.g. for Concatenate inbounds) inbound_layers = [n.layer for n in layer.inbound_nodes[0].parent_nodes] if set(inbound_layers).difference(visited_layers): return visited_layers.append(layer) # Skip input layer but store its output tensor for graph connection if isinstance(layer, InputLayer): output_tensors[layer] = layer.output # Add layer to graph if layer is Concatenate elif isinstance(layer, Concatenate): _check_concat_layer_compatibility(layer, inbound_layers) # Get input tensors of Concatenate layer concat_input_tensors = [output_tensors[layer] for layer in inbound_layers] # Add Concatenate layer to the graph output_tensors[layer] = layer(concat_input_tensors) else: # Add current layer to current branch current_branch.append(layer) # End current branch and add it to the graph if: # - current layer has multiple outbounds (split connections) # - current layer has no outbound (output layer of the model) # - next layer is Concatenate if (len(layer.outbound_nodes) != 1 or isinstance( layer.outbound_nodes[0].layer, Concatenate)): parent_nodes = current_branch[0].inbound_nodes[0].parent_nodes input_tensor = output_tensors[parent_nodes[0].layer] # Create sub-model for current branch and add it to the graph submodel = Sequential(current_branch) output_tensors[layer] = submodel(input_tensor) current_branch.clear() # Go to next layer for next_layer in [node.layer for node in layer.outbound_nodes]: parse_layer(next_layer, visited_layers, current_branch, output_tensors) # Go through model layers to detect Sequential branches input_layer = model_clone.get_layer(model_clone.input_names[0]) output_layer = model_clone.get_layer(model_clone.output_names[0]) output_tensors = {} parse_layer(input_layer, visited_layers=[], current_branch=[], output_tensors=output_tensors) # Create new model with Sequential sub-models model_cut = Model(model_clone.input, output_tensors[output_layer]) # Clone model to avoid shared layers model_cut = clone_model_with_weights(model_cut) # Sanity check: the first layer should always be an InputLayer if not isinstance(model_cut.layers[0], InputLayer): raise ValueError("Incompatible model") # Subsequent layers are either Concatenate or Sequential for layer in model_cut.layers[1:]: if not isinstance(layer, (Concatenate, Sequential)): raise ValueError("Incompatible model") # If the Model contains a single branch, return it if len(model_cut.layers) == 2: return model_cut.layers[1] return model_cut
[docs]def syncretize(model): """Align all linear branches of a Model with akida layer sequences. The input model must be composed of Sequential submodels and Concatenate layers. This function will apply transformations on every Sequential submodel and returns an equivalent functional model with akida compatible sequences of layers. Args: model (:obj:`tf.keras.Model`): a Keras model with Sequential submodels. Returns: :obj:`tf.keras.Model`: a Keras model with akida-compatible Sequential submodels. """ def syncretize_sequential(submodel): submodel = sequential_remove_useless_layers(submodel) submodel = sequential_invert_pooling_activation(submodel) submodel = invert_batchnorm_pooling(submodel) submodel = fold_batchnorm(submodel) return submodel if isinstance(model, Sequential): return syncretize_sequential(model) def parse_submodel(layer, visited_layers, output_tensors): if layer in visited_layers: raise RuntimeError(f"Layer {layer.name} already visited.") # Do not visit this layer if all inbound layers were not visited yet # (e.g. for Concatenate inbounds) inbound_layers = [n.layer for n in layer.inbound_nodes[0].parent_nodes] if set(inbound_layers).difference(visited_layers): return visited_layers.append(layer) # Add layer to graph if layer is InputLayer or Concatenate if isinstance(layer, InputLayer): input_clone = clone_layer(layer) output_tensors[layer] = input_clone.output elif isinstance(layer, Concatenate): # Get input tensors of Concatenate layer concat_input_tensors = [output_tensors[layer] for layer in inbound_layers] # Add Concatenate layer to the graph concat_clone = clone_layer(layer) output_tensors[layer] = concat_clone(concat_input_tensors) elif isinstance(layer, Sequential): # Get input tensors of submodel parent_nodes = layer.inbound_nodes[0].parent_nodes input_tensor = output_tensors[parent_nodes[0].layer] # Transform submodel and add it to the graph new_submodel = syncretize_sequential(layer) output_tensors[layer] = new_submodel(input_tensor) else: raise RuntimeError(f"Layer {layer.name} of type " f"{layer.__class__.__name__} is not supported " f"here.") # Go to next layer for next_layer in [node.layer for node in layer.outbound_nodes]: parse_submodel(next_layer, visited_layers, output_tensors) # Go through model layers to transform Sequential branches input_layer = model.get_layer(model.input_names[0]) output_tensors = {} parse_submodel(input_layer, visited_layers=[], output_tensors=output_tensors) # Create new model with transformed Sequential sub-models output_layer = model.get_layer(model.output_names[0]) return Model(output_tensors[input_layer], output_tensors[output_layer])