Source code for fluxlit.auth.trusted_proxy
"""Authentication helpers: forward-auth (trusted headers) and related utilities."""
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any
from fastapi import HTTPException, status
from starlette.requests import Request
AuthDependency = Callable[[Request], Any]
"""FastAPI dependency callable taking the incoming :class:`~fastapi.Request`."""
[docs]
@dataclass(frozen=True, slots=True)
class TrustedProxyUserConfig:
"""Policy for :class:`TrustedProxyUser`."""
header_name: str = "X-Remote-User"
require_https: bool = False
"""If True, require ``X-Forwarded-Proto: https`` or direct ``https`` URL scheme."""
trusted_client_hosts: frozenset[str] | None = None
"""If set, ``request.client.host`` must be in this set (use with reverse proxies)."""
require_non_empty_user: bool = True
"""If True, missing header yields 401 instead of allowing empty identity."""
[docs]
class TrustedProxyUser:
"""FastAPI dependency: trusted gateway user header with optional safety checks.
Use only when a **network path** guarantees that clients cannot spoof the header
(for example, the app listens only on loopback and nginx strips inbound identity
headers from untrusted clients).
"""
def __init__(self, config: TrustedProxyUserConfig | None = None) -> None:
self._config = config or TrustedProxyUserConfig()
def _effective_scheme(self, request: Request) -> str:
forwarded = request.headers.get("x-forwarded-proto")
if forwarded:
return forwarded.split(",")[0].strip().lower()
return request.url.scheme.lower()
def __call__(self, request: Request) -> str:
if self._config.trusted_client_hosts is not None:
client = request.client
host = client.host if client else None
if host not in self._config.trusted_client_hosts:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Untrusted client for forward-auth",
)
if self._config.require_https and self._effective_scheme(request) != "https":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="HTTPS required for forward-auth",
)
raw = request.headers.get(self._config.header_name)
if self._config.require_non_empty_user and not (raw and raw.strip()):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Missing trusted user header",
)
return (raw or "").strip()
__all__ = [
"AuthDependency",
"TrustedProxyUser",
"TrustedProxyUserConfig",
"proxy_user_header",
]