Skip to content

saber.table

init(drain_table=None, gauge_table=None, reg_table=None, cluster_table=None, cache=True)

Joins the drain_table.csv and gauge_table.csv to create the assign_table.csv

Parameters:

Name Type Description Default
drain_table pd.DataFrame

the drain table dataframe

None
gauge_table pd.DataFrame

the gauge table dataframe

None
reg_table pd.DataFrame

the regulatory structure table dataframe

None
cluster_table pd.DataFrame

a dataframe with a column for the assigned cluster label and a column for the model_id

None
cache bool

whether to cache the assign table immediately

True

Returns:

Type Description
pd.DataFrame

pd.DataFrame

Source code in saber/table.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def init(drain_table: pd.DataFrame = None,
         gauge_table: pd.DataFrame = None,
         reg_table: pd.DataFrame = None,
         cluster_table: pd.DataFrame = None,
         cache: bool = True) -> pd.DataFrame:
    """
    Joins the drain_table.csv and gauge_table.csv to create the assign_table.csv

    Args:
        drain_table: the drain table dataframe
        gauge_table: the gauge table dataframe
        reg_table: the regulatory structure table dataframe
        cluster_table: a dataframe with a column for the assigned cluster label and a column for the model_id
        cache: whether to cache the assign table immediately

    Returns:
        pd.DataFrame
    """
    # read the tables if they are not provided
    if drain_table is None:
        try:
            drain_table = read_table('drain_table')
        except FileNotFoundError:
            raise FileNotFoundError('The drain_table must be provided or created first')
    if gauge_table is None:
        try:
            gauge_table = read_table('gauge_table')
        except FileNotFoundError:
            raise FileNotFoundError('The gauge_table must be provided or created first')
    if reg_table is None:
        try:
            reg_table = read_table('regulate_table')
        except FileNotFoundError:
            raise FileNotFoundError('The regulate_table must be provided or created first')
    if cluster_table is None:
        try:
            cluster_table = read_table('cluster_table')
        except FileNotFoundError:
            raise FileNotFoundError('The cluster_table must be provided or created first')

    # enforce correct column data types
    drain_table[COL_MID] = drain_table[COL_MID].astype(str)
    drain_table[COL_MID_DOWN] = drain_table[COL_MID_DOWN].astype(str)
    gauge_table[COL_MID] = gauge_table[COL_MID].astype(str)
    gauge_table[COL_GID] = gauge_table[COL_GID].astype(str)
    reg_table[COL_MID] = reg_table[COL_MID].astype(str)
    reg_table[COL_RID] = reg_table[COL_RID].astype(str)
    cluster_table[COL_MID] = cluster_table[COL_MID].astype(str)

    # merge the drain_table, gauge_table, reg_table, and labels_df on the model_id column
    assign_df = (
        drain_table
        .merge(gauge_table, on=COL_MID, how='outer')
        .merge(reg_table, on=COL_MID, how='outer')
        .merge(cluster_table, on=COL_MID, how='outer')
        .sort_values(by=COL_MID)
        .reset_index(drop=True)
    )

    # create new columns asn_mid_col, asn_gid_col, reason_col
    assign_df[atable_cols] = atable_cols_defaults
    assign_df[COL_MID] = assign_df[COL_MID].astype(float).astype(int).astype(str)

    if not all([col in assign_df.columns for col in all_cols]):
        logger.error('Missing columns in assign table. Check your input tables.')
        logger.debug(f'Have columns: {assign_df.columns}')
        logger.debug(f'Need columns: {all_cols}')
        raise AssertionError('Missing columns in assign table. Check your input tables.')

    # check for and remove duplicate rows
    assign_df = assign_df.drop_duplicates(subset=[COL_MID])

    if cache:
        write_table(assign_df, 'assign_table')

    return assign_df

mp_prop_gauges(df, n_processes=None)

Traverses dendritic stream networks to identify upstream and downstream river reaches

Parameters:

Name Type Description Default
df pd.DataFrame

the assign table dataframe

required
n_processes

the number of processes to use for multiprocessing

None

Returns:

Type Description
pd.DataFrame

pd.DataFrame

Source code in saber/table.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def mp_prop_gauges(df: pd.DataFrame, n_processes: int or None = None) -> pd.DataFrame:
    """
    Traverses dendritic stream networks to identify upstream and downstream river reaches

    Args:
        df: the assign table dataframe
        n_processes: the number of processes to use for multiprocessing

    Returns:
        pd.DataFrame
    """
    logger.info('Propagating from Gauges')
    gauged_mids = df[df[COL_GID].notna()][COL_MID].values

    with Pool(n_processes) as p:
        logger.info('Finding Downstream')
        df_prop_down = pd.concat(p.starmap(_map_propagate, [(df, x, 'down', COL_GPROP) for x in gauged_mids]))
        logger.info('Finding Upstream')
        df_prop_up = pd.concat(p.starmap(_map_propagate, [(df, x, 'up', COL_GPROP) for x in gauged_mids]))
        logger.info('Resolving Nearest Propagation Neighbor')
        df_prop = pd.concat([df_prop_down, df_prop_up]).reset_index(drop=True)
        df_prop = pd.concat(p.starmap(_map_resolve_props, [(df_prop, x, COL_GPROP) for x in df_prop[COL_MID].unique()]))

    return pd.concat([df[~df[COL_MID].isin(df_prop[COL_MID])], df_prop])

mp_prop_regulated(df, n_processes=None)

Traverses dendritic stream networks downstream from regulatory structures

Parameters:

Name Type Description Default
df pd.DataFrame

the assign table dataframe

required
n_processes

the number of processes to use for multiprocessing

None

Returns:

Type Description
pd.DataFrame

pd.DataFrame

Source code in saber/table.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
def mp_prop_regulated(df: pd.DataFrame, n_processes: int or None = None) -> pd.DataFrame:
    """
    Traverses dendritic stream networks downstream from regulatory structures

    Args:
        df: the assign table dataframe
        n_processes: the number of processes to use for multiprocessing

    Returns:
        pd.DataFrame
    """
    logger.info('Propagating from Regulatory Structures')
    with Pool(n_processes) as p:
        logger.info('Propagating Downstream')
        df_prop = pd.concat(p.starmap(
            _map_propagate,
            [(df, x, 'down', COL_RPROP, False) for x in df[df[COL_RID].notna()][COL_MID].values]
        ))
        logger.info('Resolving Propagation')
        df_prop = pd.concat(p.starmap(_map_resolve_props, [(df_prop, x, COL_RPROP) for x in df_prop[COL_MID].unique()]))

    return pd.concat([df[~df[COL_MID].isin(df_prop[COL_MID])], df_prop])