"""HTTP client for calling the FastAPI app from Streamlit (server-side, same process host).
Typing mirrors :class:`httpx.Client` via ``httpx._types`` / ``httpx._client`` (see
``tests/test_httpx_import_contract.py``); re-run tests after **httpx** upgrades.
"""
from __future__ import annotations
import os
from collections.abc import Callable, Mapping
from typing import TypeVar
import httpx
from httpx import USE_CLIENT_DEFAULT
from httpx._client import UseClientDefault # noqa: PLC2701 — public type surface for defaults
from httpx._types import ( # noqa: PLC2701 — mirrors httpx Client.request annotations
AuthTypes,
CookieTypes,
HeaderTypes,
QueryParamTypes,
RequestContent,
RequestData,
RequestExtensions,
RequestFiles,
TimeoutTypes,
)
from pydantic import BaseModel, TypeAdapter
from fluxlit.config import JsonValue
from fluxlit.logging import REQUEST_ID_HEADER, get_request_id
T = TypeVar("T")
AuthHeaderFactory = Callable[[], Mapping[str, str]]
def _relative_api_path_or_raise(path: str) -> str:
"""Normalize ``path`` for httpx merge with ``base_url``; reject absolute URLs.
httpx treats ``http://...`` (and similar) as overriding ``base_url``, which would
send requests off the intended gateway/API host.
"""
url = path if path.startswith("/") else f"/{path}"
if "://" in url or url.startswith("//"):
msg = (
"ApiClient paths must be relative to the configured API base URL; "
f"absolute and scheme-relative fragments are rejected (got {path!r})"
)
raise ValueError(msg)
return url
[docs]
class ApiClient:
"""Sync HTTPX client scoped to your gateway-mounted API.
Paths are relative to the API base (including the ``/api`` prefix in the base URL).
For example use ``client.get("/users")``, not ``client.get("/api/users")``.
Paths must not be absolute URLs (``http://...``) or scheme-relative (``//...``);
those are rejected with :class:`ValueError` so user-controlled strings cannot bypass
``base_url`` to another host.
Use :meth:`for_fluxlit` or :meth:`with_bearer` when routes require ``Authorization``.
The runtime sets ``FLUXLIT_INTERNAL_API_BASE`` (e.g. ``http://127.0.0.1:8000/api``)
for Streamlit subprocesses so defaults work without passing ``base_url``.
"""
def __init__(
self,
base_url: str | None = None,
*,
timeout: float = 30.0,
default_headers: Mapping[str, str] | None = None,
auth_header_factory: AuthHeaderFactory | None = None,
propagate_request_id: bool = False,
) -> None:
"""
Args:
base_url: API root including mount prefix. Falls back to
``FLUXLIT_INTERNAL_API_BASE`` or ``http://127.0.0.1:8000/api``.
timeout: Per-request timeout in seconds.
default_headers: Merged into each request (caller headers override on key clash).
auth_header_factory: Callable returning headers (e.g. ``Authorization``) per request.
Use instead of putting long-lived secrets in Streamlit widget state. Per-call
``headers`` override these generated headers on key clash.
propagate_request_id: If True, send ``X-Request-ID`` when
:func:`fluxlit.logging.get_request_id` is set (usually empty in Streamlit).
"""
env_base = os.environ.get("FLUXLIT_INTERNAL_API_BASE", "").rstrip("/")
resolved = (base_url or env_base or "http://127.0.0.1:8000/api").rstrip("/")
self._resolved_base = resolved
self._timeout = timeout
self._default_headers = dict(default_headers) if default_headers else {}
self._auth_header_factory = auth_header_factory
self._propagate_request_id = propagate_request_id
self._client = httpx.Client(base_url=resolved, timeout=timeout)
[docs]
@classmethod
def for_fluxlit(
cls,
*,
bearer_token: str | None = None,
auth_header_factory: AuthHeaderFactory | None = None,
base_url: str | None = None,
timeout: float = 30.0,
default_headers: Mapping[str, str] | None = None,
propagate_request_id: bool = False,
) -> ApiClient:
"""Convenience constructor with static bearer token or factory (mutually exclusive)."""
if bearer_token is not None and auth_header_factory is not None:
msg = "Pass only one of bearer_token or auth_header_factory"
raise TypeError(msg)
factory: AuthHeaderFactory | None = auth_header_factory
if bearer_token is not None:
token = bearer_token
def factory() -> Mapping[str, str]:
return {"Authorization": f"Bearer {token}"}
return cls(
base_url=base_url,
timeout=timeout,
default_headers=default_headers,
auth_header_factory=factory,
propagate_request_id=propagate_request_id,
)
[docs]
def with_bearer(self, bearer_token: str) -> ApiClient:
"""Return a new client with the same base URL and options plus ``Authorization: Bearer``.
Use on the **injected page client** (no auth by default) when you already have a
token in ``st.session_state`` and want the same :class:`ApiClient` surface without
calling :meth:`for_fluxlit`. If this client already has an ``auth_header_factory``,
the new factory merges those headers first, then sets ``Authorization`` to this
bearer (call-time headers still win on key clash).
The returned client owns a separate ``httpx`` connection pool; close it when done
or use a ``with`` block.
"""
parent_factory = self._auth_header_factory
def factory() -> Mapping[str, str]:
merged: dict[str, str] = dict(parent_factory()) if parent_factory else {}
merged["Authorization"] = f"Bearer {bearer_token}"
return merged
return ApiClient(
self._resolved_base,
timeout=self._timeout,
default_headers=dict(self._default_headers),
auth_header_factory=factory,
propagate_request_id=self._propagate_request_id,
)
def _build_request_headers(self, headers: HeaderTypes | None) -> HeaderTypes:
merged = httpx.Headers(self._default_headers)
if self._auth_header_factory:
merged.update(self._auth_header_factory())
if headers is not None:
merged.update(headers)
if self._propagate_request_id:
rid = get_request_id()
if rid:
merged.setdefault(REQUEST_ID_HEADER, rid)
return merged
[docs]
def request(
self,
method: str,
path: str,
*,
content: RequestContent | None = None,
data: RequestData | None = None,
files: RequestFiles | None = None,
json: JsonValue | None = None,
params: QueryParamTypes | None = None,
headers: HeaderTypes | None = None,
cookies: CookieTypes | None = None,
auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT,
follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
extensions: RequestExtensions | None = None,
) -> httpx.Response:
"""Send a request; ``path`` may omit a leading slash.
Raises:
ValueError: If ``path`` is an absolute or scheme-relative URL (see class doc).
"""
url = _relative_api_path_or_raise(path)
merged_headers = self._build_request_headers(headers)
return self._client.request(
method,
url,
content=content,
data=data,
files=files,
json=json,
params=params,
headers=merged_headers,
cookies=cookies,
auth=auth,
follow_redirects=follow_redirects,
timeout=timeout,
extensions=extensions,
)
[docs]
def get(
self,
path: str,
*,
params: QueryParamTypes | None = None,
headers: HeaderTypes | None = None,
cookies: CookieTypes | None = None,
auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT,
follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
extensions: RequestExtensions | None = None,
) -> httpx.Response:
"""``GET`` request."""
return self.request(
"GET",
path,
params=params,
headers=headers,
cookies=cookies,
auth=auth,
follow_redirects=follow_redirects,
timeout=timeout,
extensions=extensions,
)
[docs]
def post(
self,
path: str,
*,
content: RequestContent | None = None,
data: RequestData | None = None,
files: RequestFiles | None = None,
json: JsonValue | None = None,
params: QueryParamTypes | None = None,
headers: HeaderTypes | None = None,
cookies: CookieTypes | None = None,
auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT,
follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
extensions: RequestExtensions | None = None,
) -> httpx.Response:
"""``POST`` request."""
return self.request(
"POST",
path,
content=content,
data=data,
files=files,
json=json,
params=params,
headers=headers,
cookies=cookies,
auth=auth,
follow_redirects=follow_redirects,
timeout=timeout,
extensions=extensions,
)
[docs]
def put(
self,
path: str,
*,
content: RequestContent | None = None,
data: RequestData | None = None,
files: RequestFiles | None = None,
json: JsonValue | None = None,
params: QueryParamTypes | None = None,
headers: HeaderTypes | None = None,
cookies: CookieTypes | None = None,
auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT,
follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
extensions: RequestExtensions | None = None,
) -> httpx.Response:
"""``PUT`` request."""
return self.request(
"PUT",
path,
content=content,
data=data,
files=files,
json=json,
params=params,
headers=headers,
cookies=cookies,
auth=auth,
follow_redirects=follow_redirects,
timeout=timeout,
extensions=extensions,
)
[docs]
def delete(
self,
path: str,
*,
params: QueryParamTypes | None = None,
headers: HeaderTypes | None = None,
cookies: CookieTypes | None = None,
auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT,
follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
extensions: RequestExtensions | None = None,
) -> httpx.Response:
"""``DELETE`` request."""
return self.request(
"DELETE",
path,
params=params,
headers=headers,
cookies=cookies,
auth=auth,
follow_redirects=follow_redirects,
timeout=timeout,
extensions=extensions,
)
[docs]
def get_model(
self,
path: str,
model: type[T],
*,
params: QueryParamTypes | None = None,
headers: HeaderTypes | None = None,
cookies: CookieTypes | None = None,
auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT,
follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
extensions: RequestExtensions | None = None,
) -> T:
"""``GET`` and parse JSON into a Pydantic model (raises on 4xx/5xx or validation).
Args:
path: Relative API path.
model: Pydantic model type for the response body.
Remaining kwargs: forwarded to :meth:`get`.
"""
response = self.get(
path,
params=params,
headers=headers,
cookies=cookies,
auth=auth,
follow_redirects=follow_redirects,
timeout=timeout,
extensions=extensions,
)
response.raise_for_status()
return TypeAdapter(model).validate_json(response.content)
[docs]
def post_model(
self,
path: str,
response_model: type[T],
*,
body: BaseModel | Mapping[str, JsonValue] | None = None,
content: RequestContent | None = None,
data: RequestData | None = None,
files: RequestFiles | None = None,
json: JsonValue | None = None,
params: QueryParamTypes | None = None,
headers: HeaderTypes | None = None,
cookies: CookieTypes | None = None,
auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT,
follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
extensions: RequestExtensions | None = None,
) -> T:
"""``POST`` JSON body and parse the response as ``response_model``.
Args:
path: Relative API path.
response_model: Pydantic model type for the response body.
body: Request JSON (from ``model_dump()`` if a :class:`~pydantic.BaseModel`).
Mutually exclusive with ``json``.
Remaining kwargs: forwarded to :meth:`post`.
"""
if body is not None and json is not None:
msg = "Pass only one of body or json"
raise TypeError(msg)
json_body: JsonValue | None
if isinstance(body, BaseModel):
json_body = body.model_dump(mode="json")
else:
json_body = dict(body) if body is not None else json
response = self.post(
path,
content=content,
data=data,
files=files,
json=json_body,
params=params,
headers=headers,
cookies=cookies,
auth=auth,
follow_redirects=follow_redirects,
timeout=timeout,
extensions=extensions,
)
response.raise_for_status()
return TypeAdapter(response_model).validate_json(response.content)
[docs]
def close(self) -> None:
"""Close the underlying HTTPX client."""
self._client.close()
def __enter__(self) -> ApiClient:
return self
def __exit__(self, *exc: object) -> None:
self.close()