SYMFLUENCE Integration¶
CSFS ships a SYMFLUENCE plugin with two modes:
- Drop-in community backend — CSFS replaces SYMFLUENCE's built-in
primary streamflow acquisition for
STREAMFLOW_DATA_PROVIDER: USGS / WSC / SMHIwith a single new config line (DATA_ACCESS: community). Your existing station-id keys keep working; nothing else changes. - Generic observation handler — any of CSFS's 86 providers (live agency
APIs or a pre-built CSFS store) as an additional observation source via
ADDITIONAL_OBSERVATIONS: csfsand namespaced station ids.
Integration tiers (how a request is routed)¶
Under DATA_ACCESS: community, SYMFLUENCE resolves primary streamflow
through three layered tiers, highest priority first:
- ObservationBackend tier (SYMFLUENCE acquisition-backend protocol,
contract 0.2.0): the plugin registers
CommunityObservationBackendunderR.observation_backends. SYMFLUENCE's selection layer matches the configured provider against the backend's declared capabilities (USGS, WSC, SMHI — parity-graded — plus the ungated genericCSFS), applies the parity gate, and hands the backend a formalObservationRequest. The backend reuses the handler classes below internally, then additionally writes a per-station*_obs_v1.csvdelivery (datetime,value,quality_flag, UTC, m³/s, trimmed to the half-open[start, end)window) and anacquisition_manifest.jsonsidecar. - Registry-handler tier (the original integration): the drop-in keys
usgs/wsc/smhiplus the genericcsfskey. This tier is the fallthrough when the backend tier declines — e.g. the parity gate refuses the ungated generic provider, or an older SYMFLUENCE without the backend registry is installed — and remains the only route forADDITIONAL_OBSERVATIONS: csfs. Redundant under community mode but kept by design. - Legacy tier: SYMFLUENCE's native in-tree handlers — untouched, and the default outside community mode.
All tiers produce the identical processed calibration CSV; the backend tier adds the protocol artifacts on top.
Drop-in community backend (USGS / WSC / SMHI)¶
Take an existing experiment that calibrates against a USGS, WSC, or SMHI gauge and add one line:
STREAMFLOW_DATA_PROVIDER: USGS # unchanged
STATION_ID: "06191500" # unchanged (or USGS_SITE_CODE / STREAMFLOW_STATION_ID)
DATA_ACCESS: community # <- the only new line
The plugin registers observation handlers under the existing provider names
(usgs, wsc, smhi). SYMFLUENCE's registry-first streamflow dispatch
(process_streamflow_data()) resolves the lowercased provider in the
observation-handler registry and routes acquisition + processing to CSFS
when DATA_ACCESS is community; with the default DATA_ACCESS the native
in-tree handlers run exactly as before (they live under separate registry
keys — usgs_streamflow etc. — so the plugin never shadows them).
Station ids are read from the same config keys the native handlers use, in the same order:
| Provider | Keys (resolution order) | Accepted forms |
|---|---|---|
USGS |
STATION_ID (evaluation.streamflow.station_id), USGS_SITE_CODE, STREAMFLOW_STATION_ID |
06191500, 6191500 (zero-padded to 8 digits like the native handler), usgs:06191500 |
WSC |
STATION_ID |
05BB001, environment_canada:05BB001, wsc:05BB001 |
SMHI |
STATION_ID |
2357, sweden_smhi:2357, smhi:2357 |
acquire() does a direct connector fetch for the experiment window
(EXPERIMENT_TIME_START/END) and writes the raw CSV under
observations/streamflow/raw_data/; process() emits the identical
processed contract as the native handlers (datetime index, tz-naive UTC,
discharge_cms in m³/s, same resample + interpolation).
Parity with the native handlers¶
Measured native-vs-CSFS on the same station/window (full results in the community-services design notes):
| Provider | Parity | Notes |
|---|---|---|
| USGS | Bit-identical | Same NWIS API. Required two fixes, both landed: SYMFLUENCE parsed NWIS local clock time and ignored tz_cd (fixed in symfluence#221 — native output moved to UTC), and CSFS used a truncated cfs→m³/s factor (now the exact 0.028316846592). |
| WSC | Value-identical | Same GeoMet daily-mean collection; CSFS fetches only the experiment window instead of the full period of record. The native handler's unsorted GeoMet pagination could silently corrupt records on stations with >10k records (duplicates + missing rows, nondeterministic); CSFS's environment_canada connector never had that failure mode. |
| SMHI | Product parity via 15-min | The native handler downloads SMHI hydroobs parameter 2 (15-minute discharge); the CSFS default is the daily product. The drop-in handler therefore pins resolution: "15min" on the connector so both sides see the same product. Timestamps are epoch-ms UTC on both sides. |
The SMHI 15-min pin (and any other connector setting) can be overridden via
CSFS_CONNECTOR_CONFIG (e.g. {resolution: daily}), at the cost of parity
with the native product.
Generic observation handler (ADDITIONAL_OBSERVATIONS: csfs)¶
Beyond the three drop-in providers, the generic handler exposes the whole CSFS network — 80+ additional agencies and archives — as an additional observation source. SYMFLUENCE experiments can calibrate and evaluate against any gauge CSFS can reach — live agency APIs or a pre-built CSFS store — with a two-line config change and zero framework modifications.
How discovery works¶
CSFS registers a hook in the symfluence.plugins entry-point group
(csfs = "csfs.integrations.symfluence:register"). SYMFLUENCE's bootstrap
loads that group on import symfluence and calls the hook, which adds
CSFSStreamflowHandler to the observation-handler registry under the key
csfs, plus the three drop-in handlers under usgs, wsc, and smhi.
There is nothing to import, register, or configure manually:
$ python -c "import symfluence
> from symfluence.core.registries import R
> print([k for k in ('csfs', 'usgs', 'wsc', 'smhi') if k in R.observation_handlers])"
['csfs', 'usgs', 'wsc', 'smhi']
The dependency is strictly one-way: CSFS does not depend on SYMFLUENCE.
csfs.integrations.symfluence imports the framework defensively, so
import csfs keeps working in environments without it (the handler class
simply degrades and refuses to run).
Install¶
Both packages in the same environment, plus the pandas extra (the handler reads/writes CSVs through pandas):
Configuration (generic handler)¶
Add the handler to your experiment YAML via the standard
ADDITIONAL_OBSERVATIONS mechanism:
# --- streamflow observations via CSFS ---
ADDITIONAL_OBSERVATIONS: csfs
CSFS_STATION_ID: "usgs:01646500" # canonical CSFS id: <provider>:<native_id>
That is the whole live-fetch setup. During the observation step SYMFLUENCE
instantiates the handler, acquire() pulls the series for the experiment
window (EXPERIMENT_TIME_START/END) with csfs.fetch_observations_sync,
and process() writes the framework's standard processed streamflow CSV.
Supported keys:
| Key | Required | Meaning |
|---|---|---|
CSFS_STATION_ID |
yes¹ | One or more CSFS station ids — a single id, a comma-separated string, or a YAML list. The <provider>: prefix selects the connector (csfs providers lists slugs). Multiple stations are averaged per timestep in the processed output. |
CSFS_CONNECTOR_CONFIG |
no | Mapping of provider-specific settings (API keys etc.) passed to the connector on live fetches. |
CSFS_DB_PATH |
no | Path to an existing CSFS DuckDB store. When set, observations are read from the store instead of fetched live. |
¹ Falls back to the shared STATION_ID (evaluation.streamflow.station_id)
when unset — that value must then also be namespaced (usgs:01646500, not
01646500); un-namespaced ids fail with a pointed error.
Live fetch vs. store mode¶
Live fetch (default). Each station is pulled straight from its provider API for the experiment window. Best for one-off experiments and always-fresh data; requires network access at workflow time.
Store mode (CSFS_DB_PATH). Point the handler at a DuckDB store built
with csfs acquire:
The handler queries the store read-only — fully offline, reproducible, and fast for many-gauge or repeated-calibration workloads where you curate the observation database once.
What gets written¶
Following SYMFLUENCE's observation conventions under
domain_{NAME}/data/observations/streamflow/:
raw_data/csfs_{provider}_{native_id}_raw.csv— one per station, the CSFS-native series (timestamp,discharge_m3s,quality). Existing raw files are reused unlessFORCE_DOWNLOAD: true.preprocessed/{DOMAIN_NAME}_streamflow_processed.csv— the calibration pipeline's contract, identical to the USGS/WSC handlers: adatetimeindex column and adischarge_cmscolumn, resampled to the configured model timestep (FORCING_TIME_STEP_SIZE) with small gaps interpolated.
Units and timezone guarantees¶
CSFS harmonizes every provider to discharge in m³/s and timestamps in
UTC, so the handlers perform no unit conversion. Processed timestamps are
written tz-naive in UTC, matching the convention of SYMFLUENCE's built-in
streamflow handlers (which, for USGS, themselves moved to UTC with
symfluence#221 — earlier versions wrote gauge-local clock time with DST
discontinuities; aligning observations and forcing on UTC is what makes the
calibration comparison meaningful). This applies equally to the drop-in
provider handlers and the generic csfs handler.