Source code for snntoolbox.simulation.target_simulators.INI_temporal_mean_rate_target_sim

# -*- coding: utf-8 -*-
"""INI simulator with temporal mean rate code.

@author: rbodo
"""

import os
import sys

from tensorflow import keras
import numpy as np

from snntoolbox.parsing.utils import get_inbound_layers_with_params
from snntoolbox.simulation.utils import AbstractSNN, remove_name_counter

remove_classifier = False


[docs]class SNN(AbstractSNN): """ The compiled spiking neural network, using layers derived from Keras base classes (see `snntoolbox.simulation.backends.inisim.temporal_mean_rate_tensorflow`). Aims at simulating the network on a self-implemented Integrate-and-Fire simulator using a timestepped approach. Attributes ---------- snn: keras.models.Model Keras model. This is the output format of the compiled spiking model because INI simulator runs networks of layers that are derived from Keras layer base classes. """ def __init__(self, config, queue=None): AbstractSNN.__init__(self, config, queue) self.snn = None self._spiking_layers = {} self._input_images = None self._binary_activation = None self._input_spikecount = None @property def is_parallelizable(self): return True
[docs] def add_input_layer(self, input_shape): self._input_images = keras.layers.Input(batch_shape=input_shape) self._spiking_layers[self.parsed_model.layers[0].name] = \ self._input_images
[docs] def add_layer(self, layer): from snntoolbox.parsing.utils import get_type spike_layer_name = getattr(self.sim, 'Spike' + get_type(layer)) # noinspection PyProtectedMember inbound = layer._inbound_nodes[0].inbound_layers if not isinstance(inbound, (list, tuple)): inbound = [inbound] inbound = [self._spiking_layers[inb.name] for inb in inbound] if len(inbound) == 1: inbound = inbound[0] layer_kwargs = layer.get_config() layer_kwargs['config'] = self.config # Check if layer uses binary activations. In that case, we will want to # tell the following to MaxPool layer because then we can use a # cheaper operation. if 'Conv' in layer.name and 'binary' in layer.activation.__name__: self._binary_activation = layer.activation.__name__ if 'MaxPool' in layer.name and self._binary_activation is not None: layer_kwargs['activation'] = self._binary_activation self._binary_activation = None # Replace activation from kwargs by 'linear' before initializing # superclass, because the relu activation is applied by the spike- # generation mechanism automatically. In some cases (quantized # activation), we need to apply the activation manually. This # information is taken from the 'activation' key during conversion. activation_str = str(layer_kwargs.pop(str('activation'), None)) spike_layer = spike_layer_name(**layer_kwargs) spike_layer.activation_str = activation_str spike_layer.is_first_spiking = \ len(get_inbound_layers_with_params(layer)) == 0 self._spiking_layers[layer.name] = spike_layer(inbound)
[docs] def build_dense(self, layer): pass
[docs] def build_convolution(self, layer): pass
[docs] def build_pooling(self, layer): pass
[docs] def compile(self): self.snn = keras.models.Model( self._input_images, self._spiking_layers[self.parsed_model.layers[-1].name]) self.snn.compile('sgd', 'categorical_crossentropy', ['accuracy']) # Tensorflow 2 lists all variables as weights, including our state # variables (membrane potential etc). So a simple # snn.set_weights(parsed_model.get_weights()) does not work any more. # Need to extract the actual weights here. parameter_map = {remove_name_counter(p.name): v for p, v in zip(self.parsed_model.weights, self.parsed_model.get_weights())} count = 0 for p in self.snn.weights: name = remove_name_counter(p.name) if name in parameter_map: keras.backend.set_value(p, parameter_map[name]) count += 1 assert count == len(parameter_map), "Not all weights have been " \ "transferred from ANN to SNN." for layer in self.snn.layers: if hasattr(layer, 'bias'): # Adjust biases to time resolution of simulator. bias = keras.backend.get_value(layer.bias) * self._dt keras.backend.set_value(layer.bias, bias) if self.config.getboolean('cell', 'bias_relaxation'): keras.backend.set_value( layer.b0, keras.backend.get_value(layer.bias))
[docs] def simulate(self, **kwargs): from snntoolbox.utils.utils import echo from snntoolbox.simulation.utils import get_layer_synaptic_operations input_b_l = kwargs[str('x_b_l')] * self._dt output_b_l_t = np.zeros((self.batch_size, self.num_classes, self._num_timesteps)) print("Current accuracy of batch:") # Loop through simulation time. self._input_spikecount = 0 for sim_step_int in range(self._num_timesteps): sim_step = (sim_step_int + 1) * self._dt self.set_time(sim_step) # Generate new input in case it changes with each simulation step. if self._poisson_input: input_b_l = self.get_poisson_frame_batch(kwargs[str('x_b_l')]) elif self._dataset_format == 'aedat': input_b_l = kwargs[str('dvs_gen')].next_eventframe_batch() if self.config.getboolean('simulation', 'early_stopping') and \ np.count_nonzero(input_b_l) == 0: print("\nInput empty: Finishing simulation {} steps early." "".format(self._num_timesteps - sim_step_int)) break # Main step: Propagate input through network and record output # spikes. out_spikes = self.snn.predict_on_batch(input_b_l) # Add current spikes to previous spikes. if remove_classifier: # Need to flatten output. output_b_l_t[:, :, sim_step_int] = np.argmax(np.reshape( out_spikes > 0, (out_spikes.shape[0], -1)), 1) else: output_b_l_t[:, :, sim_step_int] = out_spikes > 0 # Record neuron variables. i = j = 0 for layer in self.snn.layers: # Excludes Input, Flatten, Concatenate, etc: if hasattr(layer, 'spiketrain') \ and layer.spiketrain is not None: spiketrains_b_l = keras.backend.get_value(layer.spiketrain) if self.spiketrains_n_b_l_t is not None: self.spiketrains_n_b_l_t[i][0][ Ellipsis, sim_step_int] = spiketrains_b_l if self.synaptic_operations_b_t is not None: self.synaptic_operations_b_t[:, sim_step_int] += \ get_layer_synaptic_operations(spiketrains_b_l, self.fanout[i + 1]) if self.neuron_operations_b_t is not None: self.neuron_operations_b_t[:, sim_step_int] += \ self.num_neurons_with_bias[i + 1] i += 1 if hasattr(layer, 'mem') and self.mem_n_b_l_t is not None: self.mem_n_b_l_t[j][0][Ellipsis, sim_step_int] = \ keras.backend.get_value(layer.mem) j += 1 if 'input_b_l_t' in self._log_keys: self.input_b_l_t[Ellipsis, sim_step_int] = input_b_l if self._poisson_input or self._dataset_format == 'aedat': if self.synaptic_operations_b_t is not None: self.synaptic_operations_b_t[:, sim_step_int] += \ get_layer_synaptic_operations(input_b_l, self.fanout[0]) else: if self.neuron_operations_b_t is not None: if sim_step_int == 0: self.neuron_operations_b_t[:, 0] += self.fanin[1] * \ self.num_neurons[1] * np.ones(self.batch_size) * 2 spike_sums_b_l = np.sum(output_b_l_t, 2) undecided_b = np.sum(spike_sums_b_l, 1) == 0 guesses_b = np.argmax(spike_sums_b_l, 1) none_class_b = -1 * np.ones(self.batch_size) clean_guesses_b = np.where(undecided_b, none_class_b, guesses_b) current_acc = np.mean(kwargs[str('truth_b')] == clean_guesses_b) if self.config.getint('output', 'verbose') > 0 \ and sim_step % 1 == 0: echo('{:.2%}_'.format(current_acc)) else: sys.stdout.write('\r{:>7.2%}'.format(current_acc)) sys.stdout.flush() if self._dataset_format == 'aedat': remaining_events = \ len(kwargs[str('dvs_gen')].event_deques_batch[0]) elif self._poisson_input and self._num_poisson_events_per_sample > 0: remaining_events = self._num_poisson_events_per_sample - \ self._input_spikecount else: remaining_events = 0 if remaining_events > 0: print("SNN Toolbox WARNING: Simulation of current batch finished, " "but {} input events were not processed. Consider " "increasing the simulation time.".format(remaining_events)) return np.cumsum(output_b_l_t, 2)
[docs] def reset(self, sample_idx): for layer in self.snn.layers[1:]: # Skip input layer layer.reset(sample_idx)
[docs] def end_sim(self): pass
[docs] def save(self, path, filename): filepath = str(os.path.join(path, filename + '.h5')) print("Saving model to {}...\n".format(filepath)) self.snn.save(filepath, self.config.getboolean('output', 'overwrite'))
[docs] def load(self, path, filename): from snntoolbox.simulation.backends.inisim.temporal_mean_rate_theano \ import custom_layers filepath = os.path.join(path, filename + '.h5') try: self.snn = keras.models.load_model(filepath, custom_layers) except KeyError: raise NotImplementedError( "Loading SNN for INIsim is not supported yet.")
# Loading does not work anymore because the configparser object # needed by the custom layers is not stored when saving the model. # Could be implemented by overriding Keras' save / load methods, # but since converting even large Keras models from scratch is so # fast, there's really no need.
[docs] def get_poisson_frame_batch(self, x_b_l): """Get a batch of Poisson input spikes. Parameters ---------- x_b_l: ndarray The input frame. Shape: (`batch_size`, ``layer_shape``). Returns ------- input_b_l: ndarray Array of Poisson input spikes, with same shape as ``x_b_l``. """ if self._input_spikecount < self._num_poisson_events_per_sample \ or self._num_poisson_events_per_sample < 0: spike_snapshot = np.random.random_sample(x_b_l.shape) \ * self.rescale_fac * np.max(x_b_l) input_b_l = (spike_snapshot <= np.abs(x_b_l)).astype('float32') self._input_spikecount += \ int(np.count_nonzero(input_b_l) / self.batch_size) # For BinaryNets, with input that is not normalized and # not all positive, we stimulate with spikes of the same # size as the maximum activation, and the same sign as # the corresponding activation. Is there a better # solution? input_b_l *= np.max(x_b_l) * np.sign(x_b_l) else: # No more input spikes if _input_spikecount exceeded limit. input_b_l = np.zeros(x_b_l.shape) return input_b_l
[docs] def set_time(self, t): """Set the simulation time variable of all layers in the network. Parameters ---------- t: float Current simulation time. """ for layer in self.snn.layers[1:]: if layer.get_time() is not None: # Has time attribute layer.set_time(np.float32(t))
[docs] def set_spiketrain_stats_input(self): # Added this here because PyCharm complains about not all abstract # methods being implemented (even though this is not abstract). AbstractSNN.set_spiketrain_stats_input(self)
[docs] def get_spiketrains_input(self): # Added this here because PyCharm complains about not all abstract # methods being implemented (even though this is not abstract). AbstractSNN.get_spiketrains_input(self)
[docs] def scale_first_layer_parameters(self, t, input_b_l, tau=1): w, b = self.snn.layers[0].get_weights() alpha = (self._duration + tau) / (t + tau) beta = b + tau * (self._duration - t) / (t + tau) * w * input_b_l keras.backend.set_value(self.snn.layers[0].kernel, alpha * w) keras.backend.set_value(self.snn.layers[0].bias, beta)