Core API

Package Surface

Public package surface for shmpipeline.

The top-level package re-exports the primary user-facing objects from the underlying modules so most applications can import from shmpipeline directly.

Common imports include:

Configuration Models

Immutable configuration models for shared-memory pipelines.

class shmpipeline.config.SharedMemoryConfig(name, shape, dtype, storage='cpu', gpu_device=None, cpu_mirror=None)[source]

Bases: object

Configuration for one named shared-memory resource.

A shared-memory record defines the storage backend, tensor shape, and dtype for one named stream in the pipeline graph. GPU streams may additionally declare a CUDA device and an optional CPU mirror for host-side readers.

Parameters:
  • name (str)

  • shape (tuple[int, ...])

  • dtype (dtype)

  • storage (str)

  • gpu_device (str | None)

  • cpu_mirror (bool | None)

classmethod from_dict(raw)[source]

Build a normalized shared-memory configuration from a mapping.

Parameters:

raw (Mapping[str, Any])

Return type:

SharedMemoryConfig

class shmpipeline.config.AuxiliaryBinding(alias, name)[source]

Bases: object

Bind one kernel-local auxiliary alias to a shared-memory stream.

Parameters:
  • alias (str)

  • name (str)

class shmpipeline.config.KernelConfig(name, kind, input, output, auxiliary=<factory>, operation=None, parameters=<factory>, read_timeout=1.0, pause_sleep=0.01)[source]

Bases: object

Configuration for one compute kernel in the pipeline.

Each kernel consumes one trigger input stream, may read zero or more auxiliary streams, and writes one output stream. The kind field resolves through the active shmpipeline.registry.KernelRegistry.

Parameters:
  • name (str)

  • kind (str)

  • input (str)

  • output (str)

  • auxiliary (tuple[AuxiliaryBinding, ...])

  • operation (str | None)

  • parameters (dict[str, Any])

  • read_timeout (float)

  • pause_sleep (float)

classmethod from_dict(raw)[source]

Build a normalized kernel configuration from a mapping.

Parameters:

raw (Mapping[str, Any])

Return type:

KernelConfig

property all_inputs: tuple[str, ...]

Return the trigger input followed by ordered auxiliary streams.

property auxiliary_names: tuple[str, ...]

Return auxiliary shared-memory names in config order.

property auxiliary_aliases: tuple[str, ...]

Return auxiliary aliases in config order.

property auxiliary_by_alias: dict[str, str]

Return auxiliary bindings keyed by expression alias.

class shmpipeline.config.PipelineConfig(shared_memory, kernels)[source]

Bases: object

Complete pipeline configuration loaded from YAML or a mapping.

This is the primary configuration object used by the CLI, GUI, and shmpipeline.manager.PipelineManager. It groups the named stream definitions and the ordered kernel stages that consume them.

Parameters:
classmethod from_dict(raw)[source]

Create a pipeline configuration from a plain mapping.

Parameters:

raw (Mapping[str, Any])

Return type:

PipelineConfig

classmethod from_yaml(path)[source]

Load a pipeline configuration from a YAML file.

Parameters:

path (str | Path)

Return type:

PipelineConfig

property shared_memory_by_name: dict[str, SharedMemoryConfig]

Return shared-memory definitions keyed by name.

Kernel Base API

Kernel abstractions used by worker processes.

class shmpipeline.kernel.KernelContext(config, shared_memory)[source]

Bases: object

Static information available to a kernel instance.

The runtime constructs one context per worker so kernels can inspect their validated configuration and the shared-memory specifications for the streams they read and write.

Parameters:
property input_specs: tuple[SharedMemoryConfig, ...]

Return trigger plus auxiliary stream specifications in config order.

property trigger_input_spec: SharedMemoryConfig

Return the primary input stream specification.

property auxiliary_specs: tuple[SharedMemoryConfig, ...]

Return auxiliary stream specifications in config order.

property output_spec: SharedMemoryConfig

Return the primary output stream specification.

class shmpipeline.kernel.Kernel(context)[source]

Bases: ABC

Base class for compute kernels executed by the runtime.

Custom kernels normally override validate_config() when they need stage-specific parameter checks and implement compute_into() to write results into the provided output buffer.

Store validated kernel context and normalized parameters.

Parameters:

context (KernelContext)

classmethod validate_config(config, shared_memory)[source]

Validate arity and storage constraints before build.

Parameters:
Return type:

None

abstractmethod compute_into(trigger_input, output, auxiliary_inputs)[source]

Compute into the provided reusable output buffer.

Parameters:
  • trigger_input (Any)

  • output (Any)

  • auxiliary_inputs (Mapping[str, Any])

Return type:

None

Graph Introspection

Pipeline graph introspection and validation helpers.

class shmpipeline.graph.GraphEdge(source, target, role, stream)[source]

Bases: object

One directed graph edge between a stream and a kernel.

Parameters:
  • source (str)

  • target (str)

  • role (str)

  • stream (str)

class shmpipeline.graph.PipelineGraph(config)[source]

Bases: object

Derived graph view of one pipeline configuration.

Parameters:

config (PipelineConfig)

classmethod from_config(config)[source]

Build a graph view from one loaded configuration.

Parameters:

config (PipelineConfig)

Return type:

PipelineGraph

property edges: tuple[GraphEdge, ...]

Return the directed edges in stream-kernel-stream form.

source_streams()[source]

Return streams that are only written externally into the graph.

Return type:

tuple[str, …]

sink_streams()[source]

Return streams that are produced but not consumed downstream.

Return type:

tuple[str, …]

orphaned_streams()[source]

Return streams unused by all kernels.

Return type:

tuple[str, …]

upstream_kernels(kernel_name)[source]

Return kernels that feed any input of the target kernel.

Parameters:

kernel_name (str)

Return type:

tuple[str, …]

downstream_kernels(kernel_name)[source]

Return kernels that consume the target kernel’s output.

Parameters:

kernel_name (str)

Return type:

tuple[str, …]

validation_errors()[source]

Return graph-level validation errors.

The current graph validation rejects ambiguous write ownership where more than one kernel produces the same shared-memory stream.

Return type:

list[str]

raise_for_errors()[source]

Raise the first graph validation error, if any exists.

Return type:

None

to_dict()[source]

Serialize the graph into a CLI- and GUI-friendly mapping.

Return type:

dict[str, Any]

describe()[source]

Return a human-readable pipeline graph summary.

Return type:

str

shmpipeline.graph.validate_pipeline_config(config)[source]

Return all config, graph, and kernel-binding validation errors.

Parameters:

config (PipelineConfig)

Return type:

list[str]

Pipeline Manager

Pipeline manager responsible for building and supervising workers.

class shmpipeline.manager.PipelineManager(config, *, spawn_method='spawn', placement_policy=None, registry=None)[source]

Bases: object

Create shared memory, spawn workers, and supervise pipeline state.

The manager owns the pipeline lifecycle from validated config through runtime monitoring and teardown. It is the primary user-facing runtime API for Python callers.

Initialize a manager from a config object or YAML path.

Parameters:
property graph: PipelineGraph

Return a derived graph view of the current configuration.

build()[source]

Validate configuration and create shared-memory resources.

This step prepares the pipeline graph and opens or creates the named streams without starting any worker processes.

Return type:

None

start()[source]

Start worker processes for every configured kernel.

The manager must already be in the built state before workers can be spawned.

Return type:

None

pause()[source]

Pause all workers without tearing down the built pipeline.

Return type:

None

resume()[source]

Resume work after a pause.

Return type:

None

stop(*, timeout=5.0, force=False)[source]

Stop worker processes but keep shared-memory resources built.

After a stop, callers may inspect stream contents, restart workers, or proceed to a final shutdown.

Parameters:
  • timeout (float)

  • force (bool)

Return type:

None

shutdown(*, unlink=True, force=False)[source]

Stop workers, close local handles, and optionally unlink streams.

This is the terminal cleanup step for a manager instance.

Parameters:
  • unlink (bool)

  • force (bool)

Return type:

None

poll_events()[source]

Drain worker events and update manager failure state.

Return type:

list[dict[str, Any]]

status(*, poll=True)[source]

Return a snapshot of manager state, workers, and failures.

The result is intentionally JSON-friendly so CLI, GUI, and external tooling can consume the same structure.

Parameters:

poll (bool)

Return type:

dict[str, Any]

runtime_snapshot(*, poll=True)[source]

Return a richer status snapshot for CLI and GUI consumers.

This extends status() with a timestamp and the derived graph description.

Parameters:

poll (bool)

Return type:

dict[str, Any]

raise_if_failed()[source]

Raise the first worker failure, if any has been recorded.

Return type:

None

get_stream(name)[source]

Return the manager-owned shared-memory handle for one stream.

Parameters:

name (str)

start_synthetic_input(spec)[source]

Start a synthetic input writer for one built stream.

Synthetic writers are useful for demos, viewer testing, smoke tests, and deterministic regression scenarios.

Parameters:

spec (SyntheticInputConfig | dict[str, Any])

Return type:

dict[str, Any]

stop_synthetic_input(stream_name, *, timeout=2.0)[source]

Stop one active synthetic input writer if it exists.

Parameters:
  • stream_name (str)

  • timeout (float)

Return type:

None

stop_all_synthetic_inputs(*, timeout=2.0)[source]

Stop every active synthetic input writer.

Parameters:

timeout (float)

Return type:

None

synthetic_input_status()[source]

Return status snapshots for active synthetic input writers.

Return type:

dict[str, dict[str, Any]]

property failures: tuple[dict[str, Any], ...]

Return recorded worker failures.

property events: tuple[dict[str, Any], ...]

Return all events received from worker processes so far.

Kernel Registry

Kernel registry used by the manager and worker runtime.

class shmpipeline.registry.KernelRegistry(kernels, lazy_kernels=None)[source]

Bases: object

Resolve kernel kinds to implementation classes.

Registries are the extension point for third-party kernels. The default registry contains the built-in CPU kernels and lazily loads GPU kernels when the optional torch dependency is available.

Store a static mapping of registered kernel implementations.

Parameters:
  • kernels (Mapping[str, type[Kernel]])

  • lazy_kernels (Mapping[str, Callable[[], type[Kernel]]] | None)

get(kind)[source]

Return the implementation class for a kernel kind.

Parameters:

kind (str)

Return type:

type[Kernel]

kinds()[source]

Return registered kernel kinds in sorted order.

Return type:

tuple[str, …]

register(kernel_cls, *, replace=False)[source]

Register one kernel implementation class on this registry.

Parameters:
  • kernel_cls (type[Kernel])

  • replace (bool)

Return type:

None

extended(*kernel_classes, replace=False)[source]

Return a new registry extended with additional kernel classes.

Parameters:
  • kernel_classes (type[Kernel])

  • replace (bool)

Return type:

KernelRegistry

validate(config, shared_memory)[source]

Validate one kernel binding against shared-memory definitions.

Parameters:
Return type:

None

create(config, shared_memory)[source]

Instantiate a kernel after validation.

Parameters:
Return type:

Kernel

shmpipeline.registry.get_default_registry()[source]

Return the built-in kernel registry.

Return type:

KernelRegistry

Synthetic Inputs

Synthetic shared-memory inputs for testing and interactive demos.

shmpipeline.synthetic.available_synthetic_patterns()[source]

Return supported synthetic input pattern names.

Return type:

tuple[str, …]

class shmpipeline.synthetic.SyntheticInputConfig(stream_name, pattern='random', rate_hz=None, seed=0, amplitude=1.0, offset=0.0, period=64.0, constant=0.0, impulse_interval=64)[source]

Bases: object

Configuration for one synthetic input writer.

Synthetic writers can drive source streams without an external producer, which makes them useful for demos, GUI exploration, and automated tests.

Parameters:
  • stream_name (str)

  • pattern (str)

  • rate_hz (float | None)

  • seed (int)

  • amplitude (float)

  • offset (float)

  • period (float)

  • constant (float)

  • impulse_interval (int)

class shmpipeline.synthetic.SyntheticPatternGenerator(spec, *, shape, dtype, storage, gpu_device=None)[source]

Bases: object

Generate deterministic test frames for CPU and GPU streams.

Parameters:
  • spec (SyntheticInputConfig)

  • shape (tuple[int, ...])

  • dtype (Any)

  • storage (str)

  • gpu_device (str | None)

next_frame()[source]

Return the next generated frame, reusing an internal buffer.

class shmpipeline.synthetic.SyntheticSourceController(stream, spec)[source]

Bases: object

Background writer that feeds a stream with synthetic test frames.

The controller owns the worker thread and the timing loop for one active synthetic input source.

Parameters:
start()[source]

Start the background writer thread.

Return type:

None

stop(*, timeout=2.0)[source]

Request that the background writer stop and wait for it.

Parameters:

timeout (float)

Return type:

None

snapshot()[source]

Return a stable status snapshot for GUI and CLI consumers.

Return type:

dict[str, Any]

State Model

Pipeline state model.

class shmpipeline.state.PipelineState(*values)[source]

Bases: str, Enum

States managed by shmpipeline.manager.PipelineManager.

CLI Module

Command-line entry points for shmpipeline.

shmpipeline.cli.build_parser()[source]

Build the top-level CLI parser.

Return type:

ArgumentParser

shmpipeline.cli.main(argv=None)[source]

Run the shmpipeline CLI.

Parameters:

argv (Sequence[str] | None)

Return type:

int