Source code for fluxlit.runtime.resolve

"""Import target and gateway bind resolution for unified / sidecar mode."""

from __future__ import annotations

import os
from typing import TYPE_CHECKING, Any, TypeAlias

if TYPE_CHECKING:
    from fluxlit.app import FluxLit

    FluxLitType: TypeAlias = FluxLit[Any]


[docs] def resolve_import_target_for_unified(fl: FluxLitType) -> str: """Resolve ``module:attr`` for the Streamlit child and import stability. Order: :attr:`~fluxlit.app.FluxLit.import_target`, ``FLUXLIT_APP``, then project file (``fluxlit.toml`` / ``pyproject.toml``), then ``app:app``. """ from fluxlit.config import load_project_config, resolve_target if fl.import_target: return fl.import_target.strip() env_t = (os.environ.get("FLUXLIT_APP") or "").strip() if env_t: return env_t return resolve_target(None, load_project_config())
def _gateway_bind_for_streamlit_child(fl: FluxLitType) -> tuple[str, int]: """Host/port the Streamlit sidecar uses to reach the API (loopback-safe URL).""" from fluxlit.config import load_project_config pc = load_project_config() bind_host = (os.environ.get("FLUXLIT_GATEWAY_HOST") or "").strip() if not bind_host and pc and pc.gateway_host: bind_host = pc.gateway_host.strip() if not bind_host: bind_host = fl.settings.gateway_host.strip() bind_port_s = (os.environ.get("FLUXLIT_GATEWAY_PORT") or "").strip() if bind_port_s: try: bind_port = int(bind_port_s) except ValueError: bind_port = fl.settings.gateway_port elif pc and pc.gateway_port is not None: bind_port = pc.gateway_port else: bind_port = fl.settings.gateway_port return bind_host, bind_port