API Reference¶
Generated from the source with mkdocstrings. The stable, user-facing surface is shown here; connector internals are documented in their modules.
Facade conveniences¶
Top-level helpers on the import csfs facade (which also re-exports
everything below — models, store, runner, registry).
csfs.open_store ¶
Open a CSFS observation store (thin alias for :class:DuckDBStore).
Returns an async context manager; the database is opened on
__aenter__. Defaults to read-only, the safe mode for analysis and
model-calibration workloads — pass read_only=False to create or
write to the database (e.g. before :func:run_acquisition).
Example::
async with csfs.open_store("csfs.duckdb") as store:
stations = await store.get_stations(provider="usgs")
csfs.fetch_observations
async
¶
fetch_observations(provider_slug: str, station_id: str, start: datetime, end: datetime, config: dict | None = None) -> TimeSeriesChunk
Fetch one station's observations directly from a provider, no store needed.
One-shot wrapper around discover() + get_connector(slug) + the
connector's async context manager. station_id is the canonical CSFS
ID ("<provider>:<native_id>"). config carries provider-specific
settings such as API keys; most providers need none.
csfs.fetch_observations_sync ¶
fetch_observations_sync(provider_slug: str, station_id: str, start: datetime, end: datetime, config: dict | None = None) -> TimeSeriesChunk
Synchronous convenience wrapper around :func:fetch_observations.
Runs the fetch in a fresh event loop via :func:asyncio.run. Must be
called from synchronous code; raises :class:RuntimeError if an event
loop is already running (use await csfs.fetch_observations(...)
there instead).
Canonical models¶
csfs.core.models.Station ¶
Bases: BaseModel
A gauging station with provider-agnostic metadata.
csfs.core.models.Observation ¶
Bases: BaseModel
A single streamflow observation.
csfs.core.models.TimeSeriesChunk ¶
Bases: BaseModel
A batch of observations returned by a connector.
csfs.core.models.QualityFlag ¶
Bases: StrEnum
Store¶
csfs.store.duckdb_store.DuckDBStore ¶
Bases: BaseStore
get_stations_arrow
async
¶
get_stations_arrow(provider: str | None = None, country_code: str | None = None, bbox: tuple[float, float, float, float] | None = None, limit: int | None = None, offset: int = 0) -> pa.Table
Like :meth:get_stations, but returns a zero-copy :class:pyarrow.Table.
get_observations_arrow
async
¶
get_observations_arrow(station_id: str | Sequence[str], start: datetime | None = None, end: datetime | None = None, limit: int | None = None, offset: int = 0) -> pa.Table
Like :meth:get_observations, but returns a zero-copy :class:pyarrow.Table.
station_id may also be a sequence of station IDs (e.g. for
calibration over many gauges); rows are ordered by timestamp, then
station ID.
get_stations_df
async
¶
get_stations_df(provider: str | None = None, country_code: str | None = None, bbox: tuple[float, float, float, float] | None = None, limit: int | None = None, offset: int = 0) -> pd.DataFrame
Like :meth:get_stations, but returns a pandas DataFrame.
Requires the optional pandas extra
(pip install "community-streamflow-service[pandas]").
get_observations_df
async
¶
get_observations_df(station_id: str | Sequence[str], start: datetime | None = None, end: datetime | None = None, limit: int | None = None, offset: int = 0) -> pd.DataFrame
Like :meth:get_observations, but returns a pandas DataFrame.
The frame is indexed by timestamp (ascending, UTC) with columns
discharge_m3s and quality; a station_id column is kept
only when a sequence of station IDs is queried. Requires the
optional pandas extra
(pip install "community-streamflow-service[pandas]").
get_connector_health
async
¶
Per-provider health derived from stored data and the acquisition log.
Consolidates the data-coverage view (station/observation counts and freshness) with the acquisition-log view (last run, status, error, success rate) into one row per provider. Providers that appear in either the stations table or the acquisition log are included.
data_health classifies the stored data:
- none — no stations on record
- empty — stations exist but no observations
- stale — newest observation older than stale_after_hours
- ok — fresh observations present
Acquisition¶
csfs.scheduler.runner.run_acquisition
async
¶
run_acquisition(store: BaseStore, providers: list[str] | None = None, lookback_hours: int = 48, max_stations: int | None = None, concurrency: int = DEFAULT_CONCURRENCY, provider_configs: dict[str, dict] | None = None) -> dict[str, dict]
Run one acquisition cycle across selected (or all) providers.
csfs.scheduler.cron.run_scheduled_cycle
async
¶
run_scheduled_cycle(db_path: str, tier: str | None = None, providers: list[str] | None = None, max_stations: int | None = None, concurrency: int = 10, provider_configs: dict[str, dict] | None = None) -> dict[str, dict]
Run one acquisition cycle for a tier or specific providers.
csfs.scheduler.cron.run_daemon
async
¶
run_daemon(db_path: str, schedule: str = 'daily', tier: str | None = None, max_stations: int | None = None, provider_configs: dict[str, dict] | None = None) -> None
Run as a long-lived daemon, executing on a cron schedule.
Registry¶
csfs.core.registry.register ¶
Decorator to register a connector class under a provider slug.
csfs.core.registry.discover ¶
Import all connector modules to trigger registration.
Connector base class¶
csfs.connectors.base.BaseConnector ¶
Bases: ABC
Interface that every provider connector must implement.
Subclasses handle the specifics of one data provider: authentication, URL construction, response parsing, and rate-limit handling.
fetch_stations
abstractmethod
async
¶
Return all stations available from this provider.
fetch_observations
abstractmethod
async
¶
Fetch observations for a single station over a time range.
fetch_latest
async
¶
Fetch the most recent observations. Override if provider has a dedicated endpoint.
fetch_bulk
async
¶
fetch_bulk(station_ids: list[str], start: datetime, end: datetime) -> AsyncIterator[TimeSeriesChunk]
Fetch observations for multiple stations. Override for providers with bulk endpoints.
Health¶
csfs.core.health ¶
Shared connector-health helpers used by both the API and the CLI.
The store's :meth:get_connector_health reports only providers that have
appeared in the stations table or the acquisition log. These helpers layer on
the registered roster (so connectors that have never run still surface as
data_health == "none") and provide the summary / degraded-filtering logic
that the API endpoint and the csfs health command share.
gather_connector_health
async
¶
gather_connector_health(store: BaseStore, *, stale_after_hours: float = 168.0, include_registered: bool = True) -> list[dict]
Per-provider health, optionally padded with the full registered roster.
Returns the same row shape as :meth:BaseStore.get_connector_health,
sorted by provider slug when include_registered is set.
summarize_health ¶
Count connectors per data_health bucket.
is_degraded ¶
is_degraded(row: dict, *, data_health: tuple[str, ...] = DEGRADED_DATA_HEALTH, run_status: tuple[str, ...] = DEGRADED_RUN_STATUS) -> bool
Whether a health row should trip an alert.
A connector is degraded if its stored data is in a flagged data_health
bucket, or its most recent acquisition run ended in a flagged status.
degraded_connectors ¶
degraded_connectors(rows: list[dict], *, data_health: tuple[str, ...] = DEGRADED_DATA_HEALTH, run_status: tuple[str, ...] = DEGRADED_RUN_STATUS) -> list[dict]
Subset of rows that are degraded per :func:is_degraded.