From a703bc4c55f8bf70069c66886d1d191c70a77f71 Mon Sep 17 00:00:00 2001 From: Dmitri Gavrilov Date: Wed, 24 Jan 2024 13:35:41 -0500 Subject: [PATCH 01/16] ENH: add 'catalog' parameter to 'make_hdf' --- pyxrf/model/load_data_from_db.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pyxrf/model/load_data_from_db.py b/pyxrf/model/load_data_from_db.py index 99f994af..dad312b7 100644 --- a/pyxrf/model/load_data_from_db.py +++ b/pyxrf/model/load_data_from_db.py @@ -290,6 +290,7 @@ def make_hdf( save_scaler=True, num_end_lines_excluded=None, skip_scan_types=None, + catalog=None, ): """ Load data from database and save it in HDF5 files. @@ -402,6 +403,9 @@ def make_hdf( The list of plan types (e.g. ['FlyPlan1D']) that should cause the loader to raise an exception. The parameter is used to allow scripts to ignore certain plan types when downloading data using ranges of scans IDs. (Supported only at HXN.) + catalog: str or None + Name of the catalog (e.g. `"srx"`). The function attempts to determine the catalog + name automatically if the parameter is not specified or `None`. """ if wd: @@ -2331,6 +2335,7 @@ def _get_row_len(row_data): m, n_pt_max, missing_rows = 0, -1, [] # m - index try: while True: + print("1") ## try: while True: name, doc = next(docs_primary) @@ -2344,8 +2349,10 @@ def _get_row_len(row_data): except StopIteration: break # All events are processed, exit the loop + print("2") ## if is_filled: data = doc["data"][detector_field] + print(f"data={data}") ## data_det1 = np.array(data[:, 0, :], dtype=np.float32) # The following is the fix for the case when data has corrupt row (smaller number of data points). From f581c9c398eac67f1c2387ed49fd799c34c2cfd3 Mon Sep 17 00:00:00 2001 From: Dmitri Gavrilov Date: Wed, 24 Jan 2024 17:18:32 -0500 Subject: [PATCH 02/16] ENH: recognizing Tiled and passing parameters - work in progess --- pyxrf/db_config/srx_db_config.py | 99 ++--- pyxrf/model/load_data_from_db.py | 614 +++++++++++++++++++++++++++++-- 2 files changed, 637 insertions(+), 76 deletions(-) diff --git a/pyxrf/db_config/srx_db_config.py b/pyxrf/db_config/srx_db_config.py index 87f2fad9..f80f27a5 100644 --- a/pyxrf/db_config/srx_db_config.py +++ b/pyxrf/db_config/srx_db_config.py @@ -1,76 +1,77 @@ -import h5py -try: - from databroker.v1 import Broker -except ModuleNotFoundError: - from databroker import Broker +# import h5py -import logging +# try: +# from databroker.v1 import Broker +# except ModuleNotFoundError: +# from databroker import Broker -from databroker._core import register_builtin_handlers +# import logging -# srx detector, to be moved to filestore -# from databroker.assets.handlers import Xspress3HDF5Handler -from databroker.assets.handlers import HandlerBase +# from databroker._core import register_builtin_handlers -logger = logging.getLogger(__name__) +# # srx detector, to be moved to filestore +# # from databroker.assets.handlers import Xspress3HDF5Handler +# from databroker.assets.handlers import HandlerBase -db = Broker.named("srx") -try: - register_builtin_handlers(db.reg) -except Exception as ex: - logger.error(f"Error while registering default SRX handlers: {ex}") +# logger = logging.getLogger(__name__) +# db = Broker.named("srx") +# try: +# register_builtin_handlers(db.reg) +# except Exception as ex: +# logger.error(f"Error while registering default SRX handlers: {ex}") -class BulkXSPRESS(HandlerBase): - HANDLER_NAME = "XPS3_FLY" - def __init__(self, resource_fn): - self._handle = h5py.File(resource_fn, "r") +# class BulkXSPRESS(HandlerBase): +# HANDLER_NAME = "XPS3_FLY" - def __call__(self): - return self._handle["entry/instrument/detector/data"][:] +# def __init__(self, resource_fn): +# self._handle = h5py.File(resource_fn, "r") +# def __call__(self): +# return self._handle["entry/instrument/detector/data"][:] -class ZebraHDF5Handler(HandlerBase): - HANDLER_NAME = "ZEBRA_HDF51" - def __init__(self, resource_fn): - self._handle = h5py.File(resource_fn, "r") +# class ZebraHDF5Handler(HandlerBase): +# HANDLER_NAME = "ZEBRA_HDF51" - def __call__(self, column): - return self._handle[column][:] +# def __init__(self, resource_fn): +# self._handle = h5py.File(resource_fn, "r") +# def __call__(self, column): +# return self._handle[column][:] -class SISHDF5Handler(HandlerBase): - HANDLER_NAME = "SIS_HDF51" - def __init__(self, resource_fn): - self._handle = h5py.File(resource_fn, "r") +# class SISHDF5Handler(HandlerBase): +# HANDLER_NAME = "SIS_HDF51" - def __call__(self, column): - return self._handle[column][:] +# def __init__(self, resource_fn): +# self._handle = h5py.File(resource_fn, "r") +# def __call__(self, column): +# return self._handle[column][:] -class BulkMerlin(BulkXSPRESS): - HANDLER_NAME = "MERLIN_FLY_STREAM_V1" - def __call__(self): - return self._handle["entry/instrument/detector/data"][:] +# class BulkMerlin(BulkXSPRESS): +# HANDLER_NAME = "MERLIN_FLY_STREAM_V1" +# def __call__(self): +# return self._handle["entry/instrument/detector/data"][:] -class BulkDexela(HandlerBase): - HANDLER_NAME = "DEXELA_FLY_V1" - def __init__(self, resource_fn): - self._handle = h5py.File(resource_fn, "r") +# class BulkDexela(HandlerBase): +# HANDLER_NAME = "DEXELA_FLY_V1" - def __call__(self): - return self._handle["entry/instrument/detector/data"][:] +# def __init__(self, resource_fn): +# self._handle = h5py.File(resource_fn, "r") +# def __call__(self): +# return self._handle["entry/instrument/detector/data"][:] -db.reg.register_handler(BulkXSPRESS.HANDLER_NAME, BulkXSPRESS, overwrite=True) -db.reg.register_handler("SIS_HDF51", SISHDF5Handler, overwrite=True) -db.reg.register_handler("ZEBRA_HDF51", ZebraHDF5Handler, overwrite=True) -db.reg.register_handler(BulkMerlin.HANDLER_NAME, BulkMerlin, overwrite=True) -db.reg.register_handler(BulkDexela.HANDLER_NAME, BulkDexela, overwrite=True) + +# db.reg.register_handler(BulkXSPRESS.HANDLER_NAME, BulkXSPRESS, overwrite=True) +# db.reg.register_handler("SIS_HDF51", SISHDF5Handler, overwrite=True) +# db.reg.register_handler("ZEBRA_HDF51", ZebraHDF5Handler, overwrite=True) +# db.reg.register_handler(BulkMerlin.HANDLER_NAME, BulkMerlin, overwrite=True) +# db.reg.register_handler(BulkDexela.HANDLER_NAME, BulkDexela, overwrite=True) diff --git a/pyxrf/model/load_data_from_db.py b/pyxrf/model/load_data_from_db.py index dad312b7..88da0d0a 100644 --- a/pyxrf/model/load_data_from_db.py +++ b/pyxrf/model/load_data_from_db.py @@ -34,8 +34,15 @@ sep_v = os.sep + +def get_catalog(catalog_name): + from tiled.client import from_uri + c = from_uri('https://tiled.nsls2.bnl.gov') + return c[catalog_name.lower()]['raw'] + + try: - beamline_name = None + catalog_name = None # Attempt to find the configuration file first config_path = "/etc/pyxrf/pyxrf.json" @@ -43,39 +50,39 @@ try: with open(config_path, "r") as beamline_pyxrf: beamline_config_pyxrf = json.load(beamline_pyxrf) - beamline_name = beamline_config_pyxrf["beamline_name"] + catalog_name = beamline_config_pyxrf["beamline_name"] except Exception as ex: raise IOError(f"Error while opening configuration file {config_path!r}") from ex else: # Otherwise try to identify the beamline using host name hostname = platform.node() - beamline_names = { + catalog_names = { "xf03id": "HXN", "xf05id": "SRX", "xf08bm": "TES", "xf04bm": "XFM", } - for k, v in beamline_names.items(): + for k, v in catalog_names.items(): if hostname.startswith(k): - beamline_name = v + catalog_name = v - if beamline_name is None: + if catalog_name is None: raise Exception("Beamline is not identified") - if beamline_name == "HXN": + if catalog_name.upper() == "HXN": from pyxrf.db_config.hxn_db_config import db - elif beamline_name == "SRX": - from pyxrf.db_config.srx_db_config import db - elif beamline_name == "XFM": + elif catalog_name.upper() == "SRX": + db = get_catalog('srx') + elif catalog_name.upper() == "XFM": from pyxrf.db_config.xfm_db_config import db - elif beamline_name == "TES": + elif catalog_name.upper() == "TES": from pyxrf.db_config.tes_db_config import db else: db = None db_analysis = None - print(f"Beamline Database is not used in pyxrf: unknown beamline {beamline_name!r}") + print(f"Beamline Database is not used in pyxrf: unknown catalog {catalog_name!r}") except Exception as ex: db = None @@ -121,7 +128,7 @@ def flip_data(input_data, subscan_dims=None): return new_data -def fetch_run_info(run_id_uid): +def fetch_run_info(run_id_uid, catalog_name=None): """ Fetches key data from start document of the selected run @@ -129,6 +136,9 @@ def fetch_run_info(run_id_uid): ---------- run_id_uid: int or str Run ID (positive or negative int) or UID (str, full or short) of the run. + catalog_name: str or None + Name of the catalog (e.g. `"srx"`). The function attempts to determine the catalog + name automatically if the parameter is not specified or `None`. Returns ------- @@ -142,7 +152,14 @@ def fetch_run_info(run_id_uid): failed to fetch the run from Databroker """ try: - hdr = db[run_id_uid] + + if catalog_name: + catalog = get_catalog(catalog_name) + else: + catalog = db + hdr = catalog[run_id_uid] + + hdr = catalog[run_id_uid] run_id = hdr.start["scan_id"] run_uid = hdr.start["uid"] except Exception: @@ -166,6 +183,7 @@ def fetch_data_from_db( save_scaler=True, num_end_lines_excluded=None, skip_scan_types=None, + catalog_name=None, ): """ Read data from databroker. @@ -213,15 +231,21 @@ def fetch_data_from_db( remove the last few bad lines skip_scan_types: list(str) or None list of plan type names to ignore, e.g. ['FlyPlan1D']. (Supported only at HXN.) - + catalog_name: str or None + Returns ------- dict of data in 2D format matching x,y scanning positions """ - hdr = db[-1] + if catalog_name: + catalog = get_catalog(catalog_name) + else: + catalog = db + hdr = catalog[run_id_uid] + beamline_id = hdr.start["beamline_id"] print("Loading data from database.") - if hdr.start.beamline_id == "HXN": + if beamline_id == "HXN": data = map_data2D_hxn( run_id_uid, fpath, @@ -233,7 +257,7 @@ def fetch_data_from_db( output_to_file=output_to_file, skip_scan_types=skip_scan_types, ) - elif hdr.start.beamline_id == "xf05id" or hdr.start.beamline_id == "SRX": + elif beamline_id == "xf05id" or beamline_id == "SRX": data = map_data2D_srx( run_id_uid, fpath, @@ -245,8 +269,9 @@ def fetch_data_from_db( output_to_file=output_to_file, save_scaler=save_scaler, num_end_lines_excluded=num_end_lines_excluded, + catalog=catalog, ) - elif hdr.start.beamline_id == "XFM": + elif beamline_id == "XFM": data = map_data2D_xfm( run_id_uid, fpath, @@ -257,7 +282,7 @@ def fetch_data_from_db( file_overwrite_existing=file_overwrite_existing, output_to_file=output_to_file, ) - elif hdr.start.beamline_id == "TES": + elif beamline_id == "TES": data = map_data2D_tes( run_id_uid, fpath, @@ -290,7 +315,7 @@ def make_hdf( save_scaler=True, num_end_lines_excluded=None, skip_scan_types=None, - catalog=None, + catalog_name=None, ): """ Load data from database and save it in HDF5 files. @@ -403,7 +428,7 @@ def make_hdf( The list of plan types (e.g. ['FlyPlan1D']) that should cause the loader to raise an exception. The parameter is used to allow scripts to ignore certain plan types when downloading data using ranges of scans IDs. (Supported only at HXN.) - catalog: str or None + catalog_name: str or None Name of the catalog (e.g. `"srx"`). The function attempts to determine the catalog name automatically if the parameter is not specified or `None`. """ @@ -420,7 +445,7 @@ def make_hdf( if end is not None: raise ValueError(r"Parameter 'end' must be None if run is loaded by UID") - run_id, run_uid = fetch_run_info(start) # This may raise RuntimeException + run_id, run_uid = fetch_run_info(start, catalog_name) # This may raise RuntimeException # Load one scan with ID specified by ``start`` # If there is a problem while reading the scan, the exception is raised. @@ -440,6 +465,7 @@ def make_hdf( save_scaler=save_scaler, num_end_lines_excluded=num_end_lines_excluded, skip_scan_types=skip_scan_types, + catalog_name=catalog_name, ) else: # Both ``start`` and ``end`` are specified. Convert the scans in the range @@ -463,6 +489,7 @@ def make_hdf( save_scaler=save_scaler, num_end_lines_excluded=num_end_lines_excluded, skip_scan_types=skip_scan_types, + catalog_name=catalog_name, ) print(f"Scan #{v}: Conversion completed.\n") except Exception as ex: @@ -980,6 +1007,7 @@ def map_data2D_srx( output_to_file=True, save_scaler=True, num_end_lines_excluded=None, + catalog=None, ): """ Transfer the data from databroker into a correct format following the @@ -1024,16 +1052,38 @@ def map_data2D_srx( choose to save scaler data or not for srx beamline, test purpose only. num_end_lines_excluded : int, optional remove the last few bad lines + catalog + reference to databroker catalog Returns ------- dict of data in 2D format matching x,y scanning positions """ - hdr = db[run_id_uid] - start_doc = hdr["start"] + catalog = catalog or db + + using_tiled = "CatalogOfBlueskyRuns" in str(type(catalog)) + + hdr = catalog[run_id_uid] + start_doc = hdr.start use_new_format = "md_version" in start_doc - if use_new_format: + if using_tiled and use_new_format: + return map_data2D_srx_new_tiled( + run_id_uid=run_id_uid, + fpath=fpath, + create_each_det=create_each_det, + fname_add_version=fname_add_version, + completed_scans_only=completed_scans_only, + successful_scans_only=successful_scans_only, + file_overwrite_existing=file_overwrite_existing, + output_to_file=output_to_file, + save_scaler=save_scaler, + num_end_lines_excluded=num_end_lines_excluded, + catalog=catalog, + ) + elif using_tiled and not use_new_format: + print(f"Using Tiled and Old Format ...") + elif use_new_format: return map_data2D_srx_new( run_id_uid=run_id_uid, fpath=fpath, @@ -2116,6 +2166,516 @@ def swap_axes(): return data_output +def map_data2D_srx_new_tiled( + run_id_uid, + fpath, + create_each_det=False, + fname_add_version=False, + completed_scans_only=False, + successful_scans_only=False, + file_overwrite_existing=False, + output_to_file=True, + save_scaler=True, + num_end_lines_excluded=None, + catalog=None, +): + if num_end_lines_excluded: + logger.warning( + "The data loading function for new SRX format does not support the parameter " + "'num_end_lines_excluded' ({num_end_lines_excluded}). All available data will " + "be included in the output file." + ) + + hdr = catalog[run_id_uid] + start_doc = hdr.start + runid = start_doc["scan_id"] # Replace with the true value (runid may be relative, such as -2) + + print("**********************************************************") + print(f"Loading scan #{runid}") + print(f"Scan metadata format: version {start_doc['md_version']}") + + if completed_scans_only and not _is_scan_complete(hdr): + raise Exception("Scan is incomplete. Only completed scans are currently processed.") + if successful_scans_only and not _is_scan_successful(hdr): + raise Exception( + "Scan is not successfully completed. Only successfully completed scans are currently processed." + ) + + scan_doc = start_doc["scan"] + stop_doc = hdr.stop + + # The following scan parameters are used to compute positions for some motors. + # (Positions of course stages are not saved during the scan) + fast_start, fast_stop, fast_pts, slow_start, slow_stop, slow_pts = scan_doc["scan_input"][:6] + fast_step = (fast_stop - fast_start) / fast_pts + slow_step = (slow_stop - slow_start) / slow_pts + + snaking_enabled = scan_doc["snake"] == 1 + + print(f"Scan type: {scan_doc['type']}") + + # Check for detectors + dets = [] + try: + md_dets = hdr.start["scan"]["detectors"] + for d in md_dets: + if d in ("xs", "xs2", "xs4"): + dets.append(d) + except KeyError: + # AMK forgot to add detectors to step scans + # This is fixed, but left in for those scans + if scan_doc["type"] == "XRF_STEP": + dets.append("xs") + + if not (dets): + raise IOError("No detectors found!") + + # Get metadata + mdata = _extract_metadata_from_header(hdr) + + v = _get_metadata_value_from_descriptor_document(hdr, data_key="ring_current", stream_name="baseline") + if v is not None: + mdata["instrument_beam_current"] = v + + for ax in ["X", "Y", "Z"]: + v = _get_metadata_all_from_descriptor_document( + hdr, data_key=f"nanoKB_interferometer_pos{ax}", stream_name="baseline" + ) + if v is not None: + mdata[f"param_interferometer_pos{ax}"] = v + + # Get position data from scan + n_scan_fast, n_scan_slow = hdr.start["scan"]["shape"] + n_scan_fast, n_scan_slow = int(n_scan_fast), int(n_scan_slow) + + # =================================================================== + # NEW SRX FLY SCAN + # =================================================================== + stages_no_data = ("nano_stage_x", "nano_stage_y", "nano_stage_z", "nano_stage_topx", "nano_stage_topz") + + if scan_doc["type"] == "XRF_FLY": + fast_motor = scan_doc["fast_axis"]["motor_name"] + if fast_motor == "nano_stage_sx": + fast_key = "enc1" + elif fast_motor == "nano_stage_sy": + fast_key = "enc2" + elif fast_motor == "nano_stage_sz": + fast_key = "enc3" + elif fast_motor in stages_no_data: + fast_key = "fast_gen" # No positions are saved. Generate positions based on scan parameters. + else: + raise IOError(f"{fast_motor} not found!") + + slow_motor = scan_doc["slow_axis"]["motor_name"] + if slow_motor == "nano_stage_sx": + slow_key = "enc1" + elif slow_motor == "nano_stage_sy": + slow_key = "enc2" + elif slow_motor == "nano_stage_sz": + slow_key = "enc3" + elif slow_motor in stages_no_data: + slow_key = "slow_gen" # No positions are saved. Generate positions based on scan parameters. + else: + slow_key = slow_motor + + # Let's get the data using the events! Yay! + filler = Filler(db.reg.handler_reg, inplace=True) + docs_stream0 = hdr.documents("stream0", fill=False) + docs_primary = hdr.documents("primary", fill=False) + d_xs, d_xs_sum, N_xs = [], [], 0 + d_xs2, d_xs2_sum, N_xs2 = [], [], 0 + sclr_list = ["i0", "i0_time", "time", "im", "it"] + sclr_dict = {} + fast_pos, slow_pos = [], [] + + n_recorded_events = 0 + + try: + m = 0 + while True: + try: + while True: + name, doc = next(docs_stream0) + try: + filler(name, doc) + except Exception: + pass # The document can not be filled. Leave it unfilled. + if name == "event": + break + except StopIteration: + break # All events are processed, exit the loop + + try: + while True: + name_p, doc_p = next(docs_primary) + try: + filler(name_p, doc_p) + except Exception: + pass # The document can not be filled. Leave it unfilled. + if name == "event": + break + except StopIteration: + raise RuntimeError(f"Matching event #{m} was not found in 'primary' stream") + + def data_or_empty_array(v): + """ + If data is a string, then return an empty array, otherwise return the data as numpy array. + """ + if isinstance(v, str): + v = [] + return np.asarray(v) + + v, vp = doc, doc_p + if "xs" in dets or "xs4" in dets: + event_data = data_or_empty_array(v["data"]["fluor"]) + if event_data.size: + event_data = np.asarray(event_data, dtype=np.float32) + N_xs = max(N_xs, event_data.shape[1]) + d_xs_sum.append(np.sum(event_data, axis=1)) + if create_each_det: + d_xs.append(event_data) + else: + # Unfilled document + d_xs_sum.append(event_data) + if create_each_det: + d_xs.append(event_data) + + if "xs2" in dets: + event_data = data_or_empty_array(v["data"]["fluor_xs2"]) + if event_data.size: + event_data = np.asarray(event_data, dtype=np.float32) + N_xs2 = max(N_xs2, event_data.shape[1]) + d_xs2_sum.append(np.sum(event_data, axis=1)) + if create_each_det: + d_xs2.append(event_data) + else: + # Unfilled document + d_xs2_sum.append(event_data) + if create_each_det: + d_xs2.append(event_data) + + keys = v["data"].keys() + for s in sclr_list: + if s in keys: + tmp = data_or_empty_array(v["data"][s]) + if s not in sclr_dict: + sclr_dict[s] = [tmp] + else: + sclr_dict[s].append(tmp) + + if fast_key == "fast_gen": + # Generate positions + row_pos_fast = np.arange(fast_pts) * fast_step + fast_start + if snaking_enabled and (n_recorded_events % 2): + row_pos_fast = np.flip(row_pos_fast) + else: + row_pos_fast = data_or_empty_array(v["data"][fast_key]) + fast_pos.append(row_pos_fast) + + if slow_key == "slow_gen": + # Generate positions + row_pos_slow = np.ones(fast_pts) * (n_recorded_events * slow_step + slow_start) + elif "enc" not in slow_key: + # vp = next(ep) + # filler("event", vp) + tmp = data_or_empty_array(vp["data"][slow_key]) + row_pos_slow = np.array([tmp] * n_scan_fast) + else: + row_pos_slow = np.array(data_or_empty_array(v["data"][slow_key])) + slow_pos.append(row_pos_slow) + + n_recorded_events = m + 1 + + if m > 0 and not (m % 10): + print(f"Processed lines: {m}") + + # Delete filled data (it will not be used anymore). This prevents memory leak. + if "data" in doc: + doc["data"].clear() + if "data" in doc_p: + doc_p["data"].clear() + filler.clear_handler_cache() + + m += 1 + + except Exception as ex: + logger.error(f"Error occurred while reading data: {ex}. Trying to retrieve available data ...") + + def repair_set(dset_list, n_row_pts, msg): + """ + Replaces corrupt rows (incorrect number of points) with closest 'good' row. This allows to load + and use data from corrupt scans. The function will have no effect on 'good' scans. + If there are no rows with correct number of points (unlikely case), then the array remains unchanged. + """ + missed_rows = [] + n_last_good_row = -1 + for n in range(len(dset_list)): + d = dset_list[n] + n_pts = d.shape[0] + if n_pts != n_row_pts: + print( + f"WARNING: ({msg}) Row #{n + 1} has {n_pts} data points. {n_row_pts} points are expected." + ) + if n_last_good_row == -1: + missed_rows.append(n) + else: + dset_list[n] = np.array(dset_list[n_last_good_row]) + print(f"({msg}) Data in row #{n + 1} is replaced by data from row #{n_last_good_row}") + else: + n_last_good_row = n + if missed_rows: + for nr in missed_rows: + dset_list[nr] = np.array(dset_list[n_last_good_row]) + print(f"({msg}) Data in row #{nr + 1} is replaced by data from row #{n_last_good_row}") + missed_rows = [] + + sclr_name = list(sclr_dict.keys()) + + repair_set(d_xs_sum, n_scan_fast, "XS_sum") + repair_set(d_xs, n_scan_fast, "XS") + repair_set(d_xs2_sum, n_scan_fast, "XS2_sum") + repair_set(d_xs2, n_scan_fast, "XS2") + repair_set(fast_pos, n_scan_fast, "fast pos") + repair_set(slow_pos, n_scan_fast, "slow pos") + for sc in sclr_dict.values(): + repair_set(sc, n_scan_fast, "sclr") + + pos_pos = np.zeros((2, n_recorded_events, n_scan_fast)) + if "x" in slow_key: + pos_pos[1, :, :] = fast_pos + pos_pos[0, :, :] = slow_pos + else: + pos_pos[0, :, :] = fast_pos + pos_pos[1, :, :] = slow_pos + pos_name = ["x_pos", "y_pos"] + + if n_recorded_events != n_scan_slow: + logger.error( + "The number of recorded events (%d) is not equal to the expected number of events (%d): " + "The scan is incomplete.", + n_recorded_events, + n_scan_slow, + ) + + # The following arrays may be empty if 'create_each_det == False' or the detector is not used. + d_xs = np.asarray(d_xs) + d_xs_sum = np.asarray(d_xs_sum) + d_xs2 = np.asarray(d_xs2) + d_xs2_sum = np.asarray(d_xs2_sum) + + sclr = np.zeros((n_recorded_events, n_scan_fast, len(sclr_name))) + for n, sname in enumerate(sclr_name): + sclr[:, :, n] = np.asarray(sclr_dict[sname]) + + # =================================================================== + # NEW SRX STEP SCAN + # =================================================================== + if scan_doc["type"] == "XRF_STEP": + # Define keys for motor data + fast_motor = scan_doc["fast_axis"]["motor_name"] + fast_key = fast_motor + "_user_setpoint" + slow_motor = scan_doc["slow_axis"]["motor_name"] + slow_key = slow_motor + "_user_setpoint" + + # Collect motor positions + fast_pos = hdr.data(fast_key, stream_name="primary", fill=True) + fast_pos = np.array(list(fast_pos)) + slow_pos = hdr.data(slow_key, stream_name="primary", fill=True) + slow_pos = np.array(list(slow_pos)) + + # Reshape motor positions + num_events = stop_doc["num_events"]["primary"] + n_scan_slow, n_scan_fast = scan_doc["shape"] + if num_events != (n_scan_slow * n_scan_fast): + num_rows = num_events // n_scan_fast + 1 # number of rows + fast_pos = np.zeros((num_rows, n_scan_fast)) + slow_pos = np.zeros((num_rows, n_scan_fast)) + for i in range(num_rows): + for j in range(n_scan_fast): + fast_pos[i, j] = fast_pos[i * n_scan_fast + j] + slow_pos[i, j] = slow_pos[i * n_scan_fast + j] + else: + num_rows = n_scan_slow + fast_pos = np.reshape(fast_pos, (n_scan_slow, n_scan_fast)) + slow_pos = np.reshape(slow_pos, (n_scan_slow, n_scan_fast)) + + # Put into one array for h5 file + pos_pos = np.zeros((2, num_rows, n_scan_fast)) + if "x" in slow_key: + pos_pos[1, :, :] = fast_pos + pos_pos[0, :, :] = slow_pos + else: + pos_pos[0, :, :] = fast_pos + pos_pos[1, :, :] = slow_pos + pos_name = ["x_pos", "y_pos"] + + # Get detector data + keys = hdr.table().keys() + MAX_DET_ELEMENTS = 8 + N_xs, det_name_prefix, ndigits = None, None, 1 + for i in np.arange(1, MAX_DET_ELEMENTS + 1): + if f"xs_channel{i}" in keys: + N_xs, det_name_prefix, ndigits = i, "xs_channel", 1 + elif f"xs_channel{i:02d}" in keys: + N_xs, det_name_prefix, ndigits = i, "xs_channel", 2 + elif f"xs_channels_channel{i:02d}" in keys: + N_xs, det_name_prefix, ndigits = i, "xs_channels_channel", 2 + else: + break + N_pts = num_events + N_bins = 4096 + if "xs" in dets or "xs4" in dets: + d_xs = np.empty((N_xs, N_pts, N_bins)) + for i in np.arange(0, N_xs): + chnum = f"{i + 1}" if ndigits == 1 else f"{i + 1:02d}" + dname = det_name_prefix + chnum + + d = hdr.data(dname, fill=True) + d = np.array(list(d)) + d_xs[i, :, :] = np.copy(d) + del d + # Reshape data + if num_events != (n_scan_slow * n_scan_fast): + tmp = np.zeros((N_xs, num_rows, n_scan_fast, N_bins)) + for i in range(num_rows): + for j in range(n_scan_fast): + tmp[:, i, j, :] = fast_pos[:, i * n_scan_fast + j, :] + d_xs = np.copy(tmp) + del tmp + else: + d_xs = np.reshape(d_xs, (N_xs, n_scan_slow, n_scan_fast, N_bins)) + # Sum data + d_xs_sum = np.squeeze(np.sum(d_xs, axis=0)) + + # Scaler list + sclr_list = ["sclr_i0", "sclr_im", "sclr_it"] + sclr_name = [] + for s in sclr_list: + if s in keys: + sclr_name.append(s) + sclr = np.array(hdr.table()[sclr_name].values) + # Reshape data + if num_events != (n_scan_slow * n_scan_fast): + tmp = np.zeros((num_rows, n_scan_fast)) + for i in range(num_rows): + for j in range(n_scan_fast): + tmp[i, j] = fast_pos[i * n_scan_fast + j] + sclr = np.copy(tmp) + del tmp + else: + sclr = np.reshape(sclr, (n_scan_slow, n_scan_fast, len(sclr_name))) + + # Consider snake + # pos_pos, d_xs, d_xs_sum, sclr + if snaking_enabled: + pos_pos[:, 1::2, :] = pos_pos[:, 1::2, ::-1] + if "xs" in dets or "xs4" in dets: + if d_xs.size: + d_xs[:, 1::2, :, :] = d_xs[:, 1::2, ::-1, :] + if d_xs_sum.size: + d_xs_sum[1::2, :, :] = d_xs_sum[1::2, ::-1, :] + if "xs2" in dets: + if d_xs2.size: + d_xs2[:, 1::2, :, :] = d_xs2[:, 1::2, ::-1, :] + if d_xs2_sum.size: + d_xs2_sum[1::2, :, :] = d_xs2_sum[1::2, ::-1, :] + sclr[1::2, :, :] = sclr[1::2, ::-1, :] + + def swap_axes(): + nonlocal pos_name, pos_pos, d_xs, d_xs_sum, d_xs2, d_xs2_sum, sclr + # Need to swapaxes on pos_pos, d_xs, d_xs_sum, sclr + pos_name = pos_name[::-1] + pos_pos = np.swapaxes(pos_pos, 1, 2) + if "xs" in dets or "xs4" in dets: + if d_xs.size: + d_xs = np.swapaxes(d_xs, 0, 1) + if d_xs_sum.size: + d_xs_sum = np.swapaxes(d_xs_sum, 0, 1) + if "xs2" in dets: + if d_xs2.size: + d_xs2 = np.swapaxes(d_xs2, 0, 1) + if d_xs2_sum.size: + d_xs2_sum = np.swapaxes(d_xs2_sum, 0, 1) + sclr = np.swapaxes(sclr, 0, 1) + + if scan_doc["type"] == "XRF_FLY": + if fast_motor in ("nano_stage_sy", "nano_stage_y"): + swap_axes() + elif scan_doc["type"] == "XRF_STEP": + if "xs" in dets or "xs4" in dets: + d_xs = np.swapaxes(d_xs, 0, 1) + d_xs = np.swapaxes(d_xs, 1, 2) + if "xs2" in dets: + d_xs2 = np.swapaxes(d_xs2, 0, 1) + d_xs2 = np.swapaxes(d_xs2, 1, 2) + if fast_motor not in ("nano_stage_sy", "nano_stage_y"): + swap_axes() + pos_name = pos_name[::-1] # Swap the positions back + else: + pos_name = pos_name[::-1] # Swap the positions back + + print("Data is loaded successfully. Preparing to save data ...") + + data_output = [] + + for detector_name in dets: + if detector_name in ("xs", "xs4"): + tmp_data = d_xs + tmp_data_sum = d_xs_sum + num_det = N_xs + elif detector_name == "xs2": + tmp_data = d_xs2 + tmp_data_sum = d_xs2_sum + num_det = N_xs2 + + loaded_data = {} + loaded_data["det_sum"] = tmp_data_sum + if create_each_det: + for i in range(num_det): + loaded_data["det" + str(i + 1)] = np.squeeze(tmp_data[:, :, i, :]) + + if save_scaler: + loaded_data["scaler_data"] = sclr + loaded_data["scaler_names"] = sclr_name + + loaded_data["pos_data"] = pos_pos + loaded_data["pos_names"] = pos_name + + # Generate the default file name for the scan + if fpath is None: + fpath = f"scan2D_{runid}.h5" + + # Modify file name (path) to include data on how many channels are included in the file and how many + # channels are used for sum calculation + root, ext = os.path.splitext(fpath) + s = f"_{detector_name}_sum{num_det}ch" + if create_each_det: + s += f"+{num_det}ch" + fpath_out = f"{root}{s}{ext}" + + if output_to_file: + # output to file + print(f"Saving data to hdf file '{fpath_out}': Detector: {detector_name}.") + fpath_out = save_data_to_hdf5( + fpath_out, + loaded_data, + metadata=mdata, + fname_add_version=fname_add_version, + file_overwrite_existing=file_overwrite_existing, + create_each_det=create_each_det, + ) + + d_dict = { + "dataset": loaded_data, + "file_name": fpath_out, + "detector_name": detector_name, + "metadata": mdata, + } + data_output.append(d_dict) + + return data_output + + def map_data2D_tes( run_id_uid, fpath, @@ -3189,7 +3749,7 @@ def free_memory_from_handler(): """ # The following check is redundant: Data Broker prior to version 1.0.0 always has '_handler_cache'. # In later versions of databroker the attribute may still be present if 'databroker.v0' is used. - if (LooseVersion(databroker.__version__) < LooseVersion("1.0.0")) or hasattr(db.fs, "_handler_cache"): + if (LooseVersion(databroker.__version__) < LooseVersion("1.0.0")) or hasattr(db, "fs") and hasattr(db.fs, "_handler_cache"): for h in db.fs._handler_cache.values(): setattr(h, "_dataset", None) print("Memory is released (Databroker v0).") From 5535e32f6d49d9b89f2ae5b5e9df40d38ccd96c9 Mon Sep 17 00:00:00 2001 From: Dmitri Gavrilov Date: Sun, 28 Jan 2024 18:35:39 -0500 Subject: [PATCH 03/16] ENH: reading data from tiled --- pyxrf/core/map_processing.py | 2 +- pyxrf/model/load_data_from_db.py | 525 +++++++++++++++++++------------ 2 files changed, 318 insertions(+), 209 deletions(-) diff --git a/pyxrf/core/map_processing.py b/pyxrf/core/map_processing.py index 2bcaa7ed..3fdb53c9 100644 --- a/pyxrf/core/map_processing.py +++ b/pyxrf/core/map_processing.py @@ -1238,7 +1238,7 @@ def snip_method_numba( else: iter_num = _default_iter_num_bin - # np.array(spectrum) is not supported by numba so we have to us this: + # np.array(spectrum) is not supported by numba so we have to use this: background = np.asarray(spectrum).copy() n_background = background.size diff --git a/pyxrf/model/load_data_from_db.py b/pyxrf/model/load_data_from_db.py index 88da0d0a..06851908 100644 --- a/pyxrf/model/load_data_from_db.py +++ b/pyxrf/model/load_data_from_db.py @@ -16,6 +16,7 @@ import numpy as np import pandas as pd from event_model import Filler +import dask.array as da try: import databroker @@ -677,6 +678,34 @@ def _get_metadata_all_from_descriptor_document(hdr, *, data_key, stream_name="ba return value +def _get_metadata_value_from_descriptor_document_tiled(hdr, *, data_key, stream_name="baseline"): + """ + Returns the first occurrence of the variable with the name ``data_key`` in + specified document stream. Returns ``None`` if the variable is not found + """ + value = None + try: + value = list(hdr[stream_name][data_key])[0] + except Exception: + pass + + return value + + +def _get_metadata_all_from_descriptor_document_tiled(hdr, *, data_key, stream_name="baseline"): + """ + Returns the list of the recorded values of variables with the name ``data_key`` in + specified document stream. Returns ``None`` if the variable is not found + """ + value = [] + try: + value = list(hdr[stream_name][data_key]) + except Exception: + pass + + return value or None # Replace [] with None + + def map_data2D_hxn( run_id_uid, fpath, @@ -2201,16 +2230,20 @@ def map_data2D_srx_new_tiled( "Scan is not successfully completed. Only successfully completed scans are currently processed." ) + md_version = start_doc["md_version"] scan_doc = start_doc["scan"] stop_doc = hdr.stop + import pprint ## + print(f"=== start_doc={pprint.pformat(start_doc)}") ## + # The following scan parameters are used to compute positions for some motors. # (Positions of course stages are not saved during the scan) fast_start, fast_stop, fast_pts, slow_start, slow_stop, slow_pts = scan_doc["scan_input"][:6] fast_step = (fast_stop - fast_start) / fast_pts slow_step = (slow_stop - slow_start) / slow_pts - snaking_enabled = scan_doc["snake"] == 1 + snaking_enabled = bool(scan_doc["snake"]) print(f"Scan type: {scan_doc['type']}") @@ -2233,12 +2266,12 @@ def map_data2D_srx_new_tiled( # Get metadata mdata = _extract_metadata_from_header(hdr) - v = _get_metadata_value_from_descriptor_document(hdr, data_key="ring_current", stream_name="baseline") + v = _get_metadata_value_from_descriptor_document_tiled(hdr, data_key="ring_current", stream_name="baseline") if v is not None: mdata["instrument_beam_current"] = v for ax in ["X", "Y", "Z"]: - v = _get_metadata_all_from_descriptor_document( + v = _get_metadata_all_from_descriptor_document_tiled( hdr, data_key=f"nanoKB_interferometer_pos{ax}", stream_name="baseline" ) if v is not None: @@ -2278,169 +2311,57 @@ def map_data2D_srx_new_tiled( else: slow_key = slow_motor - # Let's get the data using the events! Yay! - filler = Filler(db.reg.handler_reg, inplace=True) - docs_stream0 = hdr.documents("stream0", fill=False) - docs_primary = hdr.documents("primary", fill=False) - d_xs, d_xs_sum, N_xs = [], [], 0 - d_xs2, d_xs2_sum, N_xs2 = [], [], 0 - sclr_list = ["i0", "i0_time", "time", "im", "it"] - sclr_dict = {} - fast_pos, slow_pos = [], [] - - n_recorded_events = 0 - - try: - m = 0 - while True: - try: - while True: - name, doc = next(docs_stream0) - try: - filler(name, doc) - except Exception: - pass # The document can not be filled. Leave it unfilled. - if name == "event": - break - except StopIteration: - break # All events are processed, exit the loop - - try: - while True: - name_p, doc_p = next(docs_primary) - try: - filler(name_p, doc_p) - except Exception: - pass # The document can not be filled. Leave it unfilled. - if name == "event": - break - except StopIteration: - raise RuntimeError(f"Matching event #{m} was not found in 'primary' stream") - - def data_or_empty_array(v): - """ - If data is a string, then return an empty array, otherwise return the data as numpy array. - """ - if isinstance(v, str): - v = [] - return np.asarray(v) - - v, vp = doc, doc_p - if "xs" in dets or "xs4" in dets: - event_data = data_or_empty_array(v["data"]["fluor"]) - if event_data.size: - event_data = np.asarray(event_data, dtype=np.float32) - N_xs = max(N_xs, event_data.shape[1]) - d_xs_sum.append(np.sum(event_data, axis=1)) - if create_each_det: - d_xs.append(event_data) - else: - # Unfilled document - d_xs_sum.append(event_data) - if create_each_det: - d_xs.append(event_data) - - if "xs2" in dets: - event_data = data_or_empty_array(v["data"]["fluor_xs2"]) - if event_data.size: - event_data = np.asarray(event_data, dtype=np.float32) - N_xs2 = max(N_xs2, event_data.shape[1]) - d_xs2_sum.append(np.sum(event_data, axis=1)) - if create_each_det: - d_xs2.append(event_data) - else: - # Unfilled document - d_xs2_sum.append(event_data) - if create_each_det: - d_xs2.append(event_data) - - keys = v["data"].keys() - for s in sclr_list: - if s in keys: - tmp = data_or_empty_array(v["data"][s]) - if s not in sclr_dict: - sclr_dict[s] = [tmp] - else: - sclr_dict[s].append(tmp) - - if fast_key == "fast_gen": - # Generate positions - row_pos_fast = np.arange(fast_pts) * fast_step + fast_start - if snaking_enabled and (n_recorded_events % 2): - row_pos_fast = np.flip(row_pos_fast) - else: - row_pos_fast = data_or_empty_array(v["data"][fast_key]) - fast_pos.append(row_pos_fast) - - if slow_key == "slow_gen": - # Generate positions - row_pos_slow = np.ones(fast_pts) * (n_recorded_events * slow_step + slow_start) - elif "enc" not in slow_key: - # vp = next(ep) - # filler("event", vp) - tmp = data_or_empty_array(vp["data"][slow_key]) - row_pos_slow = np.array([tmp] * n_scan_fast) - else: - row_pos_slow = np.array(data_or_empty_array(v["data"][slow_key])) - slow_pos.append(row_pos_slow) - - n_recorded_events = m + 1 - - if m > 0 and not (m % 10): - print(f"Processed lines: {m}") - - # Delete filled data (it will not be used anymore). This prevents memory leak. - if "data" in doc: - doc["data"].clear() - if "data" in doc_p: - doc_p["data"].clear() - filler.clear_handler_cache() - - m += 1 - - except Exception as ex: - logger.error(f"Error occurred while reading data: {ex}. Trying to retrieve available data ...") - - def repair_set(dset_list, n_row_pts, msg): - """ - Replaces corrupt rows (incorrect number of points) with closest 'good' row. This allows to load - and use data from corrupt scans. The function will have no effect on 'good' scans. - If there are no rows with correct number of points (unlikely case), then the array remains unchanged. - """ - missed_rows = [] - n_last_good_row = -1 - for n in range(len(dset_list)): - d = dset_list[n] - n_pts = d.shape[0] - if n_pts != n_row_pts: - print( - f"WARNING: ({msg}) Row #{n + 1} has {n_pts} data points. {n_row_pts} points are expected." - ) - if n_last_good_row == -1: - missed_rows.append(n) - else: - dset_list[n] = np.array(dset_list[n_last_good_row]) - print(f"({msg}) Data in row #{n + 1} is replaced by data from row #{n_last_good_row}") + # data_primary = hdr.primary["data"] + data_stream0 = hdr.stream0["data"] + + d_xs, d_xs_sum, N_xs, d_xs2, d_xs2_sum, N_xs2 = None, None, 0, None, None, 0 + if "xs_fluor" in data_stream0: + d_xs = data_stream0["xs_fluor"] + d_xs_sum = da.sum(d_xs, 2) + N_xs = d_xs.shape[2] + elif "fluor" in data_stream0: # Old format + d_xs = data_stream0["fluor"] + d_xs_sum = da.sum(d_xs, 2) + N_xs = d_xs.shape[2] + + if "xs_fluor_xs2" in data_stream0: + d_xs2 = data_stream0["xs_fluor_xs2"] + d_xs2_sum = da.sum(d_xs2, 2) + N_xs2 = d_xs2.shape[2] + elif "fluor_xs2" in data_stream0: # Old format + d_xs2 = data_stream0["fluor_xs2"] + d_xs2_sum = da.sum(d_xs2, 2) + N_xs2 = d_xs2.shape[2] + + if not create_each_det: + d_xs, d_xs2 = None, None + + if d_xs_sum is None and d_xs2_sum is None: + raise ValueError(f"No fluorescence data is found for the experiment {run_id_uid!r}") + + sclr_list = ["i0", "i0_time", "im", "it"] + sclr_dict = dict() + for k in sclr_list: + if k in data_stream0: + if k not in sclr_dict: + sclr_dict[k] = [data_stream0[k]] else: - n_last_good_row = n - if missed_rows: - for nr in missed_rows: - dset_list[nr] = np.array(dset_list[n_last_good_row]) - print(f"({msg}) Data in row #{nr + 1} is replaced by data from row #{n_last_good_row}") - missed_rows = [] + sclr_dict[k].append(data_stream0[k]) - sclr_name = list(sclr_dict.keys()) - - repair_set(d_xs_sum, n_scan_fast, "XS_sum") - repair_set(d_xs, n_scan_fast, "XS") - repair_set(d_xs2_sum, n_scan_fast, "XS2_sum") - repair_set(d_xs2, n_scan_fast, "XS2") - repair_set(fast_pos, n_scan_fast, "fast pos") - repair_set(slow_pos, n_scan_fast, "slow pos") - for sc in sclr_dict.values(): - repair_set(sc, n_scan_fast, "sclr") - - pos_pos = np.zeros((2, n_recorded_events, n_scan_fast)) + if fast_key != "fast_gen": + fast_pos = data_stream0[fast_key] + slow_pos = data_stream0[slow_key] + else: + row_pos_fast = da.arange(fast_pts) * fast_step + fast_start + fast_pos = da.broadcast_to(row_pos_fast, (slow_pts, fast_pts)) + if snaking_enabled: + rows_to_flip = list(range(1, 2, slow_pts)) # Flip 'even' rows (numbers are 0-based) + fast_pos[rows_to_flip, :] = da.fliplr(fast_pos[rows_to_flip, :]) + col_pos_slow = da.arange(slow_pts) * slow_step + slow_start + slow_pos = da.transpose(da.broadcast_to(col_pos_slow, (fast_pts, slow_pts))) + + n_events, n_points = fast_pos.shape[0], fast_pos.shape[1] + pos_pos = da.zeros((2, n_events, n_points), dtype=np.float32) if "x" in slow_key: pos_pos[1, :, :] = fast_pos pos_pos[0, :, :] = slow_pos @@ -2449,23 +2370,210 @@ def repair_set(dset_list, n_row_pts, msg): pos_pos[1, :, :] = slow_pos pos_name = ["x_pos", "y_pos"] - if n_recorded_events != n_scan_slow: - logger.error( - "The number of recorded events (%d) is not equal to the expected number of events (%d): " - "The scan is incomplete.", - n_recorded_events, - n_scan_slow, - ) - - # The following arrays may be empty if 'create_each_det == False' or the detector is not used. - d_xs = np.asarray(d_xs) - d_xs_sum = np.asarray(d_xs_sum) - d_xs2 = np.asarray(d_xs2) - d_xs2_sum = np.asarray(d_xs2_sum) - - sclr = np.zeros((n_recorded_events, n_scan_fast, len(sclr_name))) - for n, sname in enumerate(sclr_name): - sclr[:, :, n] = np.asarray(sclr_dict[sname]) + sclr_names = list(sclr_dict.keys()) + sclr = da.zeros((n_events, n_points, len(sclr_names)), dtype=np.float32) + for n, sname in enumerate(sclr_names): + print(f"n={n} sname={sname}") ## + sclr[:, :, n] = da.asarray(sclr_dict[sname]) + + # # The following arrays may be empty if 'create_each_det == False' or the detector is not used. + # d_xs = np.asarray(d_xs) + # d_xs_sum = np.asarray(d_xs_sum) + # d_xs2 = np.asarray(d_xs2) + # d_xs2_sum = np.asarray(d_xs2_sum) + + # sclr = np.zeros((n_recorded_events, n_scan_fast, len(sclr_name))) + # for n, sname in enumerate(sclr_name): + # sclr[:, :, n] = np.asarray(sclr_dict[sname]) + + # # Let's get the data using the events! Yay! + # filler = Filler(db.reg.handler_reg, inplace=True) + # docs_stream0 = hdr.documents("stream0", fill=False) + # docs_primary = hdr.documents("primary", fill=False) + # d_xs, d_xs_sum, N_xs = [], [], 0 + # d_xs2, d_xs2_sum, N_xs2 = [], [], 0 + # sclr_list = ["i0", "i0_time", "time", "im", "it"] + # sclr_dict = {} + # fast_pos, slow_pos = [], [] + + # n_recorded_events = 0 + + # try: + # m = 0 + # while True: + # try: + # while True: + # name, doc = next(docs_stream0) + # try: + # filler(name, doc) + # except Exception: + # pass # The document can not be filled. Leave it unfilled. + # if name == "event": + # break + # except StopIteration: + # break # All events are processed, exit the loop + + # try: + # while True: + # name_p, doc_p = next(docs_primary) + # try: + # filler(name_p, doc_p) + # except Exception: + # pass # The document can not be filled. Leave it unfilled. + # if name == "event": + # break + # except StopIteration: + # raise RuntimeError(f"Matching event #{m} was not found in 'primary' stream") + + # def data_or_empty_array(v): + # """ + # If data is a string, then return an empty array, otherwise return the data as numpy array. + # """ + # if isinstance(v, str): + # v = [] + # return np.asarray(v) + + # v, vp = doc, doc_p + # if "xs" in dets or "xs4" in dets: + # event_data = data_or_empty_array(v["data"]["fluor"]) + # if event_data.size: + # event_data = np.asarray(event_data, dtype=np.float32) + # N_xs = max(N_xs, event_data.shape[1]) + # d_xs_sum.append(np.sum(event_data, axis=1)) + # if create_each_det: + # d_xs.append(event_data) + # else: + # # Unfilled document + # d_xs_sum.append(event_data) + # if create_each_det: + # d_xs.append(event_data) + + # if "xs2" in dets: + # event_data = data_or_empty_array(v["data"]["fluor_xs2"]) + # if event_data.size: + # event_data = np.asarray(event_data, dtype=np.float32) + # N_xs2 = max(N_xs2, event_data.shape[1]) + # d_xs2_sum.append(np.sum(event_data, axis=1)) + # if create_each_det: + # d_xs2.append(event_data) + # else: + # # Unfilled document + # d_xs2_sum.append(event_data) + # if create_each_det: + # d_xs2.append(event_data) + + # keys = v["data"].keys() + # for s in sclr_list: + # if s in keys: + # tmp = data_or_empty_array(v["data"][s]) + # if s not in sclr_dict: + # sclr_dict[s] = [tmp] + # else: + # sclr_dict[s].append(tmp) + + # if fast_key == "fast_gen": + # # Generate positions + # row_pos_fast = np.arange(fast_pts) * fast_step + fast_start + # if snaking_enabled and (n_recorded_events % 2): + # row_pos_fast = np.flip(row_pos_fast) + # else: + # row_pos_fast = data_or_empty_array(v["data"][fast_key]) + # fast_pos.append(row_pos_fast) + + # if slow_key == "slow_gen": + # # Generate positions + # row_pos_slow = np.ones(fast_pts) * (n_recorded_events * slow_step + slow_start) + # elif "enc" not in slow_key: + # # vp = next(ep) + # # filler("event", vp) + # tmp = data_or_empty_array(vp["data"][slow_key]) + # row_pos_slow = np.array([tmp] * n_scan_fast) + # else: + # row_pos_slow = np.array(data_or_empty_array(v["data"][slow_key])) + # slow_pos.append(row_pos_slow) + + # n_recorded_events = m + 1 + + # if m > 0 and not (m % 10): + # print(f"Processed lines: {m}") + + # # Delete filled data (it will not be used anymore). This prevents memory leak. + # if "data" in doc: + # doc["data"].clear() + # if "data" in doc_p: + # doc_p["data"].clear() + # filler.clear_handler_cache() + + # m += 1 + + # except Exception as ex: + # logger.error(f"Error occurred while reading data: {ex}. Trying to retrieve available data ...") + + # def repair_set(dset_list, n_row_pts, msg): + # """ + # Replaces corrupt rows (incorrect number of points) with closest 'good' row. This allows to load + # and use data from corrupt scans. The function will have no effect on 'good' scans. + # If there are no rows with correct number of points (unlikely case), then the array remains unchanged. + # """ + # missed_rows = [] + # n_last_good_row = -1 + # for n in range(len(dset_list)): + # d = dset_list[n] + # n_pts = d.shape[0] + # if n_pts != n_row_pts: + # print( + # f"WARNING: ({msg}) Row #{n + 1} has {n_pts} data points. {n_row_pts} points are expected." + # ) + # if n_last_good_row == -1: + # missed_rows.append(n) + # else: + # dset_list[n] = np.array(dset_list[n_last_good_row]) + # print(f"({msg}) Data in row #{n + 1} is replaced by data from row #{n_last_good_row}") + # else: + # n_last_good_row = n + # if missed_rows: + # for nr in missed_rows: + # dset_list[nr] = np.array(dset_list[n_last_good_row]) + # print(f"({msg}) Data in row #{nr + 1} is replaced by data from row #{n_last_good_row}") + # missed_rows = [] + + # sclr_name = list(sclr_dict.keys()) + + # repair_set(d_xs_sum, n_scan_fast, "XS_sum") + # repair_set(d_xs, n_scan_fast, "XS") + # repair_set(d_xs2_sum, n_scan_fast, "XS2_sum") + # repair_set(d_xs2, n_scan_fast, "XS2") + # repair_set(fast_pos, n_scan_fast, "fast pos") + # repair_set(slow_pos, n_scan_fast, "slow pos") + # for sc in sclr_dict.values(): + # repair_set(sc, n_scan_fast, "sclr") + + # pos_pos = np.zeros((2, n_recorded_events, n_scan_fast)) + # if "x" in slow_key: + # pos_pos[1, :, :] = fast_pos + # pos_pos[0, :, :] = slow_pos + # else: + # pos_pos[0, :, :] = fast_pos + # pos_pos[1, :, :] = slow_pos + # pos_name = ["x_pos", "y_pos"] + + # if n_recorded_events != n_scan_slow: + # logger.error( + # "The number of recorded events (%d) is not equal to the expected number of events (%d): " + # "The scan is incomplete.", + # n_recorded_events, + # n_scan_slow, + # ) + + # # The following arrays may be empty if 'create_each_det == False' or the detector is not used. + # d_xs = np.asarray(d_xs) + # d_xs_sum = np.asarray(d_xs_sum) + # d_xs2 = np.asarray(d_xs2) + # d_xs2_sum = np.asarray(d_xs2_sum) + + # sclr = np.zeros((n_recorded_events, n_scan_fast, len(sclr_name))) + # for n, sname in enumerate(sclr_name): + # sclr[:, :, n] = np.asarray(sclr_dict[sname]) # =================================================================== # NEW SRX STEP SCAN @@ -2570,14 +2678,14 @@ def repair_set(dset_list, n_row_pts, msg): if snaking_enabled: pos_pos[:, 1::2, :] = pos_pos[:, 1::2, ::-1] if "xs" in dets or "xs4" in dets: - if d_xs.size: + if d_xs is not None: d_xs[:, 1::2, :, :] = d_xs[:, 1::2, ::-1, :] - if d_xs_sum.size: + if d_xs_sum is not None: d_xs_sum[1::2, :, :] = d_xs_sum[1::2, ::-1, :] if "xs2" in dets: - if d_xs2.size: + if d_xs2 is not None: d_xs2[:, 1::2, :, :] = d_xs2[:, 1::2, ::-1, :] - if d_xs2_sum.size: + if d_xs2_sum is not None: d_xs2_sum[1::2, :, :] = d_xs2_sum[1::2, ::-1, :] sclr[1::2, :, :] = sclr[1::2, ::-1, :] @@ -2585,18 +2693,18 @@ def swap_axes(): nonlocal pos_name, pos_pos, d_xs, d_xs_sum, d_xs2, d_xs2_sum, sclr # Need to swapaxes on pos_pos, d_xs, d_xs_sum, sclr pos_name = pos_name[::-1] - pos_pos = np.swapaxes(pos_pos, 1, 2) + pos_pos = da.swapaxes(pos_pos, 1, 2) if "xs" in dets or "xs4" in dets: - if d_xs.size: - d_xs = np.swapaxes(d_xs, 0, 1) - if d_xs_sum.size: - d_xs_sum = np.swapaxes(d_xs_sum, 0, 1) + if d_xs is not None: + d_xs = da.swapaxes(d_xs, 0, 1) + if d_xs_sum is not None: + d_xs_sum = da.swapaxes(d_xs_sum, 0, 1) if "xs2" in dets: - if d_xs2.size: - d_xs2 = np.swapaxes(d_xs2, 0, 1) - if d_xs2_sum.size: - d_xs2_sum = np.swapaxes(d_xs2_sum, 0, 1) - sclr = np.swapaxes(sclr, 0, 1) + if d_xs2 is not None: + d_xs2 = da.swapaxes(d_xs2, 0, 1) + if d_xs2_sum is not None: + d_xs2_sum = da.swapaxes(d_xs2_sum, 0, 1) + sclr = da.swapaxes(sclr, 0, 1) if scan_doc["type"] == "XRF_FLY": if fast_motor in ("nano_stage_sy", "nano_stage_y"): @@ -2617,28 +2725,29 @@ def swap_axes(): print("Data is loaded successfully. Preparing to save data ...") data_output = [] - - for detector_name in dets: - if detector_name in ("xs", "xs4"): - tmp_data = d_xs - tmp_data_sum = d_xs_sum + + for tmp_data, tmp_data_sum in ((d_xs, d_xs_sum), (d_xs2, d_xs2_sum)): + if tmp_data_sum is None: + continue + + if tmp_data_sum is d_xs_sum: + detector_name = "xs4" if "xs4" in dets else "xs" num_det = N_xs - elif detector_name == "xs2": - tmp_data = d_xs2 - tmp_data_sum = d_xs2_sum + else: + detector_name = "xs2" num_det = N_xs2 loaded_data = {} - loaded_data["det_sum"] = tmp_data_sum + loaded_data["det_sum"] = tmp_data_sum.compute() if create_each_det: for i in range(num_det): - loaded_data["det" + str(i + 1)] = np.squeeze(tmp_data[:, :, i, :]) + loaded_data["det" + str(i + 1)] = da.squeeze(tmp_data[:, :, i, :]).compute() if save_scaler: - loaded_data["scaler_data"] = sclr - loaded_data["scaler_names"] = sclr_name + loaded_data["scaler_data"] = sclr.compute() + loaded_data["scaler_names"] = sclr_names - loaded_data["pos_data"] = pos_pos + loaded_data["pos_data"] = pos_pos.compute() loaded_data["pos_names"] = pos_name # Generate the default file name for the scan From ea07f88ad9fb2ac12a3f7bbb00ab4a6eed530534 Mon Sep 17 00:00:00 2001 From: Dmitri Gavrilov Date: Sun, 28 Jan 2024 19:08:11 -0500 Subject: [PATCH 04/16] ENH: keep pos and sclr as np.float64 --- pyxrf/model/load_data_from_db.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyxrf/model/load_data_from_db.py b/pyxrf/model/load_data_from_db.py index 06851908..1b2809ce 100644 --- a/pyxrf/model/load_data_from_db.py +++ b/pyxrf/model/load_data_from_db.py @@ -2361,7 +2361,7 @@ def map_data2D_srx_new_tiled( slow_pos = da.transpose(da.broadcast_to(col_pos_slow, (fast_pts, slow_pts))) n_events, n_points = fast_pos.shape[0], fast_pos.shape[1] - pos_pos = da.zeros((2, n_events, n_points), dtype=np.float32) + pos_pos = da.zeros((2, n_events, n_points)) if "x" in slow_key: pos_pos[1, :, :] = fast_pos pos_pos[0, :, :] = slow_pos @@ -2371,7 +2371,7 @@ def map_data2D_srx_new_tiled( pos_name = ["x_pos", "y_pos"] sclr_names = list(sclr_dict.keys()) - sclr = da.zeros((n_events, n_points, len(sclr_names)), dtype=np.float32) + sclr = da.zeros((n_events, n_points, len(sclr_names))) for n, sname in enumerate(sclr_names): print(f"n={n} sname={sname}") ## sclr[:, :, n] = da.asarray(sclr_dict[sname]) From 8341a16aecae0e9eebb0f9efea1d764f9884670d Mon Sep 17 00:00:00 2001 From: Dmitri Gavrilov Date: Mon, 29 Jan 2024 15:03:49 -0500 Subject: [PATCH 05/16] ENH: do not use 'np.asarray' in the function compiled by Numba --- pyxrf/core/map_processing.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pyxrf/core/map_processing.py b/pyxrf/core/map_processing.py index 3fdb53c9..ead0782e 100644 --- a/pyxrf/core/map_processing.py +++ b/pyxrf/core/map_processing.py @@ -1239,7 +1239,8 @@ def snip_method_numba( iter_num = _default_iter_num_bin # np.array(spectrum) is not supported by numba so we have to use this: - background = np.asarray(spectrum).copy() + #background = np.asarray(spectrum).copy() # Also a problem (since Jan. 2024) + background = spectrum.copy() n_background = background.size energy = np.arange(n_background, dtype=np.float64) From 68f9f3a81a6dd59630228b3009943464c85de3bf Mon Sep 17 00:00:00 2001 From: Dmitri Gavrilov Date: Mon, 29 Jan 2024 15:09:48 -0500 Subject: [PATCH 06/16] ENH: remove commented code --- pyxrf/core/map_processing.py | 2 +- pyxrf/model/load_data_from_db.py | 201 +------------------------------ 2 files changed, 2 insertions(+), 201 deletions(-) diff --git a/pyxrf/core/map_processing.py b/pyxrf/core/map_processing.py index ead0782e..5401b599 100644 --- a/pyxrf/core/map_processing.py +++ b/pyxrf/core/map_processing.py @@ -1239,7 +1239,7 @@ def snip_method_numba( iter_num = _default_iter_num_bin # np.array(spectrum) is not supported by numba so we have to use this: - #background = np.asarray(spectrum).copy() # Also a problem (since Jan. 2024) + # background = np.asarray(spectrum).copy() # Also a problem (since Jan. 2024) background = spectrum.copy() n_background = background.size diff --git a/pyxrf/model/load_data_from_db.py b/pyxrf/model/load_data_from_db.py index 1b2809ce..d77284bb 100644 --- a/pyxrf/model/load_data_from_db.py +++ b/pyxrf/model/load_data_from_db.py @@ -1111,7 +1111,7 @@ def map_data2D_srx( catalog=catalog, ) elif using_tiled and not use_new_format: - print(f"Using Tiled and Old Format ...") + raise Exception("Loading data in 'Old' format using Tiled is not yet supported ...") elif use_new_format: return map_data2D_srx_new( run_id_uid=run_id_uid, @@ -2376,205 +2376,6 @@ def map_data2D_srx_new_tiled( print(f"n={n} sname={sname}") ## sclr[:, :, n] = da.asarray(sclr_dict[sname]) - # # The following arrays may be empty if 'create_each_det == False' or the detector is not used. - # d_xs = np.asarray(d_xs) - # d_xs_sum = np.asarray(d_xs_sum) - # d_xs2 = np.asarray(d_xs2) - # d_xs2_sum = np.asarray(d_xs2_sum) - - # sclr = np.zeros((n_recorded_events, n_scan_fast, len(sclr_name))) - # for n, sname in enumerate(sclr_name): - # sclr[:, :, n] = np.asarray(sclr_dict[sname]) - - # # Let's get the data using the events! Yay! - # filler = Filler(db.reg.handler_reg, inplace=True) - # docs_stream0 = hdr.documents("stream0", fill=False) - # docs_primary = hdr.documents("primary", fill=False) - # d_xs, d_xs_sum, N_xs = [], [], 0 - # d_xs2, d_xs2_sum, N_xs2 = [], [], 0 - # sclr_list = ["i0", "i0_time", "time", "im", "it"] - # sclr_dict = {} - # fast_pos, slow_pos = [], [] - - # n_recorded_events = 0 - - # try: - # m = 0 - # while True: - # try: - # while True: - # name, doc = next(docs_stream0) - # try: - # filler(name, doc) - # except Exception: - # pass # The document can not be filled. Leave it unfilled. - # if name == "event": - # break - # except StopIteration: - # break # All events are processed, exit the loop - - # try: - # while True: - # name_p, doc_p = next(docs_primary) - # try: - # filler(name_p, doc_p) - # except Exception: - # pass # The document can not be filled. Leave it unfilled. - # if name == "event": - # break - # except StopIteration: - # raise RuntimeError(f"Matching event #{m} was not found in 'primary' stream") - - # def data_or_empty_array(v): - # """ - # If data is a string, then return an empty array, otherwise return the data as numpy array. - # """ - # if isinstance(v, str): - # v = [] - # return np.asarray(v) - - # v, vp = doc, doc_p - # if "xs" in dets or "xs4" in dets: - # event_data = data_or_empty_array(v["data"]["fluor"]) - # if event_data.size: - # event_data = np.asarray(event_data, dtype=np.float32) - # N_xs = max(N_xs, event_data.shape[1]) - # d_xs_sum.append(np.sum(event_data, axis=1)) - # if create_each_det: - # d_xs.append(event_data) - # else: - # # Unfilled document - # d_xs_sum.append(event_data) - # if create_each_det: - # d_xs.append(event_data) - - # if "xs2" in dets: - # event_data = data_or_empty_array(v["data"]["fluor_xs2"]) - # if event_data.size: - # event_data = np.asarray(event_data, dtype=np.float32) - # N_xs2 = max(N_xs2, event_data.shape[1]) - # d_xs2_sum.append(np.sum(event_data, axis=1)) - # if create_each_det: - # d_xs2.append(event_data) - # else: - # # Unfilled document - # d_xs2_sum.append(event_data) - # if create_each_det: - # d_xs2.append(event_data) - - # keys = v["data"].keys() - # for s in sclr_list: - # if s in keys: - # tmp = data_or_empty_array(v["data"][s]) - # if s not in sclr_dict: - # sclr_dict[s] = [tmp] - # else: - # sclr_dict[s].append(tmp) - - # if fast_key == "fast_gen": - # # Generate positions - # row_pos_fast = np.arange(fast_pts) * fast_step + fast_start - # if snaking_enabled and (n_recorded_events % 2): - # row_pos_fast = np.flip(row_pos_fast) - # else: - # row_pos_fast = data_or_empty_array(v["data"][fast_key]) - # fast_pos.append(row_pos_fast) - - # if slow_key == "slow_gen": - # # Generate positions - # row_pos_slow = np.ones(fast_pts) * (n_recorded_events * slow_step + slow_start) - # elif "enc" not in slow_key: - # # vp = next(ep) - # # filler("event", vp) - # tmp = data_or_empty_array(vp["data"][slow_key]) - # row_pos_slow = np.array([tmp] * n_scan_fast) - # else: - # row_pos_slow = np.array(data_or_empty_array(v["data"][slow_key])) - # slow_pos.append(row_pos_slow) - - # n_recorded_events = m + 1 - - # if m > 0 and not (m % 10): - # print(f"Processed lines: {m}") - - # # Delete filled data (it will not be used anymore). This prevents memory leak. - # if "data" in doc: - # doc["data"].clear() - # if "data" in doc_p: - # doc_p["data"].clear() - # filler.clear_handler_cache() - - # m += 1 - - # except Exception as ex: - # logger.error(f"Error occurred while reading data: {ex}. Trying to retrieve available data ...") - - # def repair_set(dset_list, n_row_pts, msg): - # """ - # Replaces corrupt rows (incorrect number of points) with closest 'good' row. This allows to load - # and use data from corrupt scans. The function will have no effect on 'good' scans. - # If there are no rows with correct number of points (unlikely case), then the array remains unchanged. - # """ - # missed_rows = [] - # n_last_good_row = -1 - # for n in range(len(dset_list)): - # d = dset_list[n] - # n_pts = d.shape[0] - # if n_pts != n_row_pts: - # print( - # f"WARNING: ({msg}) Row #{n + 1} has {n_pts} data points. {n_row_pts} points are expected." - # ) - # if n_last_good_row == -1: - # missed_rows.append(n) - # else: - # dset_list[n] = np.array(dset_list[n_last_good_row]) - # print(f"({msg}) Data in row #{n + 1} is replaced by data from row #{n_last_good_row}") - # else: - # n_last_good_row = n - # if missed_rows: - # for nr in missed_rows: - # dset_list[nr] = np.array(dset_list[n_last_good_row]) - # print(f"({msg}) Data in row #{nr + 1} is replaced by data from row #{n_last_good_row}") - # missed_rows = [] - - # sclr_name = list(sclr_dict.keys()) - - # repair_set(d_xs_sum, n_scan_fast, "XS_sum") - # repair_set(d_xs, n_scan_fast, "XS") - # repair_set(d_xs2_sum, n_scan_fast, "XS2_sum") - # repair_set(d_xs2, n_scan_fast, "XS2") - # repair_set(fast_pos, n_scan_fast, "fast pos") - # repair_set(slow_pos, n_scan_fast, "slow pos") - # for sc in sclr_dict.values(): - # repair_set(sc, n_scan_fast, "sclr") - - # pos_pos = np.zeros((2, n_recorded_events, n_scan_fast)) - # if "x" in slow_key: - # pos_pos[1, :, :] = fast_pos - # pos_pos[0, :, :] = slow_pos - # else: - # pos_pos[0, :, :] = fast_pos - # pos_pos[1, :, :] = slow_pos - # pos_name = ["x_pos", "y_pos"] - - # if n_recorded_events != n_scan_slow: - # logger.error( - # "The number of recorded events (%d) is not equal to the expected number of events (%d): " - # "The scan is incomplete.", - # n_recorded_events, - # n_scan_slow, - # ) - - # # The following arrays may be empty if 'create_each_det == False' or the detector is not used. - # d_xs = np.asarray(d_xs) - # d_xs_sum = np.asarray(d_xs_sum) - # d_xs2 = np.asarray(d_xs2) - # d_xs2_sum = np.asarray(d_xs2_sum) - - # sclr = np.zeros((n_recorded_events, n_scan_fast, len(sclr_name))) - # for n, sname in enumerate(sclr_name): - # sclr[:, :, n] = np.asarray(sclr_dict[sname]) - # =================================================================== # NEW SRX STEP SCAN # =================================================================== From dac3cf397223212c734868f75ec99524d37277f3 Mon Sep 17 00:00:00 2001 From: Dmitri Gavrilov Date: Wed, 31 Jan 2024 16:27:56 -0500 Subject: [PATCH 07/16] ENH: reading valid data works --- pyxrf/db_config/srx_db_config.py | 1 - pyxrf/model/catalog_management.py | 20 +++++ pyxrf/model/load_data_from_db.py | 137 ++++++++++++++---------------- pyxrf/pyxrf_run.py | 19 ++++- 4 files changed, 98 insertions(+), 79 deletions(-) create mode 100644 pyxrf/model/catalog_management.py diff --git a/pyxrf/db_config/srx_db_config.py b/pyxrf/db_config/srx_db_config.py index f80f27a5..2e826fcc 100644 --- a/pyxrf/db_config/srx_db_config.py +++ b/pyxrf/db_config/srx_db_config.py @@ -1,4 +1,3 @@ - # import h5py # try: diff --git a/pyxrf/model/catalog_management.py b/pyxrf/model/catalog_management.py new file mode 100644 index 00000000..cf025b9a --- /dev/null +++ b/pyxrf/model/catalog_management.py @@ -0,0 +1,20 @@ +class CatalogInfo: + def __init__(self): + self._name = None + + @property + def name(self): + return self._name + + def set_name(self, name): + self._name = name + + +catalog_info = CatalogInfo() + + +def get_catalog(catalog_name): + from tiled.client import from_uri + + c = from_uri("https://tiled.nsls2.bnl.gov") + return c[catalog_name.lower()]["raw"] diff --git a/pyxrf/model/load_data_from_db.py b/pyxrf/model/load_data_from_db.py index d77284bb..a6590c17 100644 --- a/pyxrf/model/load_data_from_db.py +++ b/pyxrf/model/load_data_from_db.py @@ -7,16 +7,17 @@ import multiprocessing import os import platform +import pprint import re import time as ttime import warnings from distutils.version import LooseVersion +import dask.array as da import h5py import numpy as np import pandas as pd from event_model import Filler -import dask.array as da try: import databroker @@ -26,6 +27,7 @@ import pyxrf from ..core.utils import convert_time_to_nexus_string +from .catalog_management import catalog_info, get_catalog from .scan_metadata import ScanMetadataXRF pyxrf_version = pyxrf.__version__ @@ -35,55 +37,48 @@ sep_v = os.sep - -def get_catalog(catalog_name): - from tiled.client import from_uri - c = from_uri('https://tiled.nsls2.bnl.gov') - return c[catalog_name.lower()]['raw'] - - try: - catalog_name = None - - # Attempt to find the configuration file first - config_path = "/etc/pyxrf/pyxrf.json" - if os.path.isfile(config_path): - try: - with open(config_path, "r") as beamline_pyxrf: - beamline_config_pyxrf = json.load(beamline_pyxrf) - catalog_name = beamline_config_pyxrf["beamline_name"] - except Exception as ex: - raise IOError(f"Error while opening configuration file {config_path!r}") from ex + logger.info(f"Opening catalog: {catalog_info.name!r}") + if not catalog_info.name: + # Attempt to find the configuration file first + config_path = "/etc/pyxrf/pyxrf.json" + if os.path.isfile(config_path): + try: + with open(config_path, "r") as beamline_pyxrf: + beamline_config_pyxrf = json.load(beamline_pyxrf) + catalog_info.set_name(beamline_config_pyxrf["beamline_name"]) + except Exception as ex: + raise IOError(f"Error while opening configuration file {config_path!r}") from ex - else: - # Otherwise try to identify the beamline using host name - hostname = platform.node() - catalog_names = { - "xf03id": "HXN", - "xf05id": "SRX", - "xf08bm": "TES", - "xf04bm": "XFM", - } + else: + # Otherwise try to identify the beamline using host name + hostname = platform.node() + catalog_names = { + "xf03id": "HXN", + "xf05id": "SRX", + "xf08bm": "TES", + "xf04bm": "XFM", + } - for k, v in catalog_names.items(): - if hostname.startswith(k): - catalog_name = v + for k, v in catalog_names.items(): + if hostname.startswith(k): + catalog_info.set_name(v) - if catalog_name is None: + if not catalog_info.name: raise Exception("Beamline is not identified") - if catalog_name.upper() == "HXN": + if catalog_info.name.upper() == "HXN": from pyxrf.db_config.hxn_db_config import db - elif catalog_name.upper() == "SRX": - db = get_catalog('srx') - elif catalog_name.upper() == "XFM": + elif catalog_info.name.upper() == "SRX": + db = get_catalog("srx") + elif catalog_info.name.upper() == "XFM": from pyxrf.db_config.xfm_db_config import db - elif catalog_name.upper() == "TES": + elif catalog_info.name.upper() == "TES": from pyxrf.db_config.tes_db_config import db else: db = None db_analysis = None - print(f"Beamline Database is not used in pyxrf: unknown catalog {catalog_name!r}") + print(f"Beamline Database is not used in pyxrf: unknown catalog {catalog_info.name!r}") except Exception as ex: db = None @@ -153,7 +148,6 @@ def fetch_run_info(run_id_uid, catalog_name=None): failed to fetch the run from Databroker """ try: - if catalog_name: catalog = get_catalog(catalog_name) else: @@ -233,7 +227,7 @@ def fetch_data_from_db( skip_scan_types: list(str) or None list of plan type names to ignore, e.g. ['FlyPlan1D']. (Supported only at HXN.) catalog_name: str or None - + Returns ------- dict of data in 2D format matching x,y scanning positions @@ -1082,7 +1076,7 @@ def map_data2D_srx( num_end_lines_excluded : int, optional remove the last few bad lines catalog - reference to databroker catalog + reference to databroker catalog Returns ------- @@ -2218,10 +2212,10 @@ def map_data2D_srx_new_tiled( hdr = catalog[run_id_uid] start_doc = hdr.start runid = start_doc["scan_id"] # Replace with the true value (runid may be relative, such as -2) + md_version = start_doc["md_version"] - print("**********************************************************") - print(f"Loading scan #{runid}") - print(f"Scan metadata format: version {start_doc['md_version']}") + logger.info("Loading scan %r", runid) + logger.info("Metadata version: %r", md_version) if completed_scans_only and not _is_scan_complete(hdr): raise Exception("Scan is incomplete. Only completed scans are currently processed.") @@ -2230,12 +2224,10 @@ def map_data2D_srx_new_tiled( "Scan is not successfully completed. Only successfully completed scans are currently processed." ) - md_version = start_doc["md_version"] scan_doc = start_doc["scan"] stop_doc = hdr.stop - import pprint ## - print(f"=== start_doc={pprint.pformat(start_doc)}") ## + logger.info("Start document:\n%s", pprint.pformat(start_doc)) # The following scan parameters are used to compute positions for some motors. # (Positions of course stages are not saved during the scan) @@ -2356,9 +2348,9 @@ def map_data2D_srx_new_tiled( fast_pos = da.broadcast_to(row_pos_fast, (slow_pts, fast_pts)) if snaking_enabled: rows_to_flip = list(range(1, 2, slow_pts)) # Flip 'even' rows (numbers are 0-based) - fast_pos[rows_to_flip, :] = da.fliplr(fast_pos[rows_to_flip, :]) + fast_pos[rows_to_flip, :] = da.fliplr(fast_pos[rows_to_flip, :]) col_pos_slow = da.arange(slow_pts) * slow_step + slow_start - slow_pos = da.transpose(da.broadcast_to(col_pos_slow, (fast_pts, slow_pts))) + slow_pos = da.transpose(da.broadcast_to(col_pos_slow, (fast_pts, slow_pts))) n_events, n_points = fast_pos.shape[0], fast_pos.shape[1] pos_pos = da.zeros((2, n_events, n_points)) @@ -2373,13 +2365,14 @@ def map_data2D_srx_new_tiled( sclr_names = list(sclr_dict.keys()) sclr = da.zeros((n_events, n_points, len(sclr_names))) for n, sname in enumerate(sclr_names): - print(f"n={n} sname={sname}") ## sclr[:, :, n] = da.asarray(sclr_dict[sname]) # =================================================================== # NEW SRX STEP SCAN # =================================================================== if scan_doc["type"] == "XRF_STEP": + data_primary = hdr.primary["data"] + # Define keys for motor data fast_motor = scan_doc["fast_axis"]["motor_name"] fast_key = fast_motor + "_user_setpoint" @@ -2387,29 +2380,21 @@ def map_data2D_srx_new_tiled( slow_key = slow_motor + "_user_setpoint" # Collect motor positions - fast_pos = hdr.data(fast_key, stream_name="primary", fill=True) - fast_pos = np.array(list(fast_pos)) - slow_pos = hdr.data(slow_key, stream_name="primary", fill=True) - slow_pos = np.array(list(slow_pos)) + fast_pos = data_primary[fast_key] + slow_pos = data_primary[fast_key] # Reshape motor positions num_events = stop_doc["num_events"]["primary"] - n_scan_slow, n_scan_fast = scan_doc["shape"] - if num_events != (n_scan_slow * n_scan_fast): - num_rows = num_events // n_scan_fast + 1 # number of rows - fast_pos = np.zeros((num_rows, n_scan_fast)) - slow_pos = np.zeros((num_rows, n_scan_fast)) - for i in range(num_rows): - for j in range(n_scan_fast): - fast_pos[i, j] = fast_pos[i * n_scan_fast + j] - slow_pos[i, j] = slow_pos[i * n_scan_fast + j] - else: - num_rows = n_scan_slow - fast_pos = np.reshape(fast_pos, (n_scan_slow, n_scan_fast)) - slow_pos = np.reshape(slow_pos, (n_scan_slow, n_scan_fast)) + _, n_scan_fast = scan_doc["shape"] + num_rows = len(fast_pos) / n_scan_fast + n_scan_total = n_scan_fast * num_rows + fast_pos = fast_pos[:, n_scan_total] + slow_pos = slow_pos[:, n_scan_total] + fast_pos = da.reshape(fast_pos, (n_scan_slow, n_scan_fast)) + slow_pos = da.reshape(slow_pos, (n_scan_slow, n_scan_fast)) # Put into one array for h5 file - pos_pos = np.zeros((2, num_rows, n_scan_fast)) + pos_pos = da.zeros((2, num_rows, n_scan_fast)) if "x" in slow_key: pos_pos[1, :, :] = fast_pos pos_pos[0, :, :] = slow_pos @@ -2419,8 +2404,9 @@ def map_data2D_srx_new_tiled( pos_name = ["x_pos", "y_pos"] # Get detector data - keys = hdr.table().keys() + keys = list(data_primary) MAX_DET_ELEMENTS = 8 + # !!!!!! The following code for stepscan needs to be revised (still the old code) N_xs, det_name_prefix, ndigits = None, None, 1 for i in np.arange(1, MAX_DET_ELEMENTS + 1): if f"xs_channel{i}" in keys: @@ -2526,11 +2512,11 @@ def swap_axes(): print("Data is loaded successfully. Preparing to save data ...") data_output = [] - + for tmp_data, tmp_data_sum in ((d_xs, d_xs_sum), (d_xs2, d_xs2_sum)): if tmp_data_sum is None: continue - + if tmp_data_sum is d_xs_sum: detector_name = "xs4" if "xs4" in dets else "xs" num_det = N_xs @@ -2805,7 +2791,6 @@ def _get_row_len(row_data): m, n_pt_max, missing_rows = 0, -1, [] # m - index try: while True: - print("1") ## try: while True: name, doc = next(docs_primary) @@ -2819,10 +2804,8 @@ def _get_row_len(row_data): except StopIteration: break # All events are processed, exit the loop - print("2") ## if is_filled: data = doc["data"][detector_field] - print(f"data={data}") ## data_det1 = np.array(data[:, 0, :], dtype=np.float32) # The following is the fix for the case when data has corrupt row (smaller number of data points). @@ -3659,7 +3642,11 @@ def free_memory_from_handler(): """ # The following check is redundant: Data Broker prior to version 1.0.0 always has '_handler_cache'. # In later versions of databroker the attribute may still be present if 'databroker.v0' is used. - if (LooseVersion(databroker.__version__) < LooseVersion("1.0.0")) or hasattr(db, "fs") and hasattr(db.fs, "_handler_cache"): + if ( + (LooseVersion(databroker.__version__) < LooseVersion("1.0.0")) + or hasattr(db, "fs") + and hasattr(db.fs, "_handler_cache") + ): for h in db.fs._handler_cache.values(): setattr(h, "_dataset", None) print("Memory is released (Databroker v0).") diff --git a/pyxrf/pyxrf_run.py b/pyxrf/pyxrf_run.py index 14d20b13..62b70522 100644 --- a/pyxrf/pyxrf_run.py +++ b/pyxrf/pyxrf_run.py @@ -7,9 +7,6 @@ from PyQt5.QtGui import QColor, QFontDatabase, QPalette from PyQt5.QtWidgets import QApplication, QStyleFactory -from .gui_module.main_window import MainWindow -from .gui_support.gpc_class import GlobalProcessingClasses - logger = logging.getLogger("pyxrf") # if hasattr(Qt, 'AA_EnableHighDpiScaling'): @@ -25,6 +22,14 @@ def run(): # faulthandler.enable() parser = argparse.ArgumentParser(prog="pyxrf", description="Command line arguments") + parser.add_argument( + "-c", + "--catalog-name", + default=None, + type=str, + dest="catalog_name", + help="Name of the Databroker catalog to open, e.g. 'srx'", + ) parser.add_argument( "-l", "--loglevel", @@ -45,6 +50,14 @@ def run(): ) args = parser.parse_args() + from .model.catalog_management import catalog_info + + if args.catalog_name: + catalog_info.set_name(args.catalog_name) + + from .gui_module.main_window import MainWindow + from .gui_support.gpc_class import GlobalProcessingClasses + # Setup the Logger logger.setLevel(args.loglevel) formatter = logging.Formatter(fmt="%(asctime)s : %(levelname)s : %(message)s") From 290cf2512821595d8da7bde304b1fdf405c55005 Mon Sep 17 00:00:00 2001 From: Dmitri Gavrilov Date: Thu, 1 Feb 2024 14:27:00 -0500 Subject: [PATCH 08/16] ENH: remove NaNs from corrupt data --- pyxrf/model/load_data_from_db.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/pyxrf/model/load_data_from_db.py b/pyxrf/model/load_data_from_db.py index a6590c17..6e7618de 100644 --- a/pyxrf/model/load_data_from_db.py +++ b/pyxrf/model/load_data_from_db.py @@ -2524,16 +2524,22 @@ def swap_axes(): detector_name = "xs2" num_det = N_xs2 + # Replace NaNs with 0s (in corrupt data rows). loaded_data = {} - loaded_data["det_sum"] = tmp_data_sum.compute() + loaded_data["det_sum"] = np.nan_to_num(tmp_data_sum.compute()) if create_each_det: for i in range(num_det): - loaded_data["det" + str(i + 1)] = da.squeeze(tmp_data[:, :, i, :]).compute() + loaded_data["det" + str(i + 1)] = np.nan_to_num(da.squeeze(tmp_data[:, :, i, :]).compute()) if save_scaler: loaded_data["scaler_data"] = sclr.compute() loaded_data["scaler_names"] = sclr_names + # Replace NaNs for each scaler with median values + for n in range(loaded_data["scaler_data"].shape[-1]): + d = loaded_data["scaler_data"][:, :, n] + loaded_data["scaler_data"][:, :, n] = np.nan_to_num(d, nan=np.nanmedian(d)) + loaded_data["pos_data"] = pos_pos.compute() loaded_data["pos_names"] = pos_name From 9097a7413e7a2bef7d4e124f1eccd69f99b88030 Mon Sep 17 00:00:00 2001 From: Dmitri Gavrilov Date: Fri, 2 Feb 2024 07:12:20 -0500 Subject: [PATCH 09/16] ENH: try to use DB1 if Tiled is not installed --- pyxrf/db_config/srx_db_config.py | 98 ++++++++++++++++---------------- pyxrf/model/load_data_from_db.py | 16 +++++- 2 files changed, 62 insertions(+), 52 deletions(-) diff --git a/pyxrf/db_config/srx_db_config.py b/pyxrf/db_config/srx_db_config.py index 2e826fcc..87f2fad9 100644 --- a/pyxrf/db_config/srx_db_config.py +++ b/pyxrf/db_config/srx_db_config.py @@ -1,76 +1,76 @@ -# import h5py +import h5py -# try: -# from databroker.v1 import Broker -# except ModuleNotFoundError: -# from databroker import Broker +try: + from databroker.v1 import Broker +except ModuleNotFoundError: + from databroker import Broker -# import logging +import logging -# from databroker._core import register_builtin_handlers +from databroker._core import register_builtin_handlers -# # srx detector, to be moved to filestore -# # from databroker.assets.handlers import Xspress3HDF5Handler -# from databroker.assets.handlers import HandlerBase +# srx detector, to be moved to filestore +# from databroker.assets.handlers import Xspress3HDF5Handler +from databroker.assets.handlers import HandlerBase -# logger = logging.getLogger(__name__) +logger = logging.getLogger(__name__) -# db = Broker.named("srx") -# try: -# register_builtin_handlers(db.reg) -# except Exception as ex: -# logger.error(f"Error while registering default SRX handlers: {ex}") +db = Broker.named("srx") +try: + register_builtin_handlers(db.reg) +except Exception as ex: + logger.error(f"Error while registering default SRX handlers: {ex}") -# class BulkXSPRESS(HandlerBase): -# HANDLER_NAME = "XPS3_FLY" +class BulkXSPRESS(HandlerBase): + HANDLER_NAME = "XPS3_FLY" -# def __init__(self, resource_fn): -# self._handle = h5py.File(resource_fn, "r") + def __init__(self, resource_fn): + self._handle = h5py.File(resource_fn, "r") -# def __call__(self): -# return self._handle["entry/instrument/detector/data"][:] + def __call__(self): + return self._handle["entry/instrument/detector/data"][:] -# class ZebraHDF5Handler(HandlerBase): -# HANDLER_NAME = "ZEBRA_HDF51" +class ZebraHDF5Handler(HandlerBase): + HANDLER_NAME = "ZEBRA_HDF51" -# def __init__(self, resource_fn): -# self._handle = h5py.File(resource_fn, "r") + def __init__(self, resource_fn): + self._handle = h5py.File(resource_fn, "r") -# def __call__(self, column): -# return self._handle[column][:] + def __call__(self, column): + return self._handle[column][:] -# class SISHDF5Handler(HandlerBase): -# HANDLER_NAME = "SIS_HDF51" +class SISHDF5Handler(HandlerBase): + HANDLER_NAME = "SIS_HDF51" -# def __init__(self, resource_fn): -# self._handle = h5py.File(resource_fn, "r") + def __init__(self, resource_fn): + self._handle = h5py.File(resource_fn, "r") -# def __call__(self, column): -# return self._handle[column][:] + def __call__(self, column): + return self._handle[column][:] -# class BulkMerlin(BulkXSPRESS): -# HANDLER_NAME = "MERLIN_FLY_STREAM_V1" +class BulkMerlin(BulkXSPRESS): + HANDLER_NAME = "MERLIN_FLY_STREAM_V1" -# def __call__(self): -# return self._handle["entry/instrument/detector/data"][:] + def __call__(self): + return self._handle["entry/instrument/detector/data"][:] -# class BulkDexela(HandlerBase): -# HANDLER_NAME = "DEXELA_FLY_V1" +class BulkDexela(HandlerBase): + HANDLER_NAME = "DEXELA_FLY_V1" -# def __init__(self, resource_fn): -# self._handle = h5py.File(resource_fn, "r") + def __init__(self, resource_fn): + self._handle = h5py.File(resource_fn, "r") -# def __call__(self): -# return self._handle["entry/instrument/detector/data"][:] + def __call__(self): + return self._handle["entry/instrument/detector/data"][:] -# db.reg.register_handler(BulkXSPRESS.HANDLER_NAME, BulkXSPRESS, overwrite=True) -# db.reg.register_handler("SIS_HDF51", SISHDF5Handler, overwrite=True) -# db.reg.register_handler("ZEBRA_HDF51", ZebraHDF5Handler, overwrite=True) -# db.reg.register_handler(BulkMerlin.HANDLER_NAME, BulkMerlin, overwrite=True) -# db.reg.register_handler(BulkDexela.HANDLER_NAME, BulkDexela, overwrite=True) +db.reg.register_handler(BulkXSPRESS.HANDLER_NAME, BulkXSPRESS, overwrite=True) +db.reg.register_handler("SIS_HDF51", SISHDF5Handler, overwrite=True) +db.reg.register_handler("ZEBRA_HDF51", ZebraHDF5Handler, overwrite=True) +db.reg.register_handler(BulkMerlin.HANDLER_NAME, BulkMerlin, overwrite=True) +db.reg.register_handler(BulkDexela.HANDLER_NAME, BulkDexela, overwrite=True) diff --git a/pyxrf/model/load_data_from_db.py b/pyxrf/model/load_data_from_db.py index 6e7618de..5cd91962 100644 --- a/pyxrf/model/load_data_from_db.py +++ b/pyxrf/model/load_data_from_db.py @@ -70,7 +70,15 @@ if catalog_info.name.upper() == "HXN": from pyxrf.db_config.hxn_db_config import db elif catalog_info.name.upper() == "SRX": - db = get_catalog("srx") + _failed = False + try: + db = get_catalog("srx") + except Exception as ex: + logger.error("Failed to load Tiled catalog: %s", str(ex)) + _failed = True + if _failed: + logger.info("Attempting to open databroker ...") + from pyxrf.db_config.srx_db_config import db elif catalog_info.name.upper() == "XFM": from pyxrf.db_config.xfm_db_config import db elif catalog_info.name.upper() == "TES": @@ -1840,7 +1848,8 @@ def data_or_empty_array(v): v, vp = doc, doc_p if "xs" in dets or "xs4" in dets: - event_data = data_or_empty_array(v["data"]["fluor"]) + k = "xs_fluor" if "xs_fluor" in v["data"] else "fluor" + event_data = data_or_empty_array(v["data"][k]) if event_data.size: event_data = np.asarray(event_data, dtype=np.float32) N_xs = max(N_xs, event_data.shape[1]) @@ -1854,7 +1863,8 @@ def data_or_empty_array(v): d_xs.append(event_data) if "xs2" in dets: - event_data = data_or_empty_array(v["data"]["fluor_xs2"]) + k = "xs_fluor_xs2" if "xs_fluor_xs2" in v["data"] else "fluor_xs2" + event_data = data_or_empty_array(v["data"][k]) if event_data.size: event_data = np.asarray(event_data, dtype=np.float32) N_xs2 = max(N_xs2, event_data.shape[1]) From 1549bf429aafae0ecb56d908101de80501d8a62b Mon Sep 17 00:00:00 2001 From: Dmitri Gavrilov Date: Fri, 2 Feb 2024 07:34:02 -0500 Subject: [PATCH 10/16] ENH: Better way to detect Databroker V2 --- pyxrf/model/catalog_management.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pyxrf/model/catalog_management.py b/pyxrf/model/catalog_management.py index cf025b9a..ff01475b 100644 --- a/pyxrf/model/catalog_management.py +++ b/pyxrf/model/catalog_management.py @@ -14,6 +14,11 @@ def set_name(self, name): def get_catalog(catalog_name): + from packaging import version + import databroker + if version.parse(databroker.__version__).major == 1: + raise ValueError("Non-tiled version of Databroker is installed") + from tiled.client import from_uri c = from_uri("https://tiled.nsls2.bnl.gov") From d0f91b45068f70c7da709d4781dff8bbed9d85cb Mon Sep 17 00:00:00 2001 From: Dmitri Gavrilov Date: Fri, 2 Feb 2024 07:43:17 -0500 Subject: [PATCH 11/16] ENH: use separate variable for major version --- pyxrf/model/catalog_management.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pyxrf/model/catalog_management.py b/pyxrf/model/catalog_management.py index ff01475b..312ccb0a 100644 --- a/pyxrf/model/catalog_management.py +++ b/pyxrf/model/catalog_management.py @@ -16,7 +16,8 @@ def set_name(self, name): def get_catalog(catalog_name): from packaging import version import databroker - if version.parse(databroker.__version__).major == 1: + db_version_major = version.parse(databroker.__version__).major == 1 + if db_version_major == 1: raise ValueError("Non-tiled version of Databroker is installed") from tiled.client import from_uri From aacd02d0d526c837cd897258f36dba5be5993fbd Mon Sep 17 00:00:00 2001 From: Dmitri Gavrilov Date: Fri, 2 Feb 2024 07:47:54 -0500 Subject: [PATCH 12/16] ENH: major version --- pyxrf/model/catalog_management.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pyxrf/model/catalog_management.py b/pyxrf/model/catalog_management.py index 312ccb0a..7c219b16 100644 --- a/pyxrf/model/catalog_management.py +++ b/pyxrf/model/catalog_management.py @@ -17,6 +17,7 @@ def get_catalog(catalog_name): from packaging import version import databroker db_version_major = version.parse(databroker.__version__).major == 1 + print(f"Databroker major version: {db_version_major}") if db_version_major == 1: raise ValueError("Non-tiled version of Databroker is installed") From 5fbe6466853bb09d45ccb1144cdedf24b68ed348 Mon Sep 17 00:00:00 2001 From: Dmitri Gavrilov Date: Fri, 2 Feb 2024 09:35:14 -0500 Subject: [PATCH 13/16] ENH: remove print statement --- pyxrf/model/catalog_management.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pyxrf/model/catalog_management.py b/pyxrf/model/catalog_management.py index 7c219b16..312ccb0a 100644 --- a/pyxrf/model/catalog_management.py +++ b/pyxrf/model/catalog_management.py @@ -17,7 +17,6 @@ def get_catalog(catalog_name): from packaging import version import databroker db_version_major = version.parse(databroker.__version__).major == 1 - print(f"Databroker major version: {db_version_major}") if db_version_major == 1: raise ValueError("Non-tiled version of Databroker is installed") From 4a3ac3827d58cbde296fdcca368597d0442430d3 Mon Sep 17 00:00:00 2001 From: Dmitri Gavrilov Date: Tue, 13 Feb 2024 15:27:17 -0500 Subject: [PATCH 14/16] FIX: loading of corrupt files at TES --- .flake8 | 2 +- pyxrf/core/tests/test_quant_analysis.py | 2 +- pyxrf/gui_module/wnd_detailed_fitting_params.py | 3 +-- pyxrf/model/catalog_management.py | 3 ++- pyxrf/model/load_data_from_db.py | 9 ++++----- 5 files changed, 9 insertions(+), 10 deletions(-) diff --git a/.flake8 b/.flake8 index 60b358db..ea09a7e8 100644 --- a/.flake8 +++ b/.flake8 @@ -8,5 +8,5 @@ exclude = pyxrf/_version.py, docs/conf.py # There are some errors produced by 'black', therefore unavoidable -ignore = E203, W503 +ignore = E203, W503, E701, E704 max-line-length = 115 diff --git a/pyxrf/core/tests/test_quant_analysis.py b/pyxrf/core/tests/test_quant_analysis.py index ff21e797..02f7b54c 100644 --- a/pyxrf/core/tests/test_quant_analysis.py +++ b/pyxrf/core/tests/test_quant_analysis.py @@ -41,7 +41,7 @@ "name": "Test Micromatter 411640", "serial": "411640", "description": "CeF3 21.1 / Au 20.6", - "compounds": {"CeF3": 21.1, "Au": 20.6} + "compounds": {"CeF3": 21.1, "Au": 20.6}, # Missing optional 'density' field }, ] diff --git a/pyxrf/gui_module/wnd_detailed_fitting_params.py b/pyxrf/gui_module/wnd_detailed_fitting_params.py index e859985d..7c6de65e 100644 --- a/pyxrf/gui_module/wnd_detailed_fitting_params.py +++ b/pyxrf/gui_module/wnd_detailed_fitting_params.py @@ -393,8 +393,7 @@ def _validate_all(self): self.pb_cancel.setEnabled(self._data_changed and not self._auto_update) self.cb_auto_update.setChecked(Qt.Checked if self._auto_update else Qt.Unchecked) - def _load_dialog_data(self): - ... + def _load_dialog_data(self): ... def _save_dialog_data_function(self): raise NotImplementedError() diff --git a/pyxrf/model/catalog_management.py b/pyxrf/model/catalog_management.py index 312ccb0a..c6f96797 100644 --- a/pyxrf/model/catalog_management.py +++ b/pyxrf/model/catalog_management.py @@ -16,10 +16,11 @@ def set_name(self, name): def get_catalog(catalog_name): from packaging import version import databroker + db_version_major = version.parse(databroker.__version__).major == 1 if db_version_major == 1: raise ValueError("Non-tiled version of Databroker is installed") - + from tiled.client import from_uri c = from_uri("https://tiled.nsls2.bnl.gov") diff --git a/pyxrf/model/load_data_from_db.py b/pyxrf/model/load_data_from_db.py index 5cd91962..8cd982df 100644 --- a/pyxrf/model/load_data_from_db.py +++ b/pyxrf/model/load_data_from_db.py @@ -2804,7 +2804,10 @@ def _get_row_len(row_data): filler = Filler(db.reg.handler_reg, inplace=True) docs_primary = hdr.documents("primary", fill=False) - m, n_pt_max, missing_rows = 0, -1, [] # m - index + # Assume that the number of positions reflect the size of the row + n_pt_max = pos_data.shape[2] + + m, missing_rows = 0, [] # m - index try: while True: try: @@ -2823,10 +2826,6 @@ def _get_row_len(row_data): if is_filled: data = doc["data"][detector_field] data_det1 = np.array(data[:, 0, :], dtype=np.float32) - - # The following is the fix for the case when data has corrupt row (smaller number of data points). - # It will not work if the first row is corrupt. - n_pt_max = max(data_det1.shape[0], n_pt_max) data_det1_adjusted = np.zeros([n_pt_max, data_det1.shape[1]]) data_det1_adjusted[: data_det1.shape[0], :] = data_det1 From 12f1c8372fd83555e888aaa6d1138c9aa31c5302 Mon Sep 17 00:00:00 2001 From: Dmitri Gavrilov Date: Sun, 7 Apr 2024 17:06:22 -0400 Subject: [PATCH 15/16] CI: fix formatting --- pyxrf/gui_module/wnd_detailed_fitting_params.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pyxrf/gui_module/wnd_detailed_fitting_params.py b/pyxrf/gui_module/wnd_detailed_fitting_params.py index 7c6de65e..e859985d 100644 --- a/pyxrf/gui_module/wnd_detailed_fitting_params.py +++ b/pyxrf/gui_module/wnd_detailed_fitting_params.py @@ -393,7 +393,8 @@ def _validate_all(self): self.pb_cancel.setEnabled(self._data_changed and not self._auto_update) self.cb_auto_update.setChecked(Qt.Checked if self._auto_update else Qt.Unchecked) - def _load_dialog_data(self): ... + def _load_dialog_data(self): + ... def _save_dialog_data_function(self): raise NotImplementedError() From 59a1a8c3647a4f263fcac14dffa6f275429afafa Mon Sep 17 00:00:00 2001 From: Dmitri Gavrilov Date: Sun, 7 Apr 2024 17:09:15 -0400 Subject: [PATCH 16/16] CI: fixed formatting --- pyxrf/gui_module/wnd_detailed_fitting_params.py | 3 +-- pyxrf/model/catalog_management.py | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/pyxrf/gui_module/wnd_detailed_fitting_params.py b/pyxrf/gui_module/wnd_detailed_fitting_params.py index e859985d..7c6de65e 100644 --- a/pyxrf/gui_module/wnd_detailed_fitting_params.py +++ b/pyxrf/gui_module/wnd_detailed_fitting_params.py @@ -393,8 +393,7 @@ def _validate_all(self): self.pb_cancel.setEnabled(self._data_changed and not self._auto_update) self.cb_auto_update.setChecked(Qt.Checked if self._auto_update else Qt.Unchecked) - def _load_dialog_data(self): - ... + def _load_dialog_data(self): ... def _save_dialog_data_function(self): raise NotImplementedError() diff --git a/pyxrf/model/catalog_management.py b/pyxrf/model/catalog_management.py index c6f96797..343552b4 100644 --- a/pyxrf/model/catalog_management.py +++ b/pyxrf/model/catalog_management.py @@ -14,8 +14,8 @@ def set_name(self, name): def get_catalog(catalog_name): - from packaging import version import databroker + from packaging import version db_version_major = version.parse(databroker.__version__).major == 1 if db_version_major == 1: