Standardized Precipitation Index (SPI)
Rendered from 081-spi-drought-index.ipynb
Standardized Precipitation Index (SPI)
Citations: McKee TB et al. (1993); Edwards DC, McKee TB (1997); WMO (2012) SPI User Guide.
Primal: science.spi_drought_index · Rust: validate_drought_index
Baseline: control/drought_index/drought_index_spi.py · Benchmark: control/drought_index/benchmark_drought_index.json
Theory
For scale $k$ months, accumulate precipitation, fit a gamma distribution to positive amounts (MLE via Thom’s approximation), account for zero-rain mass $q$, map through the gamma CDF, then apply the inverse normal to obtain standard-normal SPI.
WMO-style categories (illustrative): extremely wet (SPI $\ge 2$) … extremely dry (SPI $\le -2$).
import json
import math
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np
REPO = Path('/home/eastgate/Development/ecoPrimals/springs/airSpring').resolve()
BENCH = REPO / "control/drought_index/benchmark_drought_index.json"
C_GREEN, C_RED, C_BLUE = "#2ecc71", "#e74c3c", "#3498db"
plt.rcParams.update({"figure.figsize": (9, 4.5), "axes.grid": True})def gamma_mle_fit(data):
positive = [x for x in data if x > 0]
n = len(positive)
if n < 3:
return None
mean_val = sum(positive) / n
log_mean = sum(math.log(x) for x in positive) / n
A = math.log(mean_val) - log_mean
if A <= 0:
return None
alpha = (1.0 / (4.0 * A)) * (1.0 + math.sqrt(1.0 + 4.0 * A / 3.0))
beta = mean_val / alpha
return (alpha, beta)
def gamma_series(a, x):
ap = a
total = 1.0 / a
delta = total
for _ in range(200):
ap += 1
delta *= x / ap
total += delta
if abs(delta) < abs(total) * 1e-14:
break
return total * math.exp(-x + a * math.log(x) - math.lgamma(a))
def gamma_cf(a, x):
b = x + 1 - a
c = 1e30
d = 1.0 / b
h = d
for i in range(1, 200):
an = -i * (i - a)
b += 2
d = an * d + b
if abs(d) < 1e-30:
d = 1e-30
c = b + an / c
if abs(c) < 1e-30:
c = 1e-30
d = 1.0 / d
delta = d * c
h *= delta
if abs(delta - 1.0) < 1e-14:
break
return math.exp(-x + a * math.log(x) - math.lgamma(a)) * h
def regularized_gamma_p(a, x):
if x <= 0:
return 0.0
if x < a + 1:
return gamma_series(a, x)
return 1.0 - gamma_cf(a, x)
def gamma_cdf(x, alpha, beta):
if x <= 0:
return 0.0
return regularized_gamma_p(alpha, x / beta)
def norm_ppf(p):
if p <= 0:
return -8.0
if p >= 1:
return 8.0
if p == 0.5:
return 0.0
if p < 0.5:
t = math.sqrt(-2.0 * math.log(p))
else:
t = math.sqrt(-2.0 * math.log(1.0 - p))
c0, c1, c2 = 2.515517, 0.802853, 0.010328
d1, d2, d3 = 1.432788, 0.189269, 0.001308
x = t - (c0 + c1 * t + c2 * t * t) / (1.0 + d1 * t + d2 * t * t + d3 * t * t * t)
return -x if p < 0.5 else x
def compute_spi(monthly_precip, scale=1):
n = len(monthly_precip)
spi = [float("nan")] * n
accum = []
for i in range(n):
if i < scale - 1:
accum.append(float("nan"))
else:
accum.append(sum(monthly_precip[i - scale + 1 : i + 1]))
valid = [x for x in accum if not math.isnan(x)]
if len(valid) < 3:
return spi
fit = gamma_mle_fit(valid)
if fit is None:
return spi
alpha, beta = fit
q = sum(1 for x in valid if x == 0) / len(valid)
for i in range(n):
if math.isnan(accum[i]):
continue
if accum[i] == 0:
prob = q
else:
prob = q + (1.0 - q) * gamma_cdf(accum[i], alpha, beta)
prob = max(1e-10, min(1.0 - 1e-10, prob))
spi[i] = norm_ppf(prob)
return spi
def classify_spi(spi_value):
if math.isnan(spi_value):
return "insufficient_data"
if spi_value >= 2.0:
return "extremely_wet"
if spi_value >= 1.5:
return "very_wet"
if spi_value >= 1.0:
return "moderately_wet"
if spi_value > -1.0:
return "near_normal"
if spi_value > -1.5:
return "moderately_dry"
if spi_value > -2.0:
return "severely_dry"
return "extremely_dry"
with open(BENCH) as f:
bench = json.load(f)
precip = bench["monthly_precip_mm"]
known = bench["gamma_fit_known"]["data"]
fit = gamma_mle_fit(known)
assert fit is not None
assert abs(fit[0] - bench["gamma_fit_known"]["alpha"]) < 1e-6
assert abs(fit[1] - bench["gamma_fit_known"]["beta"]) < 1e-6
spi1 = compute_spi(precip, 1)
# bench stores null for NaN in JSON — compare non-NaN indices
b1 = bench["spi1"]["values"]
for i, (a, b) in enumerate(zip(spi1, b1)):
if b is None:
assert math.isnan(a)
else:
assert abs(a - b) < 1e-9, i
spi3 = compute_spi(precip, 3)
for a, b in zip(spi3, bench["spi3"]["values"]):
if b is None:
assert math.isnan(a)
else:
assert abs(a - b) < 1e-9
assert bench["spi1"]["n_valid"] == len([x for x in spi1 if not math.isnan(x)])
classes = [classify_spi(v) for v in spi1 if not math.isnan(v)]
from collections import Counter
c = Counter(classes)
for k, v in bench["classification_counts"].items():
assert c[k] == v
print("benchmark_drought_index.json SPI + gamma fit agreement passed.")fig, ax = plt.subplots()
ax.plot(range(1, len(precip) + 1), precip, color=C_BLUE, lw=1.2, label="Monthly $P$")
ax.set_xlabel("Month index")
ax.set_ylabel("mm")
ax.set_title("Benchmark monthly precipitation")
ax2 = ax.twinx()
ax2.plot(range(1, len(spi1) + 1), spi1, color=C_RED, lw=1.2, alpha=0.85, label="SPI-1")
ax.legend(loc="upper left")
ax2.legend(loc="upper right")
plt.tight_layout()
plt.show()
fig2, ax3 = plt.subplots()
months = np.arange(1, len(spi1) + 1)
ax3.plot(months, spi1, color=C_GREEN, label="SPI-1")
ax3.plot(months, spi3, color=C_BLUE, alpha=0.8, label="SPI-3")
ax3.axhline(0, color="gray", lw=1)
ax3.set_xlabel("Month")
ax3.set_ylabel("SPI")
ax3.set_title("SPI scales (benchmark series)")
ax3.legend()
plt.tight_layout()
plt.show()Summary
- Algorithm: Gamma MLE + zero handling + normal quantile transform, matching the control script.
- Benchmark:
benchmark_drought_index.jsonstores the 60-month synthetic series, fitted gamma parameters, SPI vectors, and classification counts. - Provenance: Deterministic generator seed and references in JSON
_provenance.