Source code for fluxlit.runtime.uvicorn_runner

"""CLI-style entry: spawn Streamlit sidecar, optional reload watcher, run Uvicorn."""

from __future__ import annotations

import contextlib
import os
import subprocess
import sys
import threading
from pathlib import Path
from typing import Any

import uvicorn

from fluxlit.gateway import build_gateway, normalize_root_mount
from fluxlit.runtime.constants import _STREAMLIT_TCP_WAIT_S
from fluxlit.runtime.import_target import find_free_port, internal_api_base_url, load_fluxlit
from fluxlit.runtime.process_control import _remove_pidfile, _write_pidfile, default_pidfile_path
from fluxlit.runtime.public_mount import _inject_public_root_path
from fluxlit.runtime.reload_watcher import _start_streamlit_reload_watcher
from fluxlit.runtime.streamlit_proc import (
    _build_streamlit_cmd,
    _build_streamlit_env,
    _StreamlitPopenKwargs,
    _terminate_process,
)
from fluxlit.runtime.upstream_state import (
    STREAMLIT_UPSTREAM_FILE_ENV,
    read_streamlit_upstream_url,
    update_streamlit_upstream_file,
    write_streamlit_upstream_state,
)
from fluxlit.runtime.wait_tcp import _invoke_wait_for_tcp

_STREAMLIT_MAIN = Path(__file__).resolve().parent.parent / "streamlit" / "main.py"


[docs] def run_unified( target: str, *, host: str = "127.0.0.1", port: int = 8000, reload: bool = False, reload_scope: str = "gateway", log_level: str = "info", proxy_headers: bool = False, forwarded_allow_ips: str | None = None, pidfile: Path | None = None, write_pidfile: bool = True, workbench_mode: bool = False, ) -> None: """Start Streamlit on a free localhost port and Uvicorn on ``host:port``. Sets process environment so ``create_gateway_app`` / Streamlit entry can resolve the app and internal API base (loopback-safe URL derived from ``host``, ``port``, and ``api_mount_path``). If Streamlit exits, the gateway is stopped. On shutdown, the Streamlit child receives SIGINT / terminate / kill (platform-dependent). Args: target: ``module:fluxlit_instance`` import path. host: Uvicorn bind host. port: Uvicorn bind port (public). reload: If True, use Uvicorn reload with :func:`create_gateway_app`. reload_scope: ``gateway`` (default) or ``full``. ``full`` also restarts Streamlit when watched files change (requires ``watchfiles``); see CLI ``--reload-scope``. log_level: Uvicorn log level. proxy_headers: Forwarded to :class:`uvicorn.Config`. forwarded_allow_ips: Forwarded to :class:`uvicorn.Config`. pidfile: Optional explicit path for the PID file (see :func:`default_pidfile_path`). write_pidfile: If False, do not create a PID file (also skipped when ``FLUXLIT_NO_PIDFILE`` is ``1`` / ``true`` / ``yes``). workbench_mode: If True, print a Workbench/Connect-oriented startup banner and force ``proxy_headers`` on for Uvicorn (still merge ``forwarded_allow_ips``). """ streamlit_port = find_free_port() runner = _STREAMLIT_MAIN fl = load_fluxlit(target) scope = (reload_scope or "gateway").strip().lower() if reload and scope not in {"gateway", "full"}: msg = "reload_scope must be 'gateway' or 'full'" raise ValueError(msg) api_prefix = fl.settings.api_mount_path internal_api_base = internal_api_base_url(bind_host=host, port=port, api_mount_path=api_prefix) mount = normalize_root_mount(fl.settings.public_mount_path()) if fl.settings.debug: sys.stderr.write( f"[fluxlit-debug] internal_api_base={internal_api_base!r} api_prefix={api_prefix!r} " f"public_mount={mount!r} proxy_headers={proxy_headers!r} " f"trust_proxy={fl.settings.trust_proxy!r} workbench_mode={workbench_mode!r}\n" ) sys.stderr.flush() use_proxy = proxy_headers or fl.settings.trust_proxy or workbench_mode allow_ips = forwarded_allow_ips if use_proxy and allow_ips is None: allow_ips = fl.settings.forwarded_allow_ips or "*" env = _build_streamlit_env( target=target, api_prefix=api_prefix, internal_api_base=internal_api_base, ) cmd = _build_streamlit_cmd( runner=runner, port=streamlit_port, base_url_path=fl.settings.public_mount_path(), extra_cli_args=fl.settings.streamlit_run_cli_args, ) popen_kwargs: _StreamlitPopenKwargs = {"env": env} if sys.platform.startswith("win"): # New process group so we can send CTRL_BREAK_EVENT. popen_kwargs["creationflags"] = getattr(subprocess, "CREATE_NEW_PROCESS_GROUP") # noqa: B009 else: popen_kwargs["start_new_session"] = True # pragma: no cover proc_box: list[subprocess.Popen[bytes]] = [subprocess.Popen(cmd, **popen_kwargs)] streamlit_restart_lock = threading.Lock() streamlit_reload_state: dict[str, bool] = {"intentional": False} streamlit_reload_done = threading.Event() pidfile_path: Path | None = None pidfile_written = False upstream_state_path: Path | None = None no_pf = os.environ.get("FLUXLIT_NO_PIDFILE", "").strip().lower() in {"1", "true", "yes"} try: _invoke_wait_for_tcp("127.0.0.1", streamlit_port, timeout_s=_STREAMLIT_TCP_WAIT_S) upstream = f"http://127.0.0.1:{streamlit_port}" os.environ["FLUXLIT_APP"] = target os.environ["FLUXLIT_API_PREFIX"] = api_prefix upstream_state_path = write_streamlit_upstream_state(upstream) if write_pidfile and not no_pf: pidfile_path = default_pidfile_path(pidfile) _write_pidfile(pidfile_path) pidfile_written = True if workbench_mode: from fluxlit.runtime.workbench import format_workbench_startup_message sys.stderr.write( format_workbench_startup_message( app_title=fl.settings.title, bind_host=host, bind_port=port, root_mount_norm=mount, api_mount_path=api_prefix, public_base_url=fl.settings.public_base_url, proxy_headers_on=use_proxy, ) ) sys.stderr.flush() if reload: if scope == "full": sys.stderr.write( "[fluxlit] --reload --reload-scope=full: Uvicorn reload plus Streamlit " "restart on file changes (best-effort; WebSockets reconnect).\n" ) else: sys.stderr.write( "[fluxlit] --reload --reload-scope=gateway: only the API gateway reloads; " "Streamlit does not. Use --reload-scope=full to restart Streamlit too.\n" ) sys.stderr.flush() _grace = fl.settings.uvicorn_graceful_shutdown_timeout_s # Uvicorn's Config accepts many optional kwargs; keep a loose dict for forward compat. _reload_kw: dict[str, Any] = { "host": host, "port": port, "factory": True, "reload": True, "log_level": log_level, "root_path": "", "proxy_headers": use_proxy, "forwarded_allow_ips": allow_ips, } if _grace is not None: _reload_kw["timeout_graceful_shutdown"] = _grace config = uvicorn.Config("fluxlit.runtime:create_gateway_app", **_reload_kw) else: _grace = fl.settings.uvicorn_graceful_shutdown_timeout_s # Same as reload branch: mirror uvicorn.Config keyword surface without pinning stubs. _plain_kw: dict[str, Any] = { "host": host, "port": port, "log_level": log_level, "root_path": "", "proxy_headers": use_proxy, "forwarded_allow_ips": allow_ips, } if _grace is not None: _plain_kw["timeout_graceful_shutdown"] = _grace config = uvicorn.Config( _inject_public_root_path( build_gateway( fl.api, "", upstream_resolver=read_streamlit_upstream_url, access_log=fl.settings.enable_gateway_access_log, api_prefix=fl.settings.api_mount_path, root_mount=mount, proxy_settings=fl.settings, ), mount, ), **_plain_kw, ) server = uvicorn.Server(config) def monitor_streamlit() -> None: while not server.should_exit: p = proc_box[0] code = p.wait() with streamlit_restart_lock: reloading = streamlit_reload_state["intentional"] if reloading: if streamlit_reload_done.wait(timeout=_STREAMLIT_TCP_WAIT_S): streamlit_reload_done.clear() with streamlit_restart_lock: streamlit_reload_state["intentional"] = False continue if not server.should_exit: sys.stderr.write( f"[fluxlit] Streamlit exited (code={code}); stopping gateway.\n" ) sys.stderr.flush() server.should_exit = True return threading.Thread(target=monitor_streamlit, daemon=True).start() if reload and scope == "full" and upstream_state_path is not None: path_for_updates = upstream_state_path def restart_streamlit_sidecar() -> None: try: with streamlit_restart_lock: streamlit_reload_state["intentional"] = True streamlit_reload_done.clear() _terminate_process(proc_box[0]) new_port = find_free_port() cmd_local = _build_streamlit_cmd( runner=runner, port=new_port, base_url_path=fl.settings.public_mount_path(), extra_cli_args=fl.settings.streamlit_run_cli_args, ) new_proc = subprocess.Popen(cmd_local, **popen_kwargs) proc_box[0] = new_proc try: _invoke_wait_for_tcp("127.0.0.1", new_port, timeout_s=_STREAMLIT_TCP_WAIT_S) except TimeoutError: _terminate_process(new_proc) sys.stderr.write( "[fluxlit] Streamlit sidecar reload failed: timed out waiting for the " "new process to listen; restart `fluxlit dev` (or `fluxlit run`).\n" ) sys.stderr.flush() if not server.should_exit: server.should_exit = True return new_upstream = f"http://127.0.0.1:{new_port}" update_streamlit_upstream_file(path_for_updates, new_upstream) finally: with streamlit_restart_lock: streamlit_reload_done.set() _start_streamlit_reload_watcher( restart_streamlit_sidecar, debounce_s=0.25, stop_flag=lambda: server.should_exit, ) server.run() finally: if upstream_state_path is not None: with contextlib.suppress(OSError): upstream_state_path.unlink(missing_ok=True) if STREAMLIT_UPSTREAM_FILE_ENV in os.environ: del os.environ[STREAMLIT_UPSTREAM_FILE_ENV] if pidfile_written and pidfile_path is not None: _remove_pidfile(pidfile_path) _terminate_process(proc_box[0])