Skip to content

saber.assign

assign_gauged(df)

Assigns basins a gauge for correction which contain a gauge

Parameters:

Name Type Description Default
df pd.DataFrame

the assignments table dataframe

required

Returns:

Type Description
pd.DataFrame

Copy of df1 with assignments made

Source code in saber/assign.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def assign_gauged(df: pd.DataFrame) -> pd.DataFrame:
    """
    Assigns basins a gauge for correction which contain a gauge

    Args:
        df: the assignments table dataframe

    Returns:
        Copy of df1 with assignments made
    """
    selector = df[gid_col].notna()
    df.loc[selector, asgn_mid_col] = df[mid_col]
    df.loc[selector, asgn_gid_col] = df[gid_col]
    df.loc[selector, reason_col] = 'gauged'
    return df

assign_propagation(df, df_props)

Merge the assignment table and the propagation assignments

Parameters:

Name Type Description Default
df pd.DataFrame

the assignments table dataframe

required
df_props pd.DataFrame

the combined upstream and downstream propagation assignments dataframe

required

Returns:

Type Description
pd.DataFrame

pd.DataFrame

Source code in saber/assign.py
176
177
178
179
180
181
182
183
184
185
186
187
def assign_propagation(df: pd.DataFrame, df_props: pd.DataFrame) -> pd.DataFrame:
    """
     Merge the assignment table and the propagation assignments

    Args:
        df: the assignments table dataframe
        df_props: the combined upstream and downstream propagation assignments dataframe

    Returns:
        pd.DataFrame
    """
    return pd.concat([df[~df[mid_col].isin(df_props[mid_col])], df_props])

generate(workdir, labels_df=None, drain_table=None, gauge_table=None, cache=True)

Joins the drain_table.csv and gauge_table.csv to create the assign_table.csv

Parameters:

Name Type Description Default
workdir str

path to the working directory

required
cache bool

whether to cache the assign table immediately

True
labels_df pd.DataFrame

a dataframe with a column for the assigned cluster label and a column for the model_id

None
drain_table pd.DataFrame

the drain table dataframe

None
gauge_table pd.DataFrame

the gauge table dataframe

None

Returns:

Type Description
pd.DataFrame

pd.DataFrame

Source code in saber/assign.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def generate(workdir: str, labels_df: pd.DataFrame = None, drain_table: pd.DataFrame = None,
             gauge_table: pd.DataFrame = None, cache: bool = True) -> pd.DataFrame:
    """
    Joins the drain_table.csv and gauge_table.csv to create the assign_table.csv

    Args:
        workdir: path to the working directory
        cache: whether to cache the assign table immediately
        labels_df: a dataframe with a column for the assigned cluster label and a column for the model_id
        drain_table: the drain table dataframe
        gauge_table: the gauge table dataframe

    Returns:
        pd.DataFrame
    """
    # read the tables if they are not provided
    if labels_df is None:
        labels_df = read_table(workdir, 'cluster_labels')
    if drain_table is None:
        drain_table = read_table(workdir, 'drain_table')
    if gauge_table is None:
        gauge_table = read_table(workdir, 'gauge_table')

    # enforce correct column data types
    labels_df[mid_col] = labels_df[mid_col].astype(str)
    drain_table[mid_col] = drain_table[mid_col].astype(str)
    drain_table[down_mid_col] = drain_table[down_mid_col].astype(str)
    gauge_table[mid_col] = gauge_table[mid_col].astype(str)

    # join the drain_table and gauge_table then join the labels_df
    assign_df = pd.merge(
        drain_table,
        gauge_table,
        on=mid_col,
        how='outer'
    ).merge(labels_df, on=mid_col, how='outer')
    assign_df = assign_df.sort_values(by=mid_col).reset_index(drop=True)

    # create new columns asgn_mid_col, asgn_gid_col, reason_col
    assign_df[[asgn_mid_col, asgn_gid_col, reason_col]] = ['unassigned', 'unassigned', 'unassigned']

    if cache:
        write_table(assign_df, workdir, 'assign_table')
        write_table(assign_df[[mid_col, ]], workdir, 'mid_list')
        write_table(assign_df[[gid_col, ]], workdir, 'gid_list')
        write_table(assign_df[[mid_col, gid_col]], workdir, 'mid_gid_map')

    return assign_df

map_assign_ungauged(assign_df, gauges_df, mid)

Assigns all possible ungauged basins a gauge that is (1) is closer than any other gauge (2) is of same stream order as ungauged basin (3) in the same simulated fdc cluster as ungauged basin

Parameters:

Name Type Description Default
assign_df pd.DataFrame

the assignments table dataframe

required
gauges_df np.array

a subset of the assignments dataframe containing the gauges

required
mid str

the model_id to assign a gauge for

required

Returns:

Type Description
pd.DataFrame

a new row for the given mid with the assignments made

Source code in saber/assign.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
def map_assign_ungauged(assign_df: pd.DataFrame, gauges_df: np.array, mid: str) -> pd.DataFrame:
    """
    Assigns all possible ungauged basins a gauge that is
        (1) is closer than any other gauge
        (2) is of same stream order as ungauged basin
        (3) in the same simulated fdc cluster as ungauged basin

    Args:
        assign_df: the assignments table dataframe
        gauges_df: a subset of the assignments dataframe containing the gauges
        mid: the model_id to assign a gauge for

    Returns:
        a new row for the given mid with the assignments made
    """
    try:
        # find the closest gauge using euclidean distance without accounting for projection/map distortion
        mid_x, mid_y = assign_df.loc[assign_df[mid_col] == mid, [x_col, y_col]].values.flatten()
        row_idx_to_assign = pd.Series(
            np.sqrt(np.power(gauges_df[x_col] - mid_x, 2) + np.power(gauges_df[y_col] - mid_y, 2))
        ).idxmin()

        asgn_mid, asgn_gid = gauges_df.loc[row_idx_to_assign, [asgn_mid_col, asgn_gid_col]]
        asgn_reason = f'cluster-{gauges_df[clbl_col].values[0]}'

        new_row = assign_df[assign_df[mid_col] == mid].copy()
        new_row[[asgn_mid_col, asgn_gid_col, reason_col]] = [asgn_mid, asgn_gid, asgn_reason]
    except Exception as e:
        logger.error(f'Error in map_assign_ungauged: {e}')
        new_row = pd.DataFrame(columns=assign_df.columns)

    return new_row

map_propagate(df, start_mid, direction)

Meant to be mapped over a dataframe to propagate assignments downstream or upstream

Parameters:

Name Type Description Default
df pd.DataFrame

the assignments table dataframe

required
start_mid int

the model_id to start the propagation from

required
direction str

either 'down' or 'up' to indicate the direction of propagation

required

Returns:

Type Description

pd.DataFrame

Source code in saber/assign.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def map_propagate(df: pd.DataFrame, start_mid: int, direction: str) -> pd.DataFrame or None:
    """
    Meant to be mapped over a dataframe to propagate assignments downstream or upstream

    Args:
        df: the assignments table dataframe
        start_mid: the model_id to start the propagation from
        direction: either 'down' or 'up' to indicate the direction of propagation

    Returns:
        pd.DataFrame
    """
    # logger.info(f'Prop {direction} from {start_mid}')
    assigned_rows = []
    start_order = df[df[mid_col] == start_mid][order_col].values[0]

    # select the starting row
    stream_row = df[df[mid_col] == start_mid]
    start_gid = stream_row[asgn_gid_col].values[0]
    select_same_order_streams = df[order_col] == start_order

    n_steps = 1

    # repeat as long as the current row is not empty
    try:
        while True:
            # select the next up or downstream
            if direction == 'down':
                id_selector = df[mid_col] == stream_row[down_mid_col].values[0]
            else:  # direction == 'up':
                id_selector = df[down_mid_col] == stream_row[mid_col].values[0]

            # select the next row using the ID and Order selectors
            stream_row = df[np.logical_and(id_selector, select_same_order_streams)]

            # Break the loop if
            # 1. next row is empty -> no upstream/downstream row -> empty stream_row
            # 2. next row stream order not a match -> not picked by select_same_order_streams -> empty stream_row
            # 3. next row already has an assignment made -> reason column is not empty string
            if stream_row.empty or stream_row[reason_col].values[0] != 'unassigned':
                break

            # copy the row, modify the assignment columns, and append to the list
            new_row = stream_row.copy()
            new_row[[asgn_mid_col, asgn_gid_col, reason_col]] = [start_mid, start_gid, f'prop-{direction}-{n_steps}']
            assigned_rows.append(new_row)

            # increment the steps counter
            n_steps = n_steps + 1

            # Break the loop if
            # 1. The next row is an outlet -> no downstream row -> cause error when selecting next row
            # 2. we have reach the max number of steps (n_steps -1)
            if stream_row[down_mid_col].values[0] != -1 or n_steps >= 8:
                break
    except Exception as e:
        logger.error(f'Error in map_propagate: {e}')

    if len(assigned_rows):
        return pd.concat(assigned_rows)
    return pd.DataFrame(columns=df.columns)

map_resolve_propagations(df_props, mid)

Resolves the propagation assignments by choosing the assignment with the fewest steps

Parameters:

Name Type Description Default
df_props pd.DataFrame

the combined upstream and downstream propagation assignments dataframe

required
mid str

the model_id to resolve the propagation assignments for

required

Returns:

Type Description
pd.DataFrame

pd.DataFrame

Source code in saber/assign.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
def map_resolve_propagations(df_props: pd.DataFrame, mid: str) -> pd.DataFrame:
    """
    Resolves the propagation assignments by choosing the assignment with the fewest steps

    Args:
        df_props: the combined upstream and downstream propagation assignments dataframe
        mid: the model_id to resolve the propagation assignments for

    Returns:
        pd.DataFrame
    """
    df_mid = df_props[df_props[mid_col] == mid].copy()
    # parse the reason statement into number of steps and prop up or downstream
    df_mid[['direction', 'n_steps']] = pd.DataFrame(
        df_props[reason_col].apply(lambda x: x.split('-')[1:]).to_list())
    df_mid['n_steps'] = df_mid['n_steps'].astype(float)
    # sort by n_steps then by reason
    df_mid = df_mid.sort_values(['n_steps', 'direction'], ascending=[True, True])
    # return the first row which is the fewest steps and preferring downstream to upstream)
    return df_mid.head(1).drop(columns=['direction', 'n_steps'])