Source code for snntoolbox.conversion.utils

# -*- coding: utf-8 -*-
"""

This module performs modifications on the network parameters during conversion
from analog to spiking.

.. autosummary::
    :nosignatures:

    normalize_parameters

@author: rbodo
"""

import os

import json
from collections import OrderedDict
from tensorflow.keras.models import Model
import numpy as np


[docs]def normalize_parameters(model, config, **kwargs): """Normalize the parameters of a network. The parameters of each layer are normalized with respect to the maximum activation, or the ``n``-th percentile of activations. Generates plots of the activity- and weight-distribution before and after normalization. Note that plotting the activity-distribution can be very time- and memory-consuming for larger networks. """ from snntoolbox.parsing.utils import get_inbound_layers_with_params print("Normalizing parameters...") norm_dir = kwargs[str('path')] if 'path' in kwargs else \ os.path.join(config.get('paths', 'log_dir_of_current_run'), 'normalization') activ_dir = os.path.join(norm_dir, 'activations') if not os.path.exists(activ_dir): os.makedirs(activ_dir) # Store original weights for later plotting if not os.path.isfile(os.path.join(activ_dir, 'weights.npz')): weights = {} for layer in model.layers: w = layer.get_weights() if len(w) > 0: weights[layer.name] = w[0] np.savez_compressed(os.path.join(activ_dir, 'weights.npz'), **weights) batch_size = config.getint('simulation', 'batch_size') # Either load scale factors from disk, or get normalization data set to # calculate them. x_norm = None if 'scale_facs' in kwargs: scale_facs = kwargs[str('scale_facs')] elif 'x_norm' in kwargs or 'dataflow' in kwargs: if 'x_norm' in kwargs: x_norm = kwargs[str('x_norm')] elif 'dataflow' in kwargs: x_norm = [] num_samples_norm = config.getint('simulation', 'num_to_test') while len(x_norm) * batch_size < num_samples_norm: x, y = kwargs[str('dataflow')].next() x_norm.append(x) x_norm = np.concatenate(x_norm) print("Using {} samples for normalization.".format(len(x_norm))) sizes = [ len(x_norm) * np.array(layer.output_shape[1:]).prod() * 32 / (8 * 1e9) for layer in model.layers if len(layer.weights) > 0] size_str = ['{:.2f}'.format(s) for s in sizes] print("INFO: Need {} GB for layer activations.\n".format(size_str) + "May have to reduce size of data set used for normalization.") scale_facs = OrderedDict({model.layers[0].name: 1}) else: import warnings warnings.warn("Scale factors or normalization data set could not be " "loaded. Proceeding without normalization.", RuntimeWarning) return # If scale factors have not been computed in a previous run, do so now. if len(scale_facs) == 1: i = 0 sparsity = [] for layer in model.layers: # Skip if layer has no parameters if len(layer.weights) == 0: continue activations = try_reload_activations(layer, model, x_norm, batch_size, activ_dir) nonzero_activations = activations[np.nonzero(activations)] sparsity.append(1 - nonzero_activations.size / activations.size) del activations perc = get_percentile(config, i) scale_facs[layer.name] = get_scale_fac(nonzero_activations, perc) print("Scale factor: {:.2f}.".format(scale_facs[layer.name])) # Since we have calculated output activations here, check at this # point if the output is mostly negative, in which case we should # stick to softmax. Otherwise ReLU is preferred. # Todo: Determine the input to the activation by replacing the # combined output layer by two distinct layers ``Dense`` and # ``Activation``! # if layer.activation == 'softmax' and settings['softmax_to_relu']: # softmax_inputs = ... # if np.median(softmax_inputs) < 0: # print("WARNING: You allowed the toolbox to replace " # "softmax by ReLU activations. However, more than " # "half of the activations are negative, which " # "could reduce accuracy. Consider setting " # "settings['softmax_to_relu'] = False.") # settings['softmax_to_relu'] = False i += 1 # Write scale factors to disk filepath = os.path.join(norm_dir, config.get('normalization', 'percentile') + '.json') from snntoolbox.utils.utils import confirm_overwrite if config.get('output', 'overwrite') or confirm_overwrite(filepath): with open(filepath, str('w')) as f: json.dump(scale_facs, f) np.savez_compressed(os.path.join(norm_dir, 'activations', 'sparsity'), sparsity=sparsity) # Apply scale factors to normalize the parameters. for layer in model.layers: # Skip if layer has no parameters if len(layer.weights) == 0: continue # Scale parameters parameters = layer.get_weights() if layer.activation.__name__ == 'softmax': # When using a certain percentile or even the max, the scaling # factor can be extremely low in case of many output classes # (e.g. 0.01 for ImageNet). This amplifies weights and biases # greatly. But large biases cause large offsets in the beginning # of the simulation (spike input absent). scale_fac = 1.0 print("Using scale factor {:.2f} for softmax layer.".format( scale_fac)) else: scale_fac = scale_facs[layer.name] inbound = get_inbound_layers_with_params(layer) if len(inbound) == 0: # Input layer parameters_norm = [ parameters[0] * scale_facs[model.layers[0].name] / scale_fac, parameters[1] / scale_fac] elif len(inbound) == 1: parameters_norm = [ parameters[0] * scale_facs[inbound[0].name] / scale_fac, parameters[1] / scale_fac] else: # In case of this layer receiving input from several layers, we can # apply scale factor to bias as usual, but need to rescale weights # according to their respective input. parameters_norm = [parameters[0], parameters[1] / scale_fac] if parameters[0].ndim == 4: # In conv layers, just need to split up along channel dim. offset = 0 # Index offset at input filter dimension for inb in inbound: f_out = inb.filters # Num output features of inbound layer f_in = range(offset, offset + f_out) parameters_norm[0][:, :, f_in, :] *= \ scale_facs[inb.name] / scale_fac offset += f_out else: # Fully-connected layers need more consideration, because they # could receive input from several conv layers that are # concatenated and then flattened. The neuron position in the # flattened layer depend on the image_data_format. raise NotImplementedError # Check if the layer happens to be Sparse # if the layer is sparse, add the mask to the list of parameters if len(parameters) == 3: parameters_norm.append(parameters[-1]) # Update model with modified parameters layer.set_weights(parameters_norm) # Plot distributions of weights and activations before and after norm. if 'normalization_activations' in eval(config.get('output', 'plot_vars')): from snntoolbox.simulation.plotting import plot_hist from snntoolbox.simulation.plotting import plot_max_activ_hist # All layers in one plot. Assumes model.get_weights() returns # [w, b, w, b, ...]. # from snntoolbox.simulation.plotting import plot_weight_distribution # plot_weight_distribution(norm_dir, model) print("Plotting distributions of weights and activations before and " "after normalizing...") # Load original parsed model to get parameters before normalization weights = np.load(os.path.join(activ_dir, 'weights.npz')) for idx, layer in enumerate(model.layers): # Skip if layer has no parameters if len(layer.weights) == 0: continue label = str(idx) + layer.__class__.__name__ \ if config.getboolean('output', 'use_simple_labels') \ else layer.name parameters = weights[layer.name] parameters_norm = layer.get_weights()[0] weight_dict = {'weights': parameters.flatten(), 'weights_norm': parameters_norm.flatten()} plot_hist(weight_dict, 'Weight', label, norm_dir) # Load activations of model before normalization activations = try_reload_activations(layer, model, x_norm, batch_size, activ_dir) if activations is None or x_norm is None: continue # Compute activations with modified parameters nonzero_activations = activations[np.nonzero(activations)] activations_norm = get_activations_layer(model.input, layer.output, x_norm, batch_size) activation_dict = {'Activations': nonzero_activations, 'Activations_norm': activations_norm[np.nonzero(activations_norm)]} scale_fac = scale_facs[layer.name] plot_hist(activation_dict, 'Activation', label, norm_dir, scale_fac) ax = tuple(np.arange(len(layer.output_shape))[1:]) plot_max_activ_hist( {'Activations_max': np.max(activations, axis=ax)}, 'Maximum Activation', label, norm_dir, scale_fac) print('')
[docs]def get_scale_fac(activations, percentile): """ Determine the activation value at ``percentile`` of the layer distribution. Parameters ---------- activations: np.array The activations of cells in a specific layer, flattened to 1-d. percentile: int Percentile at which to determine activation. Returns ------- scale_fac: float Maximum (or percentile) of activations in this layer. Parameters of the respective layer are scaled by this value. """ return np.percentile(activations, percentile) if activations.size else 1
[docs]def get_percentile(config, layer_idx=None): """Get percentile at which to draw the maximum activation of a layer. Parameters ---------- config: configparser.ConfigParser Settings. layer_idx: Optional[int] Layer index. Returns ------- : int Percentile. """ perc = config.getfloat('normalization', 'percentile') if config.getboolean('normalization', 'normalization_schedule'): assert layer_idx >= 0, "Layer index needed for normalization schedule." perc = apply_normalization_schedule(perc, layer_idx) return perc
[docs]def apply_normalization_schedule(perc, layer_idx): """Transform percentile according to some rule, depending on layer index. Parameters ---------- perc: float Original percentile. layer_idx: int Layer index, used to decrease the scale factor in higher layers, to maintain high spike rates. Returns ------- : int Modified percentile. """ return int(perc - layer_idx * 0.02)
[docs]def get_activations_layer(layer_in, layer_out, x, batch_size=None): """ Get activations of a specific layer, iterating batch-wise over the complete data set. Parameters ---------- layer_in: keras.layers.Layer The input to the network. layer_out: keras.layers.Layer The layer for which we want to get the activations. x: np.array The samples to compute activations for. With data of the form (channels, num_rows, num_cols), x_train has dimension (batch_size, channels*num_rows*num_cols) for a multi-layer perceptron, and (batch_size, channels, num_rows, num_cols) for a convolutional net. batch_size: Optional[int] Batch size Returns ------- activations: ndarray The activations of cells in a specific layer. Has the same shape as ``layer_out``. """ if batch_size is None: batch_size = 10 if len(x) % batch_size != 0: x = x[: -(len(x) % batch_size)] return Model(layer_in, layer_out).predict(x, batch_size)
[docs]def get_activations_batch(ann, x_batch): """Compute layer activations of an ANN. Parameters ---------- ann: keras.models.Model Needed to compute activations. x_batch: np.array The input samples to use for determining the layer activations. With data of the form (channels, num_rows, num_cols), X has dimension (batch_size, channels*num_rows*num_cols) for a multi-layer perceptron, and (batch_size, channels, num_rows, num_cols) for a convolutional net. Returns ------- activations_batch: list[tuple[np.array, str]] Each tuple ``(activations, label)`` represents a layer in the ANN for which an activation can be calculated (e.g. ``Dense``, ``Conv2D``). ``activations`` containing the activations of a layer. It has the same shape as the original layer, e.g. (batch_size, n_features, n_rows, n_cols) for a convolution layer. ``label`` is a string specifying the layer type, e.g. ``'Dense'``. """ activations_batch = [] for layer in ann.layers: # Todo: This list should be replaced by # ``not in eval(config.get('restrictions', 'spiking_layers')`` if layer.__class__.__name__ in ['Input', 'InputLayer', 'Flatten', 'Concatenate', 'ZeroPadding2D', 'Reshape']: continue activations = Model(ann.input, layer.output).predict_on_batch(x_batch) activations_batch.append((activations, layer.name)) return activations_batch
[docs]def try_reload_activations(layer, model, x_norm, batch_size, activ_dir): try: activations = np.load(os.path.join(activ_dir, layer.name + '.npz'))['arr_0'] except IOError: if x_norm is None: return print("Calculating activations of layer {} ...".format(layer.name)) activations = get_activations_layer(model.input, layer.output, x_norm, batch_size) print("Writing activations to disk...") np.savez_compressed(os.path.join(activ_dir, layer.name), activations) else: print("Loading activations stored during a previous run.") return np.array(activations)