Skip to content

Commit

Permalink
Remove romanisim test dependency and protect against complex results
Browse files Browse the repository at this point in the history
  • Loading branch information
stscieisenhamer committed Aug 3, 2023
1 parent 7a2e442 commit ef8b39f
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 31 deletions.
19 changes: 10 additions & 9 deletions src/stcal/ramp_fitting/ols_cas22.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,16 @@ cdef inline (float, float, float) fit_one_ramp(
if tscale == 0:
tscale = 1
cdef float f0 = 0, f1 = 0, f2 = 0
for i in range(nres):
# Casertano+22, Eq. 45
ww[i] = ((((1 + weight_power) * nn[start + i]) /
(1 + weight_power * nn[start + i])) *
fabs((tbar[start + i] - tbarmid) / tscale) ** weight_power)
# Casertano+22 Eq. 35
f0 += ww[i]
f1 += ww[i] * tbar[start + i]
f2 += ww[i] * tbar[start + i]**2
with cython.cpow(True): # Issue when tbar[] == tbarmid causes exception otherwise
for i in range(nres):
# Casertano+22, Eq. 45
ww[i] = ((((1 + weight_power) * nn[start + i]) /
(1 + weight_power * nn[start + i])) *
fabs((tbar[start + i] - tbarmid) / tscale) ** weight_power)
# Casertano+22 Eq. 35
f0 += ww[i]
f1 += ww[i] * tbar[start + i]
f2 += ww[i] * tbar[start + i]**2
# Casertano+22 Eq. 36
cdef float dd = f2 * f0 - f1 ** 2
if dd == 0:
Expand Down
27 changes: 27 additions & 0 deletions src/stcal/ramp_fitting/ols_cas22_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,33 @@ def ma_table_to_tau(ma_table, read_time):
return meantimes - (nreads - 1) * (nreads + 1) * read_time / 6 / nreads


def ma_table_to_tij(ma_table, read_time):
"""Get the times of each read going into resultants for a MA table.
Currently only ma_table_number = 1 is supported, corresponding to a simple
fiducial high latitude imaging MA table.
This presently uses a hard-coded, somewhat inflexible MA table description
in the parameters file. But that seems like an okay option given that the
current 'official' file is slated for redesign when the format is relaxed.
Parameters
----------
ma_table : list[list]
A list of (first_read, n_reads) tuples going into resultants.
read_time : float
The time taken for a read-out. For Roman, this is FRAME_TIME.
Returns
-------
list[list[float]]
list of list of readout times for each read entering a resultant
"""
tij = [read_time * np.arange(f, f + n) for (f, n) in ma_table]
return tij


def ma_table_to_tbar(ma_table, read_time):
"""Construct the mean times for each resultant from an ma_table.
Expand Down
76 changes: 54 additions & 22 deletions tests/test_ramp_fitting_cas22.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,11 @@

"""
Unit tests for ramp-fitting functions. Tested routines:
* ma_table_to_tbar
* ma_table_to_tau
* construct_covar
* construct_ramp_fitting_matrices
* construct_ki_and_variances
* ki_and_variance_grid
* RampFitInterpolator
* __init__
* ki
* variances
* fit_ramps
* resultants_to_differences
* simulate_many_ramps
Unit tests for ramp-fitting functions.
"""
import pytest

import numpy as np

from stcal.ramp_fitting import ols_cas22_fit as ramp
from stcal.ramp_fitting.ols_cas22_util import matable_to_readpattern, readpattern_to_matable
from stcal.ramp_fitting import ols_cas22_util

# Read Time in seconds
# For Roman, the read time of the detectors is a fixed value and is currently
Expand All @@ -34,7 +19,7 @@ def test_matable_to_readpattern():
ma_table = [[1, 1], [2, 2], [4, 1], [5, 4], [9,2], [11,1]]
expected = [[1], [2, 3], [4], [5, 6, 7, 8], [9, 10], [11]]

result = matable_to_readpattern(ma_table)
result = ols_cas22_util.matable_to_readpattern(ma_table)

assert result == expected

Expand All @@ -44,16 +29,14 @@ def test_readpattern_to_matable():
pattern = [[1], [2, 3], [4], [5, 6, 7, 8], [9, 10], [11]]
expected = [[1, 1], [2, 2], [4, 1], [5, 4], [9,2], [11,1]]

result = readpattern_to_matable(pattern)
result = ols_cas22_util.readpattern_to_matable(pattern)

assert result == expected


def test_simulated_ramps():
romanisim_ramp = pytest.importorskip('romanisim.ramp')
ntrial = 100000
ma_table, flux, read_noise, resultants = romanisim_ramp.simulate_many_ramps(
ntrial=ntrial)
ma_table, flux, read_noise, resultants = simulate_many_ramps(ntrial=ntrial)

par, var = ramp.fit_ramps_casertano(
resultants, resultants * 0, read_noise, ROMAN_READ_TIME, ma_table=ma_table)
Expand All @@ -73,3 +56,52 @@ def test_simulated_ramps():
assert np.abs(chi2dof_slope - 1) < 0.03
assert np.all(par[~m, 1] == 0)
assert np.all(var[~m, 1] == 0)


# #########
# Utilities
# #########
def simulate_many_ramps(ntrial=100, flux=100, readnoise=5, ma_table=None):
"""Simulate many ramps with a particular flux, read noise, and ma_table.
To test ramp fitting, it's useful to be able to simulate a large number
of ramps that are identical up to noise. This function does that.
Parameters
----------
ntrial : int
number of ramps to simulate
flux : float
flux in electrons / s
read_noise : float
read noise in electrons
ma_table : list[list] (int)
list of lists indicating first read and number of reads in each
resultant
Returns
-------
ma_table : list[list] (int)
ma_table used
flux : float
flux used
readnoise : float
read noise used
resultants : np.ndarray[n_resultant, ntrial] (float)
simulated resultants
"""
if ma_table is None:
ma_table = [[1, 4], [5, 1], [6, 3], [9, 10], [19, 3], [22, 15]]
nread = np.array([x[1] for x in ma_table])
tij = ols_cas22_util.ma_table_to_tij(ma_table, ROMAN_READ_TIME)
resultants = np.zeros((len(ma_table), ntrial), dtype='f4')
buf = np.zeros(ntrial, dtype='i4')
for i, ti in enumerate(tij):
subbuf = np.zeros(ntrial, dtype='i4')
for t0 in ti:
buf += np.random.poisson(ROMAN_READ_TIME * flux, ntrial)
subbuf += buf
resultants[i] = (subbuf / len(ti)).astype('f4')
resultants += np.random.randn(len(ma_table), ntrial) * (
readnoise / np.sqrt(nread)).reshape(len(ma_table), 1)
return (ma_table, flux, readnoise, resultants)

0 comments on commit ef8b39f

Please sign in to comment.