Skip to content

API Reference

Generated from the source with mkdocstrings. The stable, user-facing surface is shown here; connector internals are documented in their modules.

Facade conveniences

Top-level helpers on the import csfs facade (which also re-exports everything below — models, store, runner, registry).

csfs.open_store

open_store(db_path: str | Path = 'csfs.duckdb', read_only: bool = True) -> DuckDBStore

Open a CSFS observation store (thin alias for :class:DuckDBStore).

Returns an async context manager; the database is opened on __aenter__. Defaults to read-only, the safe mode for analysis and model-calibration workloads — pass read_only=False to create or write to the database (e.g. before :func:run_acquisition).

Example::

async with csfs.open_store("csfs.duckdb") as store:
    stations = await store.get_stations(provider="usgs")

csfs.fetch_observations async

fetch_observations(provider_slug: str, station_id: str, start: datetime, end: datetime, config: dict | None = None) -> TimeSeriesChunk

Fetch one station's observations directly from a provider, no store needed.

One-shot wrapper around discover() + get_connector(slug) + the connector's async context manager. station_id is the canonical CSFS ID ("<provider>:<native_id>"). config carries provider-specific settings such as API keys; most providers need none.

csfs.fetch_observations_sync

fetch_observations_sync(provider_slug: str, station_id: str, start: datetime, end: datetime, config: dict | None = None) -> TimeSeriesChunk

Synchronous convenience wrapper around :func:fetch_observations.

Runs the fetch in a fresh event loop via :func:asyncio.run. Must be called from synchronous code; raises :class:RuntimeError if an event loop is already running (use await csfs.fetch_observations(...) there instead).

Canonical models

csfs.core.models.Station

Bases: BaseModel

A gauging station with provider-agnostic metadata.

csfs.core.models.Observation

Bases: BaseModel

A single streamflow observation.

csfs.core.models.TimeSeriesChunk

Bases: BaseModel

A batch of observations returned by a connector.

csfs.core.models.QualityFlag

Bases: StrEnum

Store

csfs.store.duckdb_store.DuckDBStore

Bases: BaseStore

get_stations_arrow async

get_stations_arrow(provider: str | None = None, country_code: str | None = None, bbox: tuple[float, float, float, float] | None = None, limit: int | None = None, offset: int = 0) -> pa.Table

Like :meth:get_stations, but returns a zero-copy :class:pyarrow.Table.

get_observations_arrow async

get_observations_arrow(station_id: str | Sequence[str], start: datetime | None = None, end: datetime | None = None, limit: int | None = None, offset: int = 0) -> pa.Table

Like :meth:get_observations, but returns a zero-copy :class:pyarrow.Table.

station_id may also be a sequence of station IDs (e.g. for calibration over many gauges); rows are ordered by timestamp, then station ID.

get_stations_df async

get_stations_df(provider: str | None = None, country_code: str | None = None, bbox: tuple[float, float, float, float] | None = None, limit: int | None = None, offset: int = 0) -> pd.DataFrame

Like :meth:get_stations, but returns a pandas DataFrame.

Requires the optional pandas extra (pip install "community-streamflow-service[pandas]").

get_observations_df async

get_observations_df(station_id: str | Sequence[str], start: datetime | None = None, end: datetime | None = None, limit: int | None = None, offset: int = 0) -> pd.DataFrame

Like :meth:get_observations, but returns a pandas DataFrame.

The frame is indexed by timestamp (ascending, UTC) with columns discharge_m3s and quality; a station_id column is kept only when a sequence of station IDs is queried. Requires the optional pandas extra (pip install "community-streamflow-service[pandas]").

get_connector_health async

get_connector_health(stale_after_hours: float = 168.0) -> list[dict]

Per-provider health derived from stored data and the acquisition log.

Consolidates the data-coverage view (station/observation counts and freshness) with the acquisition-log view (last run, status, error, success rate) into one row per provider. Providers that appear in either the stations table or the acquisition log are included.

data_health classifies the stored data: - none — no stations on record - empty — stations exist but no observations - stale — newest observation older than stale_after_hours - ok — fresh observations present

Acquisition

csfs.scheduler.runner.run_acquisition async

run_acquisition(store: BaseStore, providers: list[str] | None = None, lookback_hours: int = 48, max_stations: int | None = None, concurrency: int = DEFAULT_CONCURRENCY, provider_configs: dict[str, dict] | None = None) -> dict[str, dict]

Run one acquisition cycle across selected (or all) providers.

csfs.scheduler.cron.run_scheduled_cycle async

run_scheduled_cycle(db_path: str, tier: str | None = None, providers: list[str] | None = None, max_stations: int | None = None, concurrency: int = 10, provider_configs: dict[str, dict] | None = None) -> dict[str, dict]

Run one acquisition cycle for a tier or specific providers.

csfs.scheduler.cron.run_daemon async

run_daemon(db_path: str, schedule: str = 'daily', tier: str | None = None, max_stations: int | None = None, provider_configs: dict[str, dict] | None = None) -> None

Run as a long-lived daemon, executing on a cron schedule.

Registry

csfs.core.registry.register

register(slug: str)

Decorator to register a connector class under a provider slug.

csfs.core.registry.get_connector

get_connector(slug: str) -> type[BaseConnector]

csfs.core.registry.list_providers

list_providers() -> list[str]

csfs.core.registry.discover

discover() -> None

Import all connector modules to trigger registration.

Connector base class

csfs.connectors.base.BaseConnector

Bases: ABC

Interface that every provider connector must implement.

Subclasses handle the specifics of one data provider: authentication, URL construction, response parsing, and rate-limit handling.

fetch_stations abstractmethod async

fetch_stations() -> list[Station]

Return all stations available from this provider.

fetch_observations abstractmethod async

fetch_observations(station_id: str, start: datetime, end: datetime) -> TimeSeriesChunk

Fetch observations for a single station over a time range.

fetch_latest async

fetch_latest(station_id: str) -> TimeSeriesChunk

Fetch the most recent observations. Override if provider has a dedicated endpoint.

fetch_bulk async

fetch_bulk(station_ids: list[str], start: datetime, end: datetime) -> AsyncIterator[TimeSeriesChunk]

Fetch observations for multiple stations. Override for providers with bulk endpoints.

Health

csfs.core.health

Shared connector-health helpers used by both the API and the CLI.

The store's :meth:get_connector_health reports only providers that have appeared in the stations table or the acquisition log. These helpers layer on the registered roster (so connectors that have never run still surface as data_health == "none") and provide the summary / degraded-filtering logic that the API endpoint and the csfs health command share.

gather_connector_health async

gather_connector_health(store: BaseStore, *, stale_after_hours: float = 168.0, include_registered: bool = True) -> list[dict]

Per-provider health, optionally padded with the full registered roster.

Returns the same row shape as :meth:BaseStore.get_connector_health, sorted by provider slug when include_registered is set.

summarize_health

summarize_health(rows: list[dict]) -> dict[str, int]

Count connectors per data_health bucket.

is_degraded

is_degraded(row: dict, *, data_health: tuple[str, ...] = DEGRADED_DATA_HEALTH, run_status: tuple[str, ...] = DEGRADED_RUN_STATUS) -> bool

Whether a health row should trip an alert.

A connector is degraded if its stored data is in a flagged data_health bucket, or its most recent acquisition run ended in a flagged status.

degraded_connectors

degraded_connectors(rows: list[dict], *, data_health: tuple[str, ...] = DEGRADED_DATA_HEALTH, run_status: tuple[str, ...] = DEGRADED_RUN_STATUS) -> list[dict]

Subset of rows that are degraded per :func:is_degraded.