Source code for shmpipeline.gui.remote

"""Remote GUI helpers for talking to the pipeline control plane."""

from __future__ import annotations

from dataclasses import dataclass
from ipaddress import ip_address
from typing import Any
from urllib.parse import urlparse

from shmpipeline.control import RemoteManagerClient
from shmpipeline.document import clone_document
from shmpipeline.state import PipelineState


[docs] def normalize_server_url(value: str) -> str: """Return a normalized HTTP base URL for one control server.""" stripped = value.strip() if not stripped: raise ValueError("server URL must not be blank") if "://" not in stripped: stripped = f"http://{stripped}" normalized = stripped.rstrip("/") parsed = urlparse(normalized) if not parsed.scheme or not parsed.netloc: raise ValueError(f"invalid server URL: {value!r}") return normalized
[docs] def is_local_server_url(base_url: str) -> bool: """Return whether one control-server URL points at the local host.""" parsed = urlparse(base_url) host = parsed.hostname if host in {"localhost", None}: return host == "localhost" try: return ip_address(host).is_loopback except ValueError: return False
[docs] @dataclass(frozen=True) class ServerConnection: """Connection details for one remote control server.""" base_url: str token: str | None = None @classmethod def from_values( cls, base_url: str, token: str | None = None, ) -> "ServerConnection": cleaned_token = (token or "").strip() or None return cls(normalize_server_url(base_url), cleaned_token) @property def is_local(self) -> bool: return is_local_server_url(self.base_url) @property def display_name(self) -> str: return urlparse(self.base_url).netloc
[docs] class RemotePipelineSession: """Manager-like remote session used by the GUI windows.""" def __init__( self, connection: ServerConnection, *, timeout: float = 5.0, client: Any | None = None, ) -> None: self.connection = connection self._client = RemoteManagerClient( connection.base_url, token=connection.token, timeout=timeout, client=client, ) self._last_status: dict[str, Any] | None = None @property def is_local(self) -> bool: return self.connection.is_local @property def state(self) -> PipelineState: status = self.status() return PipelineState(status["state"]) def close(self) -> None: self._client.close() def info(self) -> dict[str, Any]: return self._client.info() def document(self) -> dict[str, Any]: payload = self._client.document() payload["document"] = clone_document(payload["document"]) return payload def update_document(self, document: dict[str, Any]) -> dict[str, Any]: return self._client.update_document(clone_document(document)) def validate_document( self, document: dict[str, Any] | None = None, ) -> dict[str, Any]: payload = None if document is None else clone_document(document) return self._client.validate_document(payload) def load_document_path(self, path: str) -> dict[str, Any]: return self._client.load_document_path(path) def status(self) -> dict[str, Any]: self._last_status = self._client.status() return self._last_status def snapshot(self) -> dict[str, Any]: snapshot = self._client.snapshot() self._last_status = snapshot return snapshot def graph(self) -> dict[str, Any]: return self._client.graph() def build(self) -> dict[str, Any]: snapshot = self._client.build() self._last_status = snapshot return snapshot def start(self) -> dict[str, Any]: snapshot = self._client.start() self._last_status = snapshot return snapshot def pause(self) -> dict[str, Any]: snapshot = self._client.pause() self._last_status = snapshot return snapshot def resume(self) -> dict[str, Any]: snapshot = self._client.resume() self._last_status = snapshot return snapshot def stop( self, *, timeout: float = 5.0, force: bool = False ) -> dict[str, Any]: snapshot = self._client.stop(timeout=timeout, force=force) self._last_status = snapshot return snapshot def shutdown( self, *, unlink: bool = True, force: bool = False, ) -> dict[str, Any]: snapshot = self._client.shutdown(unlink=unlink, force=force) self._last_status = snapshot return snapshot def synthetic_input_status(self) -> dict[str, Any]: return self.status().get("synthetic_sources", {}) def start_synthetic_input(self, spec: Any) -> dict[str, Any]: payload = self._client.start_synthetic_input(spec) self._last_status = payload.get("snapshot") return payload def stop_synthetic_input( self, stream_name: str, *, timeout: float = 2.0, ) -> dict[str, Any]: payload = self._client.stop_synthetic_input( stream_name, timeout=timeout, ) self._last_status = payload.get("snapshot") return payload