"""FastAPI-style dependency markers and resolution for Streamlit page handlers."""
from __future__ import annotations
import asyncio
import contextvars
import inspect
import json
import os
import threading
from collections.abc import Callable, Mapping
from typing import Annotated, Any, ForwardRef, get_args, get_origin, get_type_hints
from fluxlit.client import ApiClient
from fluxlit.config import FluxlitSettings
from fluxlit.url_session import SessionStore
_header_context: contextvars.ContextVar[Mapping[str, str] | None] = contextvars.ContextVar(
"fluxlit_page_headers",
default=None,
)
_cookie_context: contextvars.ContextVar[Mapping[str, str] | None] = contextvars.ContextVar(
"fluxlit_page_cookies",
default=None,
)
_HeaderCtxToken = contextvars.Token[Mapping[str, str] | None]
_CookieCtxToken = contextvars.Token[Mapping[str, str] | None]
def set_page_header_context(headers: Mapping[str, str] | None) -> _HeaderCtxToken:
"""Set optional header map for :class:`Header` injection (tests or future gateway hook)."""
return _header_context.set(headers)
def reset_page_header_context(token: _HeaderCtxToken) -> None:
_header_context.reset(token)
def set_page_cookie_context(cookies: Mapping[str, str] | None) -> _CookieCtxToken:
"""Set optional cookie map for :class:`Cookie` injection (tests or trusted hooks)."""
return _cookie_context.set(cookies)
def reset_page_cookie_context(token: _CookieCtxToken) -> None:
_cookie_context.reset(token)
[docs]
class Depends:
"""Declare a page dependency callable ``( -> T)`` resolved before the handler runs."""
def __init__(
self,
dependency: Callable[..., Any] | None = None,
*,
use_cache: bool = True,
) -> None:
self.dependency = dependency
self.use_cache = use_cache
[docs]
class Cookie:
"""Inject a cookie value by name.
Values come from overrides, :func:`set_page_cookie_context`, or ``st.context.cookies``.
"""
def __init__(self, name: str) -> None:
self.name = name.lower()
def env_page_overrides() -> dict[str, Any]:
"""Parse ``FLUXLIT_TEST_PAGE_OVERRIDES`` JSON (used by tests to inject deps)."""
raw = os.environ.get("FLUXLIT_TEST_PAGE_OVERRIDES", "").strip()
if not raw:
return {}
try:
data = json.loads(raw)
except json.JSONDecodeError:
return {}
return dict(data) if isinstance(data, dict) else {}
def _strip_annotated(tp: Any) -> Any:
origin = get_origin(tp)
if origin is Annotated:
return get_args(tp)[0]
return tp
def _unwrap_depends(param: inspect.Parameter, hints: dict[str, Any], name: str) -> Depends | None:
ann = hints.get(name, param.annotation)
if ann is inspect.Parameter.empty:
ann = Any
origin = get_origin(ann)
if origin is Annotated:
for meta in get_args(ann)[1:]:
if isinstance(meta, Depends):
return meta
if isinstance(param.default, Depends):
return param.default
return None
def _unwrap_header(param: inspect.Parameter, hints: dict[str, Any], name: str) -> Header | None:
ann = hints.get(name, param.annotation)
origin = get_origin(ann)
if origin is Annotated:
for meta in get_args(ann)[1:]:
if isinstance(meta, Header):
return meta
return None
def _unwrap_cookie(param: inspect.Parameter, hints: dict[str, Any], name: str) -> Cookie | None:
ann = hints.get(name, param.annotation)
origin = get_origin(ann)
if origin is Annotated:
for meta in get_args(ann)[1:]:
if isinstance(meta, Cookie):
return meta
return None
def _call_kw_for_fn(fn: Callable[..., Any], kw: dict[str, Any]) -> dict[str, Any]:
sig = inspect.signature(fn)
if any(p.kind == inspect.Parameter.VAR_KEYWORD for p in sig.parameters.values()):
return dict(kw)
return {k: v for k, v in kw.items() if k in sig.parameters}
def _header_from_streamlit_context(st: Any, name_lc: str) -> str | None:
"""Best-effort read from ``st.context.headers`` (Streamlit 1.30+)."""
try:
proxy = getattr(st, "context", None)
headers = getattr(proxy, "headers", None) if proxy is not None else None
if headers is None:
return None
get = getattr(headers, "get", None)
if get is None:
return None
v = get(name_lc)
if v is not None:
return str(v)
items = getattr(headers, "items", None)
if callable(items):
for k, val in items():
if str(k).lower() == name_lc:
return str(val)
except Exception:
return None
return None
def _cookie_from_streamlit_context(st: Any, name_lc: str) -> str | None:
"""Best-effort read from ``st.context.cookies`` (Streamlit 1.30+)."""
try:
proxy = getattr(st, "context", None)
cookies = getattr(proxy, "cookies", None) if proxy is not None else None
if cookies is None:
return None
get = getattr(cookies, "get", None)
if get is not None and callable(get):
v = get(name_lc)
if v is not None:
return str(v)
items = getattr(cookies, "items", None)
if callable(items):
for k, val in items():
if str(k).lower() == name_lc:
return str(val)
except Exception:
return None
return None
def _resolve_depends_callable(dependency: Callable[..., Any], *, app: Any) -> Any:
allow_async = bool(getattr(app.settings, "async_page_depends", False))
def _run_coro_no_loop(coro: Any) -> Any:
import anyio
async def _await() -> Any:
return await coro
return anyio.run(_await)
def _run_coro_isolated_thread(coro: Any) -> Any:
"""Run *coro* on a fresh event loop in a short-lived daemon thread.
Avoids ``RuntimeError`` when Streamlit (or another caller) already has a running
loop on this thread while ``anyio.run`` / ``asyncio.run`` cannot nest.
If :class:`TimeoutError` is raised after the join limit, cooperative cancellation
is not applied; the daemon thread may still complete the coroutine in the background.
"""
result: list[Any] = []
error: list[BaseException] = []
def _worker() -> None:
try:
async def _await_one() -> Any:
return await coro
result.append(asyncio.run(_await_one()))
except BaseException as exc: # noqa: BLE001 — propagate dependency failures
error.append(exc)
th = threading.Thread(target=_worker, name="fluxlit-async-dep", daemon=True)
th.start()
th.join(timeout=300.0)
if th.is_alive():
# The worker keeps running on its own event loop until the coroutine finishes;
# the thread is a daemon. Avoid unbounded work inside async deps.
msg = "async Depends resolution timed out"
raise TimeoutError(msg)
if error:
raise error[0]
return result[0]
def _await_async_dep(coro: Any) -> Any:
try:
asyncio.get_running_loop()
except RuntimeError:
return _run_coro_no_loop(coro)
return _run_coro_isolated_thread(coro)
if inspect.iscoroutinefunction(dependency):
if not allow_async:
msg = (
f"Depends({dependency!r}) is async; set FLUXLIT_ASYNC_PAGE_DEPENDS=1 "
"or use a synchronous callable."
)
raise TypeError(msg)
return _await_async_dep(dependency())
out = dependency()
if inspect.iscoroutine(out):
if not allow_async:
msg = (
"Depends(...) returned a coroutine; set FLUXLIT_ASYNC_PAGE_DEPENDS=1 "
"or return a plain value from the dependency."
)
raise TypeError(msg)
return _await_async_dep(out)
return out
def resolve_page_kwargs(
fn: Callable[..., Any],
*,
st: Any,
client: ApiClient,
app: Any,
overrides: Mapping[str, Any] | None = None,
) -> dict[str, Any]:
"""Build kwargs for *fn* (``st``, ``client``, typed injections, ``Depends``)."""
from fluxlit.application.public_urls import FluxLitPublicUrls
merged = {**env_page_overrides(), **dict(overrides or {})}
sig = inspect.signature(fn)
globalns = getattr(fn, "__globals__", None) or {}
hints = get_type_hints(fn, globalns=globalns, include_extras=True)
kwargs: dict[str, Any] = {}
dep_cache: dict[int, Any] = {}
for name, param in sig.parameters.items():
dep = _unwrap_depends(param, hints, name)
if dep is not None and dep.dependency is not None:
dep_fn = dep.dependency
cache_key = id(dep_fn)
if dep.use_cache and cache_key in dep_cache:
kwargs[name] = dep_cache[cache_key]
else:
resolved = _resolve_depends_callable(dep_fn, app=app)
if dep.use_cache:
dep_cache[cache_key] = resolved
kwargs[name] = resolved
continue
hdr = _unwrap_header(param, hints, name)
if hdr is not None:
if name in merged:
kwargs[name] = merged[name]
else:
ctx = _header_context.get() or {}
val = ctx.get(hdr.name)
if val is None:
val = _header_from_streamlit_context(st, hdr.name)
kwargs[name] = val
continue
ck = _unwrap_cookie(param, hints, name)
if ck is not None:
if name in merged:
kwargs[name] = merged[name]
else:
ctx = _cookie_context.get() or {}
val = ctx.get(ck.name)
if val is None:
val = _cookie_from_streamlit_context(st, ck.name)
kwargs[name] = val
continue
if name == "st":
kwargs[name] = st
continue
if name == "client":
kwargs[name] = client
continue
ann = hints.get(name, param.annotation)
base = _strip_annotated(ann)
if base is inspect.Parameter.empty:
base = Any # pragma: no cover
if isinstance(base, ForwardRef): # pragma: no cover
base = base._evaluate(globalns, globalns, frozenset()) # noqa: SLF001
if base is Any: # pragma: no cover
continue
try:
if issubclass(base, FluxlitSettings):
kwargs[name] = app.settings
continue
except TypeError:
pass
try:
if issubclass(base, FluxLitPublicUrls):
kwargs[name] = app.urls
continue
except TypeError:
pass
try:
from fluxlit.app import FluxLit as FluxLitCls
if issubclass(base, FluxLitCls):
kwargs[name] = app
continue
except TypeError:
pass
if base is ApiClient or (isinstance(base, type) and issubclass(base, ApiClient)):
kwargs[name] = client
continue
store = getattr(app, "session_store", None)
if store is not None and base is SessionStore:
kwargs[name] = store
continue
from fluxlit.pages.flags import FluxlitFeatureFlags
if base is FluxlitFeatureFlags:
env_flags = FluxlitFeatureFlags.from_environ()
if getattr(app.settings, "experimental_yield_pages", False):
kwargs[name] = FluxlitFeatureFlags(
experimental_yield_pages=(
env_flags.experimental_yield_pages
or bool(app.settings.experimental_yield_pages)
)
)
else:
kwargs[name] = env_flags
continue
return kwargs
def resolve_and_call_page(
rec: Any,
st: Any,
client: ApiClient,
app: Any,
overrides: Mapping[str, Any] | None,
) -> Any:
"""Resolve kwargs, call handler, apply returned :class:`~fluxlit.pages.meta.PageMeta`."""
from fluxlit.pages.apply_meta import apply_returned_page_meta, coerce_page_return
from fluxlit.pages.flags import FluxlitFeatureFlags
from fluxlit.pages.records import PageRecord
if not isinstance(rec, PageRecord):
msg = "resolve_and_call_page expects PageRecord"
raise TypeError(msg)
merged_overrides: dict[str, Any] = {**env_page_overrides(), **dict(overrides or {})}
kw = resolve_page_kwargs(rec.fn, st=st, client=client, app=app, overrides=merged_overrides)
call_kw = _call_kw_for_fn(rec.fn, kw)
fn = rec.fn
flags = FluxlitFeatureFlags.from_environ()
yield_pages = flags.experimental_yield_pages or bool(
getattr(app.settings, "experimental_yield_pages", False)
)
if yield_pages and inspect.isgeneratorfunction(fn):
gen = fn(**call_kw)
try:
first = next(gen)
except StopIteration:
return None
apply_returned_page_meta(st, coerce_page_return(st, first))
try:
second = next(gen)
except StopIteration:
return first
apply_returned_page_meta(st, coerce_page_return(st, second))
return second
out = fn(**call_kw)
meta = coerce_page_return(st, out)
apply_returned_page_meta(st, meta)
return out
__all__ = [
"Cookie",
"Depends",
"Header",
"env_page_overrides",
"reset_page_cookie_context",
"reset_page_header_context",
"resolve_and_call_page",
"resolve_page_kwargs",
"set_page_cookie_context",
"set_page_header_context",
]