Skip to content

saber.saber

fdc_mapping(sim_df, obs_df)

Bias corrects a dataframe of simulated values using a dataframe of observed values

Parameters:

Name Type Description Default
sim_df pd.DataFrame

A dataframe with a datetime index and a single column of streamflow values

required
obs_df pd.DataFrame

A dataframe with a datetime index and a single column of streamflow values

required

Returns:

Type Description
pd.DataFrame

pandas DataFrame with a datetime index and a single column of streamflow values

Source code in saber/saber.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def fdc_mapping(sim_df: pd.DataFrame, obs_df: pd.DataFrame) -> pd.DataFrame:
    """
    Bias corrects a dataframe of simulated values using a dataframe of observed values

    Args:
        sim_df: A dataframe with a datetime index and a single column of streamflow values
        obs_df: A dataframe with a datetime index and a single column of streamflow values

    Returns:
        pandas DataFrame with a datetime index and a single column of streamflow values
    """
    dates = []
    values = []

    for month in natsorted(sim_df.index.month.unique()):
        # filter historical data to only be current month
        month_sim = sim_df[sim_df.index.month == int(month)].dropna()
        month_obs = obs_df[obs_df.index.month == int(month)].dropna()

        # calculate the flow duration curves
        month_sim_fdc = fdc(month_sim.values)
        month_obs_fdc = fdc(month_obs.values)

        # make interpolator for 1) sim flow to sim prob, and 2) obs prob to obs flow
        to_prob = _make_interpolator(month_sim_fdc.values.flatten(), month_sim_fdc.index)
        to_flow = _make_interpolator(month_obs_fdc.index, month_obs_fdc.values.flatten())

        dates += month_sim.index.to_list()
        values += to_flow(to_prob(month_sim.values)).tolist()

    return pd.DataFrame({
        COL_QMOD: values,
        COL_QSIM: sim_df.values.flatten(),
    }, index=dates).sort_index()

map_saber(mid, asgn_mid, asgn_gid, hz, gauge_data)

Corrects all streams in the assignment table using the SABER method

Parameters:

Name Type Description Default
mid str

the model id of the stream to be corrected

required
asgn_mid str

the model id of the stream assigned to mid for bias correction

required
asgn_gid str

the gauge id of the stream assigned to mid for bias correction

required
hz str

xarray dataset of hindcast streamflow data

required
gauge_data str

path to the directory of observed data

required

Returns:

Type Description
pd.DataFrame | tuple | None

None

Source code in saber/saber.py
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def map_saber(mid: str, asgn_mid: str, asgn_gid: str, hz: str, gauge_data: str) -> pd.DataFrame | tuple | None:
    """
    Corrects all streams in the assignment table using the SABER method

    Args:
        mid: the model id of the stream to be corrected
        asgn_mid: the model id of the stream assigned to mid for bias correction
        asgn_gid: the gauge id of the stream assigned to mid for bias correction
        hz: xarray dataset of hindcast streamflow data
        gauge_data: path to the directory of observed data

    Returns:
        None
    """
    try:
        if asgn_gid is None or pd.isna(asgn_gid):
            logger.debug(f'No gauge assigned to {mid}')
            return

        # find the observed data to be used for correction
        if not os.path.exists(os.path.join(gauge_data, f'{asgn_gid}.csv')):
            logger.debug(f'Observed data "{asgn_gid}" not found. Cannot correct "{mid}".')
        obs_df = pd.read_csv(os.path.join(gauge_data, f'{asgn_gid}.csv'), index_col=0)
        obs_df.index = pd.to_datetime(obs_df.index)

        # perform corrections
        hz = xarray.open_mfdataset(hz, concat_dim='rivid', combine='nested', parallel=True, engine='zarr')
        rivids = hz.rivid.values
        sim_a = hz['Qout'][:, rivids == int(mid)].values
        sim_a = pd.DataFrame(sim_a, index=hz['time'].values, columns=[COL_QSIM])
        if asgn_mid != mid:
            sim_b = hz['Qout'][:, rivids == int(asgn_mid)].values
            sim_b = pd.DataFrame(sim_b, index=sim_a.index, columns=[COL_QSIM])
            sim_b = sim_b[sim_b.index.year >= 1980]
        sim_a = sim_a[sim_a.index.year >= 1980]
        hz.close()

        if asgn_mid == mid:
            corrected_df = fdc_mapping(sim_a, obs_df)
        else:
            corrected_df = sfdc_mapping(
                sim_b, obs_df, sim_a,
                use_log=True,
                drop_outliers=True, outlier_threshold=3,
                fit_gumbel=True, fit_range=(5, 95),
            )

        return corrected_df

    except Exception as e:
        logger.error(e)
        logger.debug(f'Failed to correct {mid}')
        return

mp_saber(assign_df, hindcast_zarr, gauge_data, save_dir=None, n_processes=None)

Corrects all streams in the assignment table using the SABER method with a multiprocessing Pool

Parameters:

Name Type Description Default
assign_df pd.DataFrame

the assignment table

required
hindcast_zarr str

string path to the hindcast streamflow dataset in zarr format

required
gauge_data str

path to the directory of observed data

required
save_dir str

path to the directory to save the corrected data

None
n_processes

number of processes to use for multiprocessing, passed to Pool

None

Returns:

Type Description
None

None

Source code in saber/saber.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def mp_saber(assign_df: pd.DataFrame, hindcast_zarr: str, gauge_data: str, save_dir: str = None,
             n_processes: int or None = None) -> None:
    """
    Corrects all streams in the assignment table using the SABER method with a multiprocessing Pool

    Args:
        assign_df: the assignment table
        hindcast_zarr: string path to the hindcast streamflow dataset in zarr format
        gauge_data: path to the directory of observed data
        save_dir: path to the directory to save the corrected data
        n_processes: number of processes to use for multiprocessing, passed to Pool

    Returns:
        None
    """
    logger.info('Starting SABER Bias Correction')

    if save_dir is None:
        save_dir = os.path.join(gauge_data, 'corrected')
        if not os.path.exists(save_dir):
            os.makedirs(save_dir)

    with Pool(n_processes) as p:
        p.starmap(
            map_saber,
            [[mid, asgn_mid, asgn_gid, hindcast_zarr, gauge_data, save_dir] for mid, asgn_mid, asgn_gid in
             np.moveaxis(assign_df[[COL_MID, COL_ASN_MID, COL_GID]].values, 0, 0)]
        )

    logger.info('Finished SABER Bias Correction')
    return

sfdc_mapping(sim_flow_a, obs_flow_a, sim_flow_b=None, use_log=False, fix_seasonally=True, empty_months='skip', drop_outliers=False, outlier_threshold=2.5, filter_scalar_fdc=False, filter_range=(0, 80), extrapolate='nearest', fill_value=None, fit_gumbel=False, fit_range=(10, 90), metadata=False)

Removes the bias from simulated discharge using the SABER method.

Given simulated and observed discharge at location A, removes bias from simulated data at point A. Given simulated and observed discharge at location A, removes bias from simulated data at point B, if given B.

Parameters:

Name Type Description Default
sim_flow_a pd.DataFrame

simulated hydrograph at point A. should contain a datetime index with daily values and a single column of discharge values.

required
obs_flow_a pd.DataFrame

observed hydrograph at point A. should contain a datetime index with daily values and a single column of discharge values.

required
sim_flow_b pd.DataFrame

(optional) simulated hydrograph at point B to correct using scalar flow duration curve mapping and the bias relationship at point A. should contain a datetime index with daily values and a single column of discharge values.

None
use_log bool

(optional) if True, log10 transform the discharge values before correcting. default is False.

False
fix_seasonally bool

fix on a monthly (True) or annual (False) basis

True
empty_months str

how to handle months in the simulated data where no observed data are available. Options: "skip": ignore simulated data for months without

'skip'
drop_outliers bool

flag to exclude outliers

False
outlier_threshold int or float

number of std deviations from mean to exclude from flow duration curve

2.5
filter_scalar_fdc bool

flag to filter the scalar flow duration curve

False
filter_range tuple

lower and upper bounds of the filter range

(0, 80)
extrapolate str

method to use for extrapolation. Options: nearest, const, linear, average, max, min

'nearest'
fill_value int or float

value to use for extrapolation when extrapolate_method='const'

None
fit_gumbel bool

flag to replace extremely low/high corrected flows with values from Gumbel type 1

False
fit_range tuple

lower and upper bounds of exceedance probabilities to replace with Gumbel values

(10, 90)
metadata bool

flag to return the scalars and metadata about the correction process

False

Returns:

Type Description
pd.DataFrame

pd.DataFrame with a DateTime index and columns with corrected flow, uncorrected flow, the scalar adjustment

pd.DataFrame

factor applied to correct the discharge, and the percentile of the uncorrected flow (in the seasonal grouping,

pd.DataFrame

if applicable).

Source code in saber/saber.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
def sfdc_mapping(sim_flow_a: pd.DataFrame, obs_flow_a: pd.DataFrame, sim_flow_b: pd.DataFrame = None,
                 use_log: bool = False,
                 fix_seasonally: bool = True, empty_months: str = 'skip',
                 drop_outliers: bool = False, outlier_threshold: int or float = 2.5,
                 filter_scalar_fdc: bool = False, filter_range: tuple = (0, 80),
                 extrapolate: str = 'nearest', fill_value: int or float = None,
                 fit_gumbel: bool = False, fit_range: tuple = (10, 90),
                 metadata: bool = False, ) -> pd.DataFrame:
    """
    Removes the bias from simulated discharge using the SABER method.

    Given simulated and observed discharge at location A, removes bias from simulated data at point A.
    Given simulated and observed discharge at location A, removes bias from simulated data at point B, if given B.

    Args:
        sim_flow_a (pd.DataFrame): simulated hydrograph at point A. should contain a datetime index with daily values
            and a single column of discharge values.
        obs_flow_a (pd.DataFrame): observed hydrograph at point A. should contain a datetime index with daily values
            and a single column of discharge values.
        sim_flow_b (pd.DataFrame): (optional) simulated hydrograph at point B to correct using scalar flow duration
            curve mapping and the bias relationship at point A. should contain a datetime index with daily values
            and a single column of discharge values.

        use_log (bool): (optional) if True, log10 transform the discharge values before correcting. default is False.

        fix_seasonally (bool): fix on a monthly (True) or annual (False) basis
        empty_months (str): how to handle months in the simulated data where no observed data are available. Options:
            "skip": ignore simulated data for months without

        drop_outliers (bool): flag to exclude outliers
        outlier_threshold (int or float): number of std deviations from mean to exclude from flow duration curve

        filter_scalar_fdc (bool): flag to filter the scalar flow duration curve
        filter_range (tuple): lower and upper bounds of the filter range

        extrapolate (str): method to use for extrapolation. Options: nearest, const, linear, average, max, min
        fill_value (int or float): value to use for extrapolation when extrapolate_method='const'

        fit_gumbel (bool): flag to replace extremely low/high corrected flows with values from Gumbel type 1
        fit_range (tuple): lower and upper bounds of exceedance probabilities to replace with Gumbel values

        metadata (bool): flag to return the scalars and metadata about the correction process

    Returns:
        pd.DataFrame with a DateTime index and columns with corrected flow, uncorrected flow, the scalar adjustment
        factor applied to correct the discharge, and the percentile of the uncorrected flow (in the seasonal grouping,
        if applicable).
    """
    if fix_seasonally:
        # list of the unique months in the historical simulation. should always be 1->12 but just in case...
        monthly_results = []
        for month in sorted(set(sim_flow_a.index.strftime('%m'))):
            # filter data to current iteration's month
            mon_obs_a = obs_flow_a[obs_flow_a.index.month == int(month)].dropna()

            if mon_obs_a.empty:
                if empty_months == 'skip':
                    continue
                else:
                    raise ValueError(f'Invalid value for argument "empty_months". Given: {empty_months}.')

            mon_sim_a = sim_flow_a[sim_flow_a.index.month == int(month)].dropna()
            mon_sim_b = sim_flow_b[sim_flow_b.index.month == int(month)].dropna()
            monthly_results.append(sfdc_mapping(
                mon_sim_a, mon_obs_a, mon_sim_b,
                fix_seasonally=False, empty_months=empty_months,
                drop_outliers=drop_outliers, outlier_threshold=outlier_threshold,
                filter_scalar_fdc=filter_scalar_fdc, filter_range=filter_range,
                extrapolate=extrapolate, fill_value=fill_value,
                fit_gumbel=fit_gumbel, fit_range=fit_range, )
            )
        # combine the results from each monthly into a single dataframe (sorted chronologically) and return it
        return pd.concat(monthly_results).sort_index()

    if use_log:
        sim_flow_a = np.log10(sim_flow_a)
        obs_flow_a = np.log10(obs_flow_a)
        if sim_flow_b is not None:
            sim_flow_b = np.log10(sim_flow_b)

    # compute the flow duration curves
    if drop_outliers:
        sim_fdc_a = fdc(_drop_outliers_by_zscore(sim_flow_a, threshold=outlier_threshold), col_name=COL_QSIM)
        sim_fdc_b = fdc(_drop_outliers_by_zscore(sim_flow_b, threshold=outlier_threshold), col_name=COL_QSIM)
        obs_fdc = fdc(_drop_outliers_by_zscore(obs_flow_a, threshold=outlier_threshold), col_name=COL_QOBS)
    else:
        sim_fdc_a = fdc(sim_flow_a, col_name=COL_QSIM)
        sim_fdc_b = fdc(sim_flow_b, col_name=COL_QSIM)
        obs_fdc = fdc(obs_flow_a, col_name=COL_QOBS)

    # calculate the scalar flow duration curve (at point A with simulated and observed data)
    scalar_fdc = sfdc(sim_fdc_a[COL_QSIM], obs_fdc[COL_QOBS])
    if filter_scalar_fdc:
        scalar_fdc = scalar_fdc[scalar_fdc['p_exceed'].between(filter_range[0], filter_range[1])]

    logger.debug(f'Min/Max Scalar {scalar_fdc.min()} {scalar_fdc.max()}')

    # make interpolators: Q_b -> p_exceed_b, p_exceed_a -> scalars_a
    # flow at B converted to exceedance probabilities, then matched with the scalar computed at point A
    flow_to_percent = _make_interpolator(sim_fdc_b.values.flatten(),
                                         sim_fdc_b.index,
                                         extrap=extrapolate,
                                         fill_value=fill_value)

    percent_to_scalar = _make_interpolator(scalar_fdc.index,
                                           scalar_fdc.values.flatten(),
                                           extrap=extrapolate,
                                           fill_value=fill_value)

    # apply interpolators to correct flows at B with data from A
    qb_original = sim_flow_b.values.flatten()
    p_exceed = flow_to_percent(qb_original)
    scalars = percent_to_scalar(p_exceed)
    qb_adjusted = qb_original / scalars

    if fit_gumbel:
        qb_adjusted = _fit_extreme_values_to_gumbel(qb_adjusted, p_exceed, fit_range)

    if use_log:
        qb_adjusted = np.power(10, qb_adjusted)
        qb_original = np.power(10, qb_original)

    response = pd.DataFrame(data=np.transpose([qb_adjusted, qb_original]),
                            index=sim_flow_b.index.to_list(),
                            columns=(COL_QMOD, COL_QSIM))
    if metadata:
        response['scalars'] = scalars
        response['p_exceed'] = p_exceed

    return response