"""ASGI gateway factory: API dispatch, Streamlit HTTP/WS proxy, metrics, access logs."""
from __future__ import annotations
import asyncio
import os
from collections.abc import Callable
from typing import Any, cast
import httpx
from starlette.types import ASGIApp
from fluxlit.api_mount import normalize_api_mount_path
from fluxlit.config import FluxlitSettings
from fluxlit.gateway._log import gateway_log
from fluxlit.gateway.dispatch import make_gateway_app
from fluxlit.gateway.metrics import get_gateway_prom_metrics
from fluxlit.gateway.options import gateway_opts
from fluxlit.gateway.paths import normalize_root_mount
from fluxlit.logging import DEFAULT_SENSITIVE_QUERY_KEYS
[docs]
def build_gateway(
api_app: ASGIApp,
upstream_base: str,
*,
upstream_resolver: Callable[[], str] | None = None,
access_log: bool = False,
api_prefix: str = "/api",
root_mount: str = "",
proxy_settings: FluxlitSettings | None = None,
) -> ASGIApp:
"""Build the composite ASGI application used as Uvicorn's entrypoint.
Routes whose path equals ``api_prefix`` or starts with ``api_prefix/`` are
forwarded to ``api_app`` with that prefix stripped from ``path`` and ``raw_path``.
All other HTTP traffic and WebSockets are reverse-proxied to ``upstream_base``
(typically the internal Streamlit origin), or to the URL returned by
``upstream_resolver`` when that is provided (evaluated per request).
Lifespan events are delegated only to ``api_app``.
Args:
api_app: Inner FastAPI / Starlette app (mount path not included in its routes).
upstream_base: Base URL for Streamlit when ``upstream_resolver`` is unset.
upstream_resolver: If set, called for each proxied request to get the current
upstream base (e.g. after Streamlit restarts on a new port). ``upstream_base``
is ignored for proxying when this is set.
access_log: If True, emit one INFO log per request with structured ``extra`` fields.
api_prefix: Public URL prefix for the API (default ``/api``).
root_mount: Optional browser-visible path prefix when the app is published
under a subpath (e.g. Posit Connect / Workbench). Must match
:class:`~fluxlit.config.FluxlitSettings.root_path` and Streamlit
``server.baseUrlPath``. When the proxy forwards the full path, this
strip is applied before dispatch; Streamlit still receives paths that
include the prefix.
proxy_settings: Optional :class:`~fluxlit.config.FluxlitSettings` for upstream
timeouts, body limits, concurrency, and WebSocket tuning (defaults match
historical hardcoded gateway behavior when omitted).
Returns:
A callable ASGI3 application.
"""
fixed_upstream = upstream_base.rstrip("/")
opts = gateway_opts(proxy_settings)
http_client: httpx.AsyncClient | None = None
http_client_lock = asyncio.Lock()
async def _shared_httpx_client() -> httpx.AsyncClient:
nonlocal http_client
async with http_client_lock:
if http_client is None:
timeout = httpx.Timeout(opts.read_timeout, connect=opts.connect_timeout)
limits: httpx.Limits | None = None
if opts.httpx_max_connections > 0:
mk = (
opts.httpx_max_keepalive_connections
if opts.httpx_max_keepalive_connections > 0
else opts.httpx_max_connections
)
limits = httpx.Limits(
max_connections=opts.httpx_max_connections,
max_keepalive_connections=mk,
)
client_kw: dict[str, Any] = {"timeout": timeout}
if limits is not None:
client_kw["limits"] = limits
http_client = httpx.AsyncClient(**client_kw)
return http_client
async def _close_shared_httpx_client() -> None:
nonlocal http_client
async with http_client_lock:
if http_client is not None:
await http_client.aclose()
http_client = None
upstream_sem: asyncio.Semaphore | None = None
if opts.max_concurrent_upstream_http > 0:
upstream_sem = asyncio.Semaphore(opts.max_concurrent_upstream_http)
def resolve_upstream() -> str:
if upstream_resolver is not None:
return upstream_resolver().strip().rstrip("/")
return fixed_upstream
prefix = normalize_api_mount_path(api_prefix)
mount = normalize_root_mount(root_mount)
_log_qs_keys = set(DEFAULT_SENSITIVE_QUERY_KEYS)
if proxy_settings is not None:
_p = (getattr(proxy_settings, "url_session_query_param", "") or "").strip()
if _p:
_log_qs_keys.add(_p)
log_sensitive_query_keys: frozenset[str] = frozenset(_log_qs_keys)
prom_metrics = None
prom_path = "/__fluxlit/metrics"
if proxy_settings and proxy_settings.enable_gateway_prometheus_metrics:
prom_metrics = get_gateway_prom_metrics()
raw_mp = (proxy_settings.gateway_prometheus_metrics_path or "/__fluxlit/metrics").strip()
prom_path = raw_mp if raw_mp.startswith("/") else f"/{raw_mp}"
if prom_path == prefix or prom_path.startswith(f"{prefix}/"):
gateway_log.warning(
"gateway_prometheus_metrics_path must not start with api_mount_path %r; "
"disabling Prometheus metrics for this gateway build",
prefix,
)
prom_metrics = None
debug_mode = bool(proxy_settings and proxy_settings.debug)
debug_snapshot: dict[str, Any] | None = None
debug_path = "/__fluxlit/debug"
if debug_mode and proxy_settings:
if debug_path == prefix or debug_path.startswith(f"{prefix}/"):
gateway_log.warning(
"FluxLit debug endpoint %r conflicts with api_mount_path %r; disabling it",
debug_path,
prefix,
)
debug_mode = False
else:
from fluxlit.config.config_print import redact_fluxlit_settings_dict
debug_snapshot = {
"fluxlit_debug": True,
"gateway": {"api_prefix": prefix, "root_mount": mount},
"settings": redact_fluxlit_settings_dict(proxy_settings),
"streamlit_upstream_env": (
"set"
if (os.environ.get("FLUXLIT_STREAMLIT_UPSTREAM") or "").strip()
else "unset"
),
}
gateway_log.info(
"fluxlit debug mode: GET %s redacted snapshot (api_prefix=%r root_mount=%r)",
debug_path,
prefix,
mount,
)
gateway_app = make_gateway_app(
api_app=api_app,
resolve_upstream=resolve_upstream,
prefix=prefix,
mount=mount,
opts=opts,
upstream_sem=upstream_sem,
shared_httpx_client=_shared_httpx_client,
prom_metrics=prom_metrics,
prom_path=prom_path,
access_log=access_log,
log_sensitive_query_keys=log_sensitive_query_keys,
debug_mode=debug_mode,
debug_snapshot=debug_snapshot,
debug_path=debug_path,
on_lifespan_shutdown=_close_shared_httpx_client,
)
# Unified stack lifespan is handled outside the gateway; shutdown hook for httpx pool.
cast(Any, gateway_app).fluxlit_shutdown = _close_shared_httpx_client
return gateway_app