From 4fc2cea06f713778c4ff97d40169d0c92d799d51 Mon Sep 17 00:00:00 2001 From: Lexy Andati Date: Wed, 12 May 2021 15:58:26 +0200 Subject: [PATCH] Noxcal (#450) * partial fix for #93. Should allow for one spw at a time * fixes #93. Adds workaround for https://github.com/casacore/python-casacore/issues/130 * added future-fstrings * correctly named future-fstrings * ...and install it for all pythons * back to mainstream sharedarray * lowered message verbosity * droppping support for python<3.6 * Issues 427 and 429 (#430) * Fixes #427 * Fixes #429 * Update __init__.py Bump version * Update Jenkinsfile.sh (#447) * Update Jenkinsfile.sh Remove python 2 building * Update Jenkinsfile.sh * Issue 431 (#432) * Beginning of nobeam functionality. * Remove duplicated source provider. * Change syntax for python 2.7 compatibility. * Update montblanc version in setup to avoid installation woes. Co-authored-by: JSKenyon * Remove two uses of future_fstrings. * make pypi compatible (#446) * make pypi compatible * error * Update setup.py Only support up to v0.6.1 of montblanc * Update setup.py Co-authored-by: Benjamin Hugo Co-authored-by: JSKenyon Co-authored-by: JSKenyon * Update __init__.py * Fix warning formatting Co-authored-by: Oleg Smirnov Co-authored-by: Benjamin Hugo Co-authored-by: JSKenyon Co-authored-by: JSKenyon Co-authored-by: Gijs Molenaar Co-authored-by: JSKenyon --- .gitignore | 3 +- Jenkinsfile.sh | 31 +- cubical/__init__.py | 2 +- cubical/data_handler/MBTiggerSim.py | 2 +- cubical/data_handler/TiggerSourceProvider.py | 28 +- cubical/data_handler/ms_data_handler.py | 63 +++- cubical/data_handler/ms_tile.py | 292 ++++++++++--------- cubical/degridder/DDFacetSim.py | 9 +- cubical/machines/interval_gain_machine.py | 7 +- setup.py | 22 +- 10 files changed, 265 insertions(+), 194 deletions(-) diff --git a/.gitignore b/.gitignore index 08f90dd5..f79912a0 100644 --- a/.gitignore +++ b/.gitignore @@ -44,6 +44,7 @@ Thumbs.db ################## .eggs/ .idea/ +dist/ cubical.egg-info/ # Backup files # @@ -97,6 +98,6 @@ __pycache__ .venv*/ .virtualenv*/ .persistent_history +venv/ *~ -.#* \ No newline at end of file diff --git a/Jenkinsfile.sh b/Jenkinsfile.sh index 9ad9a4af..dae1be6e 100644 --- a/Jenkinsfile.sh +++ b/Jenkinsfile.sh @@ -7,24 +7,21 @@ TEST_DATA_DIR="$WORKSPACE/../../../test-data" mkdir $TEST_OUTPUT_DIR # build and testrun -docker build -f ${WORKSPACE_ROOT}/projects/Cubical/.jenkins/1604.py2.docker -t cubical.1604.py2:${BUILD_NUMBER} ${WORKSPACE_ROOT}/projects/Cubical/ -docker run --rm cubical.1604.py2:${BUILD_NUMBER} -docker build -f ${WORKSPACE_ROOT}/projects/Cubical/.jenkins/1804.py2.docker -t cubical.1804.py2:${BUILD_NUMBER} ${WORKSPACE_ROOT}/projects/Cubical/ -docker run --rm cubical.1804.py2:${BUILD_NUMBER} docker build -f ${WORKSPACE_ROOT}/projects/Cubical/.jenkins/1804.py3.docker -t cubical.1804.py3:${BUILD_NUMBER} ${WORKSPACE_ROOT}/projects/Cubical/ docker run --rm cubical.1804.py3:${BUILD_NUMBER} #run tests -for img in 1604.py2 1804.py2 1804.py3; -do - docker run --rm -m 100g --cap-add sys_ptrace \ - --memory-swap=-1 \ - --shm-size=150g \ - --rm=true \ - --name=cubical$BUILD_NUMBER \ - -v ${TEST_OUTPUT_DIR}:/workspace \ - -v ${TEST_OUTPUT_DIR}:/root/tmp \ - --entrypoint /bin/bash \ - cubical.${img}:${BUILD_NUMBER} \ - -c "cd /src/cubical && nosetests --with-xunit --xunit-file /workspace/nosetests.xml test" -done + +img=1804.py3 + +docker run --rm -m 100g --cap-add sys_ptrace \ + --memory-swap=-1 \ + --shm-size=150g \ + --rm=true \ + --name=cubical$BUILD_NUMBER \ + -v ${TEST_OUTPUT_DIR}:/workspace \ + -v ${TEST_OUTPUT_DIR}:/root/tmp \ + --entrypoint /bin/bash \ + cubical.${img}:${BUILD_NUMBER} \ + -c "cd /src/cubical && nosetests --with-xunit --xunit-file /workspace/nosetests.xml test" + diff --git a/cubical/__init__.py b/cubical/__init__.py index 27a35401..b19305f4 100644 --- a/cubical/__init__.py +++ b/cubical/__init__.py @@ -4,4 +4,4 @@ # This code is distributed under the terms of GPLv2, see LICENSE.md for details # update this for each release -VERSION = "1.5.8" +VERSION = "1.5.10" diff --git a/cubical/data_handler/MBTiggerSim.py b/cubical/data_handler/MBTiggerSim.py index 882c9aa1..5c547ef8 100644 --- a/cubical/data_handler/MBTiggerSim.py +++ b/cubical/data_handler/MBTiggerSim.py @@ -234,7 +234,7 @@ def model_vis(self, context): ur = upper + offset lc = ddid_ind*self._chan_per_ddid uc = (ddid_ind+1)*self._chan_per_ddid - self._model[self._dir, 0, lr:ur, :, :] = \ + self._model[self._dir, 0, lr:ur, :, :] += \ context.data[:,:,lc:uc,sel].reshape(-1, self._chan_per_ddid, self._ncorr) def __str__(self): diff --git a/cubical/data_handler/TiggerSourceProvider.py b/cubical/data_handler/TiggerSourceProvider.py index 98882457..0ab16300 100644 --- a/cubical/data_handler/TiggerSourceProvider.py +++ b/cubical/data_handler/TiggerSourceProvider.py @@ -44,18 +44,22 @@ def __init__(self, lsm, phase_center, dde_tag='dE'): self._freqs = None self._clusters = cluster_sources(self._sm, dde_tag) + self._cluster_keys = list(self._clusters.keys()) self._nclus = len(self._cluster_keys) self._target_key = 0 self._target_cluster = self._cluster_keys[self._target_key] + self._target_reqbeam = "beam" - self._pnt_sources = self._clusters[self._target_cluster]["pnt"] + self._pnt_sources = \ + self._clusters[self._target_cluster][self._target_reqbeam]["pnt"] self._npsrc = len(self._pnt_sources) - self._gau_sources = self._clusters[self._target_cluster]["gau"] + self._gau_sources = \ + self._clusters[self._target_cluster][self._target_reqbeam]["gau"] self._ngsrc = len(self._gau_sources) - def set_direction(self, idir): + def set_direction(self, idir, req_beam="beam"): """Sets current direction being simulated. Args: @@ -65,9 +69,12 @@ def set_direction(self, idir): self._target_key = idir self._target_cluster = self._cluster_keys[self._target_key] - self._pnt_sources = self._clusters[self._target_cluster]["pnt"] + self._target_reqbeam = req_beam + self._pnt_sources = \ + self._clusters[self._target_cluster][self._target_reqbeam]["pnt"] self._npsrc = len(self._pnt_sources) - self._gau_sources = self._clusters[self._target_cluster]["gau"] + self._gau_sources = \ + self._clusters[self._target_cluster][self._target_reqbeam]["gau"] self._ngsrc = len(self._gau_sources) def set_frequency(self, frequency): @@ -211,10 +218,10 @@ def updated_dimensions(self): return [('npsrc', self._npsrc), ('ngsrc', self._ngsrc)] - + def phase_centre(self, context): """ Sets the MB phase direction """ - radec = np.array([self._phase_center[...,-2], + radec = np.array([self._phase_center[...,-2], self._phase_center[...,-1]], context.dtype) return radec @@ -248,9 +255,12 @@ def cluster_sources(sm, dde_tag): else: dde_cluster = src.getTag('cluster') - group = 'pnt' if src.typecode=='pnt' else 'gau' + req_beam = 'nobeam' if src.getTag('nobeam') else 'beam' + src_type = 'pnt' if src.typecode=='pnt' else 'gau' - clus.setdefault(dde_cluster, dict(pnt=[], gau=[]))[group].append(src) + clus.setdefault(dde_cluster, dict(beam=dict(pnt=[], gau=[]), + nobeam=dict(pnt=[], gau=[]))) + clus[dde_cluster][req_beam][src_type].append(src) return clus diff --git a/cubical/data_handler/ms_data_handler.py b/cubical/data_handler/ms_data_handler.py index 2e2eeed0..697cb868 100644 --- a/cubical/data_handler/ms_data_handler.py +++ b/cubical/data_handler/ms_data_handler.py @@ -1,3 +1,4 @@ +# -*- coding: utf-8 -*- # CubiCal: a radio interferometric calibration suite # (c) 2017 Rhodes University & Jonathan S. Kenyon # http://github.com/ratt-ru/CubiCal @@ -317,7 +318,8 @@ def __init__(self, ms_name, data_column, output_column=None, output_model_column assert set(spwtabcols) <= set( _spwtab.colnames()), "Measurement set conformance error - keyword table SPECTRAL_WINDOW incomplete. Perhaps disable --out-casa-gaintables or check your MS!" - self._spwtabcols = {t: _spwtab.getcol(t) for t in spwtabcols} + nrows = _spwtab.nrows() + self._spwtabcols = {t: [_spwtab.getcol(t, row, 1) for row in range(nrows)] for t in spwtabcols} # read observation details obstabcols = ["TIME_RANGE", "LOG", "SCHEDULE", "FLAG_ROW", @@ -470,18 +472,19 @@ def __init__(self, ms_name, data_column, output_column=None, output_model_column " ".join([str(ch) for ch in freqchunks + [nchan]])), file=log(0)) # now accumulate list of all frequencies, and also see if selected DDIDs have a uniform rebinning and chunking map - all_freqs = set(self.chanfreqs[self._ddids[0]]) self.do_freq_rebin = any([m is not None for m in list(self.rebin_chan_maps.values())]) self._ddids_unequal = False ddid0_map = self.rebin_chan_maps[self._ddids[0]] for ddid in self._ddids[1:]: + if len(self.chanfreqs[0]) != len(self.chanfreqs[ddid]): + self._ddids_unequal = True + break map1 = self.rebin_chan_maps[ddid] if ddid0_map is None and map1 is None: continue if (ddid0_map is None and map1 is not None) or (ddid0_map is not None and map1 is None) or \ len(ddid0_map) != len(map1) or (ddid0_map!=map1).any(): self._ddids_unequal = True - all_freqs.update(self.chanfreqs[ddid]) if self._ddids_unequal: print("Selected DDIDs have differing channel structure. Processing may be less efficient.", file=log(0,"red")) @@ -812,6 +815,40 @@ def fetch(self, colname, first_row=0, nrows=-1, subset=None): (subset or self.data).getcolnp(str(colname), prealloc, first_row, nrows) return prealloc + @staticmethod + def _get_row_chunk(array): + """ + Establishes max row chunk that can be used. Workaround for https://github.com/casacore/python-casacore/issues/130 + array: array to be written, of shape (nrows, nfreq, ncorr) + """ + _maxchunk = 2**29 # max number of elements to write, see https://github.com/casacore/python-casacore/issues/130#issuecomment-748150854 + nrows, nfreq, ncorr = array.shape + maxrows = max(1, _maxchunk // (nfreq*ncorr)) + #if maxrows < nrows: + log(1).print(f" table I/O request of {nrows} rows: max chunk size is {maxrows} rows") + return maxrows, nrows + + @staticmethod + def _getcolnp_wrapper(table, column, array, startrow): + "Calls table.getcolnp() in chunks of rows. Workaround for https://github.com/casacore/python-casacore/issues/130" + maxrows, nrows = MSDataHandler._get_row_chunk(array) + for row0 in range(0, nrows, maxrows): + table.getcolnp(column, array[row0:row0+maxrows], startrow+row0, maxrows) + + @staticmethod + def _getcolslicenp_wrapper(table, column, array, begin, end, incr, startrow): + "Calls table.getcolnp() in chunks of rows. Workaround for https://github.com/casacore/python-casacore/issues/130" + maxrows, nrows = MSDataHandler._get_row_chunk(array) + for row0 in range(0, nrows, maxrows): + table.getcolslicenp(column, array[row0:row0+maxrows], begin, end, incr, startrow+row0, maxrows) + + @staticmethod + def _putcol_wrapper(table, column, array, startrow): + "Calls table.putcol() in chunks of rows. Workaround for https://github.com/casacore/python-casacore/issues/130" + maxrows, nrows = MSDataHandler._get_row_chunk(array) + for row0 in range(0, nrows, maxrows): + table.putcol(column, array[row0:row0+maxrows], startrow+row0, maxrows) + def fetchslice(self, column, startrow=0, nrows=-1, subset=None): """ Convenience function similar to fetch(), but assumes a column of NFREQxNCORR shape, @@ -840,17 +877,17 @@ def fetchslice(self, column, startrow=0, nrows=-1, subset=None): shape = tuple([nrows] + [s for s in cell.shape]) if hasattr(cell, "shape") else nrows prealloc = np.empty(shape, dtype=dtype) - subset.getcolnp(str(column), prealloc, startrow, nrows) + self._getcolnp_wrapper(subset, str(column), prealloc, startrow) return prealloc # ugly hack because getcell returns a different dtype to getcol - cell = (subset or self.data).getcol(str(column), startrow, nrow=1)[0, ...] + cell = subset.getcol(str(column), startrow, nrow=1)[0, ...] dtype = getattr(cell, "dtype", type(cell)) shape = tuple([nrows] + [len(list(range(l, r + 1, i))) #inclusive in cc for l, r, i in zip(self._ms_blc, self._ms_trc, self._ms_incr)]) prealloc = np.empty(shape, dtype=dtype) - subset.getcolslicenp(str(column), prealloc, self._ms_blc, self._ms_trc, self._ms_incr, startrow, nrows) + self._getcolslicenp_wrapper(subset, str(column), prealloc, self._ms_blc, self._ms_trc, self._ms_incr, startrow) return prealloc def fetchslicenp(self, column, data, startrow=0, nrows=-1, subset=None): @@ -871,8 +908,8 @@ def fetchslicenp(self, column, data, startrow=0, nrows=-1, subset=None): """ subset = subset or self.data if self._ms_blc == None: - return subset.getcolnp(column, data, startrow, nrows) - return subset.getcolslicenp(column, data, self._ms_blc, self._ms_trc, self._ms_incr, startrow, nrows) + return self._getcolnp_wrapper(subset, column, data, startrow) + return self._getcolslicenp_wrapper(subset, column, data, self._ms_blc, self._ms_trc, self._ms_incr, startrow) def putslice(self, column, value, startrow=0, nrows=-1, subset=None): """ @@ -897,7 +934,7 @@ def putslice(self, column, value, startrow=0, nrows=-1, subset=None): # if no slicing, just use putcol to put the whole thing. This always works, # unless the MS is screwed up if self._ms_blc == None: - return subset.putcol(str(column), value, startrow, nrows) + return self._putcol_wrapper(subset, str(column), value, startrow) if nrows<0: nrows = subset.nrows() @@ -921,11 +958,13 @@ def putslice(self, column, value, startrow=0, nrows=-1, subset=None): value[:] = np.bitwise_or.reduce(value, axis=2)[:,:,np.newaxis] if self._channel_slice == slice(None) and self._corr_slice == slice(None): - return subset.putcol(column, value, startrow, nrows) + return self._putcol_wrapper(subset, value, startrow) else: # for bitflags, we want to preserve flags we haven't touched -- read the column if column == "BITFLAG" or column == "FLAG": - value0 = subset.getcol(column) + value0 = np.empty_like(value) + self._getcolnp_wrapper(subset, column, value0, startrow) + # cheekily propagate per-corr flags to all corrs value0[:] = np.bitwise_or.reduce(value0, axis=2)[:,:,np.newaxis] # otherwise, init empty column else: @@ -933,7 +972,7 @@ def putslice(self, column, value, startrow=0, nrows=-1, subset=None): shape = (nrows, self._nchan0_orig[ddid], self.nmscorrs) value0 = np.zeros(shape, value.dtype) value0[:, self._channel_slice, self._corr_slice] = value - return subset.putcol(str(column), value0, startrow, nrows) + return self._putcol_wrapper(subset, str(column), value0, startrow, nrows) def define_chunk(self, chunk_time, rebin_time, fdim=1, chunk_by=None, chunk_by_jump=0, chunks_per_tile=4, max_chunks_per_tile=0): """ diff --git a/cubical/data_handler/ms_tile.py b/cubical/data_handler/ms_tile.py index 94632c43..c086e755 100644 --- a/cubical/data_handler/ms_tile.py +++ b/cubical/data_handler/ms_tile.py @@ -1,3 +1,4 @@ +# -*- coding: utf-8 -*- # CubiCal: a radio interferometric calibration suite # (c) 2017 Rhodes University & Jonathan S. Kenyon # http://github.com/ratt-ru/CubiCal @@ -60,6 +61,10 @@ def __init__(self, ddid, tchunk, timeslice, rows, rows0): """ self.ddid, self.tchunk, self.timeslice, self.rows, self.rows0 = ddid, tchunk, timeslice, rows, rows0 + # MSTile.Subset to which this chunk belongs (a subset may contain several chunks) + self.subset = None + # subset_rows: row indices within a subset corresponding to this chunk + self.rows_within_subset = None class MSTile(object): @@ -73,12 +78,25 @@ class Subset(object): Subsets are read from the MS in one go, and predicted by Montblanc in one go. """ - def __init__(self, tile, label, datadict, ms_rows, rows0): + def __init__(self, tile, label, datadict, ms_rows, ms_rows0): + """ + Creates a subset object + tile: parent MSTile + label: a descriptive label for this subset + datadict: name of SharedDict correspondign to this subset + ms_rows: a slice (or list) of the rebinned MS rows comprising this subset. Row 0 is rebinned MS row 0. + ms_rows0: a list of the original MS rows comprising this subset. Row 0 is raw MS row 0. + """ self.tile = tile self.datadict = datadict - self.rows0 = rows0 # row numbers in original (sorted) MS + self.rows0 = ms_rows0 # row numbers in original (sorted) MS self.label = label self.nants = tile.nants + # + if type(ms_rows) is slice: + log(1).print(f"{tile.label} subset {label} rows {ms_rows}") + else: + log(1).print(f"{tile.label} subset {label} contains {len(ms_rows)} rows (min {ms_rows[0]} max {ms_rows[-1]})") # subsets of rebinned rows self.ddid_col = self.tile.dh.ddid_col[ms_rows] self.time_col = self.tile.dh.time_col[ms_rows] @@ -95,7 +113,7 @@ def __init__(self, tile, label, datadict, ms_rows, rows0): if label is None: self.rebin_row_map = tile.rebin_row_map else: - self.rebin_row_map = tile.rebin_row_map[rows0] + self.rebin_row_map = tile.rebin_row_map[ms_rows0 - tile.first_row0] # since the rebinned map is now sparse in the output because we've got a subset of input rows, # uniquify it so it becomes contiguous in the output. # Note that '-' signs need to be preserved @@ -111,6 +129,9 @@ def __init__(self, tile, label, datadict, ms_rows, rows0): # filled in by self.load_montblanc_models below self._mb_measet_src = self._mb_cached_ms_src = None + # bitflag column stored here + self.bflagcol = self.bflagrow = self._flagcol = self_flagcol_sum = None + # static member will be initialized with FitsBeamSourceProvider if needed _mb_arbeam_src = None @@ -126,8 +147,8 @@ def upsample(self, data): def load_ddfacet_models(self, uvwco, loaded_models, model_source, cluster, imod, idir, model_type): """ Invoke DDFacet degridder to compute model visibilities - for all directions. - + for all directions. + Postcondition of loaded_models is a new entry of model_source cluster object with all directions predicted. @@ -152,10 +173,10 @@ def load_ddfacet_models(self, uvwco, loaded_models, model_source, cluster, imod, ddfsim.set_model_provider(model_source) for idir, clus in enumerate(model_source._cluster_keys): ddfsim.set_direction(clus) - model = ddfsim.simulate(self.tile.dh, + model = ddfsim.simulate(self.tile.dh, self.tile, self, - self.tile.dh._poltype, + self.tile.dh._poltype, uvwco, self._freqs, model_type, @@ -217,11 +238,11 @@ def load_montblanc_models(self, uvwco, loaded_models, model_source, cluster, imo ddid_index, uniq_ddids, _ = data_handler.uniquify(self.ddid_col) self._freqs = np.array([self.tile.dh.chanfreqs[ddid] for ddid in uniq_ddids]) - - + + assert dtc.assert_isint(n_bl) assert dtc.assert_isint(ddid_index) - + self._row_identifiers = ddid_index * n_bl * ntime + (self.times - self.times[0]) * n_bl + \ (-0.5 * self.antea ** 2 + (self.nants - 1.5) * self.antea + self.anteb - 1).astype( np.int32) @@ -268,7 +289,7 @@ def load_montblanc_models(self, uvwco, loaded_models, model_source, cluster, imo self._mb_sorted_ind = np.array([current_row_index[idx] for idx in full_index]) self._mb_measet_src = MBTiggerSim.MSSourceProvider(self.tile, time_col, antea, anteb, ddid_index, uvwco, - self._freqs, self._mb_sorted_ind, len(self.time_col), + self._freqs, self._mb_sorted_ind, len(self.time_col), do_pa_rotation=self.tile.dh.pa_rotate_montblanc) if not self.tile.dh.pa_rotate_montblanc: @@ -287,8 +308,6 @@ def load_montblanc_models(self, uvwco, loaded_models, model_source, cluster, imo tigger_source = model_source cached_src = CachedSourceProvider(tigger_source, clear_start=True, clear_stop=True) srcs = [self._mb_cached_ms_src, cached_src] - if self._mb_arbeam_src: - srcs.append(self._mb_arbeam_src) # make a sink with an array to receive visibilities ndirs = model_source._nclus @@ -300,10 +319,18 @@ def load_montblanc_models(self, uvwco, loaded_models, model_source, cluster, imo snks = [column_snk] for direction in range(ndirs): - tigger_source.set_direction(direction) - tigger_source.set_frequency(self._freqs) - column_snk.set_direction(direction) - MBTiggerSim.simulate(srcs, snks, polarisation_type=self.tile.dh._poltype, opts=self.tile.dh.mb_opts) + for req_beam in ["beam", "nobeam"]: + tigger_source.set_direction(direction, req_beam) + tigger_source.set_frequency(self._freqs) + column_snk.set_direction(direction) + if self._mb_arbeam_src and req_beam == "beam": + dir_srcs = srcs + [self._mb_arbeam_src] + else: + dir_srcs = srcs + MBTiggerSim.simulate(dir_srcs, + snks, + polarisation_type=self.tile.dh._poltype, + opts=self.tile.dh.mb_opts) # convert back to float, if double was used if full_model.dtype != self.tile.dh.ctype: @@ -336,7 +363,7 @@ def _column_to_cube(self, column, chunk_tdim, chunk_fdim, rows, freqs, dtype, ze chunk_fdim (int): Frequencies per chunk. rows (np.ndarray): - Row slice (or set of indices). + Row slice (or set of indices) to be applied to column data in order to extract the required chunk freqs (slice): Frequency slice. dtype (various): @@ -540,7 +567,8 @@ def finalize(self, label=None): # row rebin map, relative to start (row0) of tile self.rebin_row_map = self.dh.rebin_row_map[self.first_row0:self.last_row0+1] - first_row, last_row = abs(self.rebin_row_map[0]), abs(self.rebin_row_map[-1]) + first_row, last_row = self.first_row, self.last_row = abs(self.rebin_row_map[0]), abs(self.rebin_row_map[-1]) + num_rows = self.last_row - self.first_row + 1 # first rebinned row is also 0 self.rebin_row_map = np.sign(self.rebin_row_map)*(abs(self.rebin_row_map) - first_row) @@ -559,16 +587,20 @@ def finalize(self, label=None): # reverse index: from DDID to its number in the self.ddids list self._ddid_reverse = { ddid:num for num, ddid in enumerate(self.ddids) } - # Create a dict of { chunk_label: rows, chan0, chan1 } for all chunks in this tile. - + # Create a dict of { chunk_label: rowchunk, chan0, chan1 } for all chunks in this tile. self._chunk_dict = OrderedDict() self._chunk_indices = {} + # maps row to chunk. Row 0 is start of tile. + row_chunknum = np.zeros(num_rows, dtype=int) + # maps row to DDID. Row 0 is start of tile. + row_ddid = self.dh.ddid_col[first_row:last_row+1] + # maps row0 (raw MS rows) to DDID. Row 0 is start of tile. + row0_ddid = np.zeros(self.last_row0 - self.first_row0 + 1, dtype=int) - # collect row chunks and freqs, and also row numbers per each DDID - ddid_rows0 = {} - for rowchunk in self.rowchunks: - ddid_rows0.setdefault(rowchunk.ddid, set()).update(rowchunk.rows0) + for i, rowchunk in enumerate(self.rowchunks): + row_chunknum[rowchunk.rows] = i + row0_ddid[rowchunk.rows0 - self.first_row0] = rowchunk.ddid freqchunks = self.dh.freqchunks[rowchunk.ddid] for ifreq,chan0 in enumerate(freqchunks): key = "D{}T{}F{}".format(rowchunk.ddid, rowchunk.tchunk, ifreq) @@ -579,27 +611,41 @@ def finalize(self, label=None): self.nants = self.dh.nants self.ncorr = self.dh.ncorr - # set up per-DDID subsets. These will be inherited by workers. + # Set up per-DDID subsets. These will be inherited by workers. + + # A tile consists of 1+ Subsets. One subset if the DDIDs have the same shape, one per DDID if the shape is ragged. + # A Subset is read from the MS as a single unit (with one getcolnp call) + # A subset can contain one or more rowchunks. - # gives list of subsets to be read in, as a tuple of shared dict name, MS row subset - self._subsets = [] - self._ddid_data_dict = {} + # dict: DDID -> subset. For one subset (DDIDs with same shape), None -> subset + self._subsets = {} + # if spectral windows are ragged, we generate one Subset object per DDID if self.dh._ddids_unequal: + subset_row_chunknum = {} for ddid in self.ddids: - rows = np.where(self.dh.ddid_col==ddid)[0] - rows0 = np.array(sorted(ddid_rows0[ddid])) + #rows = np.array([x[0] for x in ddid_rows[ddid]]) + #rows0 = np.array(sorted(ddid_rows0[ddid])) + rows = np.where(row_ddid == ddid)[0] + rows0 = np.where(row0_ddid == ddid)[0] + self.first_row0 datadict = "{}:D{}".format(self._data_dict_name, ddid) - subset = MSTile.Subset(self, "DDID {}".format(ddid), datadict, rows, rows0) - self._subsets.append(subset) - self._ddid_data_dict[ddid] = subset, slice(None) + self._subsets[ddid] = subset = MSTile.Subset(self, "DDID {}".format(ddid), datadict, rows, rows0) + # extract chunk numbers for rows of this subset + subset_row_chunknum[ddid] = np.array(row_chunknum[rows]) + # now for every row chunk, determine which rows from which subset belogn to it + for i, rowchunk in enumerate(self.rowchunks): + rowchunk.subset = self._subsets[rowchunk.ddid] + rowchunk.rows_within_subset = np.where(subset_row_chunknum[rowchunk.ddid] == i)[0] + # else a single Subset object to read the entire tile at once else: rows = slice(first_row, last_row + 1) rows0 = np.arange(self.first_row0, self.last_row0 + 1) - subset = MSTile.Subset(self, None, self._data_dict_name, rows, rows0) - self._subsets.append(subset) - for ddid in self.ddids: - self._ddid_data_dict[ddid] = subset, np.where(subset.ddid_col == ddid)[0] + self._subsets[None] = subset = MSTile.Subset(self, None, self._data_dict_name, rows, rows0) + # make a reference from each constituent row chunk to this subset + for i, rowchunk in enumerate(self.rowchunks): + rowchunk.subset = subset + # each row is the actual row within the tile + rowchunk.rows_within_subset = np.where(row_chunknum == i)[0] def get_chunk_indices(self, key): @@ -635,7 +681,7 @@ def get_chunk_tfs(self, key): slice(chan_offset + chan0, chan_offset + chan1) def fill_bitflags(self, flagbit): - for subset in self._subsets: + for subset in self._subsets.values(): if subset.label is None: print(" {}: filling bitflags, MS rows {}~{}".format(self.label, self.first_row0, self.last_row0), file=log(1)) else: @@ -679,29 +725,22 @@ def load(self, load_model=True): DefaultParset.cfg). """ - # Create a shared dict for the data arrays. - - data0 = shared_dict.create(self._data_dict_name) - # These two variables indicate if the (corrected) data or flags have been updated # (Gotcha for shared_dict users! The only truly shared objects are arrays. # Thus, we create an array for the two "updated" variables.) - data0['updated'] = np.array([False, False, False]) self._auto_filled_bitflag = False # now run over the subsets of the tile set up above. Each subset is a chunk of rows with the same # channel shape. If all DDIDs have the same shape, this will be just the one - for subset in self._subsets: - if subset.label is None: - print("{}: reading MS rows {}~{}".format(self.label, self.first_row0, self.last_row0), file=log(0, "blue")) - data = data0 - else: - print("{}: reading MS rows {}~{}, {} ({} rows)".format(self.label, self.first_row0, - self.last_row0, subset.label, - len(subset.rows0)), file=log(0, "blue")) - data = shared_dict.create(subset.datadict) + for subset in self._subsets.values(): + print(f"{self.label}{' ' + subset.label if subset.label else ''}: reading MS rows {subset.rows0[0]}~{subset.rows0[-1]}", file=log(0, "blue")) + + # create datadict structure for this + data = shared_dict.create(subset.datadict) + data.addSubdict('solutions') + data['updated'] = np.array([False, False, False]) table_subset = self.dh.data.selectrows(subset.rows0) nrows0 = table_subset.nrows() @@ -770,8 +809,8 @@ def load(self, load_model=True): # FLAG/FLAG_ROW only needed if applying them, or auto-filling BITFLAG from them. flagcol = flagrow = None - self.bflagcol = None - self._flagcol_sum = 0 + subset.bflagcol = None + subset._flagcol_sum = 0 self.dh.flagcounts["TOTAL"] += flag_arr0.size if self.dh._apply_flags: @@ -781,8 +820,8 @@ def load(self, load_model=True): flagcol[flagrow, :, :] = True print(" read FLAG/FLAG_ROW", file=log(2)) # compute stats - self._flagcol_sum = flagcol.sum() - self.dh.flagcounts["FLAG"] += self._flagcol_sum + subset._flagcol_sum = flagcol.sum() + self.dh.flagcounts["FLAG"] += subset._flagcol_sum if self.dh._apply_flags: flag_arr0[flagcol] = FL.PRIOR @@ -813,28 +852,28 @@ def load(self, load_model=True): # Form up bitflag array, if needed. if self.dh._apply_bitflags or self.dh._save_bitflag: # If not explicitly re-initializing, try to read column. - self.bflagrow = table_subset.getcol("BITFLAG_ROW") + subset.bflagrow = table_subset.getcol("BITFLAG_ROW") # If there's an error reading BITFLAG, it must be unfilled. This is a common # occurrence so we may as well deal with it. In this case, if auto-fill is set, # fill BITFLAG from FLAG/FLAG_ROW. - self.bflagcol = self.dh.fetchslice("BITFLAG", subset=table_subset) - self.bflagcol[:] = np.bitwise_or.reduce(self.bflagcol, axis=2)[:,:,np.newaxis] + subset.bflagcol = self.dh.fetchslice("BITFLAG", subset=table_subset) + subset.bflagcol[:] = np.bitwise_or.reduce(subset.bflagcol, axis=2)[:,:,np.newaxis] print(" read BITFLAG/BITFLAG_ROW", file=log(2)) # compute stats for flagset, bitmask in self.dh.bitflags.items(): - flagged = self.bflagcol & bitmask != 0 - flagged[self.bflagrow & bitmask != 0, :, :] = True + flagged = subset.bflagcol & bitmask != 0 + flagged[subset.bflagrow & bitmask != 0, :, :] = True self.dh.flagcounts[flagset] += flagged.sum() # apply if self.dh._apply_bitflags: - flag_arr0[(self.bflagcol & self.dh._apply_bitflags) != 0] = FL.PRIOR - flag_arr0[(self.bflagrow & self.dh._apply_bitflags) != 0, :, :] = FL.PRIOR + flag_arr0[(subset.bflagcol & self.dh._apply_bitflags) != 0] = FL.PRIOR + flag_arr0[(subset.bflagrow & self.dh._apply_bitflags) != 0, :, :] = FL.PRIOR # if bitflag column is not kept, yet we need to save flags, keep the flag column - if self.bflagcol is None and self.dh._save_flags: - self._flagcol = flagcol + if subset.bflagcol is None and self.dh._save_flags: + subset._flagcol = flagcol flagged = flag_arr0 != 0 @@ -886,7 +925,7 @@ def load(self, load_model=True): obvis = data.addSharedArray('obvis', [nrows, nchan, self.dh.ncorr], obvis0.dtype) rebin_factor = obvis0.size / float(obvis.size) flag_arr = data.addSharedArray('flags', obvis.shape, FL.dtype) - ### no longer filling with -1 -- see flag logic changes in the kernel + ### no longer filling with -1 -- see flag logic changes in the kernel ## flag_arr.fill(-1) uvwco = data.addSharedArray('uvwco', [nrows, 3], float) # make dummy weight array if not 0 @@ -918,7 +957,7 @@ def load(self, load_model=True): if num_weights: data['weigh'] = weights0 del obvis0, uvw0, weights0 - + # normalize by amplitude, if needed # if self.dh.do_normalize_data: # dataabs = np.abs(obvis) @@ -929,7 +968,7 @@ def load(self, load_model=True): # data['weigh'] *= dataabs[np.newaxis, ...] # else: # data['weigh'] = dataabs.reshape([1]+list(dataabs.shape)) - + # compute PA rotation angles, if needed if self.dh.parallactic_machine is not None: angles = self.dh.parallactic_machine.rotation_angles(subset.time_col) @@ -1004,7 +1043,7 @@ def load(self, load_model=True): model = subset.load_montblanc_models(uvwco, loaded_models, model_source, cluster, imod, idir) # montblanc applies the PA rotation matrix internally elif DicoSourceProvider is not None and isinstance(model_source, DicoSourceProvider): - + if self.dh.ncorr == 4: model_type="cplx2x2" elif self.dh.ncorr == 2: @@ -1056,7 +1095,7 @@ def load(self, load_model=True): # round to 1e-16 to avoid underflows (casa setjy, for example, can yield visibilities with a tiny # imaginary part, which cause underflows when squared) movis.round(16, out=movis) - + # check for a null model (all directions) invmodel = (~np.isfinite(movis)).any(axis=(0,1)) invmodel[...,(0,-1)] |= (movis[...,(0,-1)]==0).all(axis=(0,1)) @@ -1070,7 +1109,7 @@ def load(self, load_model=True): self.dh.flagcounts["INVMODEL"] += ninv*rebin_factor print(" {} ({:.2%}) visibilities flagged due to 0/inf/nan model".format( ninv, ninv / float(flagged.size)), file=log(0, "red")) - + # release memory (gc.collect() particularly important), as model visibilities are *THE* major user (especially # in the DD case) del loaded_models, flag_arr0 @@ -1084,11 +1123,6 @@ def load(self, load_model=True): if self.dh.output_weight_column is not None: data.addSharedArray('outweights', data['obvis'].shape, self.dh.wtype) - # Create a placeholder for the gain solutions - data.addSubdict("solutions") - - return data - def get_chunk_cubes(self, key, ctype=np.complex128, allocator=np.empty, flag_allocator=np.empty): """ Produces the CubiCal data cubes corresponding to the specified key. @@ -1117,18 +1151,16 @@ def get_chunk_cubes(self, key, ctype=np.complex128, allocator=np.empty, flag_all n_mod refers to number of models simultaneously fitted. """ rowchunk, freq0, freq1 = self._chunk_dict[key] - - subset, _ = self._ddid_data_dict[rowchunk.ddid] + subset, row_index = rowchunk.subset, rowchunk.rows_within_subset data = shared_dict.attach(subset.datadict) t_dim = self.dh.chunk_ntimes[rowchunk.tchunk] f_dim = freq1 - freq0 freq_slice = slice(freq0, freq1) - rows = rowchunk.rows - nants = self.dh.nants - flags_2x2 = subset._column_to_cube(data['flags'], t_dim, f_dim, rows, freq_slice, + log(1).print(f"getting cubes for subset {subset.label}: {len(row_index)} rows, first is {row_index[0]}, last is {row_index[-1]}") + flags_2x2 = subset._column_to_cube(data['flags'], t_dim, f_dim, row_index, freq_slice, FL.dtype, FL.MISSING, allocator=allocator) flags = flag_allocator(flags_2x2.shape[:-2], flags_2x2.dtype) if self.ncorr == 4: @@ -1136,8 +1168,8 @@ def get_chunk_cubes(self, key, ctype=np.complex128, allocator=np.empty, flag_all else: flags[:] = flags_2x2[..., 0, 0] flags |= flags_2x2[..., 1, 1] - - obs_arr = subset._column_to_cube(data['obvis'], t_dim, f_dim, rows, freq_slice, ctype, + + obs_arr = subset._column_to_cube(data['obvis'], t_dim, f_dim, row_index, freq_slice, ctype, reqdims=6, allocator=allocator) if 'movis' in data: ### APPLY ROTATION HERE @@ -1153,7 +1185,7 @@ def get_chunk_cubes(self, key, ctype=np.complex128, allocator=np.empty, flag_all # subset.antea[rows], subset.anteb[rows], # angles=angles) - mod_arr = subset._column_to_cube(data['movis'], t_dim, f_dim, rows, freq_slice, ctype, + mod_arr = subset._column_to_cube(data['movis'], t_dim, f_dim, row_index, freq_slice, ctype, reqdims=8, allocator=allocator) # flag invalid model visibilities flags[(~np.isfinite(mod_arr[0, 0, ...])).any(axis=(-2, -1))] |= FL.INVALID @@ -1161,7 +1193,7 @@ def get_chunk_cubes(self, key, ctype=np.complex128, allocator=np.empty, flag_all mod_arr = None if 'weigh' in data: - wgt_arr = subset._column_to_cube(data['weigh'], t_dim, f_dim, rows, freq_slice, self.dh.wtype, + wgt_arr = subset._column_to_cube(data['weigh'], t_dim, f_dim, row_index, freq_slice, self.dh.wtype, allocator=allocator) # wgt_arr = flag_allocator(wgt_2x2.shape[:-2], wgt_2x2.dtype) # np.mean(wgt_2x2, axis=(-1, -2), out=wgt_arr) @@ -1195,14 +1227,13 @@ def set_chunk_cubes(self, cube, flag_cube, weight_cube, key, column='covis'): The column to which the cube must be copied. """ rowchunk, freq0, freq1 = self._chunk_dict[key] - subset, _ = self._ddid_data_dict[rowchunk.ddid] + subset, row_index = rowchunk.subset, rowchunk.rows_within_subset data = shared_dict.attach(subset.datadict) - rows = rowchunk.rows freq_slice = slice(freq0, freq1) if cube is not None: data['updated'][0] = True - subset._cube_to_column(data[column], cube, rows, freq_slice) + subset._cube_to_column(data[column], cube, row_index, freq_slice) ### APPLY DEROTATION HERE if self.dh.derotate_output > 0: @@ -1217,10 +1248,14 @@ def set_chunk_cubes(self, cube, flag_cube, weight_cube, key, column='covis'): angles=data['pa'][rows]) if flag_cube is not None: data['updated'][1] = True - subset._cube_to_column(data['flags'], flag_cube, rows, freq_slice, flags=True) + subset._cube_to_column(data['flags'], flag_cube, row_index, freq_slice, flags=True) if weight_cube is not None: data['updated'][2] = True - subset._cube_to_column(data['outweights'], weight_cube, rows, freq_slice) + subset._cube_to_column(data['outweights'], weight_cube, row_index, freq_slice) + + def _get_chunk_datadict(self, key): + rowchunk, _, _ = self._chunk_dict[key] + return shared_dict.attach(rowchunk.subset.datadict) def create_solutions_chunk_dict(self, key): """ @@ -1234,8 +1269,7 @@ def create_solutions_chunk_dict(self, key): :obj:`~cubical.tools.shared_dict.SharedDict`: Shared dictionary containing gain solutions. """ - - data = shared_dict.attach(self._data_dict_name) + data = self._get_chunk_datadict(key) sd = data['solutions'].addSubdict(key) return sd @@ -1252,11 +1286,11 @@ def iterate_solution_chunks(self): - Time slice (slice) - Frequency slice (slice) """ - - data = shared_dict.attach(self._data_dict_name) - soldict = data['solutions'] - for key in soldict.keys(): - yield soldict[key] + for subset in self._subsets.values(): + data = shared_dict.attach(subset.datadict) + soldict = data['solutions'] + for key in soldict.keys(): + yield soldict[key] def save(self, final=False, only_save=["output", "model", "weight", "flag", "bitflag"]): """ @@ -1269,28 +1303,20 @@ def save(self, final=False, only_save=["output", "model", "weight", "flag", "bit Only saves "output", "model", "weight" "flag" or "bitflag". This is useful for predicting """ - data0 = shared_dict.attach(self._data_dict_name) - # insert columns first, if needed, and reopen MS added = False - for subset in self._subsets: - if subset.label is None: - print("{}: saving MS rows {}~{}".format(self.label, self.first_row0, self.last_row0), file=log(0, "blue")) - data = data0 - else: - print("{}: saving MS rows {}~{}, {} ({} rows)".format(self.label, self.first_row0, - self.last_row0, subset.label, - len(subset.rows0)), file=log(0, "blue")) - data = shared_dict.attach(subset.datadict) + for subset in self._subsets.values(): + print(f"{self.label}{' ' + subset.label if subset.label else ''}: saving MS rows {subset.rows0[0]}~{subset.rows0[-1]}", file=log(0, "blue")) + data = shared_dict.attach(subset.datadict) # insert output columns, if needed, and reopen MS if they were actually added if not added: - if self.dh.output_column and data0['updated'][0]: + if self.dh.output_column and data['updated'][0]: added = self.dh._add_column(self.dh.output_column) if self.dh.output_model_column and 'movis' in data: added = self.dh._add_column(self.dh.output_model_column) or added - if self.dh.output_weight_column and data0['updated'][2]: + if self.dh.output_weight_column and data['updated'][2]: added = self.dh._add_column(self.dh.output_weight_column, like_type='float') or added if added: self.dh.reopen() @@ -1298,7 +1324,7 @@ def save(self, final=False, only_save=["output", "model", "weight", "flag", "bit table_subset = self.dh.data.selectrows(subset.rows0) - if self.dh.output_column and data0['updated'][0]: + if self.dh.output_column and data['updated'][0]: if ("output" in only_save): covis = data['covis'] # if self.dh.parallactic_machine is not None: @@ -1325,7 +1351,7 @@ def save(self, final=False, only_save=["output", "model", "weight", "flag", "bit else: print(" skipping {} column per user request".format(self.dh.output_model_column), file=log) #writing outputs weights if any - if self.dh.output_weight_column and data0['updated'][2]: + if self.dh.output_weight_column and data['updated'][2]: if ("weight" in only_save): outweights = subset.upsample(data['outweights']) print(" writing {} weight column".format(self.dh.output_weight_column), file=log) @@ -1337,7 +1363,7 @@ def save(self, final=False, only_save=["output", "model", "weight", "flag", "bit flag_col = None # if not None, FLAG/FLAG_ROW will be saved bflag_col = None # if not None, BITFLAG/BITFLAG_ROW will be saved - if data0['updated'][1]: + if data['updated'][1]: # count new flags newflags = subset.upsample(data['flags'] & ~(FL.PRIOR | FL.SKIPSOL) != 0) nfl = newflags.sum() @@ -1348,20 +1374,20 @@ def save(self, final=False, only_save=["output", "model", "weight", "flag", "bit # write to BITFLAG, if asked to if self.dh._save_bitflag: # clear bitflag column first - self.bflagcol &= ~self.dh._save_bitflag - self.bflagcol[newflags] |= self.dh._save_bitflag + subset.bflagcol &= ~self.dh._save_bitflag + subset.bflagcol[newflags] |= self.dh._save_bitflag bflag_col = True if self.dh._save_flags: print(" {:.2%} visibilities flagged by solver: saving to BITFLAG and FLAG columns".format(ratio), file=log) - flag_col = self.bflagcol != 0 + flag_col = subset.bflagcol != 0 else: print(" {:.2%} visibilities flagged by solver: saving to BITFLAG column only".format(ratio), file=log) # else write to FLAG/FLAG_ROW only, if asked to elif self.dh._save_flags: print(" {:.2%} visibilities flagged by solver: saving to FLAG column".format(ratio), file=log) - self._flagcol[newflags] = True - flag_col = self._flagcol + subset._flagcol[newflags] = True + flag_col = subset._flagcol # else just message else: @@ -1370,11 +1396,11 @@ def save(self, final=False, only_save=["output", "model", "weight", "flag", "bit print(" no solver flags were generated", file=log) # bitflag still needs to be cleared though, if we're writing to it if self.dh._save_bitflag: - self.bflagcol &= ~self.dh._save_bitflag + subset.bflagcol &= ~self.dh._save_bitflag bflag_col = True if self.dh._save_flags: print(" no visibilities flagged by solver: saving to BITFLAG and FLAG columns", file=log) - flag_col = self.bflagcol != 0 + flag_col = subset.bflagcol != 0 else: print(" no visibilities flagged by solver: saving to BITFLAG column only", file=log) @@ -1392,22 +1418,22 @@ def save(self, final=False, only_save=["output", "model", "weight", "flag", "bit # this is set if BITFLAG/BITFLAG_ROW is to be written out if bflag_col is not None: if ("bitflag" in only_save): - self.bflagcol[:] = np.bitwise_or.reduce(self.bflagcol, axis=2)[:,:,np.newaxis] - self.dh.putslice("BITFLAG", self.bflagcol, subset=table_subset) - totflags = (self.bflagcol != 0).sum() + subset.bflagcol[:] = np.bitwise_or.reduce(subset.bflagcol, axis=2)[:,:,np.newaxis] + self.dh.putslice("BITFLAG", subset.bflagcol, subset=table_subset) + totflags = (subset.bflagcol != 0).sum() self.dh.flagcounts['OUT'] += totflags - print(" updated BITFLAG column ({:.2%} visibilities flagged)".format(totflags / float(self.bflagcol.size)), file=log) - self.bflagrow = np.bitwise_and.reduce(self.bflagcol, axis=(-1, -2)) - table_subset.putcol("BITFLAG_ROW", self.bflagrow) + print(" updated BITFLAG column ({:.2%} visibilities flagged)".format(totflags / float(subset.bflagcol.size)), file=log) + subset.bflagrow = np.bitwise_and.reduce(subset.bflagcol, axis=(-1, -2)) + table_subset.putcol("BITFLAG_ROW", subset.bflagrow) print(" updated BITFLAG_ROW column ({:.2%} rows flagged)".format( - (self.bflagrow!=0).sum()/float(self.bflagrow.size)), file=log) + (subset.bflagrow!=0).sum()/float(subset.bflagrow.size)), file=log) else: - totflags = (self.bflagcol != 0).sum() - print(" skipping BITFLAG column per user request ({:.2%} visibilities would have been flagged otherwise)".format(totflags / float(self.bflagcol.size)), file=log) - print(" skipping BITFLAG column per user request ({:.2%} visibilities would have been flagged otherwise)".format(totflags / float(self.bflagcol.size)), file=log) + totflags = (subset.bflagcol != 0).sum() + print(" skipping BITFLAG column per user request ({:.2%} visibilities would have been flagged otherwise)".format(totflags / float(subset.bflagcol.size)), file=log) + print(" skipping BITFLAG column per user request ({:.2%} visibilities would have been flagged otherwise)".format(totflags / float(subset.bflagcol.size)), file=log) #prevents memory leak by clearing - self.bflagcol = self.bflagrow = None + subset.bflagcol = subset.bflagrow = None # this is set if FLAG/FLAG_ROW is to be written out if flag_col is not None: @@ -1443,7 +1469,7 @@ def release(self, final=False): data = shared_dict.attach(self._data_dict_name) data.delete() - for subset in self._subsets: + for subset in self._subsets.values(): if subset.label is not None: data = shared_dict.attach(subset.datadict) data.delete() diff --git a/cubical/degridder/DDFacetSim.py b/cubical/degridder/DDFacetSim.py index b069f7b8..2d4b226a 100644 --- a/cubical/degridder/DDFacetSim.py +++ b/cubical/degridder/DDFacetSim.py @@ -359,8 +359,13 @@ def __vis_2_time_map(DicoSols, times): jones_matrix["DicoJones_Beam"]["Dirs"]["I"] = I jones_matrix["DicoJones_Beam"]["Dirs"]["Cluster"] = 0 # separate matrix stack for each subregion (facet) sample_times = np.array(beam_provider.getBeamSampleTimes(utc_times)) - dt = (sample_times[1:] - sample_times[:-1]) / 2. if sample_times.size > 1 else \ - np.array([1.0e-6]) # half width of beam sample time + dt = np.zeros_like(sample_times) + if sample_times.size > 1: + # half width of beam sample time + dt[:-1] = (sample_times[1:] - sample_times[:-1]) / 2. + dt[-1] = dt[-2] # equal-sized left and right dt of the last step + else: + dt[...] = 1.0e-6 t0 = sample_times - dt t1 = sample_times + dt E_jones = np.ascontiguousarray([beam_provider.evaluateBeam(t, [ra], [dec]) for t in sample_times], dtype=np.complex64) diff --git a/cubical/machines/interval_gain_machine.py b/cubical/machines/interval_gain_machine.py index 06b183df..07807814 100644 --- a/cubical/machines/interval_gain_machine.py +++ b/cubical/machines/interval_gain_machine.py @@ -467,10 +467,9 @@ def precompute_attributes(self, data_arr, model_arr, flags_arr, inv_var_chan): if invalid_models.any(): self.raise_userwarning( logging.CRITICAL, - "{0:s} {1:s}: {2:d}/{3:d} directions/intervals has an invalid or null model. " + - "This can be caused by bad data, but should have been handled gracefully, so please report this issue!".format( - self.chunk_label, self.jones_label, invalid_models.sum(), invalid_models.size - ), + "{0:s} {1:s}: {2:d}/{3:d} directions/intervals has an invalid or null model. ".format( + self.chunk_label, self.jones_label, invalid_models.sum(), invalid_models.size) + + "This can be caused by bad data, but should have been handled gracefully, so please report this issue!", 90, raise_once="invalid_models", verbosity=2, color="red") with np.errstate(invalid='ignore', divide='ignore'): diff --git a/setup.py b/setup.py index b52a869d..bcd6fa5f 100644 --- a/setup.py +++ b/setup.py @@ -45,25 +45,19 @@ if on_rtd: requirements = ['numpy', - 'futures; python_version <= "2.7"', - 'matplotlib', + 'matplotlib', 'scipy'] else: requirements = ['future', 'numpy', - 'llvmlite==v0.31.0; python_version <= "2.7"', - 'numba==0.47.0; python_version <= "2.7"', - 'numba; python_version >= "3.0"', -# 'python-casacore<=3.0.0; python_version <= "2.7"', + 'numba', 'python-casacore', - 'sharedarray @ git+https://gitlab.com/bennahugo/shared-array.git@master', - 'matplotlib<3.0', + 'sharedarray >= 3.2.1', + 'matplotlib', 'scipy', 'astro-tigger-lsm', 'six', - 'futures; python_version <= "2.7"', - 'astropy<3.0; python_version <= "2.7"', - 'astropy>=3.0; python_version > "2.7"', + 'astropy>=3.0', 'psutil' ] @@ -72,7 +66,7 @@ description='Fast calibration implementation exploiting complex optimisation.', url='https://github.com/ratt-ru/CubiCal', classifiers=[ - "Development Status :: 3 - Alpha", + "Development Status :: 5 - Production/Stable", "Intended Audience :: Science/Research", "License :: OSI Approved :: GNU General Public License v3 (GPLv3)", "Operating System :: POSIX :: Linux", @@ -84,7 +78,7 @@ long_description=long_description, long_description_content_type='text/markdown', packages=find_packages(), - python_requires='<3.0' if six.PY2 else ">=3.0", #build a py2 or py3 specific wheel depending on environment (due to cython backend) + python_requires=">=3.6", install_requires=requirements, include_package_data=True, zip_safe=False, @@ -93,7 +87,7 @@ 'cubical/bin/plot-gain-solutions'], entry_points={'console_scripts': ['gocubical = cubical.main:main']}, extras_require={ - 'lsm-support': ['montblanc @git+https://github.com/ska-sa/montblanc.git@0.6.1'], + 'lsm-support': ['montblanc >= 0.6.4'], 'degridder-support': ['ddfacet >= 0.5.0', 'regions >= 0.4', 'meqtrees-cattery >= 1.7.0']