diff --git a/pyxrf/model/load_data_from_db.py b/pyxrf/model/load_data_from_db.py index 1b4eaf4b..e87fd4d0 100644 --- a/pyxrf/model/load_data_from_db.py +++ b/pyxrf/model/load_data_from_db.py @@ -82,7 +82,15 @@ elif catalog_info.name.upper() == "XFM": from pyxrf.db_config.xfm_db_config import db elif catalog_info.name.upper() == "TES": - from pyxrf.db_config.tes_db_config import db + _failed = False + try: + db = get_catalog("tes") + except Exception as ex: + logger.error("Failed to load Tiled catalog: %s", str(ex)) + _failed = True + if _failed: + logger.info("Attempting to open databroker ...") + from pyxrf.db_config.tes_db_config import db else: db = None db_analysis = None @@ -295,6 +303,7 @@ def fetch_data_from_db( successful_scans_only=successful_scans_only, file_overwrite_existing=file_overwrite_existing, output_to_file=output_to_file, + catalog=catalog, ) else: print("Databroker is not setup for this beamline") @@ -687,7 +696,7 @@ def _get_metadata_value_from_descriptor_document_tiled(hdr, *, data_key, stream_ """ value = None try: - value = list(hdr[stream_name][data_key])[0] + value = hdr[stream_name]["data"][data_key].compute()[0] except Exception: pass @@ -701,7 +710,7 @@ def _get_metadata_all_from_descriptor_document_tiled(hdr, *, data_key, stream_na """ value = [] try: - value = list(hdr[stream_name][data_key]) + value = list(hdr[stream_name]["data"][data_key].compute()) except Exception: pass @@ -2600,6 +2609,369 @@ def map_data2D_tes( file_overwrite_existing=False, output_to_file=True, save_scaler=True, + catalog=None, +): + """ + Transfer the data from databroker into a correct format following the + shape of 2D scan. + This function is used at TES beamline for step scan. + Save the new data dictionary to hdf5 file if needed. + + .. note:: + + It is recommended to read data from databroker into memory + directly, instead of saving to files. This is ongoing work. + + Parameters + ---------- + run_id_uid : int + ID or UID of a run + fpath: str + path to save hdf file + create_each_det: bool, optional + Do not create data for each detector if data size is too large, + if set as False. This will slow down the speed of creating an hdf5 file + with large data size. + fname_add_version : bool + True: if file already exists, then file version is added to the file name + so that it becomes unique in the current directory. The version is + added to .h5 in the form _(1).h5, _(2).h5, etc. + False: then conversion fails. + completed_scans_only : bool + True: process only completed scans (for which ``stop`` document exists in + the database). Failed scan for which ``stop`` document exists are considered + completed even if not the whole image was scanned. If incomplete scan is + encountered: an exception is thrown. + False: the feature is disabled, incomplete scan will be processed. + file_overwrite_existing : bool, keyword parameter + This option should be used if the existing file should be deleted and replaced + with the new file with the same name. This option should be used with caution, + since the existing file may contain processed data, which will be permanently deleted. + True: overwrite existing files if needed. Note, that if ``fname_add_version`` is ``True``, + then new versions of the existing file will always be created. + False: do not overwrite existing files. If the file already exists, then the exception + is raised. + output_to_file : bool, optional + save data to hdf5 file if True + catalog + reference to databroker catalog + + Returns + ------- + dict of data in 2D format matching x,y scanning positions + """ + catalog = catalog or db + + using_tiled = "CatalogOfBlueskyRuns" in str(type(catalog)) + + if using_tiled: + return map_data2D_tes_tiled( + run_id_uid=run_id_uid, + fpath=fpath, + create_each_det=create_each_det, + fname_add_version=fname_add_version, + completed_scans_only=completed_scans_only, + successful_scans_only=successful_scans_only, + file_overwrite_existing=file_overwrite_existing, + output_to_file=output_to_file, + catalog=catalog, + ) + else: + return map_data2D_tes_databroker( + run_id_uid=run_id_uid, + fpath=fpath, + create_each_det=create_each_det, + fname_add_version=fname_add_version, + completed_scans_only=completed_scans_only, + successful_scans_only=successful_scans_only, + file_overwrite_existing=file_overwrite_existing, + output_to_file=output_to_file, + ) + + +def map_data2D_tes_tiled( + run_id_uid, + fpath, + create_each_det=False, + fname_add_version=False, + completed_scans_only=False, + successful_scans_only=False, + file_overwrite_existing=False, + output_to_file=True, + save_scaler=True, + catalog=None, +): + """ + Transfer the data from databroker into a correct format following the + shape of 2D scan. + This function is used at TES beamline for step scan. + Save the new data dictionary to hdf5 file if needed. + + .. note:: + + It is recommended to read data from databroker into memory + directly, instead of saving to files. This is ongoing work. + + Parameters + ---------- + run_id_uid : int + ID or UID of a run + fpath: str + path to save hdf file + create_each_det: bool, optional + Do not create data for each detector if data size is too large, + if set as False. This will slow down the speed of creating an hdf5 file + with large data size. + fname_add_version : bool + True: if file already exists, then file version is added to the file name + so that it becomes unique in the current directory. The version is + added to .h5 in the form _(1).h5, _(2).h5, etc. + False: then conversion fails. + completed_scans_only : bool + True: process only completed scans (for which ``stop`` document exists in + the database). Failed scan for which ``stop`` document exists are considered + completed even if not the whole image was scanned. If incomplete scan is + encountered: an exception is thrown. + False: the feature is disabled, incomplete scan will be processed. + file_overwrite_existing : bool, keyword parameter + This option should be used if the existing file should be deleted and replaced + with the new file with the same name. This option should be used with caution, + since the existing file may contain processed data, which will be permanently deleted. + True: overwrite existing files if needed. Note, that if ``fname_add_version`` is ``True``, + then new versions of the existing file will always be created. + False: do not overwrite existing files. If the file already exists, then the exception + is raised. + output_to_file : bool, optional + save data to hdf5 file if True + catalog + reference to databroker catalog + + Returns + ------- + dict of data in 2D format matching x,y scanning positions + """ + + hdr = catalog[run_id_uid] + runid = hdr.start["scan_id"] # Replace with the true value (runid may be relative, such as -2) + + # The dictionary holding scan metadata + mdata = _extract_metadata_from_header(hdr) + # Some metadata is located at specific places in the descriptor documents + # Search through the descriptor documents for the metadata + v = _get_metadata_value_from_descriptor_document_tiled(hdr, data_key="mono_energy", stream_name="baseline") + # Incident energy in the descriptor document is expected to be more accurate, so + # overwrite the value if it already exists + + if v is not None: + mdata["instrument_mono_incident_energy"] = v / 1000.0 # eV to keV + + if completed_scans_only and not _is_scan_complete(hdr): + raise Exception("Scan is incomplete. Only completed scans are currently processed.") + if successful_scans_only and not _is_scan_successful(hdr): + raise Exception( + "Scan is not successfully completed. Only successfully completed scans are currently processed." + ) + + # Generate the default file name for the scan + if fpath is None: + fpath = f"scan2D_{runid}.h5" + + # Load configuration file + current_dir = os.path.dirname(os.path.realpath(__file__)) + config_file = "tes_pv_config.json" + config_path = sep_v.join(current_dir.split(sep_v)[:-2] + ["configs", config_file]) + with open(config_path, "r") as json_data: + config_data = json.load(json_data) + + # NOTE: + # Currently implemented algorithm will work only with the following flyscan: + # flyscanning along X-axis, stepping along Y-axis (to do otherwise or support both cases + # the function has to be modified). + # Each document will contain full data for a single line of N-point flyscan: + # N-element arrays with values for X and Y axis + # N-element arrays with values for each scaler + # N fluorescent spectra (each spectrum is 4096 points, saved by Xspress3 into + # separate file on GPFS, the document contains the path to file) + + print() + print("****************************************") + print(" Loading TES fly scan ") + print("****************************************") + + xpos_name = "x_centers" # For now, we always fly on stage_x (fast axis) + ypos_name = "y_centers" + + # The dictionary of fields that are used to store data from different detectors (for fly scan only) + # key - the name of the field used to store data read from the detector + # value - the detector name (probably short abbreviation, attached to the created file name so that + # the detector could be identified) + # A separate data file is created for each detector + + # The following list will be used if the function is modified to work with multiple detectors + # detector_field_dict = config_data['xrf_flyscan_detector_fields'] + + # spectrum_len = 4096 # It is typically fixed + + # Output data is the list of data structures for all available detectors + data_output = [] + + # The dictionary that will contain the data extracted from scan data + # This data will be saved to file and/or loaded into processing software + new_data = {} + + data_primary = hdr.primary["data"] + + def _is_row_missing(row_data): + """ + Determine if the row is missing. Different versions of Databroker will return differnent value types. + """ + if row_data is None: + return True + elif isinstance(row_data, np.ndarray) and sum(np.isnan(row_data)): + # Tiled is expected to replace missing data with NaNs, but we don't want NaNs or 0s in scalers + return True + elif isinstance(row_data, np.ndarray) and (row_data.size == 1) and (row_data == np.array(None)): + # This is returned by databroker.v0 + return True + elif not len(row_data): + return True + else: + return False + + def _get_row_len(row_data): + if _is_row_missing(row_data): + return 0 + else: + return len(row_data) + + # Typically the scalers are saved + if save_scaler is True: + # Read the scalers + scaler_names = config_data["scaler_list"] + + # Save all scaler names using lowercase letters + scaler_names_lower = scaler_names.copy() + for n in range(len(scaler_names)): + scaler_names_lower[n] = scaler_names_lower[n].lower() + new_data["scaler_names"] = scaler_names_lower + + n_scalers = len(config_data["scaler_list"]) + scaler_data = None + data_shape = None + + for n, name in enumerate(scaler_names): + s_data = data_primary[name].read() + # Convert DAsk array to a list of ndarrays (.to_numpy()) + # and then stack the arrays into a single 2D array + s_data = s_data.compute() + + # Find maximum number of points in a row. + n_max_points = -1 # Maximum number of points in the row + for row_data in s_data: + n_max_points = max(n_max_points, _get_row_len(row_data)) + + # Fix for the issue: 'empty' rows in scaler data. Fill 'empty' row + # with the nearest (preceding) row. + # TODO: investigate the issue of 'empty' scaler ('dwell_time') rows at TES + n_full = -1 + + for _n in range(len(s_data)): + if _is_row_missing(s_data[_n]) or (len(s_data[_n]) < n_max_points): + n_full = _n + break + for _n in range(len(s_data)): + # Data for the missing row is replaced by data from the previous 'good' row + if _is_row_missing(s_data[_n]) or (len(s_data[_n]) < n_max_points): + s_data[_n] = np.copy(s_data[n_full]) + logger.error( + f"Scaler '{name}': row #{_n} is corrupt or contains no data. " + f"Replaced by data from row #{n_full}" + ) + else: + n_full = _n + + s_data = np.vstack(s_data) + if scaler_data is None: + data_shape = s_data.shape + scaler_data = np.zeros(shape=data_shape + (n_scalers,), dtype=float) + scaler_data[:, :, n] = s_data + new_data["scaler_data"] = scaler_data + + # Read x-y coordinates + + data_x = data_primary[xpos_name].read().compute() + data_y = data_primary[ypos_name].read().compute() + new_data["pos_names"] = ["x_pos", "y_pos"] + pos_data = np.zeros(shape=(2,) + data_shape, dtype=float) + # Convert pandas dataframes to 2D ndarrays + pos_data[0, :, :] = np.vstack(data_x) + pos_data[1, :, :] = np.vstack(data_y) + new_data["pos_data"] = pos_data + + detector_field = "fluor" + + detector_data = data_primary[detector_field].read()[:, :, 0, :] + # n_events_found = detector_data.shape[0] + + # if n_events_found < n_events: + # print("The number of lines is less than expected. The experiment may be incomplete") + + # if n_events_found != n_events: + # # This will happen if data is corrupt, for example the experiment is interrupted prematurely. + # n_events_min = min(n_events_found, n_events) + # print(f"The map is resized: data for only {n_events_min} rows is available") + # detector_data = detector_data[:n_events_min, :, :] + # new_data["scaler_data"] = new_data["scaler_data"][:n_events_min, :, :] + # new_data["pos_data"] = new_data["pos_data"][:, :n_events_min, :] + + # Note: the following code assumes that the detector has only one channel. + # If the detector is upgraded, the following code will have to be rewritten, but + # the rest of the data loading procedure will have to be modified anyway. + if create_each_det: + new_data["det1"] = detector_data + else: + new_data["det_sum"] = detector_data + + num_det = 1 + detector_name = "xs" + n_detectors_found = 1 + + # Modify file name (path) to include data on how many channels are included in the file and how many + # channels are used for sum calculation + root, ext = os.path.splitext(fpath) + s = f"_{detector_name}_sum{num_det}ch" + if create_each_det: + s += f"+{num_det}ch" + fpath_out = f"{root}{s}{ext}" + + if output_to_file: + # output to file + print(f"Saving data to hdf file #{n_detectors_found}: Detector: {detector_name}.") + fpath_out = save_data_to_hdf5( + fpath_out, + new_data, + metadata=mdata, + fname_add_version=fname_add_version, + file_overwrite_existing=file_overwrite_existing, + create_each_det=create_each_det, + ) + + d_dict = {"dataset": new_data, "file_name": fpath_out, "detector_name": detector_name, "metadata": mdata} + data_output.append(d_dict) + + return data_output + + +def map_data2D_tes_databroker( + run_id_uid, + fpath, + create_each_det=False, + fname_add_version=False, + completed_scans_only=False, + successful_scans_only=False, + file_overwrite_existing=False, + output_to_file=True, + save_scaler=True, ): """ Transfer the data from databroker into a correct format following the