Skip to content

Commit

Permalink
Merge branch 'main' into feature/a-value
Browse files Browse the repository at this point in the history
  • Loading branch information
aronsho committed Jul 4, 2024
2 parents 9ed063b + ae59690 commit 4a6ea10
Show file tree
Hide file tree
Showing 5 changed files with 10,177 additions and 78 deletions.
128 changes: 99 additions & 29 deletions seismostats/analysis/estimate_mc.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ def cdf_discrete_GR(

def empirical_cdf(
sample: np.ndarray | pd.Series,
mc: float = None,
delta_m: float = 1e-16,
weights: np.ndarray | pd.Series | None = None,
) -> tuple[np.ndarray, np.ndarray]:
"""
Expand All @@ -52,11 +54,18 @@ def empirical_cdf(
Parameters:
sample: Magnitude sample
mc: Completeness magnitude, if None, the minimum of the sample
is used
delta_m: Magnitude bin size, by default 1e-16. Its recommended to
use the value that the samples are rounded to.
weights: Sample weights, by default None
Returns:
x: x-values of the empirical CDF
y: y-values of the empirical CDF
x: x-values of the empirical CDF (i.e. the unique vector of
magnitudes from mc to the maximum magnitude in the sample,
binned by delta_m)
y: y-values of the empirical CDF (i.e., the empirical
frequency observed in the sample corresponding to the x-values)
"""

try:
Expand All @@ -69,17 +78,52 @@ def empirical_cdf(
except BaseException:
pass

idx = np.argsort(sample)
sample_sorted = sample[idx]
if delta_m == 0:
raise ValueError("delta_m has to be > 0")

if weights is not None:
weights_sorted = weights[idx]
x, y = sample_sorted, np.cumsum(weights_sorted) / weights_sorted.sum()
else:
x, y = sample_sorted, np.arange(1, len(sample) + 1) / len(sample)
if get_option("warnings") is True:
# check if binning is correct
if not np.allclose(sample, bin_to_precision(sample, delta_m)):
warnings.warn(
"Magnitudes are not binned correctly. "
"Test might fail because of this."
)
if delta_m == 1e-16:
warnings.warn(
"delta_m = 1e-16, this might lead to extended computation time")

if mc is None:
mc = np.min(sample)

idx1 = np.argsort(sample)
x = sample[idx1]
x, y_count = np.unique(x, return_counts=True)
y = y[np.cumsum(y_count) - 1]

# add empty bins
for mag_bin in bin_to_precision(
np.arange(mc, np.max(sample) + delta_m, delta_m), delta_m
):
if mag_bin not in x:
x = np.append(x, mag_bin)
y_count = np.append(y_count, 0)
idx2 = np.argsort(x)
x = x[idx2]
y_count = y_count[idx2]

# estimate the CDF
if weights is None:
y = np.cumsum(y_count) / len(sample)
else:
weights_sorted = weights[idx1]
y = np.cumsum(weights_sorted) / weights_sorted.sum()

# make sure that y is zero if there are no samples in the first bins
for ii, y_loop in enumerate(y_count):
if y_loop > 0:
break
leading_zeros = np.zeros(ii)
y = np.append(leading_zeros, y[np.cumsum(y_count[ii:]) - 1])

return x, y


Expand All @@ -89,6 +133,7 @@ def ks_test_gr(
delta_m: float,
beta: float,
n: int = 10000,
ks_ds: list | None = None,
) -> tuple[float, float, list[float]]:
"""
For a given magnitude sample and mc and beta,
Expand All @@ -101,8 +146,12 @@ def ks_test_gr(
mc: Completeness magnitude
delta_m: Magnitude bin size
beta : Beta parameter for the Gutenberg-Richter distribution
n: Number of times the KS distance is calculated for
estimating the p-value, by default 10000
n: Number of times the KS distance is calculated from
synthetic samples with the given parameters, used for
estimating the p-value. By default 10000.
ks_ds: List of KS distances from synthetic data with the given
paramters. If None, they will be estimated here (then, n is
not needed). By default None.
Returns:
p_val: p-value
Expand All @@ -121,23 +170,24 @@ def ks_test_gr(
warnings.warn("sample contains only one value")
return 0, 1, []

ks_ds = []
if ks_ds is None:
ks_ds = []

n_sample = len(sample)
simulated_all = simulate_magnitudes_binned(
n * n_sample, beta, mc, delta_m, b_parameter="beta"
)
n_sample = len(sample)
simulated_all = simulate_magnitudes_binned(
n * n_sample, beta, mc, delta_m, b_parameter="beta"
)

for ii in range(n):
simulated = simulated_all[n_sample * ii: n_sample * (ii + 1)]
_, y_th = cdf_discrete_GR(simulated, mc=mc, delta_m=delta_m, beta=beta)
_, y_emp = empirical_cdf(simulated)
for ii in range(n):
simulated = simulated_all[n_sample * ii: n_sample * (ii + 1)]
x, y_emp = empirical_cdf(simulated, mc, delta_m)
_, y_th = cdf_discrete_GR(x, mc=mc, delta_m=delta_m, beta=beta)

ks_d = np.max(np.abs(y_emp - y_th))
ks_ds.append(ks_d)
ks_d = np.max(np.abs(y_emp - y_th))
ks_ds.append(ks_d)

_, y_th = cdf_discrete_GR(sample, mc=mc, delta_m=delta_m, beta=beta)
_, y_emp = empirical_cdf(sample)
x, y_emp = empirical_cdf(sample, mc, delta_m)
_, y_th = cdf_discrete_GR(x, mc=mc, delta_m=delta_m, beta=beta)

ks_d_obs = np.max(np.abs(y_emp - y_th))
p_val = sum(ks_ds >= ks_d_obs) / len(ks_ds)
Expand All @@ -155,6 +205,7 @@ def mc_ks(
beta: float | None = None,
b_method: str | None = None,
n: int = 10000,
ks_ds_list: list[list] | None = None,
) -> tuple[np.ndarray, list[float], np.ndarray, float | None, float | None]:
"""
Return the completeness magnitude (mc) estimate
Expand Down Expand Up @@ -183,6 +234,10 @@ def mc_ks(
n: Number of number of times the KS distance is
calculated for estimating the p-value,
by default 10000
ks_ds_list: List of list of KS distances from synthetic data
(needed for testing). If None, they will be estimated
in this funciton. By default None
Returns:
mcs_test: tested completeness magnitudes
ks_ds: KS distances
Expand All @@ -201,6 +256,11 @@ def mc_ks(
if not np.all(np.diff(mcs_test) > 0):
warnings.warn("mcs_test are being re-ordered by size.")
mcs_test = np.sort(np.unique(mcs_test))
if not np.allclose(mcs_test, bin_to_precision(mcs_test, delta_m)):
warnings.warn(
"mc_test are not binned correctly,"
"this might affect the test."
)

if get_option("warnings") is True:
# check if binning is correct
Expand Down Expand Up @@ -228,7 +288,7 @@ def mc_ks(
ps = []
betas = []

for mc in mcs_test:
for ii, mc in enumerate(mcs_test):

if verbose:
print("\ntesting mc", mc)
Expand All @@ -247,9 +307,19 @@ def mc_ks(
else:
mc_beta = beta

p, ks_d, _ = ks_test_gr(
mc_sample, mc=mc, delta_m=delta_m, beta=mc_beta, n=n
)
if ks_ds_list is None:
p, ks_d, _ = ks_test_gr(
mc_sample, mc=mc, delta_m=delta_m, beta=mc_beta, n=n
)
else:
p, ks_d, _ = ks_test_gr(
mc_sample,
mc=mc,
delta_m=delta_m,
beta=mc_beta,
n=n,
ks_ds=ks_ds_list[ii],
)

mcs_tested.append(mc)
ks_ds.append(ks_d)
Expand Down
Loading

0 comments on commit 4a6ea10

Please sign in to comment.