Skip to content

Commit

Permalink
Merge pull request #49 from thpap/main
Browse files Browse the repository at this point in the history
implemented Weichert (1980) method for estimation of GR-parameters
  • Loading branch information
schmidni authored Aug 11, 2023
2 parents 499636f + 723aeda commit 17611c7
Show file tree
Hide file tree
Showing 3 changed files with 288 additions and 17 deletions.
183 changes: 182 additions & 1 deletion catalog_tools/analysis/estimate_beta.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
"""This module contains functions for the estimation of beta and the b-value.
"""
from typing import Optional, Tuple, Union
from typing import Optional, Tuple, Union, List

import numpy as np
import pandas as pd
from scipy.optimize import minimize
import warnings


def estimate_b_tinti(magnitudes: np.ndarray,
Expand Down Expand Up @@ -231,6 +234,184 @@ def estimate_b_laplace(
error=error)


def estimate_b_weichert(
magnitudes: np.ndarray,
dates: List[np.datetime64],
completeness_table: np.ndarray,
mag_max: Union[int, float],
last_year: Optional[Union[int, float]] = None,
delta_m: float = 0.1,
b_parameter: str = 'b_value'
) -> Tuple[float, float, float, float, float]:
""" applies the Weichert (1980) algorithm for estimation of the
Gutenberg-Richter magnitude-frequency distribution parameters in
the case of unequal completeness periods for different magnitude
values.
Source:
Weichert (1980), Estimation of the earthquake recurrence parameters
for unequal observation periods for different magnitudes,
Bulletin of the Seismological Society of America,
Vol 70, No. 4, pp. 1337-1346
Args:
magnitudes: vector of earthquake magnitudes
dates: list of datetime objects of occurrence of each earthquake
completeness_table: Nx2 array, where the first column
contains the leftmost edge of magnitude bins and
the second column the associated year of completeness, i.e.
the year after which all earthquakes larger than the value in
the first column are considered detected. An example is given
below:
np.array([[ 3.95, 1980],
[ 4.95, 1920],
[ 5.95, 1810],
[ 6.95, 1520]])
mag_max: maximum possible magnitude
last_year: last year of observation (the default is None, in which case
it is set to the latest year in years).
delta_m: magnitude resolution, the default is 0.1.
b_parameter:either 'b-value', then the corresponding value of the
Gutenberg-Richter law is returned, otherwise 'beta'
from the exponential distribution [p(M) = exp(-beta*(M-mc))]
Returns:(
b_parameter: maximum likelihood point estimate of 'b-value' or 'beta'
std_b_parameter: standard error of b_parameter
rate_at_lmc: maximum likelihood point estimate of earthquake rate
at the lower magnitude of completeness
std_rate_at_lmc: standard error of rate_at_lmc
a_val: maximum likelihood point estimate of a-value
( =log10(rate at mag=0) ) of Gutenberg-Richter
magnitude frequency distribution
"""
assert len(magnitudes) == len(dates), \
"the magnitudes and years arrays have different lengths"
assert completeness_table.shape[1] == 2
assert np.all(np.ediff1d(completeness_table[:, 0]) >= 0),\
"magnitudes in completeness table not in ascending order"
assert [i - delta_m in np.arange(completeness_table[0, 0],
mag_max + 0.001, delta_m)
for i in np.unique(magnitudes)],\
"magnitude bins not aligned with completeness edges"
if not np.all(magnitudes >= completeness_table[:, 0].min()):
warnings.warn(
"magnitudes below %.2f are not covered by the "
"completeness table and are discarded" %
completeness_table[0, 0])
assert delta_m > 0, "delta_m cannot be zero"
assert b_parameter == 'b_value' or b_parameter == 'beta', \
"please choose either 'b_value' or 'beta' as b_parameter"
factor = 1 / np.log(10) if b_parameter == 'b_value' else 1

# convert datetime to integer calendar year
years = np.array(dates).astype('datetime64[Y]').astype(int) + 1970

# get last year of catalogue if last_year not defined
last_year = last_year if last_year else np.max(years)

# Get the magnitudes and completeness years as separate arrays
completeness_table_magnitudes = completeness_table[:, 0]
completeness_table_years = completeness_table[:, 1]

# Obtain the completeness start year for each value in magnitudes
insertion_indices = np.searchsorted(
completeness_table_magnitudes, magnitudes)
completeness_starts = np.array(
[completeness_table_years[idx - 1] if
idx not in [0, len(completeness_table_years)]
else {0: -1,
len(completeness_table_years): completeness_table_years[-1]}[idx]
for i, idx in enumerate(insertion_indices)]
)

# filter out events outside completeness window and
# get number of "complete" events in each magnitude bin
# and associated year of completeness
idxcomp = ((completeness_starts > 0) & (years - completeness_starts >= 0))
complete_events = pd.DataFrame.groupby(
pd.DataFrame(data={
'mag_left_edge': np.array(
[i.left for i in pd.cut(magnitudes[idxcomp],
bins=np.arange(completeness_table_magnitudes[0],
mag_max + 0.01, delta_m), right=False)]),
'completeness_start': completeness_starts[idxcomp]
}), by=['mag_left_edge', 'completeness_start'])\
.size()\
.to_frame('num')\
.reset_index()
assert np.all(
complete_events.completeness_start > 0) # should be the case by design

# minimization
beta = np.log(10) # initialization of beta
solution = minimize(_weichert_objective_function,
beta,
args=(last_year, complete_events, delta_m),
method='Nelder-Mead',
options={'maxiter': 5000, 'disp': True},
tol=1e5 * np.finfo(float).eps)
beta = solution.x[0]
b_parameter = beta * factor

# compute rate at lower magnitude of completeness bin
weichert_multiplier = np.sum(
np.exp(-beta * (complete_events.mag_left_edge + delta_m * 0.5)))\
/ np.sum(
(last_year - complete_events.completeness_start.values)
* np.exp(-beta * (complete_events.mag_left_edge + delta_m * 0.5)))
rate_at_lmc = complete_events.num.sum() * weichert_multiplier

# compute a-value ( a_val = log10(rate at M=0) )
a_val = np.log10(rate_at_lmc)\
+ (beta / np.log(10)) * complete_events.mag_left_edge.values[0]

# compute uncertainty in b-parameter according to Weichert (1980)
nominator = np.sum((last_year - complete_events.completeness_start.values)
* np.exp(-beta * (complete_events.mag_left_edge
+ delta_m * 0.5)))**2
denominator_term1 = np.sum(
(last_year - complete_events.completeness_start.values)
* (complete_events.mag_left_edge + delta_m * 0.5)
* np.exp(- beta * (complete_events.mag_left_edge
+ delta_m * 0.5)))**2
denominator_term2 = np.sqrt(nominator) * \
np.sum((last_year - complete_events.completeness_start.values)
* ((complete_events.mag_left_edge + delta_m * 0.5)**2)
* np.exp(-beta * (complete_events.mag_left_edge
+ delta_m * 0.5))
)
var_beta = -(1 / complete_events.num.sum()) * nominator\
/ (denominator_term1 - denominator_term2)
std_b_parameter = np.sqrt(var_beta) * factor

# compute uncertainty in rate at lower magnitude of completeness
std_rate_at_lmc = rate_at_lmc / np.sqrt(complete_events.num.sum())

return b_parameter, std_b_parameter, rate_at_lmc, std_rate_at_lmc, a_val


def _weichert_objective_function(beta: float,
last_year: Union[float, int],
complete_events: pd.DataFrame,
delta_m: float) -> float:
"""
function to be minimized for estimation of GR parameters as per
Weichert (1980). Used internally within estimate_b_weichert function.
"""
magbins = complete_events.mag_left_edge + delta_m * 0.5
nom = np.sum((last_year - complete_events.completeness_start.values
) * magbins * np.exp(-beta * magbins))
denom = np.sum((last_year - complete_events.completeness_start.values
) * np.exp(-beta * magbins))
left = nom / denom
right = np.sum(complete_events.num.values * magbins)\
/ complete_events.num.sum()
return np.abs(left - right)


def shi_bolt_confidence(
magnitudes: np.ndarray,
b_value: Optional[float] = None,
Expand Down
101 changes: 89 additions & 12 deletions catalog_tools/analysis/tests/test_estimate_beta.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
import numpy as np
import pytest

# import functions to be tested
from catalog_tools.analysis.estimate_beta import (differences, estimate_b_elst,
estimate_b_laplace,
estimate_b_tinti,
estimate_b_utsu,
estimate_beta_tinti,
shi_bolt_confidence)
from catalog_tools.utils.binning import bin_to_precision
# import functions from other modules
from catalog_tools.utils.simulate_distributions import simulate_magnitudes
from catalog_tools.utils.binning import bin_to_precision


def simulate_magnitudes_w_offset(n: int, beta: float, mc: float,
delta_m: float) -> np.ndarray:
# import functions to be tested
from catalog_tools.analysis.estimate_beta import\
estimate_beta_tinti,\
estimate_b_tinti,\
estimate_b_utsu,\
estimate_b_elst,\
estimate_b_laplace,\
estimate_b_weichert,\
differences,\
shi_bolt_confidence


def simulate_magnitudes_w_offset(
n: int, beta: float, mc: float,
delta_m: float, mag_max: float = None) -> np.ndarray:
""" This function simulates the magnitudes with the correct offset"""
mags = simulate_magnitudes(n, beta, mc - delta_m / 2)
mags = simulate_magnitudes(n, beta, mc - delta_m / 2, mag_max)
if delta_m > 0:
mags = bin_to_precision(mags, delta_m)
return mags
Expand Down Expand Up @@ -95,6 +100,78 @@ def test_estimate_b_laplace(n: int, b: float, mc: float, delta_m: float,
assert abs(b - b_estimate) / b <= precision


@pytest.mark.parametrize(
"a_val_true,b_val_true,precision",
[(7, 1, 0.01)]
)
def test_estimate_b_weichert(a_val_true: float,
b_val_true: float,
precision: float):

# annual expected rates:
r45 = 10 ** (a_val_true - b_val_true * 3.95)\
- 10 ** (a_val_true - b_val_true * 4.95)
r56 = 10 ** (a_val_true - b_val_true * 4.95)\
- 10 ** (a_val_true - b_val_true * 5.95)
r67 = 10 ** (a_val_true - b_val_true * 5.95)\
- 10 ** (a_val_true - b_val_true * 6.95)
r78 = 10 ** (a_val_true - b_val_true * 6.95)\
- 10 ** (a_val_true - b_val_true * 7.95)

# assume a catalogue from year 1000 to end of 1999
# with completeness as follows:
# 3.95 - 1940 / 4.95 - 1880 / 5.95 - 1500 / 6.95 - 1000

# sample earthquakes over 1,000 year period
n45 = np.random.poisson(r45 * (2000 - 1940))
mags45 = simulate_magnitudes_w_offset(
n=n45, beta=np.log(10), mc=4, delta_m=0.1, mag_max=4.95)
years45 = np.random.randint(1940, 2000, n45)
dates45 = np.array(['%d-06-15' % i for i in years45], dtype='datetime64')

n56 = np.random.poisson(r56 * (2000 - 1880))
mags56 = simulate_magnitudes_w_offset(
n=n56, beta=np.log(10), mc=5, delta_m=0.1, mag_max=5.95)
years56 = np.random.randint(1880, 2000, n56)
dates56 = np.array(['%d-06-15' % i for i in years56], dtype='datetime64')

n67 = np.random.poisson(r67 * (2000 - 1500))
mags67 = simulate_magnitudes_w_offset(
n=n67, beta=np.log(10), mc=6, delta_m=0.1, mag_max=6.95)
years67 = np.random.randint(1500, 2000, n67)
dates67 = np.array(['%d-06-15' % i for i in years67], dtype='datetime64')

n78 = np.random.poisson(r78 * (2000 - 1000))
mags78 = simulate_magnitudes_w_offset(
n=n78, beta=np.log(10), mc=7, delta_m=0.1, mag_max=7.95)
years78 = np.random.randint(1000, 2000, n78)
dates78 = np.array(['%d-06-15'%i for i in years78], dtype='datetime64')

# add some earthquakes in incomplete years
mags_inc = np.concatenate([
np.random.randint(40, 50, 100) / 10,
np.random.randint(50, 60, 10) / 10,
np.random.randint(60, 70, 1) / 10
])
years_inc = np.concatenate([
np.random.randint(1000, 1940, 100),
np.random.randint(1000, 1880, 10),
np.random.randint(1000, 1500, 1)
])
dates_inc = np.array(['%d-06-15' % i for i in years_inc], dtype='datetime64')

mags = np.concatenate([mags45, mags56, mags67, mags78, mags_inc])
dates = np.concatenate([dates45, dates56, dates67, dates78, dates_inc])

b_val, std_b_val, rate_at_mref, std_rate_at_mref, a_val = \
estimate_b_weichert(magnitudes=mags, dates=dates, completeness_table=np.array([[3.95, 1940], [4.95, 1880],
[5.95, 1500], [6.95, 1000]]),
mag_max=7.95, last_year=2000, delta_m=0.1, b_parameter='b_value')

assert abs(b_val_true - b_val) / b_val_true <= precision
assert abs(a_val_true - a_val) / a_val_true <= precision


@pytest.mark.parametrize(
"magnitudes,b_value,std_b_value,std_beta",
[(np.array([0.20990507, 0.04077336, 0.27906596, 0.57406287, 0.64256544,
Expand Down
21 changes: 17 additions & 4 deletions catalog_tools/utils/simulate_distributions.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,33 @@
import numpy as np
from scipy import stats
from typing import Optional, Tuple, Union


def simulate_magnitudes(n: int, beta: float, mc: float) -> np.ndarray:
def simulate_magnitudes(n: int, beta: float, mc: float,
mag_max: Optional[float] = None) -> np.ndarray:
""" Generates a vector of n elements drawn from an exponential distribution
exp(-beta*M)
Args:
n: number of sample magnitudes
beta: scale factor of the exponential distribution
mc: completeness magnitude
mc: cut-off magnitude
mag_max: maximum magnitude. If it is not None, the exponential
distribution is truncated at mag_max.
Returns:
mags: vector of length n of magnitudes drawn from an exponential
distribution
"""

mags = np.random.exponential(1 / beta, n) + mc
if mag_max:
quantile1 = stats.expon.cdf(mc, loc=0, scale=1 / beta)
quantile2 = stats.expon.cdf(mag_max, loc=0, scale=1 / beta)
mags = stats.expon.ppf(
np.random.uniform(quantile1, quantile2, size=n),
loc=0,
scale=1 / beta,
)
else:
mags = np.random.exponential(1 / beta, n) + mc

return mags

0 comments on commit 17611c7

Please sign in to comment.