Source code for fluxlit.runtime.process_control

"""PID file helpers, process liveness checks, and graceful shutdown of ``run_unified`` stacks."""

from __future__ import annotations

import contextlib
import os
import signal
import subprocess
import sys
import time
from pathlib import Path

from fluxlit.runtime.constants import DEFAULT_PIDFILE_NAME


[docs] def default_pidfile_path(explicit: Path | None = None) -> Path: """Path for ``fluxlit dev|run`` PID file (current directory unless overridden).""" if explicit is not None: return Path(explicit).expanduser() env = os.environ.get("FLUXLIT_PIDFILE", "").strip() if env: return Path(env).expanduser() return Path.cwd() / DEFAULT_PIDFILE_NAME
def _write_pidfile(path: Path) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(f"{os.getpid()}\n", encoding="ascii") def _remove_pidfile(path: Path) -> None: with contextlib.suppress(FileNotFoundError): path.unlink() def _pid_is_zombie_unix(pid: int) -> bool: """True if *pid* is a zombie (defunct) — :func:`os.kill` with 0 still succeeds.""" try: out = subprocess.run( ["ps", "-p", str(pid), "-o", "stat="], capture_output=True, text=True, timeout=10, check=False, ) except (OSError, subprocess.TimeoutExpired): return False if out.returncode != 0: return False stat = (out.stdout or "").strip() return bool(stat) and stat[0] == "Z" def _pid_running(pid: int) -> bool: if sys.platform.startswith("win"): # Avoid parsing ``tasklist`` output (locale-dependent). OpenProcess alone is not # enough: a terminated child can still be opened until the parent reaps it, so we # must consult ``GetExitCodeProcess`` (``STILL_ACTIVE`` means still running). import ctypes from ctypes import wintypes kernel32 = ctypes.windll.kernel32 PROCESS_QUERY_LIMITED_INFORMATION = 0x1000 ERROR_ACCESS_DENIED = 5 STILL_ACTIVE = 259 handle = kernel32.OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, False, pid) if handle: exit_code = wintypes.DWORD() ok = int(kernel32.GetExitCodeProcess(handle, ctypes.byref(exit_code))) kernel32.CloseHandle(handle) if ok: return int(exit_code.value) == STILL_ACTIVE return True # ``GetLastError`` is often typed as ``Any`` in stubs; coerce for ``no-any-return``. return int(kernel32.GetLastError()) == ERROR_ACCESS_DENIED try: os.kill(pid, 0) except ProcessLookupError: return False except PermissionError: return True if _pid_is_zombie_unix(pid): return False return True def _windows_taskkill_tree(pid: int, *, force: bool) -> subprocess.CompletedProcess[str]: cmd = ["taskkill", "/PID", str(pid), "/T"] if force: cmd.append("/F") return subprocess.run( cmd, capture_output=True, text=True, timeout=90, check=False, )
[docs] def shutdown_unified_process( pidfile: Path | None = None, *, force: bool = False, wait_s: float = 5.0, ) -> tuple[int, str]: """Stop a stack started by :func:`fluxlit.runtime.run_unified` using its PID file. Sends ``SIGTERM`` to the recorded PID (the process running Uvicorn + supervision). On Windows, uses ``taskkill /T`` (and ``/F`` when *force* is True) instead of ``os.kill``, which does not reliably terminate arbitrary processes. If ``force`` is True on POSIX, sends ``SIGKILL`` after *wait_s* if still running. Returns: ``(exit_code, message)`` where ``exit_code`` is 0 on success, 1 on failure (still running after timeout / permission error), 2 if the pidfile is missing. """ path = default_pidfile_path(pidfile) if not path.is_file(): return 2, f"No pid file at {path}" try: raw = path.read_text(encoding="ascii").strip() pid = int(raw) except (OSError, ValueError): path.unlink(missing_ok=True) return 0, f"Removed invalid pid file at {path}" if not _pid_running(pid): path.unlink(missing_ok=True) return 0, f"Removed stale pid file (pid {pid} not running)" if sys.platform.startswith("win"): # Prefer `os.kill(..., SIGTERM)` first: it reliably terminates Python processes # (including the ones spawned in our tests). Fall back to taskkill for non-Python # or permission edge cases. try: os.kill(pid, signal.SIGTERM) except ProcessLookupError: path.unlink(missing_ok=True) return 0, f"Process {pid} exited before signal was delivered" except Exception: tk = _windows_taskkill_tree(pid, force=False) combined = f"{tk.stdout or ''}{tk.stderr or ''}" if tk.returncode != 0: lowered = combined.lower() if ( "could not find" in lowered or "not found" in lowered or "not running" in lowered ): path.unlink(missing_ok=True) return 0, f"Process {pid} exited before signal was delivered" else: try: os.kill(pid, signal.SIGTERM) except ProcessLookupError: path.unlink(missing_ok=True) return 0, f"Process {pid} exited before signal was delivered" except PermissionError as e: return 1, f"Cannot signal pid {pid}: {e}" deadline = time.monotonic() + wait_s while time.monotonic() < deadline: if not _pid_running(pid): path.unlink(missing_ok=True) return 0, f"Stopped process {pid}" time.sleep(0.05) if sys.platform.startswith("win") and not force: # If SIGTERM didn't work, escalate with taskkill /F (same behavior as --force on # POSIX where we follow SIGTERM with SIGKILL). _windows_taskkill_tree(pid, force=True) t_escalate = time.monotonic() + 2.0 while time.monotonic() < t_escalate: if not _pid_running(pid): path.unlink(missing_ok=True) return 0, f"Stopped process {pid}" time.sleep(0.05) if force: if sys.platform.startswith("win"): _windows_taskkill_tree(pid, force=True) else: with contextlib.suppress(ProcessLookupError): os.kill(pid, signal.SIGKILL) t2 = time.monotonic() + 2.0 while time.monotonic() < t2: if not _pid_running(pid): path.unlink(missing_ok=True) return 0, f"Killed process {pid}" time.sleep(0.05) if not _pid_running(pid): path.unlink(missing_ok=True) return 0, f"Stopped process {pid}" return 1, f"Process {pid} still running after {wait_s:.1f}s (try --force)"