From ef8b39fb33a82c5618938817b541dcf330e0acaf Mon Sep 17 00:00:00 2001 From: Jonathan Eisenhamer Date: Wed, 2 Aug 2023 22:18:04 -0400 Subject: [PATCH] Remove romanisim test dependency and protect against complex results --- src/stcal/ramp_fitting/ols_cas22.pyx | 19 +++--- src/stcal/ramp_fitting/ols_cas22_util.py | 27 +++++++++ tests/test_ramp_fitting_cas22.py | 76 +++++++++++++++++------- 3 files changed, 91 insertions(+), 31 deletions(-) diff --git a/src/stcal/ramp_fitting/ols_cas22.pyx b/src/stcal/ramp_fitting/ols_cas22.pyx index db174b4ab..a5dce4971 100644 --- a/src/stcal/ramp_fitting/ols_cas22.pyx +++ b/src/stcal/ramp_fitting/ols_cas22.pyx @@ -77,15 +77,16 @@ cdef inline (float, float, float) fit_one_ramp( if tscale == 0: tscale = 1 cdef float f0 = 0, f1 = 0, f2 = 0 - for i in range(nres): - # Casertano+22, Eq. 45 - ww[i] = ((((1 + weight_power) * nn[start + i]) / - (1 + weight_power * nn[start + i])) * - fabs((tbar[start + i] - tbarmid) / tscale) ** weight_power) - # Casertano+22 Eq. 35 - f0 += ww[i] - f1 += ww[i] * tbar[start + i] - f2 += ww[i] * tbar[start + i]**2 + with cython.cpow(True): # Issue when tbar[] == tbarmid causes exception otherwise + for i in range(nres): + # Casertano+22, Eq. 45 + ww[i] = ((((1 + weight_power) * nn[start + i]) / + (1 + weight_power * nn[start + i])) * + fabs((tbar[start + i] - tbarmid) / tscale) ** weight_power) + # Casertano+22 Eq. 35 + f0 += ww[i] + f1 += ww[i] * tbar[start + i] + f2 += ww[i] * tbar[start + i]**2 # Casertano+22 Eq. 36 cdef float dd = f2 * f0 - f1 ** 2 if dd == 0: diff --git a/src/stcal/ramp_fitting/ols_cas22_util.py b/src/stcal/ramp_fitting/ols_cas22_util.py index a28cbf9ac..63bb9a278 100644 --- a/src/stcal/ramp_fitting/ols_cas22_util.py +++ b/src/stcal/ramp_fitting/ols_cas22_util.py @@ -73,6 +73,33 @@ def ma_table_to_tau(ma_table, read_time): return meantimes - (nreads - 1) * (nreads + 1) * read_time / 6 / nreads +def ma_table_to_tij(ma_table, read_time): + """Get the times of each read going into resultants for a MA table. + + Currently only ma_table_number = 1 is supported, corresponding to a simple + fiducial high latitude imaging MA table. + + This presently uses a hard-coded, somewhat inflexible MA table description + in the parameters file. But that seems like an okay option given that the + current 'official' file is slated for redesign when the format is relaxed. + + Parameters + ---------- + ma_table : list[list] + A list of (first_read, n_reads) tuples going into resultants. + + read_time : float + The time taken for a read-out. For Roman, this is FRAME_TIME. + + Returns + ------- + list[list[float]] + list of list of readout times for each read entering a resultant + """ + tij = [read_time * np.arange(f, f + n) for (f, n) in ma_table] + return tij + + def ma_table_to_tbar(ma_table, read_time): """Construct the mean times for each resultant from an ma_table. diff --git a/tests/test_ramp_fitting_cas22.py b/tests/test_ramp_fitting_cas22.py index 60c58651a..c8363d760 100644 --- a/tests/test_ramp_fitting_cas22.py +++ b/tests/test_ramp_fitting_cas22.py @@ -1,26 +1,11 @@ """ -Unit tests for ramp-fitting functions. Tested routines: -* ma_table_to_tbar -* ma_table_to_tau -* construct_covar -* construct_ramp_fitting_matrices -* construct_ki_and_variances -* ki_and_variance_grid -* RampFitInterpolator - * __init__ - * ki - * variances - * fit_ramps -* resultants_to_differences -* simulate_many_ramps +Unit tests for ramp-fitting functions. """ -import pytest - import numpy as np from stcal.ramp_fitting import ols_cas22_fit as ramp -from stcal.ramp_fitting.ols_cas22_util import matable_to_readpattern, readpattern_to_matable +from stcal.ramp_fitting import ols_cas22_util # Read Time in seconds # For Roman, the read time of the detectors is a fixed value and is currently @@ -34,7 +19,7 @@ def test_matable_to_readpattern(): ma_table = [[1, 1], [2, 2], [4, 1], [5, 4], [9,2], [11,1]] expected = [[1], [2, 3], [4], [5, 6, 7, 8], [9, 10], [11]] - result = matable_to_readpattern(ma_table) + result = ols_cas22_util.matable_to_readpattern(ma_table) assert result == expected @@ -44,16 +29,14 @@ def test_readpattern_to_matable(): pattern = [[1], [2, 3], [4], [5, 6, 7, 8], [9, 10], [11]] expected = [[1, 1], [2, 2], [4, 1], [5, 4], [9,2], [11,1]] - result = readpattern_to_matable(pattern) + result = ols_cas22_util.readpattern_to_matable(pattern) assert result == expected def test_simulated_ramps(): - romanisim_ramp = pytest.importorskip('romanisim.ramp') ntrial = 100000 - ma_table, flux, read_noise, resultants = romanisim_ramp.simulate_many_ramps( - ntrial=ntrial) + ma_table, flux, read_noise, resultants = simulate_many_ramps(ntrial=ntrial) par, var = ramp.fit_ramps_casertano( resultants, resultants * 0, read_noise, ROMAN_READ_TIME, ma_table=ma_table) @@ -73,3 +56,52 @@ def test_simulated_ramps(): assert np.abs(chi2dof_slope - 1) < 0.03 assert np.all(par[~m, 1] == 0) assert np.all(var[~m, 1] == 0) + + +# ######### +# Utilities +# ######### +def simulate_many_ramps(ntrial=100, flux=100, readnoise=5, ma_table=None): + """Simulate many ramps with a particular flux, read noise, and ma_table. + + To test ramp fitting, it's useful to be able to simulate a large number + of ramps that are identical up to noise. This function does that. + + Parameters + ---------- + ntrial : int + number of ramps to simulate + flux : float + flux in electrons / s + read_noise : float + read noise in electrons + ma_table : list[list] (int) + list of lists indicating first read and number of reads in each + resultant + + Returns + ------- + ma_table : list[list] (int) + ma_table used + flux : float + flux used + readnoise : float + read noise used + resultants : np.ndarray[n_resultant, ntrial] (float) + simulated resultants +""" + if ma_table is None: + ma_table = [[1, 4], [5, 1], [6, 3], [9, 10], [19, 3], [22, 15]] + nread = np.array([x[1] for x in ma_table]) + tij = ols_cas22_util.ma_table_to_tij(ma_table, ROMAN_READ_TIME) + resultants = np.zeros((len(ma_table), ntrial), dtype='f4') + buf = np.zeros(ntrial, dtype='i4') + for i, ti in enumerate(tij): + subbuf = np.zeros(ntrial, dtype='i4') + for t0 in ti: + buf += np.random.poisson(ROMAN_READ_TIME * flux, ntrial) + subbuf += buf + resultants[i] = (subbuf / len(ti)).astype('f4') + resultants += np.random.randn(len(ma_table), ntrial) * ( + readnoise / np.sqrt(nread)).reshape(len(ma_table), 1) + return (ma_table, flux, readnoise, resultants)