from __future__ import annotations
from dataclasses import dataclass
from typing import Literal, Tuple
import numpy as np
from tad.metrics.avalanches import AvalancheResult
[docs]
@dataclass(frozen=True)
class PowerLawFitResult:
"""
Power-law fit result for discrete avalanche distributions.
Parameters
----------
variable
"size" or "lifetime".
a
Prefactor in P(x) = a * x^b (in probability space).
b
Exponent in P(x) = a * x^b.
rmse
Root mean squared error between empirical P(x) and fitted P_hat(x),
computed over the retained bins (after exclusions).
x_all
All support values (sorted unique x) from the empirical PMF.
p_all
Empirical PMF values corresponding to x_all.
keep_mask
Boolean mask over x_all/p_all indicating which points were used for fit/RMSE.
x_used
x values used for fit.
p_used
empirical P(x) used for fit.
p_hat_used
fitted P_hat(x) evaluated at x_used.
exclude_unit
Whether x=1 was excluded.
tail_fraction
Tail cutoff fraction relative to max(P). Points with P < tail_fraction * max(P)
were excluded.
"""
variable: Literal["size", "lifetime"]
a: float
b: float
rmse: float
x_all: np.ndarray
p_all: np.ndarray
keep_mask: np.ndarray
x_used: np.ndarray
p_used: np.ndarray
p_hat_used: np.ndarray
exclude_unit: bool
tail_fraction: float
def _pmf_from_samples(samples: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
Compute discrete PMF from positive integer samples.
Parameters
----------
samples
1D array of integers (>=1).
Returns
-------
x
Sorted unique support values.
p
Probability mass P(x) on that support.
"""
s = np.asarray(samples, dtype=np.int64).ravel()
s = s[s >= 1]
if s.size == 0:
return np.asarray([], dtype=np.float64), np.asarray([], dtype=np.float64)
x, c = np.unique(s, return_counts=True)
p = c.astype(np.float64) / float(c.sum())
return x.astype(np.float64), p
[docs]
def fit_avalanche_powerlaw(
aval: AvalancheResult,
*,
variable: Literal["size", "lifetime"] = "size",
exclude_unit: bool = True,
tail_fraction: float = 0.01,
min_points: int = 5,
) -> PowerLawFitResult:
"""
Fit a power-law model P(x) = a x^b to an avalanche distribution and compute RMSE.
This implements the *paper-style* protocol used in your references:
- Build a discrete histogram / PMF of sizes or lifetimes
- Exclude unit events (x=1) (first bin)
- Exclude tail bins whose probability is < (tail_fraction * max_probability)
(paper uses 1% of the maximum => tail_fraction=0.01)
- Fit P(x) = a x^b
- Compute RMSE between empirical P(x) and fitted P_hat(x) on retained points
Parameters
----------
aval
AvalancheResult containing extracted avalanche sizes/lifetimes.
variable
Choose which distribution to fit: "size" or "lifetime".
exclude_unit
If True, exclude x=1.
tail_fraction
Tail probability cutoff relative to max(P). Default 0.01.
min_points
Minimum number of retained points required to fit. If fewer, returns NaNs.
Returns
-------
PowerLawFitResult
Fit parameters and diagnostics.
Notes
-----
The fit is performed by linear least squares on log space:
log P = log a + b log x
Then a is recovered by exponentiation.
RMSE is computed in probability space:
RMSE = sqrt(mean((P - P_hat)^2))
over the retained points, matching the “goodness-of-fit via RMSE” style described.
"""
if tail_fraction <= 0.0 or not np.isfinite(tail_fraction):
raise ValueError(f"tail_fraction must be positive and finite, got {tail_fraction!r}.")
if min_points < 3:
raise ValueError("min_points should be >= 3.")
if variable == "size":
samples = aval.sizes
elif variable == "lifetime":
samples = aval.lifetimes
else:
raise ValueError(f"variable must be 'size' or 'lifetime', got {variable!r}.")
x_all, p_all = _pmf_from_samples(samples)
# Default keep mask: all points with positive probability
keep = (p_all > 0.0)
# Exclude unit avalanches (first bin)
if exclude_unit:
keep &= (x_all != 1.0)
# Tail trimming: remove bins with P < tail_fraction * max(P) among currently kept bins
if np.any(keep):
pmax = float(np.max(p_all[keep]))
keep &= (p_all >= tail_fraction * pmax)
x_used = x_all[keep]
p_used = p_all[keep]
# Not enough points: return NaNs but keep artifacts for inspection
if x_used.size < min_points:
nan = float("nan")
return PowerLawFitResult(
variable=variable,
a=nan,
b=nan,
rmse=nan,
x_all=x_all,
p_all=p_all,
keep_mask=keep,
x_used=x_used,
p_used=p_used,
p_hat_used=np.asarray([], dtype=np.float64),
exclude_unit=exclude_unit,
tail_fraction=float(tail_fraction),
)
# Fit in log space: log(P) = log(a) + b log(x)
lx = np.log(x_used)
lp = np.log(p_used)
# Linear regression (least squares)
# lp ≈ intercept + b * lx
b, intercept = np.polyfit(lx, lp, deg=1)
a = float(np.exp(intercept))
p_hat = a * (x_used ** b)
rmse = float(np.sqrt(np.mean((p_used - p_hat) ** 2)))
return PowerLawFitResult(
variable=variable,
a=a,
b=float(b),
rmse=rmse,
x_all=x_all,
p_all=p_all,
keep_mask=keep,
x_used=x_used,
p_used=p_used,
p_hat_used=p_hat,
exclude_unit=exclude_unit,
tail_fraction=float(tail_fraction),
)