"""URL-bound server session continuity (Phase 2 follow-on — no HTTP cookies).
Use an opaque query parameter (default ``fluxlit_sid``) plus a :class:`SessionStore`
implementation to survive **full browser reloads** without relying on the browser
cookie jar. The gateway already forwards path + query to Streamlit.
**Production:** replace :class:`InMemorySessionStore` with a shared store (Redis, etc.)
that implements the same protocol. In-memory is **single-process only** (typical
``fluxlit dev`` / one replica).
See the **URL session continuity** user guide for security (HTTPS, link leakage, TTL)
and multipage patterns.
"""
from __future__ import annotations
import logging
import secrets
import time
from collections.abc import Mapping, MutableMapping
from typing import Protocol, TypeVar, cast, runtime_checkable
from pydantic import BaseModel, ValidationError
from fluxlit.config import FluxlitSettings, JsonValue
from fluxlit.runtime.env_parse import truthy_env
from fluxlit.streamlit.facade import StreamlitSessionFacade
ModelT = TypeVar("ModelT", bound=BaseModel)
_log = logging.getLogger("fluxlit.url_session")
_FALLBACK_URL_SESSION_PARAM = "fluxlit_sid"
def _default_url_session_param() -> str:
"""Resolve the URL-session query key from :class:`~fluxlit.config.FluxlitSettings`."""
p = (FluxlitSettings().url_session_query_param or "").strip()
return p or _FALLBACK_URL_SESSION_PARAM
def _resolve_url_session_param(param: str | None) -> str:
if param is not None:
return param
return _default_url_session_param()
[docs]
@runtime_checkable
class SessionStore(Protocol):
"""Server-side persistence for URL-bound session blobs (JSON-serializable dicts)."""
[docs]
def get(self, session_id: str) -> dict[str, JsonValue] | None:
"""Return the stored dict or ``None`` if missing/expired."""
[docs]
def set(
self,
session_id: str,
data: dict[str, JsonValue],
*,
ttl_seconds: float | None = None,
) -> None:
"""Persist *data* for *session_id*; optional time-to-live in seconds."""
[docs]
def delete(self, session_id: str) -> None:
"""Remove *session_id* from the store if present."""
[docs]
class InMemorySessionStore:
"""Process-local :class:`SessionStore` (dev / single replica).
Not safe across multiple Uvicorn workers or horizontal replicas without a shared
backend.
"""
def __init__(
self,
*,
max_entries: int = 5000,
default_ttl_seconds: float | None = 86_400.0,
) -> None:
self._max_entries = max(1, max_entries)
self._default_ttl = default_ttl_seconds
# session_id -> (payload, deadline_monotonic or None if no ttl)
self._data: dict[str, tuple[dict[str, JsonValue], float | None]] = {}
def _evict_expired(self) -> None:
now = time.monotonic()
dead = [k for k, (_, d) in self._data.items() if d is not None and d <= now]
for k in dead:
self._data.pop(k, None)
def _trim_size(self) -> None:
while len(self._data) > self._max_entries:
# Dict iteration order: pop oldest insertion (not LRU by last access).
self._data.pop(next(iter(self._data)))
[docs]
def get(self, session_id: str) -> dict[str, JsonValue] | None:
self._evict_expired()
ent = self._data.get(session_id)
if not ent:
return None
payload, deadline = ent
if deadline is not None and deadline <= time.monotonic():
self._data.pop(session_id, None)
return None
return dict(payload)
[docs]
def set(
self,
session_id: str,
data: dict[str, JsonValue],
*,
ttl_seconds: float | None = None,
) -> None:
"""Persist *data* for *session_id*; optional time-to-live in seconds.
TTL semantics: ``ttl_seconds`` is ``None`` → use ``default_ttl_seconds`` (which may
itself be ``None`` for no expiry). A **positive** value sets a monotonic deadline.
``ttl_seconds`` of ``0`` does **not** mean "expire immediately"; it is treated like
"no TTL window" for this write (same as a negative value would be if passed, though
callers should use ``None`` for no expiry).
"""
self._evict_expired()
ttl = self._default_ttl if ttl_seconds is None else ttl_seconds
deadline: float | None = None
if ttl is not None and ttl > 0:
deadline = time.monotonic() + float(ttl)
self._data[session_id] = (dict(data), deadline)
self._trim_size()
[docs]
def delete(self, session_id: str) -> None:
self._data.pop(session_id, None)
[docs]
def new_session_id() -> str:
"""Return a new URL-safe opaque session identifier (≥128 bits)."""
return secrets.token_urlsafe(32)
def _url_session_disabled() -> bool:
if truthy_env("FLUXLIT_DISABLE_URL_SESSION"):
return True
return truthy_env("FLUXLIT_TESTS") and not truthy_env("FLUXLIT_FORCE_URL_SESSION_IN_TESTS")
def _query_param_get(st: StreamlitSessionFacade, param: str) -> str | None:
qp = st.query_params
if qp is None or not hasattr(qp, "get"):
return None
raw = qp.get(param)
if raw is None:
return None
if isinstance(raw, list):
return str(raw[0]) if raw else None
return str(raw)
def _query_param_set(st: StreamlitSessionFacade, param: str, value: str) -> bool:
"""Best-effort assign *param* on ``st.query_params`` (Streamlit 1.30+)."""
qp = st.query_params
if qp is None:
return False
try:
if isinstance(qp, MutableMapping):
cast(MutableMapping[str, str], qp)[param] = value
return True
setitem = getattr(qp, "__setitem__", None)
if callable(setitem):
setitem(param, value)
return True
return False
except Exception as exc:
if truthy_env("FLUXLIT_DEBUG"):
_log.debug(
"_query_param_set: failed to set query param %r: %s",
param,
exc,
exc_info=True,
)
return False
[docs]
def hydrate_url_session(
st: StreamlitSessionFacade,
store: SessionStore,
*,
param: str | None = None,
merge: bool = True,
blob: Mapping[str, JsonValue] | None = None,
) -> str | None:
"""If ``st.query_params[param]`` is set, load the store payload into ``st.session_state``.
- **merge** (default): ``session_state.setdefault(k, v)`` for each key in the blob
so in-flight widget state wins over stale store keys on first paint.
- If the param is missing, returns ``None`` and does nothing.
- If the param is present but the store has no entry, returns the **session id**
without mutating ``session_state`` (caller may seed with :meth:`SessionStore.set`).
Returns the session id string when the query param is present, else ``None``.
"""
if _url_session_disabled():
return None
resolved = _resolve_url_session_param(param)
sid = _query_param_get(st, resolved)
if not sid:
return None
if blob is not None:
payload = dict(blob)
else:
from_store = store.get(sid)
if from_store is None:
return sid
payload = dict(from_store)
ss = st.session_state
if merge:
for k, v in payload.items():
ss.setdefault(k, v)
else:
for k, v in payload.items():
ss[k] = v
return sid
[docs]
def ensure_url_session(
st: StreamlitSessionFacade,
store: SessionStore,
*,
param: str | None = None,
initial: Mapping[str, JsonValue] | None = None,
ttl_seconds: float | None = None,
) -> str:
"""Ensure ``st.query_params[param]`` exists; mint id, seed store, set query param.
Returns the session id (existing or new). If the query param cannot be set
(read-only ``query_params``), still returns a new id and persists *initial* so
callers can surface a warning or use client-side navigation.
"""
resolved = _resolve_url_session_param(param)
if _url_session_disabled():
return _query_param_get(st, resolved) or ""
existing = _query_param_get(st, resolved)
if existing:
if initial:
cur = store.get(existing)
merged: dict[str, JsonValue] = dict(cur or {})
merged.update(initial)
store.set(existing, merged, ttl_seconds=ttl_seconds)
return existing
sid = new_session_id()
store.set(sid, dict(initial or {}), ttl_seconds=ttl_seconds)
_query_param_set(st, resolved, sid)
return sid
[docs]
def persist_url_session(
st: StreamlitSessionFacade,
store: SessionStore,
*,
param: str | None = None,
ttl_seconds: float | None = None,
) -> str | None:
"""Write current ``st.session_state`` (shallow dict copy) to the store for ``param`` id.
Returns the session id if the param is present, else ``None``.
Values are copied best-effort; remote stores should JSON-encode—non-JSON-safe
values may need app-side filtering before :meth:`SessionStore.set`.
If snapshotting ``session_state`` fails, the store is **not** updated but the
session id is still returned; a **warning** is logged (with traceback when
``FLUXLIT_DEBUG=1``).
"""
resolved = _resolve_url_session_param(param)
if _url_session_disabled():
return None
sid = _query_param_get(st, resolved)
if not sid:
return None
ss = st.session_state
snap: dict[str, JsonValue] = {}
try:
fs = getattr(ss, "filtered_state", None)
if isinstance(fs, Mapping):
src = dict(fs)
else:
src = {str(k): ss[k] for k in ss}
for k, v in src.items():
if str(k).startswith("__"):
continue
snap[str(k)] = cast(JsonValue, v)
except Exception as exc:
if truthy_env("FLUXLIT_DEBUG"):
_log.exception(
"persist_url_session: failed to snapshot session_state for param=%r sid=%s; "
"store not updated",
resolved,
sid,
)
else:
_log.warning(
"persist_url_session: failed to snapshot session_state for param=%r sid=%s; "
"store not updated: %s (set FLUXLIT_DEBUG=1 for traceback)",
resolved,
sid,
exc,
)
return sid
store.set(sid, snap, ttl_seconds=ttl_seconds)
return sid
[docs]
def hydrate_url_session_typed(
st: StreamlitSessionFacade,
store: SessionStore,
model: type[ModelT],
*,
param: str | None = None,
merge: bool = True,
strict: bool = False,
) -> tuple[str | None, ModelT | None]:
"""Like :func:`hydrate_url_session`, then validate the store payload as *model*.
Validates the store blob **before** merging into ``st.session_state`` so invalid
payloads do not partially hydrate the UI session.
Returns ``(session_id_or_none, validated_model_or_none)``. On validation failure,
when *strict* is false, returns ``(sid, None)``; when *strict* is true, raises
:class:`pydantic.ValidationError`.
"""
if _url_session_disabled():
return None, None
resolved = _resolve_url_session_param(param)
sid = _query_param_get(st, resolved)
if not sid:
return None, None
blob = store.get(sid)
if blob is None:
return sid, None
try:
validated = model.model_validate(dict(blob))
except ValidationError:
if strict:
raise
return sid, None
hydrate_url_session(
st,
store,
param=resolved,
merge=merge,
blob=validated.model_dump(),
)
return sid, validated