Core API
Package Surface
Public package surface for shmpipeline.
The top-level package re-exports the primary user-facing objects from the
underlying modules so most applications can import from shmpipeline
directly.
Common imports include:
Configuration Models
Immutable configuration models for shared-memory pipelines.
Bases:
objectConfiguration for one named shared-memory resource.
A shared-memory record defines the storage backend, tensor shape, and dtype for one named stream in the pipeline graph. GPU streams may additionally declare a CUDA device and an optional CPU mirror for host-side readers.
- Parameters:
name (str)
shape (tuple[int, ...])
dtype (dtype)
storage (str)
gpu_device (str | None)
cpu_mirror (bool | None)
Build a normalized shared-memory configuration from a mapping.
- Parameters:
raw (Mapping[str, Any])
- Return type:
- class shmpipeline.config.AuxiliaryBinding(alias, name)[source]
Bases:
objectBind one kernel-local auxiliary alias to a shared-memory stream.
- Parameters:
alias (str)
name (str)
- class shmpipeline.config.KernelConfig(name, kind, input, output, auxiliary=<factory>, operation=None, parameters=<factory>, read_timeout=1.0, pause_sleep=0.01)[source]
Bases:
objectConfiguration for one compute kernel in the pipeline.
Each kernel consumes one trigger input stream, may read zero or more auxiliary streams, and writes one output stream. The kind field resolves through the active
shmpipeline.registry.KernelRegistry.- Parameters:
name (str)
kind (str)
input (str)
output (str)
auxiliary (tuple[AuxiliaryBinding, ...])
operation (str | None)
parameters (dict[str, Any])
read_timeout (float)
pause_sleep (float)
- classmethod from_dict(raw)[source]
Build a normalized kernel configuration from a mapping.
- Parameters:
raw (Mapping[str, Any])
- Return type:
- property all_inputs: tuple[str, ...]
Return the trigger input followed by ordered auxiliary streams.
- property auxiliary_names: tuple[str, ...]
Return auxiliary shared-memory names in config order.
- property auxiliary_aliases: tuple[str, ...]
Return auxiliary aliases in config order.
- property auxiliary_by_alias: dict[str, str]
Return auxiliary bindings keyed by expression alias.
- class shmpipeline.config.PipelineConfig(shared_memory, kernels)[source]
Bases:
objectComplete pipeline configuration loaded from YAML or a mapping.
This is the primary configuration object used by the CLI, GUI, and
shmpipeline.manager.PipelineManager. It groups the named stream definitions and the ordered kernel stages that consume them.- Parameters:
shared_memory (tuple[SharedMemoryConfig, ...])
kernels (tuple[KernelConfig, ...])
- classmethod from_dict(raw)[source]
Create a pipeline configuration from a plain mapping.
- Parameters:
raw (Mapping[str, Any])
- Return type:
- classmethod from_yaml(path)[source]
Load a pipeline configuration from a YAML file.
- Parameters:
path (str | Path)
- Return type:
Return shared-memory definitions keyed by name.
Kernel Base API
Kernel abstractions used by worker processes.
- class shmpipeline.kernel.KernelContext(config, shared_memory)[source]
Bases:
objectStatic information available to a kernel instance.
The runtime constructs one context per worker so kernels can inspect their validated configuration and the shared-memory specifications for the streams they read and write.
- Parameters:
config (KernelConfig)
shared_memory (Mapping[str, SharedMemoryConfig])
- property input_specs: tuple[SharedMemoryConfig, ...]
Return trigger plus auxiliary stream specifications in config order.
- property trigger_input_spec: SharedMemoryConfig
Return the primary input stream specification.
- property auxiliary_specs: tuple[SharedMemoryConfig, ...]
Return auxiliary stream specifications in config order.
- property output_spec: SharedMemoryConfig
Return the primary output stream specification.
- class shmpipeline.kernel.Kernel(context)[source]
Bases:
ABCBase class for compute kernels executed by the runtime.
Custom kernels normally override
validate_config()when they need stage-specific parameter checks and implementcompute_into()to write results into the provided output buffer.Store validated kernel context and normalized parameters.
- Parameters:
context (KernelContext)
- classmethod validate_config(config, shared_memory)[source]
Validate arity and storage constraints before build.
- Parameters:
config (KernelConfig)
shared_memory (Mapping[str, SharedMemoryConfig])
- Return type:
None
Graph Introspection
Pipeline graph introspection and validation helpers.
- class shmpipeline.graph.GraphEdge(source, target, role, stream)[source]
Bases:
objectOne directed graph edge between a stream and a kernel.
- Parameters:
source (str)
target (str)
role (str)
stream (str)
- class shmpipeline.graph.PipelineGraph(config)[source]
Bases:
objectDerived graph view of one pipeline configuration.
- Parameters:
config (PipelineConfig)
- classmethod from_config(config)[source]
Build a graph view from one loaded configuration.
- Parameters:
config (PipelineConfig)
- Return type:
- source_streams()[source]
Return streams that are only written externally into the graph.
- Return type:
tuple[str, …]
- sink_streams()[source]
Return streams that are produced but not consumed downstream.
- Return type:
tuple[str, …]
- upstream_kernels(kernel_name)[source]
Return kernels that feed any input of the target kernel.
- Parameters:
kernel_name (str)
- Return type:
tuple[str, …]
- downstream_kernels(kernel_name)[source]
Return kernels that consume the target kernel’s output.
- Parameters:
kernel_name (str)
- Return type:
tuple[str, …]
- validation_errors()[source]
Return graph-level validation errors.
The current graph validation rejects ambiguous write ownership where more than one kernel produces the same shared-memory stream.
- Return type:
list[str]
- raise_for_errors()[source]
Raise the first graph validation error, if any exists.
- Return type:
None
- shmpipeline.graph.validate_pipeline_config(config)[source]
Return all config, graph, and kernel-binding validation errors.
- Parameters:
config (PipelineConfig)
- Return type:
list[str]
Pipeline Manager
Pipeline manager responsible for building and supervising workers.
- class shmpipeline.manager.PipelineManager(config, *, spawn_method='spawn', placement_policy=None, registry=None)[source]
Bases:
objectCreate shared memory, spawn workers, and supervise pipeline state.
The manager owns the pipeline lifecycle from validated config through runtime monitoring and teardown. It is the primary user-facing runtime API for Python callers.
Initialize a manager from a config object or YAML path.
- Parameters:
config (PipelineConfig | str | Path)
spawn_method (str)
placement_policy (WorkerPlacementPolicy | None)
registry (KernelRegistry | None)
- property graph: PipelineGraph
Return a derived graph view of the current configuration.
- build()[source]
Validate configuration and create shared-memory resources.
This step prepares the pipeline graph and opens or creates the named streams without starting any worker processes.
- Return type:
None
- start()[source]
Start worker processes for every configured kernel.
The manager must already be in the built state before workers can be spawned.
- Return type:
None
- stop(*, timeout=5.0, force=False)[source]
Stop worker processes but keep shared-memory resources built.
After a stop, callers may inspect stream contents, restart workers, or proceed to a final shutdown.
- Parameters:
timeout (float)
force (bool)
- Return type:
None
- shutdown(*, unlink=True, force=False)[source]
Stop workers, close local handles, and optionally unlink streams.
This is the terminal cleanup step for a manager instance.
- Parameters:
unlink (bool)
force (bool)
- Return type:
None
- poll_events()[source]
Drain worker events and update manager failure state.
- Return type:
list[dict[str, Any]]
- status(*, poll=True)[source]
Return a snapshot of manager state, workers, and failures.
The result is intentionally JSON-friendly so CLI, GUI, and external tooling can consume the same structure.
- Parameters:
poll (bool)
- Return type:
dict[str, Any]
- runtime_snapshot(*, poll=True)[source]
Return a richer status snapshot for CLI and GUI consumers.
This extends
status()with a timestamp and the derived graph description.- Parameters:
poll (bool)
- Return type:
dict[str, Any]
- raise_if_failed()[source]
Raise the first worker failure, if any has been recorded.
- Return type:
None
- get_stream(name)[source]
Return the manager-owned shared-memory handle for one stream.
- Parameters:
name (str)
- start_synthetic_input(spec)[source]
Start a synthetic input writer for one built stream.
Synthetic writers are useful for demos, viewer testing, smoke tests, and deterministic regression scenarios.
- Parameters:
spec (SyntheticInputConfig | dict[str, Any])
- Return type:
dict[str, Any]
- stop_synthetic_input(stream_name, *, timeout=2.0)[source]
Stop one active synthetic input writer if it exists.
- Parameters:
stream_name (str)
timeout (float)
- Return type:
None
- stop_all_synthetic_inputs(*, timeout=2.0)[source]
Stop every active synthetic input writer.
- Parameters:
timeout (float)
- Return type:
None
- synthetic_input_status()[source]
Return status snapshots for active synthetic input writers.
- Return type:
dict[str, dict[str, Any]]
- property failures: tuple[dict[str, Any], ...]
Return recorded worker failures.
- property events: tuple[dict[str, Any], ...]
Return all events received from worker processes so far.
Kernel Registry
Kernel registry used by the manager and worker runtime.
- class shmpipeline.registry.KernelRegistry(kernels, lazy_kernels=None)[source]
Bases:
objectResolve kernel kinds to implementation classes.
Registries are the extension point for third-party kernels. The default registry contains the built-in CPU kernels and lazily loads GPU kernels when the optional torch dependency is available.
Store a static mapping of registered kernel implementations.
- Parameters:
- get(kind)[source]
Return the implementation class for a kernel kind.
- Parameters:
kind (str)
- Return type:
type[Kernel]
- register(kernel_cls, *, replace=False)[source]
Register one kernel implementation class on this registry.
- Parameters:
kernel_cls (type[Kernel])
replace (bool)
- Return type:
None
- extended(*kernel_classes, replace=False)[source]
Return a new registry extended with additional kernel classes.
- Parameters:
kernel_classes (type[Kernel])
replace (bool)
- Return type:
- validate(config, shared_memory)[source]
Validate one kernel binding against shared-memory definitions.
- Parameters:
config (KernelConfig)
shared_memory (Mapping[str, SharedMemoryConfig])
- Return type:
None
- create(config, shared_memory)[source]
Instantiate a kernel after validation.
- Parameters:
config (KernelConfig)
shared_memory (Mapping[str, SharedMemoryConfig])
- Return type:
Synthetic Inputs
Synthetic shared-memory inputs for testing and interactive demos.
- shmpipeline.synthetic.available_synthetic_patterns()[source]
Return supported synthetic input pattern names.
- Return type:
tuple[str, …]
- class shmpipeline.synthetic.SyntheticInputConfig(stream_name, pattern='random', rate_hz=None, seed=0, amplitude=1.0, offset=0.0, period=64.0, constant=0.0, impulse_interval=64)[source]
Bases:
objectConfiguration for one synthetic input writer.
Synthetic writers can drive source streams without an external producer, which makes them useful for demos, GUI exploration, and automated tests.
- Parameters:
stream_name (str)
pattern (str)
rate_hz (float | None)
seed (int)
amplitude (float)
offset (float)
period (float)
constant (float)
impulse_interval (int)
- class shmpipeline.synthetic.SyntheticPatternGenerator(spec, *, shape, dtype, storage, gpu_device=None)[source]
Bases:
objectGenerate deterministic test frames for CPU and GPU streams.
- Parameters:
spec (SyntheticInputConfig)
shape (tuple[int, ...])
dtype (Any)
storage (str)
gpu_device (str | None)
- class shmpipeline.synthetic.SyntheticSourceController(stream, spec)[source]
Bases:
objectBackground writer that feeds a stream with synthetic test frames.
The controller owns the worker thread and the timing loop for one active synthetic input source.
- Parameters:
stream (Any)
spec (SyntheticInputConfig)
State Model
Pipeline state model.
- class shmpipeline.state.PipelineState(*values)[source]
Bases:
str,EnumStates managed by
shmpipeline.manager.PipelineManager.
CLI Module
Command-line entry points for shmpipeline.