Skip to content

Trends

Trend analysis functions: pairwise comparison (run_pairs), group comparison (run_groups), and annual time series (run_series) with CQTC/QTC decomposition.

wrtds.trends

Trend analysis: generalized flow normalization and trend decomposition (EGRET 3.0).

Implements runPairs, runGroups, and runSeries from the R EGRET package. These functions decompose water-quality trends into a concentration-discharge trend component (CQTC) — change in the C-Q relationship itself — and a discharge trend component (QTC) — change driven by shifts in the discharge distribution.

The key idea is generalized flow normalization: instead of averaging across the entire historical discharge distribution (stationary FN), the discharge distribution is restricted to a sliding window of 2 * window_side + 1 years centred on the target year.

bin_qs_windowed(daily, center_year, window_side)

Group historical log-discharge by day-of-year within a time window.

Like :func:~wrtds.flow_norm.bin_qs but restricted to years within [center_year - window_side, center_year + window_side].

Parameters:

Name Type Description Default
daily

Populated daily DataFrame.

required
center_year

Centre year of the window (integer or decimal year).

required
window_side

Half-width of the window in years.

required

Returns:

Type Description

Dict mapping {day_of_year: np.array of LogQ values}.

Source code in wrtds/trends.py
def bin_qs_windowed(daily, center_year, window_side):
    """Group historical log-discharge by day-of-year within a time window.

    Like :func:`~wrtds.flow_norm.bin_qs` but restricted to years within
    ``[center_year - window_side, center_year + window_side]``.

    Args:
        daily: Populated daily DataFrame.
        center_year: Centre year of the window (integer or decimal year).
        window_side: Half-width of the window in years.

    Returns:
        Dict mapping ``{day_of_year: np.array of LogQ values}``.
    """
    dec_year = daily['DecYear'].values
    mask = (dec_year >= center_year - window_side) & (dec_year <= center_year + window_side)
    return bin_qs(daily.loc[mask])

run_pairs(sample, daily, year1, year2, window_side=7, pa_start=10, pa_long=12, fit_params=None)

Compare flow-normalised values between two specific years.

Estimates separate 1-year surfaces for year1 and year2, then decomposes the total change into a CQTC (concentration-discharge trend component) and a QTC (discharge trend component).

The notation xAB means:

  • A = which year's C-Q surface (1 = year1, 2 = year2)
  • B = which flow distribution (0 = stationary / full record, 1 = year1's window, 2 = year2's window)

Parameters:

Name Type Description Default
sample

Populated sample DataFrame.

required
daily

Populated daily DataFrame.

required
year1

First comparison year.

required
year2

Second comparison year.

required
window_side

Half-window for generalized flow normalisation (years). Use 0 for stationary flow normalisation only.

7
pa_start

Period of analysis start month (10 = water year).

10
pa_long

Period of analysis length in months.

12
fit_params

Dict of regression parameters (window_y, window_q, window_s, min_num_obs, min_num_uncen, edge_adjust).

None

Returns:

Type Description

DataFrame with index ['Conc', 'Flux'] and columns

['TotalChange', 'CQTC', 'QTC', 'x10', 'x11', 'x20', 'x22'].

Concentration in mg/L, flux in 10^6 kg/year.

Source code in wrtds/trends.py
def run_pairs(
    sample,
    daily,
    year1,
    year2,
    window_side=7,
    pa_start=10,
    pa_long=12,
    fit_params=None,
):
    """Compare flow-normalised values between two specific years.

    Estimates separate 1-year surfaces for *year1* and *year2*, then
    decomposes the total change into a **CQTC** (concentration-discharge
    trend component) and a **QTC** (discharge trend component).

    The notation ``xAB`` means:

    - **A** = which year's C-Q surface (1 = year1, 2 = year2)
    - **B** = which flow distribution (0 = stationary / full record,
      1 = year1's window, 2 = year2's window)

    Args:
        sample: Populated sample DataFrame.
        daily: Populated daily DataFrame.
        year1: First comparison year.
        year2: Second comparison year.
        window_side: Half-window for generalized flow normalisation (years).
            Use 0 for stationary flow normalisation only.
        pa_start: Period of analysis start month (10 = water year).
        pa_long: Period of analysis length in months.
        fit_params: Dict of regression parameters
            (``window_y``, ``window_q``, ``window_s``, ``min_num_obs``,
            ``min_num_uncen``, ``edge_adjust``).

    Returns:
        DataFrame with index ``['Conc', 'Flux']`` and columns
        ``['TotalChange', 'CQTC', 'QTC', 'x10', 'x11', 'x20', 'x22']``.
        Concentration in mg/L, flux in 10^6 kg/year.
    """
    if fit_params is None:
        fit_params = {
            'window_y': 7, 'window_q': 2, 'window_s': 0.5,
            'min_num_obs': 100, 'min_num_uncen': 50, 'edge_adjust': True,
        }

    # 1. Water year bounds in decimal years for surface grids
    start1, end1 = _water_year_dates(year1, pa_start, pa_long)
    start2, end2 = _water_year_dates(year2, pa_start, pa_long)

    dec_start1 = decimal_date(pd.Series([start1])).iloc[0]
    dec_end1 = decimal_date(pd.Series([end1])).iloc[0]
    dec_start2 = decimal_date(pd.Series([start2])).iloc[0]
    dec_end2 = decimal_date(pd.Series([end2])).iloc[0]

    # 2. Estimate 1-year surfaces for each year (full sample data, narrow grid)
    si1 = _compute_surface_index_narrow(sample, dec_start1, dec_end1)
    si2 = _compute_surface_index_narrow(sample, dec_start2, dec_end2)

    surfaces1 = estimate_surfaces(sample, si1, **fit_params)
    surfaces2 = estimate_surfaces(sample, si2, **fit_params)

    # 3. Filter daily to each analysis year
    daily_year1 = _filter_daily_to_year(daily, year1, pa_start, pa_long)
    daily_year2 = _filter_daily_to_year(daily, year2, pa_start, pa_long)

    # 4. Q bins: stationary (full record) and windowed
    q_bins_all = bin_qs(daily)

    if window_side > 0:
        q_bins_1 = bin_qs_windowed(daily, year1, window_side)
        q_bins_2 = bin_qs_windowed(daily, year2, window_side)
    else:
        q_bins_1 = q_bins_all
        q_bins_2 = q_bins_all

    # 5. Compute x10, x11, x20, x22
    c10, f10 = _annual_fn_mean(daily_year1, surfaces1, si1, q_bins_all)
    c11, f11 = _annual_fn_mean(daily_year1, surfaces1, si1, q_bins_1)
    c20, f20 = _annual_fn_mean(daily_year2, surfaces2, si2, q_bins_all)
    c22, f22 = _annual_fn_mean(daily_year2, surfaces2, si2, q_bins_2)

    return _build_result_df(c10, f10, c11, f11, c20, f20, c22, f22)

run_groups(daily, surfaces, surface_index, group1_years, group2_years, window_side=7, pa_start=10, pa_long=12)

Compare flow-normalised averages across two groups of years.

Uses the existing full-period surface and averages annual flow-normalised values over each year group.

Parameters:

Name Type Description Default
daily

Populated daily DataFrame.

required
surfaces

3-D surfaces array (from full fit).

required
surface_index

Grid parameters dict.

required
group1_years

(first_year, last_year) for group 1.

required
group2_years

(first_year, last_year) for group 2.

required
window_side

Half-window for generalized flow normalisation.

7
pa_start

Period of analysis start month.

10
pa_long

Period of analysis length in months.

12

Returns:

Type Description

DataFrame with same format as :func:run_pairs.

Source code in wrtds/trends.py
def run_groups(
    daily,
    surfaces,
    surface_index,
    group1_years,
    group2_years,
    window_side=7,
    pa_start=10,
    pa_long=12,
):
    """Compare flow-normalised averages across two groups of years.

    Uses the existing full-period surface and averages annual
    flow-normalised values over each year group.

    Args:
        daily: Populated daily DataFrame.
        surfaces: 3-D surfaces array (from full fit).
        surface_index: Grid parameters dict.
        group1_years: ``(first_year, last_year)`` for group 1.
        group2_years: ``(first_year, last_year)`` for group 2.
        window_side: Half-window for generalized flow normalisation.
        pa_start: Period of analysis start month.
        pa_long: Period of analysis length in months.

    Returns:
        DataFrame with same format as :func:`run_pairs`.
    """
    q_bins_all = bin_qs(daily)

    def _group_means(year_range):
        c_flex, f_flex, c_stat, f_stat = [], [], [], []
        for year in range(year_range[0], year_range[1] + 1):
            daily_year = _filter_daily_to_year(daily, year, pa_start, pa_long)
            if len(daily_year) == 0:
                continue

            # Stationary FN (full Q distribution)
            cs, fs = _annual_fn_mean(daily_year, surfaces, surface_index, q_bins_all)
            c_stat.append(cs)
            f_stat.append(fs)

            # Generalized (windowed) FN
            if window_side > 0:
                q_bins_w = bin_qs_windowed(daily, year, window_side)
            else:
                q_bins_w = q_bins_all
            cg, fg = _annual_fn_mean(daily_year, surfaces, surface_index, q_bins_w)
            c_flex.append(cg)
            f_flex.append(fg)

        return (float(np.mean(c_flex)), float(np.mean(f_flex)),
                float(np.mean(c_stat)), float(np.mean(f_stat)))

    c11, f11, c10, f10 = _group_means(group1_years)
    c22, f22, c20, f20 = _group_means(group2_years)

    return _build_result_df(c10, f10, c11, f11, c20, f20, c22, f22)

run_series(daily, surfaces, surface_index, window_side=7, pa_start=10, pa_long=12)

Compute annual time series of generalized flow-normalised values.

For each year in the record, flow normalisation uses the discharge distribution from a sliding window of 2 * window_side + 1 years centred on the target year.

Parameters:

Name Type Description Default
daily

Populated daily DataFrame (must have Day, DecYear, Date columns).

required
surfaces

3-D surfaces array.

required
surface_index

Grid parameters dict.

required
window_side

Half-window for generalized flow normalisation. Use 0 for standard (stationary) flow normalisation.

7
pa_start

Period of analysis start month.

10
pa_long

Period of analysis length in months.

12

Returns:

Type Description

Daily DataFrame with updated FNConc and FNFlux columns

computed using generalized flow normalisation.

Source code in wrtds/trends.py
def run_series(
    daily,
    surfaces,
    surface_index,
    window_side=7,
    pa_start=10,
    pa_long=12,
):
    """Compute annual time series of generalized flow-normalised values.

    For each year in the record, flow normalisation uses the discharge
    distribution from a sliding window of ``2 * window_side + 1`` years
    centred on the target year.

    Args:
        daily: Populated daily DataFrame (must have ``Day``, ``DecYear``,
            ``Date`` columns).
        surfaces: 3-D surfaces array.
        surface_index: Grid parameters dict.
        window_side: Half-window for generalized flow normalisation.
            Use 0 for standard (stationary) flow normalisation.
        pa_start: Period of analysis start month.
        pa_long: Period of analysis length in months.

    Returns:
        Daily DataFrame with updated ``FNConc`` and ``FNFlux`` columns
        computed using generalized flow normalisation.
    """
    daily = daily.copy()

    # Determine year range from daily data
    dec_year = daily['DecYear'].values
    min_dec = float(dec_year.min())
    max_dec = float(dec_year.max())

    # Find first and last complete analysis years
    first_year = int(np.ceil(min_dec))
    last_year = int(np.floor(max_dec))

    daily['FNConc'] = np.nan
    daily['FNFlux'] = np.nan

    for year in range(first_year, last_year + 1):
        start, end = _water_year_dates(year, pa_start, pa_long)
        year_mask = (daily['Date'] >= start) & (daily['Date'] <= end)
        if not year_mask.any():
            continue

        daily_year = daily.loc[year_mask].copy()

        if window_side > 0:
            q_bins = bin_qs_windowed(daily, year, window_side)
        else:
            q_bins = bin_qs(daily)

        daily_fn = flow_normalize(daily_year, surfaces, surface_index, q_bins)

        daily.loc[year_mask, 'FNConc'] = daily_fn['FNConc'].values
        daily.loc[year_mask, 'FNFlux'] = daily_fn['FNFlux'].values

    return daily