"""Minimal server and state-machine control GUI for shmpipeline."""
from __future__ import annotations
import subprocess
import sys
import tempfile
import time
from datetime import datetime
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
from PySide6.QtCore import QSettings, Qt, QTimer
from PySide6.QtGui import QAction, QActionGroup, QCloseEvent, QTextCursor
from PySide6.QtWidgets import (
QApplication,
QComboBox,
QFileDialog,
QFormLayout,
QHBoxLayout,
QLabel,
QLineEdit,
QMainWindow,
QMessageBox,
QPlainTextEdit,
QPushButton,
QTabWidget,
QVBoxLayout,
QWidget,
)
from shmpipeline.control.discovery import (
LocalControlServerRecord,
discover_local_servers,
terminate_local_server,
)
from shmpipeline.gui.remote import RemotePipelineSession, ServerConnection
from shmpipeline.gui.themes import apply_application_theme, resolve_theme
[docs]
class ControlWindow(QMainWindow):
"""Minimal window for server handling and pipeline state-machine control."""
_FIELD_MIN_WIDTH = 300
def __init__(self, *, theme_name: str | None = None) -> None:
super().__init__()
self.setWindowTitle("shmpipeline Control")
self.resize(600, 400)
self.menuBar().setNativeMenuBar(False)
self._settings = QSettings("shmpipeline", "control-gui")
shared_gui_settings = QSettings("shmpipeline", "gui")
default_theme = shared_gui_settings.value("theme", "light", str)
self._theme = resolve_theme(
theme_name or self._settings.value("theme", default_theme, str)
)
self._session: RemotePipelineSession | None = None
self._reported_failures: set[tuple[str, str]] = set()
self._discovered_records: list[LocalControlServerRecord] = []
self._last_refresh_error: str | None = None
self._launch_log_path: Path | None = None
self._state_value = "DISCONNECTED"
self._connection_state = "DISCONNECTED"
self._url_edit = QLineEdit(
self._settings.value("server_url", "http://127.0.0.1:8765", str)
)
self._url_edit.setMinimumWidth(self._FIELD_MIN_WIDTH)
self._token_edit = QLineEdit(
self._settings.value("server_token", "", str)
)
self._token_edit.setEchoMode(QLineEdit.Password)
self._token_edit.setMinimumWidth(self._FIELD_MIN_WIDTH)
self._config_path_edit = QLineEdit(
self._settings.value("control_config_path", "", str)
)
self._config_path_edit.setPlaceholderText("/path/to/pipeline.yaml")
self._config_path_edit.setMinimumWidth(self._FIELD_MIN_WIDTH)
self._discovered_combo = QComboBox()
self._discovered_combo.setMinimumContentsLength(28)
self._server_value_label = QLabel("127.0.0.1:8765")
self._server_status_badge = QLabel("DISCONNECTED")
self._server_status_badge.setAlignment(Qt.AlignCenter)
self._config_value_label = QLabel("N/A")
self._config_value_label.setWordWrap(True)
self._state_badge = QLabel("DISCONNECTED")
self._state_badge.setAlignment(Qt.AlignCenter)
self._state_badge.setMinimumHeight(96)
self._state_hint_label = QLabel(
"Use START, PAUSE, STOP, or TEARDOWN to drive the pipeline."
)
self._state_hint_label.setAlignment(Qt.AlignCenter)
self._log_output = QPlainTextEdit()
self._log_output.setReadOnly(True)
self._primary_button = QPushButton("START")
self._primary_button.clicked.connect(self.advance_state_machine)
self._stop_button = QPushButton("STOP")
self._stop_button.clicked.connect(self.stop_pipeline)
self._teardown_button = QPushButton("TEARDOWN")
self._teardown_button.clicked.connect(self.teardown_pipeline)
self._build_ui()
self._build_actions()
self._apply_theme(self._theme.name, persist=False)
self.discover_servers()
self.refresh_status()
self._status_timer = QTimer(self)
self._status_timer.setInterval(500)
self._status_timer.timeout.connect(self.refresh_status)
self._status_timer.start()
@property
def current_theme_name(self) -> str:
"""Return the active theme name."""
return self._theme.name
def _build_ui(self) -> None:
self._tabs = QTabWidget()
self._tabs.addTab(self._build_state_tab(), "State")
self._tabs.addTab(self._build_server_tab(), "Server")
log_box = QWidget()
log_layout = QVBoxLayout(log_box)
log_layout.setContentsMargins(0, 0, 0, 0)
log_layout.addWidget(QLabel("Session Log"))
log_layout.addWidget(self._log_output)
central = QWidget()
layout = QVBoxLayout(central)
layout.addWidget(self._tabs, 0)
layout.addWidget(log_box, 1)
self.setCentralWidget(central)
def _build_server_tab(self) -> QWidget:
tab = QWidget()
layout = QVBoxLayout(tab)
discovery_row = QHBoxLayout()
discovery_row.addWidget(self._discovered_combo, 1)
for label, handler in [
("Discover", self.discover_servers),
("Connect", self.connect_selected_server),
("Kill", self.kill_selected_server),
]:
button = QPushButton(label)
button.clicked.connect(handler)
discovery_row.addWidget(button)
layout.addLayout(discovery_row)
form = QFormLayout()
form.setFieldGrowthPolicy(QFormLayout.ExpandingFieldsGrow)
form.addRow("Token", self._token_edit)
form.addRow(
"Server URL",
self._row_widget(
self._url_edit,
[
("Connect", self.connect_from_fields),
("Launch", self.launch_local_server),
("Disconnect", self.disconnect_from_server),
],
),
)
form.addRow(
"Config",
self._row_widget(
self._config_path_edit,
[
("Browse", self.browse_config_path),
("Use On Server", self.change_server_config),
],
),
)
layout.addLayout(form)
layout.addStretch(1)
return tab
def _build_state_tab(self) -> QWidget:
tab = QWidget()
layout = QVBoxLayout(tab)
server_row = QHBoxLayout()
server_label = QLabel("Server:")
server_label.setMinimumWidth(server_label.sizeHint().width())
status_label = QLabel("Status:")
status_label.setMinimumWidth(status_label.sizeHint().width())
server_row.addWidget(server_label)
server_row.addWidget(self._server_value_label, 1)
server_row.addWidget(status_label)
server_row.addWidget(self._server_status_badge)
layout.addLayout(server_row)
config_row = QHBoxLayout()
config_label = QLabel("Config:")
config_label.setMinimumWidth(server_label.sizeHint().width())
config_row.addWidget(config_label)
config_row.addWidget(self._config_value_label, 1)
layout.addLayout(config_row)
layout.addSpacing(8)
layout.addWidget(self._state_badge)
layout.addWidget(self._state_hint_label)
command_row = QHBoxLayout()
command_row.addWidget(self._primary_button)
command_row.addWidget(self._stop_button)
command_row.addWidget(self._teardown_button)
layout.addLayout(command_row)
layout.addStretch(1)
return tab
def _row_widget(
self, field: QWidget, actions: list[tuple[str, Any]]
) -> QWidget:
container = QWidget()
row = QHBoxLayout(container)
row.setContentsMargins(0, 0, 0, 0)
row.addWidget(field, 1)
for label, handler in actions:
button = QPushButton(label)
button.clicked.connect(handler)
row.addWidget(button)
return container
def _build_actions(self) -> None:
view_menu = self.menuBar().addMenu("View")
theme_menu = view_menu.addMenu("Theme")
theme_group = QActionGroup(self)
theme_group.setExclusive(True)
for theme_name, label in (("light", "Light"), ("dark", "Dark")):
action = QAction(label, self)
action.setCheckable(True)
action.setChecked(self._theme.name == theme_name)
action.triggered.connect(
lambda checked, selected=theme_name: (
checked and self._apply_theme(selected)
)
)
theme_group.addAction(action)
theme_menu.addAction(action)
def _apply_theme(self, theme_name: str, *, persist: bool = True) -> None:
app = QApplication.instance()
if app is None: # pragma: no cover - GUI runtime only
return
self._theme = apply_application_theme(app, theme_name)
if persist:
self._settings.setValue("theme", self._theme.name)
self._apply_connection_badge_style(self._connection_state)
self._apply_state_badge_style(self._state_value)
def _configured_server_name(self) -> str:
try:
return self._build_connection().display_name
except Exception:
return self._url_edit.text().strip() or "N/A"
def _set_connection_status(self, status: str) -> None:
self._connection_state = str(status).upper()
self._server_status_badge.setText(self._connection_state)
self._apply_connection_badge_style(self._connection_state)
def _apply_connection_badge_style(self, status: str) -> None:
palette = {
"CONNECTED": (self._theme.success_bg, self._theme.success),
"ERROR": (self._theme.error_bg, self._theme.error),
"DISCONNECTED": (self._theme.button, self._theme.muted_text),
}
background, color = palette.get(
status,
(self._theme.alternate_base, self._theme.text),
)
self._server_status_badge.setStyleSheet(
"QLabel {"
f" background-color: {background};"
f" color: {color};"
f" border: 1px solid {self._theme.border};"
" border-radius: 8px;"
" font-size: 12px;"
" font-weight: 700;"
" letter-spacing: 0.5px;"
" padding: 4px 10px;"
"}"
)
def _append_log_message(self, level: str, message: str) -> None:
timestamp = datetime.now().strftime("%H:%M:%S")
entry = f"[{timestamp}] {level}: {message}"
self._log_output.moveCursor(QTextCursor.End)
if self._log_output.toPlainText():
self._log_output.insertPlainText("\n")
self._log_output.insertPlainText(entry)
self._log_output.moveCursor(QTextCursor.End)
self._log_output.ensureCursorVisible()
def _log_info(self, message: str) -> None:
self._append_log_message("INFO", message)
def _log_error(self, title: str, error: Exception | str) -> None:
self._append_log_message("ERROR", f"{title}: {error}")
def _show_error(self, title: str, error: Exception | str) -> None:
self._log_error(title, error)
QMessageBox.critical(self, title, str(error))
def _selected_discovered_record(self) -> LocalControlServerRecord | None:
index = self._discovered_combo.currentIndex()
if index < 0 or index >= len(self._discovered_records):
return None
return self._discovered_records[index]
def _build_connection(self) -> ServerConnection:
return ServerConnection.from_values(
self._url_edit.text(),
self._token_edit.text(),
)
def _connect(
self, connection: ServerConnection, *, announce: bool = True
) -> bool:
session = None
try:
session = RemotePipelineSession(connection)
session.info()
except Exception as exc:
if session is not None:
session.close()
self._show_error("Connection Failed", exc)
return False
self.disconnect_from_server(announce=False)
self._session = session
self._settings.setValue("server_url", connection.base_url)
self._settings.setValue("server_token", connection.token or "")
if announce:
self._log_info(f"Connected to {connection.display_name}.")
self.refresh_status()
return True
def connect_from_fields(self) -> None:
self._connect(self._build_connection())
def browse_config_path(self) -> None:
path, _ = QFileDialog.getOpenFileName(
self,
"Select Pipeline Config",
str(Path.cwd()),
"YAML Files (*.yaml *.yml)",
)
if not path:
return
self._config_path_edit.setText(path)
self._settings.setValue("control_config_path", path)
self._log_info(f"Selected config file {path}.")
def _server_info_for_record(
self,
record: LocalControlServerRecord,
) -> dict[str, Any] | None:
token = self._token_edit.text().strip() or None
if record.token_required and token is None:
return None
session = None
try:
session = RemotePipelineSession(
ServerConnection.from_values(record.base_url, token),
timeout=0.5,
)
return session.info()
except Exception:
return None
finally:
if session is not None:
session.close()
def _describe_discovered_record(
self,
record: LocalControlServerRecord,
) -> str:
parts = [record.base_url, f"pid {record.pid}"]
if record.token_required:
parts.append("token")
info = self._server_info_for_record(record)
if info is not None:
parts.append(str(info.get("state", "unknown")).upper())
config_path = info.get("config_path")
if config_path:
parts.append(Path(config_path).name)
return " | ".join(parts)
def discover_servers(self) -> None:
self._discovered_records = discover_local_servers()
current_base_url = (
self._session.connection.base_url
if self._session is not None
else None
)
self._discovered_combo.clear()
if not self._discovered_records:
self._discovered_combo.addItem("No local servers discovered")
self._discovered_combo.setEnabled(False)
self._log_info("Discovered 0 local servers.")
return
self._discovered_combo.setEnabled(True)
selected_index = 0
for index, record in enumerate(self._discovered_records):
self._discovered_combo.addItem(
self._describe_discovered_record(record)
)
if current_base_url == record.base_url:
selected_index = index
self._discovered_combo.setCurrentIndex(selected_index)
self._log_info(
f"Discovered {len(self._discovered_records)} local servers."
)
def connect_selected_server(self) -> None:
record = self._selected_discovered_record()
if record is None:
self._show_error(
"Connect Failed", "Select a discovered server first."
)
return
self._url_edit.setText(record.base_url)
if record.token_required and not self._token_edit.text().strip():
self._show_error(
"Connect Failed",
"The selected server requires a bearer token.",
)
return
self._connect(
ServerConnection.from_values(
record.base_url, self._token_edit.text()
)
)
def _read_launch_log(self) -> str:
if self._launch_log_path is None:
return ""
try:
text = self._launch_log_path.read_text(
encoding="utf-8",
errors="replace",
).strip()
except Exception:
return ""
if len(text) <= 8000:
return text
return "...\n" + text[-8000:]
def launch_local_server(self) -> None:
config_path = self._config_path_edit.text().strip()
if not config_path:
self._show_error("Launch Failed", "Choose a config file first.")
return
config_file = Path(config_path).expanduser().resolve()
if not config_file.is_file():
self._show_error(
"Launch Failed", f"Config file not found: {config_file}"
)
return
connection = self._build_connection()
if not connection.is_local:
self._show_error(
"Launch Failed",
"The minimal control GUI only launches local servers.",
)
return
parsed = urlparse(connection.base_url)
host = parsed.hostname or "127.0.0.1"
port = parsed.port or 8765
self._settings.setValue("control_config_path", str(config_file))
with tempfile.NamedTemporaryFile(
prefix="shmpipeline-control-gui-",
suffix=".log",
delete=False,
) as handle:
self._launch_log_path = Path(handle.name)
command = [
sys.executable,
"-m",
"shmpipeline.cli",
"--log-level",
"ERROR",
"serve",
str(config_file),
"--host",
host,
"--port",
str(port),
]
token = connection.token
if token is not None:
command.extend(["--token", token])
self._log_info(
f"Launching local server at http://{host}:{port} using {config_file}."
)
try:
with self._launch_log_path.open(
"w", encoding="utf-8"
) as log_handle:
process = subprocess.Popen(
command,
stdout=log_handle,
stderr=subprocess.STDOUT,
text=True,
)
except Exception as exc:
self._show_error("Launch Failed", exc)
return
deadline = time.monotonic() + 5.0
last_error: Exception | None = None
while time.monotonic() < deadline:
if process.poll() is not None:
log_text = self._read_launch_log()
message = "The launched local control server exited before it became ready."
if log_text:
message += f"\n\nServer log:\n{log_text}"
self._show_error("Launch Failed", message)
self.discover_servers()
return
session = None
try:
session = RemotePipelineSession(connection, timeout=0.5)
session.info()
except Exception as exc:
last_error = exc
if session is not None:
session.close()
time.sleep(0.1)
continue
else:
session.close()
self.discover_servers()
self._connect(connection, announce=False)
self._log_info(
f"Local server is ready at {connection.display_name}."
)
return
process.terminate()
try:
process.wait(timeout=2.0)
except Exception:
process.kill()
process.wait(timeout=2.0)
log_text = self._read_launch_log()
message = "Timed out waiting for the launched local control server to accept connections."
if last_error is not None:
message += f"\n\nLast connection error: {last_error}"
if log_text:
message += f"\n\nServer log:\n{log_text}"
self._show_error("Launch Failed", message)
self.discover_servers()
def kill_selected_server(self) -> None:
record = self._selected_discovered_record()
if record is None:
self._show_error(
"Kill Failed", "Select a discovered server first."
)
return
try:
terminate_local_server(record)
except Exception as exc:
self._show_error("Kill Failed", exc)
return
if (
self._session is not None
and self._session.connection.base_url == record.base_url
):
self.disconnect_from_server(announce=False)
self._log_info(
f"Sent termination signal to {record.base_url} (pid {record.pid})."
)
QTimer.singleShot(500, self.discover_servers)
def disconnect_from_server(self, *, announce: bool = True) -> None:
if self._session is None:
self._server_value_label.setText(self._configured_server_name())
self._config_value_label.setText("N/A")
self._set_connection_status("DISCONNECTED")
self._set_state_display("DISCONNECTED")
return
base_url = self._session.connection.base_url
try:
self._session.close()
finally:
self._session = None
self._reported_failures.clear()
self._last_refresh_error = None
self._server_value_label.setText(self._configured_server_name())
self._config_value_label.setText("N/A")
self._set_connection_status("DISCONNECTED")
self._set_state_display("DISCONNECTED")
if announce:
self._log_info(f"Disconnected from {base_url}.")
def change_server_config(self) -> None:
config_path = self._config_path_edit.text().strip()
if not config_path:
self._show_error(
"Change Config Failed", "Choose a config file first."
)
return
config_file = Path(config_path).expanduser().resolve()
if not config_file.is_file():
self._show_error(
"Change Config Failed",
f"Config file not found: {config_file}",
)
return
if self._session is None and not self._connect(
self._build_connection(), announce=False
):
return
try:
assert self._session is not None
payload = self._session.load_document_path(str(config_file))
except Exception as exc:
self._show_error("Change Config Failed", exc)
return
self._settings.setValue("control_config_path", str(config_file))
self._log_info(
"Server config changed to "
f"{payload.get('config_path', str(config_file))}."
)
self.discover_servers()
self.refresh_status()
def _set_state_display(self, state: str) -> None:
self._state_value = str(state).upper()
self._state_badge.setText(self._state_value)
self._apply_state_badge_style(self._state_value)
if self._state_value == "RUNNING":
self._primary_button.setText("PAUSE")
self._primary_button.setEnabled(True)
elif self._state_value == "PAUSED":
self._primary_button.setText("RESUME")
self._primary_button.setEnabled(True)
elif self._state_value in {"FAILED", "CONNECTION ERROR"}:
self._primary_button.setText("START")
self._primary_button.setEnabled(False)
else:
self._primary_button.setText("START")
self._primary_button.setEnabled(True)
connected = self._state_value not in {
"DISCONNECTED",
"CONNECTION ERROR",
}
self._stop_button.setEnabled(connected)
self._teardown_button.setEnabled(connected)
def _apply_state_badge_style(self, state: str) -> None:
palette = {
"RUNNING": (self._theme.success_bg, self._theme.success),
"PAUSED": (self._theme.highlight, self._theme.highlight_text),
"FAILED": (self._theme.error_bg, self._theme.error),
"CONNECTION ERROR": (self._theme.error_bg, self._theme.error),
"INITIALIZED": (self._theme.alternate_base, self._theme.accent),
"BUILT": (self._theme.alternate_base, self._theme.accent),
"STOPPED": (self._theme.button, self._theme.muted_text),
"DISCONNECTED": (self._theme.button, self._theme.muted_text),
}
background, color = palette.get(
state,
(self._theme.alternate_base, self._theme.text),
)
self._state_badge.setStyleSheet(
"QLabel {"
f" background-color: {background};"
f" color: {color};"
f" border: 1px solid {self._theme.border};"
" border-radius: 10px;"
" font-size: 28px;"
" font-weight: 700;"
" letter-spacing: 1px;"
" padding: 12px;"
"}"
)
self._state_hint_label.setStyleSheet(
f"color: {self._theme.muted_text};"
)
def _report_failures(self, status: dict[str, Any]) -> None:
active_failures: set[tuple[str, str]] = set()
for failure in status.get("failures", []):
kernel = str(failure.get("kernel") or "unknown")
error = str(failure.get("error") or "unknown error")
key = (kernel, error)
active_failures.add(key)
if key not in self._reported_failures:
self._show_error("Pipeline Error", f"{kernel}: {error}")
self._reported_failures = active_failures
def refresh_status(self) -> None:
if self._session is None:
self._server_value_label.setText(self._configured_server_name())
self._config_value_label.setText("N/A")
self._set_connection_status("DISCONNECTED")
self._set_state_display("DISCONNECTED")
return
try:
status = self._session.status()
except Exception as exc:
message = str(exc)
if message != self._last_refresh_error:
self._log_error("Status Refresh Failed", message)
self._last_refresh_error = message
self._server_value_label.setText(
self._session.connection.display_name
)
self._config_value_label.setText("UNAVAILABLE")
self._set_connection_status("ERROR")
self._set_state_display("CONNECTION ERROR")
return
self._last_refresh_error = None
self._report_failures(status)
state = str(status.get("state", "unknown")).upper()
config_path = str(status.get("config_path") or "N/A")
self._server_value_label.setText(self._session.connection.display_name)
self._config_value_label.setText(config_path)
self._set_connection_status("CONNECTED")
self._set_state_display(state)
def _ensure_session(self) -> bool:
if self._session is not None:
return True
return self._connect(self._build_connection(), announce=False)
def advance_state_machine(self) -> None:
if not self._ensure_session():
return
try:
assert self._session is not None
state = self._session.state.value.upper()
if state == "RUNNING":
self._session.pause()
self._log_info("Pipeline paused.")
elif state == "PAUSED":
self._session.resume()
self._log_info("Pipeline resumed.")
else:
self._session.start()
self._log_info("Pipeline started.")
except Exception as exc:
self._show_error("State Change Failed", exc)
return
self.refresh_status()
def stop_pipeline(self) -> None:
if not self._ensure_session():
return
try:
assert self._session is not None
self._session.stop(force=True)
except Exception as exc:
self._show_error("Stop Failed", exc)
return
self._log_info("Pipeline stopped.")
self.refresh_status()
def teardown_pipeline(self) -> None:
if not self._ensure_session():
return
try:
assert self._session is not None
self._session.shutdown(force=True)
except Exception as exc:
self._show_error("Teardown Failed", exc)
return
self._log_info("Pipeline torn down.")
self.refresh_status()
def closeEvent(
self,
event: QCloseEvent,
) -> None: # pragma: no cover - GUI runtime only
self._status_timer.stop()
self.disconnect_from_server(announce=False)
super().closeEvent(event)
[docs]
def main() -> int:
"""Launch the minimal control GUI."""
app = QApplication.instance() or QApplication(sys.argv)
app.setApplicationName("shmpipeline Control GUI")
app.setOrganizationName("shmpipeline")
window = ControlWindow()
window.show()
return app.exec()
if __name__ == "__main__": # pragma: no cover - GUI entry point
raise SystemExit(main())