Source code for shmpipeline.kernels.gpu.leaky_integrator

"""GPU leaky-integrator control kernel."""

from __future__ import annotations

from typing import Any, Mapping

import torch

from shmpipeline.config import KernelConfig, SharedMemoryConfig
from shmpipeline.errors import ConfigValidationError
from shmpipeline.kernels.gpu._common import (
    require_numeric_parameter,
    validate_unary_same_shape_and_dtype,
)
from shmpipeline.kernels.gpu.base import GpuKernel, as_gpu_tensor


[docs] class LeakyIntegratorGpuKernel(GpuKernel): """Apply the control law u_k = leak * u_{k-1} + gain * e_k.""" kind = "gpu.leaky_integrator"
[docs] @classmethod def validate_config( cls, config: KernelConfig, shared_memory: Mapping[str, SharedMemoryConfig], ) -> None: super().validate_config(config, shared_memory) validate_unary_same_shape_and_dtype(config, shared_memory) input_spec = shared_memory[config.input] output_spec = shared_memory[config.output] if len(input_spec.shape) != 1 or len(output_spec.shape) != 1: raise ConfigValidationError( f"kernel {config.name!r} requires 1D input and output vectors" ) require_numeric_parameter(config, name="leak") require_numeric_parameter(config, name="gain")
def __init__(self, context) -> None: super().__init__(context) self.leak = require_numeric_parameter(context.config, name="leak") self.gain = require_numeric_parameter(context.config, name="gain") self.state = torch.zeros_like(self.output_buffer)
[docs] def compute_into( self, trigger_input: Any, output: Any, auxiliary_inputs: Mapping[str, Any], ) -> None: del auxiliary_inputs torch.mul(self.state, self.leak, out=output) torch.add( output, as_gpu_tensor(trigger_input, device=self.device), alpha=self.gain, out=output, ) self.state.copy_(output) torch.cuda.synchronize(output.device)