Source code for snntoolbox.simulation.backends.inisim.ttfs

# -*- coding: utf-8 -*-
"""INI time-to-first-spike simulator backend.

This module defines the layer objects used to create a spiking neural network
for our built-in INI simulator
:py:mod:`~snntoolbox.simulation.target_simulators.INI_ttfs_target_sim`.

The coding scheme underlying this conversion is that the instantaneous firing
rate is given by the inverse time-to-first-spike.

This simulator works only with Keras backend set to Tensorflow.

@author: rbodo
"""

import tensorflow as tf
from tensorflow.keras.layers import Dense, Flatten, AveragePooling2D, \
    MaxPooling2D, Conv2D, Layer, Concatenate, ZeroPadding2D, Reshape, \
    DepthwiseConv2D


[docs]class SpikeLayer(Layer): """Base class for layer with spiking neurons.""" def __init__(self, **kwargs): self.config = kwargs.pop(str('config'), None) self.layer_type = self.class_name self.batch_size = self.config.getint('simulation', 'batch_size') self.dt = self.config.getfloat('simulation', 'dt') self.duration = self.config.getint('simulation', 'duration') self.tau_refrac = self.config.getfloat('cell', 'tau_refrac') self._v_thresh = self.config.getfloat('cell', 'v_thresh') self.v_thresh = None self.time = None self.mem = self.spiketrain = self.impulse = None self.refrac_until = None self.last_spiketimes = None allowed_kwargs = {'input_shape', 'batch_input_shape', 'batch_size', 'dtype', 'name', 'trainable', 'weights', 'input_dtype', # legacy } for kwarg in kwargs.copy(): if kwarg not in allowed_kwargs: kwargs.pop(kwarg) Layer.__init__(self, **kwargs) self.stateful = True self._floatx = tf.keras.backend.floatx()
[docs] def reset(self, sample_idx): """Reset layer variables.""" self.reset_spikevars(sample_idx)
@property def class_name(self): """Get class name.""" return self.__class__.__name__
[docs] def update_neurons(self): """Update neurons according to activation function.""" # Update membrane potentials. new_mem = self.get_new_mem() # Generate spikes. if hasattr(self, 'activation_str') \ and self.activation_str == 'softmax': output_spikes = self.softmax_activation(new_mem) else: output_spikes = self.linear_activation(new_mem) # Reset membrane potential after spikes. self.set_reset_mem(new_mem, output_spikes) # Store refractory period after spikes. if hasattr(self, 'activation_str') \ and self.activation_str == 'softmax': # We do not constrain softmax output neurons. new_refrac = tf.identity(self.refrac_until) else: new_refrac = tf.where(tf.not_equal(output_spikes, 0), self.time + self.tau_refrac, self.refrac_until) self.refrac_until.assign(new_refrac) if self.spiketrain is not None: self.spiketrain.assign(self.time * tf.cast( tf.not_equal(output_spikes, 0), self._floatx)) # Compute post-synaptic potential. psp = self.get_psp(output_spikes) return tf.cast(psp, self._floatx)
[docs] def linear_activation(self, mem): """Linear activation.""" return tf.cast(tf.greater_equal(mem, self.v_thresh), self._floatx)
[docs] def softmax_activation(self, mem): """Softmax activation.""" return tf.cast(tf.less_equal(tf.random.uniform(tf.shape(mem)), tf.nn.softmax(mem)), self._floatx)
[docs] def get_new_mem(self): """Add input to membrane potential.""" # Destroy impulse if in refractory period masked_impulse = self.impulse if self.tau_refrac == 0 else \ tf.where(tf.greater(self.refrac_until, self.time), tf.zeros_like(self.impulse), self.impulse) new_mem = self.mem + masked_impulse if self.config.getboolean('cell', 'leak'): # Todo: Implement more flexible version of leak! new_mem = tf.where(tf.greater(new_mem, 0), new_mem - 0.1 * self.dt, new_mem) return new_mem
[docs] def set_reset_mem(self, mem, spikes): """ Reset membrane potential ``mem`` array where ``spikes`` array is nonzero. """ if hasattr(self, 'activation_str') \ and self.activation_str == 'softmax': new = tf.identity(mem) else: new = tf.where(tf.not_equal(spikes, 0), tf.zeros_like(mem), mem) self.mem.assign(new)
[docs] def get_psp(self, output_spikes): if hasattr(self, 'activation_str') \ and self.activation_str == 'softmax': psp = tf.identity(output_spikes) else: new_spiketimes = tf.where(tf.not_equal(output_spikes, 0), tf.ones_like(output_spikes) * self.time, self.last_spiketimes) assign_new_spiketimes = self.last_spiketimes.assign(new_spiketimes) with tf.control_dependencies([assign_new_spiketimes]): last_spiketimes = self.last_spiketimes + 0 # Dummy op psp = tf.where(tf.greater(last_spiketimes, 0), tf.ones_like(output_spikes) * self.dt, tf.zeros_like(output_spikes)) return psp
[docs] def get_time(self): """Get simulation time variable. Returns ------- time: float Current simulation time. """ return self.time.eval
[docs] def set_time(self, time): """Set simulation time variable. Parameters ---------- time: float Current simulation time. """ self.time.assign(time)
[docs] def init_membrane_potential(self, output_shape=None, mode='zero'): """Initialize membrane potential. Helpful to avoid transient response in the beginning of the simulation. Not needed when reset between frames is turned off, e.g. with a video data set. Parameters ---------- output_shape: Optional[tuple] Output shape mode: str Initialization mode. - ``'uniform'``: Random numbers from uniform distribution in ``[-thr, thr]``. - ``'bias'``: Negative bias. - ``'zero'``: Zero (default). Returns ------- init_mem: ndarray A tensor of ``self.output_shape`` (same as layer). """ if output_shape is None: output_shape = self.output_shape if mode == 'uniform': init_mem = tf.random.uniform(output_shape, -self._v_thresh, self._v_thresh) elif mode == 'bias': init_mem = tf.zeros(output_shape, self._floatx) if hasattr(self, 'bias'): bias = self.get_weights()[1] for i in range(len(bias)): # Todo: This assumes data_format = 'channels_first' init_mem[:, i, Ellipsis] = bias[i] self.bias.assign(tf.zeros_like(bias)) else: # mode == 'zero': init_mem = tf.zeros(output_shape, self._floatx) return init_mem
[docs] @tf.function def reset_spikevars(self, sample_idx): """ Reset variables present in spiking layers. Can be turned off for instance when a video sequence is tested. """ mod = self.config.getint('simulation', 'reset_between_nth_sample') mod = mod if mod else sample_idx + 1 do_reset = sample_idx % mod == 0 if do_reset: self.mem.assign(self.init_membrane_potential()) self.time.assign(self.dt) zeros_output_shape = tf.zeros(self.output_shape, self._floatx) if self.tau_refrac > 0: self.refrac_until.assign(zeros_output_shape) if self.spiketrain is not None: self.spiketrain.assign(zeros_output_shape) self.last_spiketimes.assign(zeros_output_shape - 1)
[docs] @tf.function def init_neurons(self, input_shape): """Init layer neurons.""" from snntoolbox.bin.utils import get_log_keys, get_plot_keys output_shape = self.compute_output_shape(input_shape) if self.v_thresh is None: self.v_thresh = tf.Variable(self._v_thresh, name='v_thresh', trainable=False) if self.mem is None: self.mem = tf.Variable(self.init_membrane_potential(output_shape), name='v_mem', trainable=False) if self.time is None: self.time = tf.Variable(self.dt, name='dt', trainable=False) # To save memory and computations, allocate only where needed: if self.tau_refrac > 0 and self.refrac_until is None: self.refrac_until = tf.Variable( tf.zeros(output_shape), name='refrac_until', trainable=False) if any({'spiketrains', 'spikerates', 'correlation', 'spikecounts', 'hist_spikerates_activations', 'operations', 'synaptic_operations_b_t', 'neuron_operations_b_t', 'spiketrains_n_b_l_t'} & (get_plot_keys(self.config) | get_log_keys(self.config))) and self.spiketrain is None: self.spiketrain = tf.Variable(tf.zeros(output_shape), name='spiketrains', trainable=False) if self.last_spiketimes is None: self.last_spiketimes = tf.Variable(-tf.ones(output_shape), name='last_spiketimes', trainable=False)
[docs] def get_layer_idx(self): """Get index of layer.""" label = self.name.split('_')[0] layer_idx = None for i in range(len(label)): if label[:i].isdigit(): layer_idx = int(label[:i]) return layer_idx
[docs]def spike_call(call): @tf.function def decorator(self, x): # Only call layer if there are input spikes. This is to prevent # accumulation of bias. self.impulse = tf.cond(tf.math.reduce_any(tf.not_equal(x, 0)), lambda: call(self, x), lambda: tf.zeros_like(self.mem)) return self.update_neurons() return decorator
[docs]class SpikeConcatenate(Concatenate): """Spike merge layer""" def __init__(self, axis, **kwargs): kwargs.pop(str('config')) Concatenate.__init__(self, axis, **kwargs)
[docs] @staticmethod def get_time(): pass
[docs] @staticmethod def reset(sample_idx): """Reset layer variables.""" pass
@property def class_name(self): """Get class name.""" return self.__class__.__name__
[docs]class SpikeZeroPadding2D(ZeroPadding2D): """Spike padding layer""" def __init__(self, *args, **kwargs): kwargs.pop(str('config')) ZeroPadding2D.__init__(self, *args, **kwargs)
[docs] @staticmethod def get_time(): pass
[docs] @staticmethod def reset(sample_idx): """Reset layer variables.""" pass
@property def class_name(self): """Get class name.""" return self.__class__.__name__
[docs]class SpikeReshape(Reshape): """Spike reshape layer""" def __init__(self, *args, **kwargs): kwargs.pop(str('config')) Reshape.__init__(self, *args, **kwargs)
[docs] @staticmethod def get_time(): pass
[docs] @staticmethod def reset(sample_idx): """Reset layer variables.""" pass
@property def class_name(self): """Get class name.""" return self.__class__.__name__
[docs]class SpikeFlatten(Flatten): """Spike flatten layer.""" def __init__(self, **kwargs): self.config = kwargs.pop(str('config'), None) Flatten.__init__(self, **kwargs)
[docs] @staticmethod def get_time(): return None
[docs] def reset(self, sample_idx): """Reset layer variables.""" pass
@property def class_name(self): """Get class name.""" return self.__class__.__name__
[docs]class SpikeDense(Dense, SpikeLayer): """Spike Dense layer."""
[docs] def build(self, input_shape): """Creates the layer neurons and connections. Parameters ---------- input_shape: Union[list, tuple, Any] Keras tensor (future input to layer) or list/tuple of Keras tensors to reference for weight shape computations. """ Dense.build(self, input_shape) self.init_neurons(input_shape.as_list())
[docs] @spike_call def call(self, x, **kwargs): return Dense.call(self, x)
[docs]class SpikeConv2D(Conv2D, SpikeLayer): """Spike 2D Convolution."""
[docs] def build(self, input_shape): """Creates the layer weights. Must be implemented on all layers that have weights. Parameters ---------- input_shape: Union[list, tuple, Any] Keras tensor (future input to layer) or list/tuple of Keras tensors to reference for weight shape computations. """ Conv2D.build(self, input_shape) self.init_neurons(input_shape.as_list())
[docs] @spike_call def call(self, x, mask=None): return Conv2D.call(self, x)
[docs]class SpikeDepthwiseConv2D(DepthwiseConv2D, SpikeLayer): """Spike 2D depthwise-separable convolution."""
[docs] def build(self, input_shape): """Creates the layer weights. Must be implemented on all layers that have weights. Parameters ---------- input_shape: Union[list, tuple, Any] Keras tensor (future input to layer) or list/tuple of Keras tensors to reference for weight shape computations. """ DepthwiseConv2D.build(self, input_shape) self.init_neurons(input_shape.as_list())
[docs] @spike_call def call(self, x, mask=None): return DepthwiseConv2D.call(self, x)
[docs]class SpikeAveragePooling2D(AveragePooling2D, SpikeLayer): """Average Pooling."""
[docs] def build(self, input_shape): """Creates the layer weights. Must be implemented on all layers that have weights. Parameters ---------- input_shape: Union[list, tuple, Any] Keras tensor (future input to layer) or list/tuple of Keras tensors to reference for weight shape computations. """ AveragePooling2D.build(self, input_shape) self.init_neurons(input_shape.as_list())
[docs] @spike_call def call(self, x, mask=None): return AveragePooling2D.call(self, x)
[docs]class SpikeMaxPooling2D(MaxPooling2D, SpikeLayer): """Spiking Max Pooling."""
[docs] def build(self, input_shape): """Creates the layer neurons and connections.. Parameters ---------- input_shape: Union[list, tuple, Any] Keras tensor (future input to layer) or list/tuple of Keras tensors to reference for weight shape computations. """ MaxPooling2D.build(self, input_shape) self.init_neurons(input_shape.as_list())
[docs] def call(self, x, mask=None): """Layer functionality.""" # Skip integration of input spikes in membrane potential. Directly # transmit new spikes. The output psp is nonzero wherever there has # been an input spike at any time during simulation. input_psp = MaxPooling2D.call(self, x) if self.spiketrain is not None: new_spikes = tf.math.logical_xor( tf.greater(input_psp, 0), tf.greater(self.last_spiketimes, 0)) self.spiketrain.assign(self.time * tf.cast(new_spikes, self._floatx)) psp = self.get_psp(input_psp) return tf.cast(psp, self._floatx)
custom_layers = {'SpikeFlatten': SpikeFlatten, 'SpikeDense': SpikeDense, 'SpikeConv2D': SpikeConv2D, 'SpikeAveragePooling2D': SpikeAveragePooling2D, 'SpikeMaxPooling2D': SpikeMaxPooling2D, 'SpikeConcatenate': SpikeConcatenate, 'SpikeDepthwiseConv2D': SpikeDepthwiseConv2D, 'SpikeZeroPadding2D': SpikeZeroPadding2D, 'SpikeReshape': SpikeReshape}