"""Editable document helpers shared by GUI and control services."""
from __future__ import annotations
from copy import deepcopy
from pathlib import Path
from typing import Any, Mapping
import yaml
from shmpipeline.config import PipelineConfig
from shmpipeline.errors import ConfigValidationError
Document = dict[str, Any]
[docs]
def default_document() -> Document:
"""Return an empty editable pipeline document."""
return {
"shared_memory": [],
"kernels": [],
}
[docs]
def clone_document(document: Mapping[str, Any]) -> Document:
"""Return a deep editable copy of one document."""
return deepcopy(dict(document))
def normalize_document(document: Mapping[str, Any]) -> Document:
"""Ensure a document always has the expected top-level keys."""
normalized = clone_document(document)
normalized.setdefault("shared_memory", [])
normalized.setdefault("kernels", [])
return normalized
[docs]
def load_document(path: str | Path) -> Document:
"""Load a YAML config file into the editable document shape."""
raw = yaml.safe_load(Path(path).read_text(encoding="utf-8"))
if raw is None:
return default_document()
if not isinstance(raw, Mapping):
raise ConfigValidationError("pipeline config must be a mapping")
return normalize_document(raw)
[docs]
def document_to_yaml(document: Mapping[str, Any]) -> str:
"""Serialize one document to readable YAML."""
return yaml.safe_dump(
normalize_document(document),
sort_keys=False,
allow_unicode=False,
)
[docs]
def save_document(path: str | Path, document: Mapping[str, Any]) -> None:
"""Write one editable document to disk."""
Path(path).write_text(document_to_yaml(document), encoding="utf-8")
[docs]
def parse_inline_yaml(text: str, *, fallback: Any) -> Any:
"""Parse a small YAML fragment used by editors and dialogs."""
stripped = text.strip()
if not stripped:
return deepcopy(fallback)
value = yaml.safe_load(stripped)
if value is None:
return deepcopy(fallback)
return value
def config_to_document(config: PipelineConfig) -> Document:
"""Convert an immutable PipelineConfig back into editable document form."""
shared_memory = []
for spec in config.shared_memory:
item: dict[str, Any] = {
"name": spec.name,
"shape": list(spec.shape),
"dtype": str(spec.dtype),
"storage": spec.storage,
}
if spec.gpu_device is not None:
item["gpu_device"] = spec.gpu_device
if spec.cpu_mirror is not None:
item["cpu_mirror"] = spec.cpu_mirror
shared_memory.append(item)
kernels = []
for kernel in config.kernels:
item = {
"name": kernel.name,
"kind": kernel.kind,
"input": kernel.input,
"output": kernel.output,
"parameters": deepcopy(kernel.parameters),
"read_timeout": kernel.read_timeout,
"pause_sleep": kernel.pause_sleep,
}
if kernel.operation is not None:
item["operation"] = kernel.operation
if kernel.auxiliary:
if all(
binding.alias == binding.name for binding in kernel.auxiliary
):
item["auxiliary"] = [
binding.name for binding in kernel.auxiliary
]
else:
item["auxiliary"] = {
binding.alias: binding.name for binding in kernel.auxiliary
}
kernels.append(item)
return {
"shared_memory": shared_memory,
"kernels": kernels,
}