Skip to content

Commit

Permalink
Noxcal (#450)
Browse files Browse the repository at this point in the history
* partial fix for #93. Should allow for one spw at a time

* fixes #93. Adds workaround for casacore/python-casacore#130

* added future-fstrings

* correctly named future-fstrings

* ...and install it for all pythons

* back to mainstream sharedarray

* lowered message verbosity

* droppping support for python<3.6

* Issues 427 and 429 (#430)

* Fixes #427

* Fixes #429

* Update __init__.py

Bump version

* Update Jenkinsfile.sh (#447)

* Update Jenkinsfile.sh

Remove python 2 building

* Update Jenkinsfile.sh

* Issue 431 (#432)

* Beginning of nobeam functionality.

* Remove duplicated source provider.

* Change syntax for python 2.7 compatibility.

* Update montblanc version in setup to avoid installation woes.

Co-authored-by: JSKenyon <[email protected]>

* Remove two uses of future_fstrings.

* make pypi compatible (#446)

* make pypi compatible

* error

* Update setup.py

Only support up to v0.6.1 of montblanc

* Update setup.py

Co-authored-by: Benjamin Hugo <[email protected]>
Co-authored-by: JSKenyon <[email protected]>
Co-authored-by: JSKenyon <[email protected]>

* Update __init__.py

* Fix warning formatting

Co-authored-by: Oleg Smirnov <[email protected]>
Co-authored-by: Benjamin Hugo <[email protected]>
Co-authored-by: JSKenyon <[email protected]>
Co-authored-by: JSKenyon <[email protected]>
Co-authored-by: Gijs Molenaar <[email protected]>
Co-authored-by: JSKenyon <[email protected]>
  • Loading branch information
7 people authored May 12, 2021
1 parent c6a8020 commit 4fc2cea
Show file tree
Hide file tree
Showing 10 changed files with 265 additions and 194 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ Thumbs.db
##################
.eggs/
.idea/
dist/
cubical.egg-info/

# Backup files #
Expand Down Expand Up @@ -97,6 +98,6 @@ __pycache__
.venv*/
.virtualenv*/
.persistent_history
venv/

*~
.#*
31 changes: 14 additions & 17 deletions Jenkinsfile.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,21 @@ TEST_DATA_DIR="$WORKSPACE/../../../test-data"
mkdir $TEST_OUTPUT_DIR

# build and testrun
docker build -f ${WORKSPACE_ROOT}/projects/Cubical/.jenkins/1604.py2.docker -t cubical.1604.py2:${BUILD_NUMBER} ${WORKSPACE_ROOT}/projects/Cubical/
docker run --rm cubical.1604.py2:${BUILD_NUMBER}
docker build -f ${WORKSPACE_ROOT}/projects/Cubical/.jenkins/1804.py2.docker -t cubical.1804.py2:${BUILD_NUMBER} ${WORKSPACE_ROOT}/projects/Cubical/
docker run --rm cubical.1804.py2:${BUILD_NUMBER}
docker build -f ${WORKSPACE_ROOT}/projects/Cubical/.jenkins/1804.py3.docker -t cubical.1804.py3:${BUILD_NUMBER} ${WORKSPACE_ROOT}/projects/Cubical/
docker run --rm cubical.1804.py3:${BUILD_NUMBER}

#run tests
for img in 1604.py2 1804.py2 1804.py3;
do
docker run --rm -m 100g --cap-add sys_ptrace \
--memory-swap=-1 \
--shm-size=150g \
--rm=true \
--name=cubical$BUILD_NUMBER \
-v ${TEST_OUTPUT_DIR}:/workspace \
-v ${TEST_OUTPUT_DIR}:/root/tmp \
--entrypoint /bin/bash \
cubical.${img}:${BUILD_NUMBER} \
-c "cd /src/cubical && nosetests --with-xunit --xunit-file /workspace/nosetests.xml test"
done

img=1804.py3

docker run --rm -m 100g --cap-add sys_ptrace \
--memory-swap=-1 \
--shm-size=150g \
--rm=true \
--name=cubical$BUILD_NUMBER \
-v ${TEST_OUTPUT_DIR}:/workspace \
-v ${TEST_OUTPUT_DIR}:/root/tmp \
--entrypoint /bin/bash \
cubical.${img}:${BUILD_NUMBER} \
-c "cd /src/cubical && nosetests --with-xunit --xunit-file /workspace/nosetests.xml test"

2 changes: 1 addition & 1 deletion cubical/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
# This code is distributed under the terms of GPLv2, see LICENSE.md for details

# update this for each release
VERSION = "1.5.8"
VERSION = "1.5.10"
2 changes: 1 addition & 1 deletion cubical/data_handler/MBTiggerSim.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def model_vis(self, context):
ur = upper + offset
lc = ddid_ind*self._chan_per_ddid
uc = (ddid_ind+1)*self._chan_per_ddid
self._model[self._dir, 0, lr:ur, :, :] = \
self._model[self._dir, 0, lr:ur, :, :] += \
context.data[:,:,lc:uc,sel].reshape(-1, self._chan_per_ddid, self._ncorr)

def __str__(self):
Expand Down
28 changes: 19 additions & 9 deletions cubical/data_handler/TiggerSourceProvider.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,22 @@ def __init__(self, lsm, phase_center, dde_tag='dE'):
self._freqs = None

self._clusters = cluster_sources(self._sm, dde_tag)

self._cluster_keys = list(self._clusters.keys())
self._nclus = len(self._cluster_keys)

self._target_key = 0
self._target_cluster = self._cluster_keys[self._target_key]
self._target_reqbeam = "beam"

self._pnt_sources = self._clusters[self._target_cluster]["pnt"]
self._pnt_sources = \
self._clusters[self._target_cluster][self._target_reqbeam]["pnt"]
self._npsrc = len(self._pnt_sources)
self._gau_sources = self._clusters[self._target_cluster]["gau"]
self._gau_sources = \
self._clusters[self._target_cluster][self._target_reqbeam]["gau"]
self._ngsrc = len(self._gau_sources)

def set_direction(self, idir):
def set_direction(self, idir, req_beam="beam"):
"""Sets current direction being simulated.
Args:
Expand All @@ -65,9 +69,12 @@ def set_direction(self, idir):

self._target_key = idir
self._target_cluster = self._cluster_keys[self._target_key]
self._pnt_sources = self._clusters[self._target_cluster]["pnt"]
self._target_reqbeam = req_beam
self._pnt_sources = \
self._clusters[self._target_cluster][self._target_reqbeam]["pnt"]
self._npsrc = len(self._pnt_sources)
self._gau_sources = self._clusters[self._target_cluster]["gau"]
self._gau_sources = \
self._clusters[self._target_cluster][self._target_reqbeam]["gau"]
self._ngsrc = len(self._gau_sources)

def set_frequency(self, frequency):
Expand Down Expand Up @@ -211,10 +218,10 @@ def updated_dimensions(self):

return [('npsrc', self._npsrc),
('ngsrc', self._ngsrc)]

def phase_centre(self, context):
""" Sets the MB phase direction """
radec = np.array([self._phase_center[...,-2],
radec = np.array([self._phase_center[...,-2],
self._phase_center[...,-1]], context.dtype)
return radec

Expand Down Expand Up @@ -248,9 +255,12 @@ def cluster_sources(sm, dde_tag):
else:
dde_cluster = src.getTag('cluster')

group = 'pnt' if src.typecode=='pnt' else 'gau'
req_beam = 'nobeam' if src.getTag('nobeam') else 'beam'
src_type = 'pnt' if src.typecode=='pnt' else 'gau'

clus.setdefault(dde_cluster, dict(pnt=[], gau=[]))[group].append(src)
clus.setdefault(dde_cluster, dict(beam=dict(pnt=[], gau=[]),
nobeam=dict(pnt=[], gau=[])))
clus[dde_cluster][req_beam][src_type].append(src)

return clus

Expand Down
63 changes: 51 additions & 12 deletions cubical/data_handler/ms_data_handler.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
# CubiCal: a radio interferometric calibration suite
# (c) 2017 Rhodes University & Jonathan S. Kenyon
# http://github.com/ratt-ru/CubiCal
Expand Down Expand Up @@ -317,7 +318,8 @@ def __init__(self, ms_name, data_column, output_column=None, output_model_column

assert set(spwtabcols) <= set(
_spwtab.colnames()), "Measurement set conformance error - keyword table SPECTRAL_WINDOW incomplete. Perhaps disable --out-casa-gaintables or check your MS!"
self._spwtabcols = {t: _spwtab.getcol(t) for t in spwtabcols}
nrows = _spwtab.nrows()
self._spwtabcols = {t: [_spwtab.getcol(t, row, 1) for row in range(nrows)] for t in spwtabcols}

# read observation details
obstabcols = ["TIME_RANGE", "LOG", "SCHEDULE", "FLAG_ROW",
Expand Down Expand Up @@ -470,18 +472,19 @@ def __init__(self, ms_name, data_column, output_column=None, output_model_column
" ".join([str(ch) for ch in freqchunks + [nchan]])), file=log(0))

# now accumulate list of all frequencies, and also see if selected DDIDs have a uniform rebinning and chunking map
all_freqs = set(self.chanfreqs[self._ddids[0]])
self.do_freq_rebin = any([m is not None for m in list(self.rebin_chan_maps.values())])
self._ddids_unequal = False
ddid0_map = self.rebin_chan_maps[self._ddids[0]]
for ddid in self._ddids[1:]:
if len(self.chanfreqs[0]) != len(self.chanfreqs[ddid]):
self._ddids_unequal = True
break
map1 = self.rebin_chan_maps[ddid]
if ddid0_map is None and map1 is None:
continue
if (ddid0_map is None and map1 is not None) or (ddid0_map is not None and map1 is None) or \
len(ddid0_map) != len(map1) or (ddid0_map!=map1).any():
self._ddids_unequal = True
all_freqs.update(self.chanfreqs[ddid])

if self._ddids_unequal:
print("Selected DDIDs have differing channel structure. Processing may be less efficient.", file=log(0,"red"))
Expand Down Expand Up @@ -812,6 +815,40 @@ def fetch(self, colname, first_row=0, nrows=-1, subset=None):
(subset or self.data).getcolnp(str(colname), prealloc, first_row, nrows)
return prealloc

@staticmethod
def _get_row_chunk(array):
"""
Establishes max row chunk that can be used. Workaround for https://github.com/casacore/python-casacore/issues/130
array: array to be written, of shape (nrows, nfreq, ncorr)
"""
_maxchunk = 2**29 # max number of elements to write, see https://github.com/casacore/python-casacore/issues/130#issuecomment-748150854
nrows, nfreq, ncorr = array.shape
maxrows = max(1, _maxchunk // (nfreq*ncorr))
#if maxrows < nrows:
log(1).print(f" table I/O request of {nrows} rows: max chunk size is {maxrows} rows")
return maxrows, nrows

@staticmethod
def _getcolnp_wrapper(table, column, array, startrow):
"Calls table.getcolnp() in chunks of rows. Workaround for https://github.com/casacore/python-casacore/issues/130"
maxrows, nrows = MSDataHandler._get_row_chunk(array)
for row0 in range(0, nrows, maxrows):
table.getcolnp(column, array[row0:row0+maxrows], startrow+row0, maxrows)

@staticmethod
def _getcolslicenp_wrapper(table, column, array, begin, end, incr, startrow):
"Calls table.getcolnp() in chunks of rows. Workaround for https://github.com/casacore/python-casacore/issues/130"
maxrows, nrows = MSDataHandler._get_row_chunk(array)
for row0 in range(0, nrows, maxrows):
table.getcolslicenp(column, array[row0:row0+maxrows], begin, end, incr, startrow+row0, maxrows)

@staticmethod
def _putcol_wrapper(table, column, array, startrow):
"Calls table.putcol() in chunks of rows. Workaround for https://github.com/casacore/python-casacore/issues/130"
maxrows, nrows = MSDataHandler._get_row_chunk(array)
for row0 in range(0, nrows, maxrows):
table.putcol(column, array[row0:row0+maxrows], startrow+row0, maxrows)

def fetchslice(self, column, startrow=0, nrows=-1, subset=None):
"""
Convenience function similar to fetch(), but assumes a column of NFREQxNCORR shape,
Expand Down Expand Up @@ -840,17 +877,17 @@ def fetchslice(self, column, startrow=0, nrows=-1, subset=None):
shape = tuple([nrows] + [s for s in cell.shape]) if hasattr(cell, "shape") else nrows

prealloc = np.empty(shape, dtype=dtype)
subset.getcolnp(str(column), prealloc, startrow, nrows)
self._getcolnp_wrapper(subset, str(column), prealloc, startrow)
return prealloc

# ugly hack because getcell returns a different dtype to getcol
cell = (subset or self.data).getcol(str(column), startrow, nrow=1)[0, ...]
cell = subset.getcol(str(column), startrow, nrow=1)[0, ...]
dtype = getattr(cell, "dtype", type(cell))

shape = tuple([nrows] + [len(list(range(l, r + 1, i))) #inclusive in cc
for l, r, i in zip(self._ms_blc, self._ms_trc, self._ms_incr)])
prealloc = np.empty(shape, dtype=dtype)
subset.getcolslicenp(str(column), prealloc, self._ms_blc, self._ms_trc, self._ms_incr, startrow, nrows)
self._getcolslicenp_wrapper(subset, str(column), prealloc, self._ms_blc, self._ms_trc, self._ms_incr, startrow)
return prealloc

def fetchslicenp(self, column, data, startrow=0, nrows=-1, subset=None):
Expand All @@ -871,8 +908,8 @@ def fetchslicenp(self, column, data, startrow=0, nrows=-1, subset=None):
"""
subset = subset or self.data
if self._ms_blc == None:
return subset.getcolnp(column, data, startrow, nrows)
return subset.getcolslicenp(column, data, self._ms_blc, self._ms_trc, self._ms_incr, startrow, nrows)
return self._getcolnp_wrapper(subset, column, data, startrow)
return self._getcolslicenp_wrapper(subset, column, data, self._ms_blc, self._ms_trc, self._ms_incr, startrow)

def putslice(self, column, value, startrow=0, nrows=-1, subset=None):
"""
Expand All @@ -897,7 +934,7 @@ def putslice(self, column, value, startrow=0, nrows=-1, subset=None):
# if no slicing, just use putcol to put the whole thing. This always works,
# unless the MS is screwed up
if self._ms_blc == None:
return subset.putcol(str(column), value, startrow, nrows)
return self._putcol_wrapper(subset, str(column), value, startrow)
if nrows<0:
nrows = subset.nrows()

Expand All @@ -921,19 +958,21 @@ def putslice(self, column, value, startrow=0, nrows=-1, subset=None):
value[:] = np.bitwise_or.reduce(value, axis=2)[:,:,np.newaxis]

if self._channel_slice == slice(None) and self._corr_slice == slice(None):
return subset.putcol(column, value, startrow, nrows)
return self._putcol_wrapper(subset, value, startrow)
else:
# for bitflags, we want to preserve flags we haven't touched -- read the column
if column == "BITFLAG" or column == "FLAG":
value0 = subset.getcol(column)
value0 = np.empty_like(value)
self._getcolnp_wrapper(subset, column, value0, startrow)
# cheekily propagate per-corr flags to all corrs
value0[:] = np.bitwise_or.reduce(value0, axis=2)[:,:,np.newaxis]
# otherwise, init empty column
else:
ddid = subset.getcol("DATA_DESC_ID", 0, 1)[0]
shape = (nrows, self._nchan0_orig[ddid], self.nmscorrs)
value0 = np.zeros(shape, value.dtype)
value0[:, self._channel_slice, self._corr_slice] = value
return subset.putcol(str(column), value0, startrow, nrows)
return self._putcol_wrapper(subset, str(column), value0, startrow, nrows)

def define_chunk(self, chunk_time, rebin_time, fdim=1, chunk_by=None, chunk_by_jump=0, chunks_per_tile=4, max_chunks_per_tile=0):
"""
Expand Down
Loading

0 comments on commit 4fc2cea

Please sign in to comment.