diff --git a/docs/images/data_bobfile.png b/docs/images/data_bobfile.png new file mode 100644 index 00000000..4f4747c9 Binary files /dev/null and b/docs/images/data_bobfile.png differ diff --git a/docs/user/how-to/capture-hdf.rst b/docs/user/how-to/capture-hdf.rst new file mode 100644 index 00000000..e966340e --- /dev/null +++ b/docs/user/how-to/capture-hdf.rst @@ -0,0 +1,38 @@ + +Capture data +============ + +The ``:DATA`` PVs are used to capture data from the panda. +These can be viewed from the DATA screen. + +.. image:: /images/data_bobfile.png + :alt: The data screen + :align: center + + +* The file directory and name are chosen with ``:DATA:HDFDirectory`` and ``:DATA:HDFFileName``. +* ``:DATA:NumCapture`` is the number of frames to capture in the file. +* ``:DATA:NumCaptured`` is the number of frames written to file. +* ``:DATA:NumReceived`` is the number of frames received from the panda. +* ``:DATA:FlushPeriod`` is the frequency that the data is flushed into frames in the client. +* ``:DATA:Capture`` will begin capturing data. +* ``:DATA:CaptureMode`` is one of the three capture modes listed below. + + +First N mode +------------ + +Begin capturing data and writing it to file as soon as it is received. Stop capturing once ``NumCapture`` +frames have been written or the panda has been disarmed. + + +Last N mode +----------- + +Begin capturing data in a buffer, once capturing has finished write the last ``NumCapture`` frames to disk. + + +Forever mode +------------ + +Keep capturing and writing frames. Once the panda has been disarmed wait for it to be armed again and continue writing. \ No newline at end of file diff --git a/docs/user/index.rst b/docs/user/index.rst index 2c94a0c0..31da93ae 100644 --- a/docs/user/index.rst +++ b/docs/user/index.rst @@ -26,6 +26,7 @@ side-bar. :maxdepth: 1 how-to/run-container + how-to/capture-hdf +++ diff --git a/pyproject.toml b/pyproject.toml index 576799c3..c44d9067 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,7 +17,7 @@ dependencies = [ "click", "h5py", "softioc>=4.4.0", - "pandablocks>=0.5.3", + "pandablocks~=0.7.0", "pvi~=0.7.0", ] # Add project dependencies here, e.g. ["click", "numpy"] dynamic = ["version"] diff --git a/src/pandablocks_ioc/_hdf_ioc.py b/src/pandablocks_ioc/_hdf_ioc.py index f3543039..034adae0 100644 --- a/src/pandablocks_ioc/_hdf_ioc.py +++ b/src/pandablocks_ioc/_hdf_ioc.py @@ -2,8 +2,10 @@ import logging import os from asyncio import CancelledError +from collections import deque +from enum import Enum from importlib.util import find_spec -from typing import List, Optional +from typing import Callable, Deque, Optional, Union from pandablocks.asyncio import AsyncioClient from pandablocks.hdf import ( @@ -18,26 +20,305 @@ from softioc import alarm, builder from softioc.pythonSoftIoc import RecordWrapper -from ._pvi import PviGroup, add_pvi_info +from ._pvi import PviGroup, add_automatic_pvi_info, add_data_capture_pvi_info from ._types import ONAM_STR, ZNAM_STR, EpicsName +HDFReceived = Union[ReadyData, StartData, FrameData, EndData] + + +class CaptureMode(Enum): + """ + The mode which the circular buffer will use to flush + """ + + #: Wait till N frames are recieved then write them + #: and finish capture + FIRST_N = 0 + + #: On EndData write the last N frames + LAST_N = 1 + + #: Write data as received until Capture set to 0 + FOREVER = 2 + + +class NumCapturedSetter(Pipeline): + def __init__(self, number_captured_setter: Callable) -> None: + self.number_captured_setter = number_captured_setter + self.number_captured_setter(0) + super().__init__() + + self.what_to_do = {int: self.set_record} + + def set_record(self, value: int): + self.number_captured_setter(value) + + +class HDF5Buffer: + _buffer_index = None + start_data = None + number_of_received_rows = 0 + finish_capturing = False + number_of_rows_in_circular_buffer = 0 + + def __init__( + self, + capture_mode: CaptureMode, + filepath: str, + number_of_rows_to_capture: int, + status_message_setter: Callable, + number_received_setter: Callable, + number_captured_setter_pipeline: NumCapturedSetter, + ): + # Only one filename - user must stop capture and set new FileName/FilePath + # for new files + + self.circular_buffer: Deque[FrameData] = deque() + self.capture_mode = capture_mode + + match capture_mode: + case CaptureMode.FIRST_N: + self._handle_FrameData = self._capture_first_n + case CaptureMode.LAST_N: + self._handle_FrameData = self._capture_last_n + case CaptureMode.FOREVER: + self._handle_FrameData = self._capture_forever + case _: + raise RuntimeError("Invalid capture mode") + + self.filepath = filepath + self.number_of_rows_to_capture = number_of_rows_to_capture + self.status_message_setter = status_message_setter + self.number_received_setter = number_received_setter + self.number_captured_setter_pipeline = number_captured_setter_pipeline + self.number_captured_setter_pipeline.number_captured_setter(0) + + if ( + self.capture_mode == CaptureMode.LAST_N + and self.number_of_rows_to_capture <= 0 + ): + raise RuntimeError("Number of rows to capture must be > 0 on LAST_N mode") + + self.start_pipeline() + + def __del__(self): + if self.pipeline[0].is_alive(): + stop_pipeline(self.pipeline) + + def put_data_to_file(self, data: HDFReceived): + try: + self.pipeline[0].queue.put_nowait(data) + except Exception as ex: + logging.exception(f"Failed to save the data to HDF5 file: {ex}") + + def start_pipeline(self): + self.pipeline = create_default_pipeline( + iter([self.filepath]), self.number_captured_setter_pipeline + ) + + def _handle_StartData(self, data: StartData): + if self.start_data and data != self.start_data: + # PandA was disarmed, had config changed, and rearmed. + # Cannot process to the same file with different start data. + logging.error( + "New start data detected, differs from previous start " + "data for this file. Aborting HDF5 data capture." + ) + + self.status_message_setter( + "Mismatched StartData packet for file", + severity=alarm.MAJOR_ALARM, + alarm=alarm.STATE_ALARM, + ) + self.put_data_to_file( + EndData(self.number_of_received_rows, EndReason.START_DATA_MISMATCH) + ) + + self.finish_capturing = True + + # Only pass StartData to pipeline if we haven't previously + else: + # In LAST_N mode, wait till the end of capture to write + # the StartData to file. + # In FOREVER mode write the StartData to file if it's the first received. + if ( + self.capture_mode == CaptureMode.FIRST_N + or self.capture_mode == CaptureMode.FOREVER + and not self.start_data + ): + self.put_data_to_file(data) + + self.start_data = data + + def _capture_first_n(self, data: FrameData): + """ + Capture framedata as it comes in. Stop when number of frames exceeds + number_of_rows_to_capture, and cut off the data so that it's length + number_of_rows_to_capture. + """ + self.number_of_received_rows += len(data.data) + + if ( + self.number_of_rows_to_capture > 0 + and self.number_of_received_rows > self.number_of_rows_to_capture + ): + # Discard extra collected data points if necessary + data.data = data.data[ + : self.number_of_rows_to_capture - self.number_of_received_rows + ].copy() + self.number_of_received_rows = self.number_of_rows_to_capture + + self.put_data_to_file(data) + self.number_received_setter(self.number_of_received_rows) + + if ( + self.number_of_rows_to_capture > 0 + and self.number_of_received_rows == self.number_of_rows_to_capture + ): + # Reached configured capture limit, stop the file + logging.info( + f"Requested number of frames ({self.number_of_rows_to_capture}) " + "captured, disabling Capture." + ) + self.status_message_setter("Requested number of frames captured") + self.put_data_to_file(EndData(self.number_of_received_rows, EndReason.OK)) + self.finish_capturing = True + + def _capture_forever(self, data: FrameData): + self.put_data_to_file(data) + self.number_of_received_rows += len(data.data) + self.number_received_setter(self.number_of_received_rows) + + def _capture_last_n(self, data: FrameData): + """ + Append every FrameData to a buffer until the number of rows equals + `:NumCapture`. Then rewrite the data circularly. + + Only write the data once PCAP is received. + """ + self.circular_buffer.append(data) + self.number_of_received_rows += len(data.data) + self.number_of_rows_in_circular_buffer += len(data.data) + + if self.number_of_rows_in_circular_buffer > self.number_of_rows_to_capture: + self.status_message_setter( + "NumCapture received, rewriting first frames received" + ) + + else: + self.status_message_setter("Filling buffer to NumReceived") + + while self.number_of_rows_in_circular_buffer > self.number_of_rows_to_capture: + first_frame_data = self.circular_buffer.popleft() + first_frame_data_length = len(first_frame_data.data) + + if first_frame_data_length > self.number_of_rows_to_capture: + # More data than we want to capture, all in a single FrameData + # We can just slice with the NumCapture since this has to be the + # only FrameData in the buffer at this point + assert len(self.circular_buffer) == 0 + shrinked_data = first_frame_data.data[ + -self.number_of_rows_to_capture : + ].copy() + first_frame_data.data = shrinked_data + self.circular_buffer.appendleft(first_frame_data) + self.number_of_rows_in_circular_buffer = self.number_of_rows_to_capture + elif ( + first_frame_data_length + > self.number_of_rows_in_circular_buffer + - self.number_of_rows_to_capture + ): + # We can slice from the beginning of the FrameData to have the desired + # number of rows + indices_to_discard = ( + self.number_of_rows_in_circular_buffer + - self.number_of_rows_to_capture + ) + shrinked_data = first_frame_data.data[indices_to_discard:].copy() + first_frame_data.data = shrinked_data + self.circular_buffer.appendleft(first_frame_data) + self.number_of_rows_in_circular_buffer -= indices_to_discard + assert ( + self.number_of_rows_in_circular_buffer + == self.number_of_rows_to_capture + ) + else: + # If we remove the enire first frame data then the buffer will still + # be too big, or it will be exactly the number of rows we want + self.number_of_rows_in_circular_buffer -= first_frame_data_length + + self.number_received_setter(self.number_of_received_rows) + + def _handle_EndData(self, data: EndData): + match self.capture_mode: + case CaptureMode.LAST_N: + # In LAST_N only write FrameData if the EndReason is OK + if data.reason not in (EndReason.OK, EndReason.MANUALLY_STOPPED): + self.status_message_setter( + f"Stopped capturing with reason {data.reason}, " + "skipping writing of buffered frames" + ) + self.finish_capturing = True + return + + self.status_message_setter( + "Finishing capture, writing buffered frames to file" + ) + self.put_data_to_file(self.start_data) + for frame_data in self.circular_buffer: + self.put_data_to_file(frame_data) + + case CaptureMode.FOREVER: + if data.reason != EndReason.MANUALLY_STOPPED: + self.status_message_setter( + "Finished capture, waiting for next ReadyData" + ) + return + + case CaptureMode.FIRST_N: + pass # Frames will have already been written in FirstN + + case _: + raise RuntimeError("Unknown capture mode") + + self.status_message_setter("Finished capture") + self.finish_capturing = True + self.put_data_to_file(data) + + def handle_data(self, data: HDFReceived): + match data: + case ReadyData(): + pass + case StartData(): + self.status_message_setter("Starting capture") + self._handle_StartData(data) + case FrameData(): + self._handle_FrameData(data) + case EndData(): + self._handle_EndData(data) + case _: + raise RuntimeError( + f"Data was recieved that was of type {type(data)}, not" + "StartData, EndData, ReadyData, or FrameData" + ) + class HDF5RecordController: """Class to create and control the records that handle HDF5 processing""" - _HDF5_PREFIX = "HDF5" + _DATA_PREFIX = "DATA" _client: AsyncioClient - _file_path_record: RecordWrapper + _directory_record: RecordWrapper _file_name_record: RecordWrapper _file_number_record: RecordWrapper _file_format_record: RecordWrapper _num_capture_record: RecordWrapper + _num_captured_record: RecordWrapper _flush_period_record: RecordWrapper _capture_control_record: RecordWrapper # Turn capture on/off _status_message_record: RecordWrapper # Reports status and error messages - _currently_capturing_record: RecordWrapper # If HDF5 file currently being written _handle_hdf5_data_task: Optional[asyncio.Task] = None @@ -53,32 +334,34 @@ def __init__(self, client: AsyncioClient, record_prefix: str): # Create the records, including an uppercase alias for each # Naming convention and settings (mostly) copied from FSCN2 HDF5 records - file_path_record_name = EpicsName(self._HDF5_PREFIX + ":FilePath") - self._file_path_record = builder.longStringOut( - file_path_record_name, + directory_record_name = EpicsName(self._DATA_PREFIX + ":HDFDirectory") + self._directory_record = builder.longStringOut( + directory_record_name, length=path_length, DESC="File path for HDF5 files", validate=self._parameter_validate, + on_update=self._update_full_file_path, ) - add_pvi_info( - PviGroup.INPUTS, - self._file_path_record, - file_path_record_name, + add_automatic_pvi_info( + PviGroup.HDF, + self._directory_record, + directory_record_name, builder.longStringOut, ) - self._file_path_record.add_alias( - record_prefix + ":" + file_path_record_name.upper() + self._directory_record.add_alias( + record_prefix + ":" + directory_record_name.upper() ) - file_name_record_name = EpicsName(self._HDF5_PREFIX + ":FileName") + file_name_record_name = EpicsName(self._DATA_PREFIX + ":HDFFileName") self._file_name_record = builder.longStringOut( file_name_record_name, length=filename_length, DESC="File name prefix for HDF5 files", validate=self._parameter_validate, + on_update=self._update_full_file_path, ) - add_pvi_info( - PviGroup.INPUTS, + add_automatic_pvi_info( + PviGroup.HDF, self._file_name_record, file_name_record_name, builder.longStringOut, @@ -87,7 +370,23 @@ def __init__(self, client: AsyncioClient, record_prefix: str): record_prefix + ":" + file_name_record_name.upper() ) - num_capture_record_name = EpicsName(self._HDF5_PREFIX + ":NumCapture") + full_file_path_record_name = EpicsName(self._DATA_PREFIX + ":HDFFullFilePath") + self._full_file_path_record = builder.longStringIn( + full_file_path_record_name, + length=path_length + 1 + filename_length, + DESC="Full HDF5 file name with directory", + ) + add_automatic_pvi_info( + PviGroup.HDF, + self._full_file_path_record, + full_file_path_record_name, + builder.longStringIn, + ) + self._file_name_record.add_alias( + record_prefix + ":" + full_file_path_record_name.upper() + ) + + num_capture_record_name = EpicsName(self._DATA_PREFIX + ":NumCapture") self._num_capture_record = builder.longOut( num_capture_record_name, initial_value=0, # Infinite capture @@ -95,8 +394,8 @@ def __init__(self, client: AsyncioClient, record_prefix: str): DRVL=0, ) - add_pvi_info( - PviGroup.INPUTS, + add_automatic_pvi_info( + PviGroup.CAPTURE, self._num_capture_record, num_capture_record_name, builder.longOut, @@ -106,14 +405,49 @@ def __init__(self, client: AsyncioClient, record_prefix: str): record_prefix + ":" + num_capture_record_name.upper() ) - flush_period_record_name = EpicsName(self._HDF5_PREFIX + ":FlushPeriod") + num_captured_record_name = EpicsName(self._DATA_PREFIX + ":NumCaptured") + self._num_captured_record = builder.longIn( + num_captured_record_name, + initial_value=0, + DESC="Number of frames written to file.", + ) + + add_automatic_pvi_info( + PviGroup.CAPTURE, + self._num_captured_record, + num_captured_record_name, + builder.longIn, + ) + self._num_captured_record.add_alias( + record_prefix + ":" + num_captured_record_name.upper() + ) + + num_received_record_name = EpicsName(self._DATA_PREFIX + ":NumReceived") + self._num_received_record = builder.longIn( + num_received_record_name, + initial_value=0, + DESC="Number of frames received from panda.", + ) + + add_automatic_pvi_info( + PviGroup.CAPTURE, + self._num_received_record, + num_received_record_name, + builder.longIn, + ) + self._num_received_record.add_alias( + record_prefix + ":" + num_received_record_name.upper() + ) + + flush_period_record_name = EpicsName(self._DATA_PREFIX + ":FlushPeriod") self._flush_period_record = builder.aOut( flush_period_record_name, initial_value=1.0, DESC="Frequency that data is flushed (seconds)", + EGU="s", ) - add_pvi_info( - PviGroup.INPUTS, + add_automatic_pvi_info( + PviGroup.CAPTURE, self._flush_period_record, flush_period_record_name, builder.aOut, @@ -122,7 +456,7 @@ def __init__(self, client: AsyncioClient, record_prefix: str): record_prefix + ":" + flush_period_record_name.upper() ) - capture_control_record_name = EpicsName(self._HDF5_PREFIX + ":Capture") + capture_control_record_name = EpicsName(self._DATA_PREFIX + ":Capture") self._capture_control_record = builder.boolOut( capture_control_record_name, ZNAM=ZNAM_STR, @@ -131,23 +465,40 @@ def __init__(self, client: AsyncioClient, record_prefix: str): validate=self._capture_validate, DESC="Start/stop HDF5 capture", ) - add_pvi_info( - PviGroup.INPUTS, - self._capture_control_record, + add_data_capture_pvi_info( + PviGroup.CAPTURE, capture_control_record_name, - builder.boolOut, + self._capture_control_record, ) self._capture_control_record.add_alias( record_prefix + ":" + capture_control_record_name.upper() ) - status_message_record_name = EpicsName(self._HDF5_PREFIX + ":Status") - self._status_message_record = builder.stringIn( + capture_mode_record_name = EpicsName(self._DATA_PREFIX + ":CaptureMode") + self._capture_mode_record = builder.mbbOut( + capture_mode_record_name, + *[capture_mode.name for capture_mode in CaptureMode], + initial_value=0, + DESC="Choose how to hdf writer flushes", + ) + add_automatic_pvi_info( + PviGroup.CAPTURE, + self._capture_mode_record, + capture_mode_record_name, + builder.mbbOut, + ) + self._capture_mode_record.add_alias( + record_prefix + ":" + capture_mode_record_name.upper() + ) + + status_message_record_name = EpicsName(self._DATA_PREFIX + ":Status") + self._status_message_record = builder.longStringIn( status_message_record_name, initial_value="OK", + length=200, DESC="Reports current status of HDF5 capture", ) - add_pvi_info( + add_automatic_pvi_info( PviGroup.OUTPUTS, self._status_message_record, status_message_record_name, @@ -157,23 +508,6 @@ def __init__(self, client: AsyncioClient, record_prefix: str): record_prefix + ":" + status_message_record_name.upper() ) - currently_capturing_record_name = EpicsName(self._HDF5_PREFIX + ":Capturing") - self._currently_capturing_record = builder.boolIn( - currently_capturing_record_name, - ZNAM=ZNAM_STR, - ONAM=ONAM_STR, - DESC="If HDF5 file is currently being written", - ) - add_pvi_info( - PviGroup.OUTPUTS, - self._currently_capturing_record, - currently_capturing_record_name, - builder.boolIn, - ) - self._currently_capturing_record.add_alias( - record_prefix + ":" + currently_capturing_record_name.upper() - ) - def _parameter_validate(self, record: RecordWrapper, new_val) -> bool: """Control when values can be written to parameter records (file name etc.) based on capturing record's value""" @@ -187,101 +521,49 @@ def _parameter_validate(self, record: RecordWrapper, new_val) -> bool: return False return True + async def _update_full_file_path(self, new_val) -> None: + self._full_file_path_record.set(self._get_filepath()) + async def _handle_hdf5_data(self) -> None: """Handles writing HDF5 data from the PandA to file, based on configuration in the various HDF5 records. This method expects to be run as an asyncio Task.""" try: - # Keep the start data around to compare against, for the case where a new - # capture, and thus new StartData, is sent without Capture ever being - # disabled - start_data: Optional[StartData] = None - captured_frames: int = 0 - # Only one filename - user must stop capture and set new FileName/FilePath - # for new files - pipeline: List[Pipeline] = create_default_pipeline( - iter([self._get_filename()]) + # Set up the hdf buffer + num_capture: int = self._num_capture_record.get() + capture_mode: CaptureMode = CaptureMode(self._capture_mode_record.get()) + filepath = self._get_filepath() + + number_captured_setter_pipeline = NumCapturedSetter( + self._num_captured_record.set + ) + buffer = HDF5Buffer( + capture_mode, + filepath, + num_capture, + self._status_message_record.set, + self._num_received_record.set, + number_captured_setter_pipeline, ) flush_period: float = self._flush_period_record.get() - async for data in self._client.data( scaled=False, flush_period=flush_period ): logging.debug(f"Received data packet: {data}") - if isinstance(data, ReadyData): - self._currently_capturing_record.set(1) - self._status_message_record.set("Starting capture") - elif isinstance(data, StartData): - if start_data and data != start_data: - # PandA was disarmed, had config changed, and rearmed. - # Cannot process to the same file with different start data. - logging.error( - "New start data detected, differs from previous start " - "data for this file. Aborting HDF5 data capture." - ) - - self._status_message_record.set( - "Mismatched StartData packet for file", - severity=alarm.MAJOR_ALARM, - alarm=alarm.STATE_ALARM, - ) - pipeline[0].queue.put_nowait( - EndData(captured_frames, EndReason.START_DATA_MISMATCH) - ) - - break - if start_data is None: - # Only pass StartData to pipeline if we haven't previously - # - if we have there will already be an in-progress HDF file - # that we should just append data to - start_data = data - pipeline[0].queue.put_nowait(data) - - elif isinstance(data, FrameData): - captured_frames += len(data.data) - - num_frames_to_capture: int = self._num_capture_record.get() - if ( - num_frames_to_capture > 0 - and captured_frames > num_frames_to_capture - ): - # Discard extra collected data points if necessary - data.data = data.data[: num_frames_to_capture - captured_frames] - captured_frames = num_frames_to_capture - - pipeline[0].queue.put_nowait(data) - - if ( - num_frames_to_capture > 0 - and captured_frames >= num_frames_to_capture - ): - # Reached configured capture limit, stop the file - logging.info( - f"Requested number of frames ({num_frames_to_capture}) " - "captured, disabling Capture." - ) - self._status_message_record.set( - "Requested number of frames captured" - ) - pipeline[0].queue.put_nowait( - EndData(captured_frames, EndReason.OK) - ) - break - elif not isinstance(data, EndData): - raise RuntimeError( - f"Data was recieved that was of type {type(data)}, not" - "StartData, EndData, ReadyData or FrameData" - ) - # Ignore EndData - handle terminating capture with the Capture - # record or when we capture the requested number of frames + + buffer.handle_data(data) + if buffer.finish_capturing: + break except CancelledError: logging.info("Capturing task cancelled, closing HDF5 file") self._status_message_record.set("Capturing disabled") # Only send EndData if we know the file was opened - could be cancelled # before PandA has actually send any data - if start_data: - pipeline[0].queue.put_nowait(EndData(captured_frames, EndReason.OK)) + if buffer.capture_mode != CaptureMode.LAST_N: + buffer.put_data_to_file( + EndData(buffer.number_of_received_rows, EndReason.MANUALLY_STOPPED) + ) except Exception: logging.exception("HDF5 data capture terminated due to unexpected error") @@ -292,22 +574,21 @@ async def _handle_hdf5_data(self) -> None: ) # Only send EndData if we know the file was opened - exception could happen # before file was opened - if start_data: - pipeline[0].queue.put_nowait( - EndData(captured_frames, EndReason.UNKNOWN_EXCEPTION) + if buffer.start_data and buffer.capture_mode != CaptureMode.LAST_N: + buffer.put_data_to_file( + EndData(buffer.number_of_received_rows, EndReason.UNKNOWN_EXCEPTION) ) finally: logging.debug("Finishing processing HDF5 PandA data") - stop_pipeline(pipeline) + self._num_received_record.set(buffer.number_of_received_rows) self._capture_control_record.set(0) - self._currently_capturing_record.set(0) - def _get_filename(self) -> str: + def _get_filepath(self) -> str: """Create the file path for the HDF5 file from the relevant records""" return "/".join( ( - self._file_path_record.get(), + self._directory_record.get(), self._file_name_record.get(), ) ) @@ -330,7 +611,7 @@ def _capture_validate(self, record: RecordWrapper, new_val: int) -> bool: """Check the required records have been set before allowing Capture=1""" if new_val: try: - self._get_filename() + self._get_filepath() except ValueError: logging.exception("At least 1 required record had no value") return False diff --git a/src/pandablocks_ioc/_pvi.py b/src/pandablocks_ioc/_pvi.py index ce74d379..7ddf8bab 100644 --- a/src/pandablocks_ioc/_pvi.py +++ b/src/pandablocks_ioc/_pvi.py @@ -39,6 +39,7 @@ class PviGroup(Enum): READBACKS = "Readbacks" OUTPUTS = "Outputs" CAPTURE = "Capture" + HDF = "HDF" TABLE = "Table" # TODO: May not need this anymore @@ -53,13 +54,64 @@ class PviInfo: component: Component -def add_pvi_info( +def add_pvi_info_to_record( + record: RecordWrapper, + record_name: EpicsName, + access: str, +): + block, field = record_name.split(":", maxsplit=1) + block_name_suffixed = f"pvi.{field.lower().replace(':', '_')}.{access}" + record.add_info( + "Q:group", + { + RecordName(f"{block}:PVI"): { + block_name_suffixed: { + "+channel": "NAME", + "+type": "plain", + "+trigger": block_name_suffixed, + } + } + }, + ) + + +def add_data_capture_pvi_info( + group: PviGroup, + data_capture_record_name: EpicsName, + data_capture_pvi_record: RecordWrapper, +): + component = SignalRW( + name=epics_to_pvi_name(data_capture_record_name), + pv=data_capture_record_name, + widget=ButtonPanel(actions=dict(Start="1", Stop="0")), + read_widget=LED(), + ) + add_pvi_info_to_record(data_capture_pvi_record, data_capture_record_name, "rw") + Pvi.add_pvi_info( + record_name=data_capture_record_name, group=group, component=component + ) + + +def add_pcap_arm_pvi_info(group: PviGroup, pcap_arm_pvi_record: RecordWrapper): + pcap_arm_record_name = EpicsName("PCAP:ARM") + component = SignalRW( + name=epics_to_pvi_name(pcap_arm_record_name), + pv=pcap_arm_record_name, + widget=ButtonPanel(actions=dict(Arm="1", Disarm="0")), + read_widget=LED(), + ) + add_pvi_info_to_record(pcap_arm_pvi_record, pcap_arm_record_name, "rw") + Pvi.add_pvi_info(record_name=pcap_arm_record_name, group=group, component=component) + + +def add_automatic_pvi_info( group: PviGroup, record: RecordWrapper, record_name: EpicsName, record_creation_func: Callable, ) -> None: - """Create the most common forms of the `PviInfo` structure""" + """Create the most common forms of the `PviInfo` structure. + Generates generic components from""" component: Component writeable: bool = record_creation_func in OUT_RECORD_FUNCTIONS useComboBox: bool = record_creation_func == builder.mbbOut @@ -83,7 +135,10 @@ def add_pvi_info( if useComboBox: widget = ComboBox() else: - if record_creation_func in (builder.longStringOut, builder.stringOut): + if record_creation_func in ( + builder.longStringOut, + builder.stringOut, + ): widget = TextWrite(format=TextFormat.string) else: widget = TextWrite(format=None) @@ -91,22 +146,18 @@ def add_pvi_info( component = SignalRW(name=pvi_name, pv=record_name, widget=widget) access = "rw" else: + if record_creation_func in ( + builder.longStringIn, + builder.stringIn, + ): + widget = TextRead(format=TextFormat.string) + else: + widget = TextRead(format=None) + component = SignalR(name=pvi_name, pv=record_name, widget=TextRead()) access = "r" - block, field = record_name.split(":", maxsplit=1) - block_name_suffixed = f"pvi.{field.lower().replace(':', '_')}.{access}" - record.add_info( - "Q:group", - { - RecordName(f"{block}:PVI"): { - block_name_suffixed: { - "+channel": "NAME", - "+type": "plain", - "+trigger": block_name_suffixed, - } - } - }, - ) + + add_pvi_info_to_record(record, record_name, access) Pvi.add_pvi_info(record_name=record_name, group=group, component=component) diff --git a/src/pandablocks_ioc/ioc.py b/src/pandablocks_ioc/ioc.py index 148488f5..f65625a6 100644 --- a/src/pandablocks_ioc/ioc.py +++ b/src/pandablocks_ioc/ioc.py @@ -41,7 +41,13 @@ from softioc.pythonSoftIoc import RecordWrapper from ._hdf_ioc import HDF5RecordController -from ._pvi import Pvi, PviGroup, add_positions_table_row, add_pvi_info +from ._pvi import ( + Pvi, + PviGroup, + add_automatic_pvi_info, + add_pcap_arm_pvi_info, + add_positions_table_row, +) from ._tables import TableRecordWrapper, TableUpdater from ._types import ( ONAM_STR, @@ -652,7 +658,7 @@ def _create_record_info( record_name, *labels, *args, **extra_kwargs, **kwargs ) - add_pvi_info( + add_automatic_pvi_info( group=group, record=record, record_name=record_name, @@ -1727,9 +1733,7 @@ def create_block_records( DESC="Arm/Disarm the PandA", ) - add_pvi_info( - PviGroup.INPUTS, pcap_arm_record, EpicsName("PCAP:ARM"), builder.Action - ) + add_pcap_arm_pvi_info(PviGroup.INPUTS, pcap_arm_record) HDF5RecordController(self._client, self._record_prefix) diff --git a/tests/fixtures/mocked_panda.py b/tests/fixtures/mocked_panda.py index 495e2944..7c2b9c0c 100644 --- a/tests/fixtures/mocked_panda.py +++ b/tests/fixtures/mocked_panda.py @@ -227,15 +227,12 @@ async def data( flush_every_frame = flush_period is None conn = DataConnection() conn.connect(scaled) - try: - f = open(Path(__file__).parent.parent / "raw_dump.txt", "rb") + with open(Path(__file__).parent.parent / "raw_dump.txt", "rb") as f: for raw in chunked_read(f, 200000): for data in conn.receive_bytes( raw, flush_every_frame=flush_every_frame ): yield data - finally: - f.close() def get_multiprocessing_context(): diff --git a/tests/test-bobfiles/HDF5.bob b/tests/test-bobfiles/DATA.bob similarity index 50% rename from tests/test-bobfiles/HDF5.bob rename to tests/test-bobfiles/DATA.bob index 51d60d04..1cabfce4 100644 --- a/tests/test-bobfiles/HDF5.bob +++ b/tests/test-bobfiles/DATA.bob @@ -3,13 +3,13 @@ 0 0 426 - 277 + 413 4 4 Title TITLE - HDF5 - TEST_PREFIX: + DATA - TEST_PREFIX: 0 0 426 @@ -26,15 +26,15 @@ 1 - INPUTS + HDF 5 30 416 - 156 + 106 true Label - Filepath + Hdfdirectory 0 0 250 @@ -42,7 +42,7 @@ TextEntry - TEST_PREFIX:HDF5:FilePath + TEST_PREFIX:DATA:HDFDirectory 255 0 125 @@ -52,7 +52,7 @@ Label - Filename + Hdffilename 0 25 250 @@ -60,7 +60,7 @@ TextEntry - TEST_PREFIX:HDF5:FileName + TEST_PREFIX:DATA:HDFFileName 255 25 125 @@ -70,76 +70,84 @@ Label - Numcapture + Hdffullfilepath 0 50 250 20 - - TextEntry - TEST_PREFIX:HDF5:NumCapture + + TextUpdate + TEST_PREFIX:DATA:HDFFullFilePath 255 50 125 20 + + + + 1 + + + CAPTURE + 5 + 141 + 416 + 206 + true Label - Flushperiod + Numcapture 0 - 75 + 0 250 20 TextEntry - TEST_PREFIX:HDF5:FlushPeriod + TEST_PREFIX:DATA:NumCapture 255 - 75 + 0 125 20 1 Label - Capture + Numcaptured 0 - 100 + 25 250 20 - - TextEntry - TEST_PREFIX:HDF5:Capture + + TextUpdate + TEST_PREFIX:DATA:NumCaptured 255 - 100 + 25 125 20 + + + + 1 - - - OUTPUTS - 5 - 191 - 416 - 81 - true Label - Status + Numreceived 0 - 0 + 50 250 20 TextUpdate - TEST_PREFIX:HDF5:Status + TEST_PREFIX:DATA:NumReceived 255 - 0 + 50 125 20 @@ -150,17 +158,132 @@ Label - Capturing + Flushperiod 0 - 25 + 75 + 250 + 20 + + + TextEntry + TEST_PREFIX:DATA:FlushPeriod + 255 + 75 + 125 + 20 + 1 + + + Label + Capture + 0 + 100 + 250 + 20 + + + WritePV + TEST_PREFIX:DATA:Capture + + + $(pv_name) + 1 + $(name) + + + Start + 255 + 100 + 38 + 20 + $(actions) + + + WritePV + TEST_PREFIX:DATA:Capture + + + $(pv_name) + 0 + $(name) + + + Stop + 298 + 100 + 38 + 20 + $(actions) + + + LED + TEST_PREFIX:DATA:Capture + 350 + 100 + 20 + 20 + + + Label + Capturemode + 0 + 125 + 250 + 20 + + + ComboBox + TEST_PREFIX:DATA:CaptureMode + 255 + 125 + 125 + 20 + + + Label + All Postion Capture Parameters + 0 + 150 + 250 + 20 + + + OpenDisplay + + + PandA_PositionsTable.bob + tab + Open Display + + + All Postion Capture Parameters + 255 + 150 + 125 + 20 + $(actions) + + + + OUTPUTS + 5 + 352 + 416 + 56 + true + + Label + Status + 0 + 0 250 20 TextUpdate - TEST_PREFIX:HDF5:Capturing + TEST_PREFIX:DATA:Status 255 - 25 + 0 125 20 diff --git a/tests/test-bobfiles/index.bob b/tests/test-bobfiles/index.bob index 77167918..7f9a10ae 100644 --- a/tests/test-bobfiles/index.bob +++ b/tests/test-bobfiles/index.bob @@ -51,7 +51,7 @@ Label - HDF5 + DATA 23 55 250 @@ -61,12 +61,12 @@ OpenDisplay - HDF5.bob + DATA.bob tab Open Display - HDF5 + DATA 278 55 125 diff --git a/tests/test_hdf_ioc.py b/tests/test_hdf_ioc.py index 36ef4912..1d6d8e6a 100644 --- a/tests/test_hdf_ioc.py +++ b/tests/test_hdf_ioc.py @@ -3,6 +3,7 @@ import asyncio import logging from asyncio import CancelledError +from collections import deque from multiprocessing.connection import Connection from pathlib import Path from typing import AsyncGenerator, Generator @@ -11,7 +12,7 @@ import numpy import pytest import pytest_asyncio -from aioca import caget, camonitor, caput +from aioca import DBR_CHAR_STR, CANothing, caget, caput from fixtures.mocked_panda import ( TIMEOUT, MockedAsyncioClient, @@ -34,7 +35,12 @@ ) from softioc import asyncio_dispatcher, builder, softioc -from pandablocks_ioc._hdf_ioc import HDF5RecordController +from pandablocks_ioc._hdf_ioc import ( + CaptureMode, + HDF5Buffer, + HDF5RecordController, + NumCapturedSetter, +) NAMESPACE_PREFIX = "HDF-RECORD-PREFIX" @@ -42,7 +48,7 @@ @pytest.fixture def new_random_hdf5_prefix(): test_prefix = append_random_uppercase(NAMESPACE_PREFIX) - hdf5_test_prefix = test_prefix + ":HDF5" + hdf5_test_prefix = test_prefix + ":DATA" return test_prefix, hdf5_test_prefix @@ -207,7 +213,7 @@ def fast_dump_expected(): [8, 58, 58, 174, 0.570000056, 58, 116], ) ), - EndData(58, EndReason.DISARMED), + EndData(58, EndReason.OK), ] @@ -318,14 +324,14 @@ async def test_hdf5_ioc(hdf5_subprocess_ioc): test_prefix, hdf5_test_prefix = hdf5_subprocess_ioc - val = await caget(hdf5_test_prefix + ":FilePath") + val = await caget(hdf5_test_prefix + ":HDFDirectory", datatype=DBR_CHAR_STR) # Default value of longStringOut is an array of a single NULL byte - assert val.size == 1 + assert val == "" # Mix and match between CamelCase and UPPERCASE to check aliases work - val = await caget(hdf5_test_prefix + ":FILENAME") - assert val.size == 1 # As above for longStringOut + val = await caget(hdf5_test_prefix + ":HDFFILENAME", datatype=DBR_CHAR_STR) + assert val == "" val = await caget(hdf5_test_prefix + ":NumCapture") assert val == 0 @@ -336,20 +342,13 @@ async def test_hdf5_ioc(hdf5_subprocess_ioc): val = await caget(hdf5_test_prefix + ":CAPTURE") assert val == 0 - val = await caget(hdf5_test_prefix + ":Status") + val = await caget(hdf5_test_prefix + ":Status", datatype=DBR_CHAR_STR) assert val == "OK" - val = await caget(hdf5_test_prefix + ":Capturing") - assert val == 0 - - -def _string_to_buffer(string: str): - """Convert a python string into a numpy buffer suitable for caput'ing to a Waveform - record""" - return numpy.frombuffer(string.encode(), dtype=numpy.uint8) - -async def test_hdf5_ioc_parameter_validate_works(hdf5_subprocess_ioc_no_logging_check): +async def test_hdf5_ioc_parameter_validate_works( + hdf5_subprocess_ioc_no_logging_check, tmp_path +): """Run the HDF5 module as its own IOC and check the _parameter_validate method does not stop updates, then stops when capture record is changed""" @@ -357,52 +356,155 @@ async def test_hdf5_ioc_parameter_validate_works(hdf5_subprocess_ioc_no_logging_ # EPICS bug means caputs always appear to succeed, so do a caget to prove it worked await caput( - hdf5_test_prefix + ":FilePath", _string_to_buffer("/new/path"), wait=True + hdf5_test_prefix + ":HDFDirectory", + str(tmp_path), + datatype=DBR_CHAR_STR, + wait=True, ) - val = await caget(hdf5_test_prefix + ":FilePath") - assert val.tobytes().decode() == "/new/path" + val = await caget(hdf5_test_prefix + ":HDFDirectory", datatype=DBR_CHAR_STR) + assert val == str(tmp_path) - await caput(hdf5_test_prefix + ":FileName", _string_to_buffer("name.h5"), wait=True) - val = await caget(hdf5_test_prefix + ":FileName") - assert val.tobytes().decode() == "name.h5" + await caput( + hdf5_test_prefix + ":HDFFileName", "name.h5", wait=True, datatype=DBR_CHAR_STR + ) + val = await caget(hdf5_test_prefix + ":HDFFileName", datatype=DBR_CHAR_STR) + assert val == "name.h5" await caput(hdf5_test_prefix + ":Capture", 1, wait=True) assert await caget(hdf5_test_prefix + ":Capture") == 1 + with pytest.raises(CANothing): + await caput( + hdf5_test_prefix + ":HDFFullFilePath", + "/second/path/name.h5", + wait=True, + datatype=DBR_CHAR_STR, + ) + val = await caget(hdf5_test_prefix + ":HDFFullFilePath", datatype=DBR_CHAR_STR) + assert val == str(tmp_path) + "/name.h5" # put should have been stopped + + +@pytest.mark.parametrize("num_capture", [1, 1000, 10000]) +async def test_hdf5_file_writing_first_n( + hdf5_subprocess_ioc, tmp_path: Path, caplog, num_capture +): + """Test that an HDF5 file is written when Capture is enabled""" + + test_prefix, hdf5_test_prefix = hdf5_subprocess_ioc + + val = await caget(hdf5_test_prefix + ":CaptureMode") + assert val == CaptureMode.FIRST_N.value + + test_dir = tmp_path + test_filename = "test.h5" await caput( - hdf5_test_prefix + ":FilePath", _string_to_buffer("/second/path"), wait=True + hdf5_test_prefix + ":HDFDirectory", + str(test_dir), + wait=True, + datatype=DBR_CHAR_STR, + ) + val = await caget(hdf5_test_prefix + ":HDFDirectory", datatype=DBR_CHAR_STR) + assert val == str(test_dir) + + await caput( + hdf5_test_prefix + ":HDFFileName", "name.h5", wait=True, datatype=DBR_CHAR_STR + ) + val = await caget(hdf5_test_prefix + ":HDFFileName", datatype=DBR_CHAR_STR) + assert val == "name.h5" + + await caput( + hdf5_test_prefix + ":HDFFileName", + test_filename, + wait=True, + timeout=TIMEOUT, + datatype=DBR_CHAR_STR, + ) + val = await caget(hdf5_test_prefix + ":HDFFileName", datatype=DBR_CHAR_STR) + assert val == test_filename + + val = await caget(hdf5_test_prefix + ":HDFFullFilePath", datatype=DBR_CHAR_STR) + assert val == "/".join([str(tmp_path), test_filename]) + + # Only a single FrameData in the example data + assert await caget(hdf5_test_prefix + ":NumCapture") == 0 + await caput( + hdf5_test_prefix + ":NumCapture", num_capture, wait=True, timeout=TIMEOUT + ) + assert await caget(hdf5_test_prefix + ":NumCapture") == num_capture + + await caput(hdf5_test_prefix + ":Capture", 1, wait=True, timeout=TIMEOUT) + assert await caget(hdf5_test_prefix + ":NumReceived") <= num_capture + + await asyncio.sleep(1) + # Capture should have closed by itself + assert await caget(hdf5_test_prefix + ":Capture") == 0 + + assert await caget(hdf5_test_prefix + ":NumReceived") == num_capture + assert await caget(hdf5_test_prefix + ":NumCaptured") == num_capture + # Confirm file contains data we expect + with h5py.File(tmp_path / test_filename, "r") as hdf_file: + assert list(hdf_file) == [ + "COUNTER1.OUT.Max", + "COUNTER1.OUT.Mean", + "COUNTER1.OUT.Min", + "COUNTER2.OUT.Mean", + "COUNTER3.OUT.Value", + "PCAP.BITS2.Value", + "PCAP.SAMPLES.Value", + "PCAP.TS_START.Value", + ] + + assert len(hdf_file["/COUNTER1.OUT.Max"]) == num_capture + + assert ( + await caget(hdf5_test_prefix + ":Status", datatype=DBR_CHAR_STR) + == "Requested number of frames captured" ) - val = await caget(hdf5_test_prefix + ":FilePath") - assert val.tobytes().decode() == "/new/path" # put should have been stopped @pytest.mark.parametrize("num_capture", [1, 1000, 10000]) -async def test_hdf5_file_writing( +async def test_hdf5_file_writing_last_n_endreason_not_ok( hdf5_subprocess_ioc, tmp_path: Path, caplog, num_capture ): """Test that an HDF5 file is written when Capture is enabled""" test_prefix, hdf5_test_prefix = hdf5_subprocess_ioc - test_dir = str(tmp_path) + "\0" - test_filename = "test.h5\0" + val = await caget(hdf5_test_prefix + ":CaptureMode") + assert val == CaptureMode.FIRST_N.value + await caput(hdf5_test_prefix + ":CaptureMode", 1, wait=True) + val = await caget(hdf5_test_prefix + ":CaptureMode") + assert val == CaptureMode.LAST_N.value + + test_dir = tmp_path + test_filename = "test.h5" await caput( - hdf5_test_prefix + ":FilePath", - _string_to_buffer(str(test_dir)), + hdf5_test_prefix + ":HDFDirectory", + str(test_dir), wait=True, - timeout=TIMEOUT, + datatype=DBR_CHAR_STR, + ) + val = await caget(hdf5_test_prefix + ":HDFDirectory", datatype=DBR_CHAR_STR) + assert val == str(test_dir) + + await caput( + hdf5_test_prefix + ":HDFFileName", "name.h5", wait=True, datatype=DBR_CHAR_STR ) - val = await caget(hdf5_test_prefix + ":FilePath") - assert val.tobytes().decode() == test_dir + val = await caget(hdf5_test_prefix + ":HDFFileName", datatype=DBR_CHAR_STR) + assert val == "name.h5" await caput( - hdf5_test_prefix + ":FileName", - _string_to_buffer(test_filename), + hdf5_test_prefix + ":HDFFileName", + test_filename, wait=True, timeout=TIMEOUT, + datatype=DBR_CHAR_STR, ) - val = await caget(hdf5_test_prefix + ":FileName") - assert val.tobytes().decode() == test_filename + val = await caget(hdf5_test_prefix + ":HDFFileName", datatype=DBR_CHAR_STR) + assert val == test_filename + + val = await caget(hdf5_test_prefix + ":HDFFullFilePath", datatype=DBR_CHAR_STR) + assert val == "/".join([str(tmp_path), test_filename]) # Only a single FrameData in the example data assert await caget(hdf5_test_prefix + ":NumCapture") == 0 @@ -411,44 +513,301 @@ async def test_hdf5_file_writing( ) assert await caget(hdf5_test_prefix + ":NumCapture") == num_capture - # The queue expects to see Capturing go 0 -> 1 -> 0 as Capture is enabled - # and subsequently finishes - capturing_queue: asyncio.Queue = asyncio.Queue() - m = camonitor( - hdf5_test_prefix + ":Capturing", - capturing_queue.put, + # Initially Status should be "OK" + val = await caget(hdf5_test_prefix + ":Status", datatype=DBR_CHAR_STR) + assert val == "OK" + + await caput(hdf5_test_prefix + ":Capture", 1, wait=True, timeout=TIMEOUT) + + await asyncio.sleep(1) + # Capture should have closed by itself + assert await caget(hdf5_test_prefix + ":Capture") == 0 + + val = await caget(hdf5_test_prefix + ":Status", datatype=DBR_CHAR_STR) + assert ( + val == "Stopped capturing with reason EndReason.DISARMED, " + "skipping writing of buffered frames" ) - # Initially Capturing should be 0 - assert await capturing_queue.get() == 0 + # We received all 10000 frames even if we asked to capture fewer. + assert await caget(hdf5_test_prefix + ":NumReceived") == 10000 - await caput(hdf5_test_prefix + ":Capture", 1, wait=True, timeout=TIMEOUT) + # We didn't write any frames since the endreason was `EndReason.DISARMED`, + # not endreason `EndReason.OK` + assert await caget(hdf5_test_prefix + ":NumCaptured") == 0 + + # Confirm no data was written + assert not (tmp_path / test_filename).exists() + + +@pytest_asyncio.fixture +def differently_sized_framedata(): + yield [ + ReadyData(), + StartData(DUMP_FIELDS, 0, "Scaled", "Framed", 52), + FrameData( + numpy.array( + [ + [0, 1, 1, 3, 5.6e-08, 1, 2], + [0, 2, 2, 6, 0.010000056, 2, 4], + [8, 3, 3, 9, 0.020000056, 3, 6], + [8, 4, 4, 12, 0.030000056, 4, 8], + [8, 5, 5, 15, 0.040000056, 5, 10], + [8, 6, 6, 18, 0.050000056, 6, 12], + [8, 7, 7, 21, 0.060000056, 7, 14], + [8, 8, 8, 24, 0.070000056, 8, 16], + [8, 9, 9, 27, 0.080000056, 9, 18], + [8, 10, 10, 30, 0.090000056, 10, 20], + ] + ) + ), + FrameData( + numpy.array( + [ + [0, 11, 11, 33, 0.100000056, 11, 22], + [8, 12, 12, 36, 0.110000056, 12, 24], + [8, 13, 13, 39, 0.120000056, 13, 26], + [8, 14, 14, 42, 0.130000056, 14, 28], + [8, 15, 15, 45, 0.140000056, 15, 30], + [8, 16, 16, 48, 0.150000056, 16, 32], + [8, 17, 17, 51, 0.160000056, 17, 34], + [8, 18, 18, 54, 0.170000056, 18, 36], + [8, 19, 19, 57, 0.180000056, 19, 38], + [0, 20, 20, 60, 0.190000056, 20, 40], + [8, 21, 21, 63, 0.200000056, 21, 42], + ] + ) + ), + FrameData( + numpy.array( + [ + [8, 22, 22, 66, 0.210000056, 22, 44], + [8, 23, 23, 69, 0.220000056, 23, 46], + [8, 24, 24, 72, 0.230000056, 24, 48], + [8, 25, 25, 75, 0.240000056, 25, 50], + [8, 26, 26, 78, 0.250000056, 26, 52], + [8, 27, 27, 81, 0.260000056, 27, 54], + [8, 28, 28, 84, 0.270000056, 28, 56], + [0, 29, 29, 87, 0.280000056, 29, 58], + [8, 30, 30, 90, 0.290000056, 30, 60], + [8, 31, 31, 93, 0.300000056, 31, 62], + ] + ) + ), + FrameData( + numpy.array( + [ + [8, 32, 32, 96, 0.310000056, 32, 64], + [8, 33, 33, 99, 0.320000056, 33, 66], + [8, 34, 34, 102, 0.330000056, 34, 68], + [8, 35, 35, 105, 0.340000056, 35, 70], + [8, 36, 36, 108, 0.350000056, 36, 72], + [8, 37, 37, 111, 0.360000056, 37, 74], + [0, 38, 38, 114, 0.370000056, 38, 76], + [8, 39, 39, 117, 0.380000056, 39, 78], + [8, 40, 40, 120, 0.390000056, 40, 80], + [8, 41, 41, 123, 0.400000056, 41, 82], + ] + ) + ), + FrameData( + numpy.array( + [ + [8, 42, 42, 126, 0.410000056, 42, 84], + [8, 43, 43, 129, 0.420000056, 43, 86], + [8, 44, 44, 132, 0.430000056, 44, 88], + [8, 45, 45, 135, 0.440000056, 45, 90], + [8, 46, 46, 138, 0.450000056, 46, 92], + [0, 47, 47, 141, 0.460000056, 47, 94], + [8, 48, 48, 144, 0.470000056, 48, 96], + [8, 49, 49, 147, 0.480000056, 49, 98], + [8, 50, 50, 150, 0.490000056, 50, 100], + [8, 51, 51, 153, 0.500000056, 51, 102], + ] + ) + ), + FrameData( + numpy.array( + [ + [8, 52, 52, 156, 0.510000056, 52, 104], + [8, 53, 53, 159, 0.520000056, 53, 106], + [8, 54, 54, 162, 0.530000056, 54, 108], + [8, 55, 55, 165, 0.540000056, 55, 110], + [0, 56, 56, 168, 0.550000056, 56, 112], + [8, 57, 57, 171, 0.560000056, 57, 114], + [8, 58, 58, 174, 0.570000056, 58, 116], + ] + ) + ), + EndData(58, EndReason.OK), + ] - assert await capturing_queue.get() == 1 - # The HDF5 data will be processed, and when it's done Capturing is set to 0 - assert await asyncio.wait_for(capturing_queue.get(), timeout=TIMEOUT) == 0 +def test_hdf_buffer_forever(differently_sized_framedata, tmp_path): + filepath = str(tmp_path / "test_file.h5") + status_output = [] + num_received_output = [] + num_captured_output = [] + frames_written_to_file = [] + num_captured_output = [] + num_captured_setter_pipeline = NumCapturedSetter(num_captured_output.append) + buffer = HDF5Buffer( + CaptureMode.FOREVER, + filepath, + 21, + status_output.append, + num_received_output.append, + num_captured_setter_pipeline, + ) + buffer.put_data_to_file = frames_written_to_file.append + + for data in differently_sized_framedata: + buffer.handle_data(data) + + assert buffer.number_of_received_rows == 58 + assert not buffer.finish_capturing + + differently_sized_framedata[-1] = EndData(58, EndReason.MANUALLY_STOPPED) + + for data in differently_sized_framedata: + buffer.handle_data(data) + + assert buffer.number_of_received_rows == 116 + assert buffer.finish_capturing + + assert len(frames_written_to_file) == 14 + sum( + len(frame.data) + for frame in frames_written_to_file + if isinstance(frame, FrameData) + ) == 116 + + +def test_hdf_buffer_last_n(differently_sized_framedata, tmp_path): + filepath = str(tmp_path / "test_file.h5") + status_output = [] + num_received_output = [] + num_captured_output = [] + frames_written_to_file = [] + num_captured_output = [] + num_captured_setter_pipeline = NumCapturedSetter(num_captured_output.append) + buffer = HDF5Buffer( + CaptureMode.LAST_N, + filepath, + 21, + status_output.append, + num_received_output.append, + num_captured_setter_pipeline, + ) + buffer.put_data_to_file = frames_written_to_file.append + + for data in differently_sized_framedata: + buffer.handle_data(data) + + assert buffer.number_of_received_rows == 58 + assert buffer.number_of_rows_in_circular_buffer == 21 + + expected_cut_off_data = deque( + [ + FrameData( + numpy.array( + [ + [0, 38, 38, 114, 0.370000056, 38, 76], + [8, 39, 39, 117, 0.380000056, 39, 78], + [8, 40, 40, 120, 0.390000056, 40, 80], + [8, 41, 41, 123, 0.400000056, 41, 82], + ] + ) + ), + FrameData( + numpy.array( + [ + [8, 42, 42, 126, 0.410000056, 42, 84], + [8, 43, 43, 129, 0.420000056, 43, 86], + [8, 44, 44, 132, 0.430000056, 44, 88], + [8, 45, 45, 135, 0.440000056, 45, 90], + [8, 46, 46, 138, 0.450000056, 46, 92], + [0, 47, 47, 141, 0.460000056, 47, 94], + [8, 48, 48, 144, 0.470000056, 48, 96], + [8, 49, 49, 147, 0.480000056, 49, 98], + [8, 50, 50, 150, 0.490000056, 50, 100], + [8, 51, 51, 153, 0.500000056, 51, 102], + ] + ) + ), + FrameData( + numpy.array( + [ + [8, 52, 52, 156, 0.510000056, 52, 104], + [8, 53, 53, 159, 0.520000056, 53, 106], + [8, 54, 54, 162, 0.530000056, 54, 108], + [8, 55, 55, 165, 0.540000056, 55, 110], + [0, 56, 56, 168, 0.550000056, 56, 112], + [8, 57, 57, 171, 0.560000056, 57, 114], + [8, 58, 58, 174, 0.570000056, 58, 116], + ] + ) + ), + ] + ) - m.close() + output_frames = [ + frame_data + for frame_data in frames_written_to_file + if isinstance(frame_data, FrameData) + ] + for expected_frame, output_frame in zip(expected_cut_off_data, output_frames): + numpy.testing.assert_array_equal(expected_frame.data, output_frame.data) + + +def test_hdf_buffer_last_n_large_data(tmp_path): + filepath = str(tmp_path / "test_file.h5") + status_output = [] + num_received_output = [] + num_captured_output = [] + frames_written_to_file = [] + num_captured_setter_pipeline = NumCapturedSetter(num_captured_output.append) + buffer = HDF5Buffer( + CaptureMode.LAST_N, + filepath, + 25000, + status_output.append, + num_received_output.append, + num_captured_setter_pipeline, + ) + buffer.put_data_to_file = frames_written_to_file.append - # Close capture, thus closing hdf5 file - await caput(hdf5_test_prefix + ":Capture", 0, wait=True) - assert await caget(hdf5_test_prefix + ":Capture") == 0 + large_data = [ + ReadyData(), + StartData([], 0, "Scaled", "Framed", 52), + FrameData(numpy.zeros((25000))), + FrameData(numpy.zeros((25000))), + FrameData(numpy.zeros((25000))), + FrameData(numpy.zeros((25000))), + FrameData(numpy.zeros((25000))), + FrameData(numpy.append(numpy.zeros((15000)), numpy.arange(1, 10001))), + EndData(150000, EndReason.OK), + ] - # Confirm file contains data we expect - hdf_file = h5py.File(tmp_path / test_filename[:-1], "r") - assert list(hdf_file) == [ - "COUNTER1.OUT.Max", - "COUNTER1.OUT.Mean", - "COUNTER1.OUT.Min", - "COUNTER2.OUT.Mean", - "COUNTER3.OUT.Value", - "PCAP.BITS2.Value", - "PCAP.SAMPLES.Value", - "PCAP.TS_START.Value", + for data in large_data: + buffer.handle_data(data) + + assert buffer.number_of_received_rows == 150000 + assert buffer.number_of_rows_in_circular_buffer == 25000 + + expected_output = [ + StartData([], 0, "Scaled", "Framed", 52), + FrameData(numpy.append(numpy.zeros((15000)), numpy.arange(1, 10001))), + EndData(150000, EndReason.OK), ] - assert len(hdf_file["/COUNTER1.OUT.Max"]) == num_capture + output_frames = [ + frame_data + for frame_data in frames_written_to_file + if isinstance(frame_data, FrameData) + ] + assert len(output_frames) == 1 + numpy.testing.assert_array_equal(output_frames[0].data, expected_output[1].data) def test_hdf_parameter_validate_not_capturing(hdf5_controller: HDF5RecordController): @@ -490,7 +849,7 @@ async def mock_data(scaled, flush_period): yield item # Set up all the mocks - hdf5_controller._get_filename = MagicMock( # type: ignore + hdf5_controller._get_filepath = MagicMock( # type: ignore return_value="Some/Filepath" ) hdf5_controller._client.data = mock_data # type: ignore @@ -510,8 +869,6 @@ async def mock_data(scaled, flush_period): assert pipeline_mock[0].queue.put_nowait.call_count == 7 pipeline_mock[0].queue.put_nowait.assert_called_with(EndData(5, EndReason.OK)) - mock_stop_pipeline.assert_called_once() - @patch("pandablocks_ioc._hdf_ioc.stop_pipeline") @patch("pandablocks_ioc._hdf_ioc.create_default_pipeline") @@ -530,7 +887,7 @@ async def mock_data(scaled, flush_period): yield item # Set up all the mocks - hdf5_controller._get_filename = MagicMock( # type: ignore + hdf5_controller._get_filepath = MagicMock( # type: ignore return_value="Some/Filepath" ) hdf5_controller._client.data = mock_data # type: ignore @@ -547,12 +904,10 @@ async def mock_data(scaled, flush_period): hdf5_controller._status_message_record.get() == "Requested number of frames captured" ) - # len 12 as ReadyData isn't pushed to pipeline, only Start and Frame data. - assert pipeline_mock[0].queue.put_nowait.call_count == 12 + # len 13 for 2 StartData, 10 FrameData and 1 EndData + assert pipeline_mock[0].queue.put_nowait.call_count == 13 pipeline_mock[0].queue.put_nowait.assert_called_with(EndData(10, EndReason.OK)) - mock_stop_pipeline.assert_called_once() - @patch("pandablocks_ioc._hdf_ioc.stop_pipeline") @patch("pandablocks_ioc._hdf_ioc.create_default_pipeline") @@ -599,7 +954,7 @@ async def mock_data(scaled, flush_period): yield item # Set up all the mocks - hdf5_controller._get_filename = MagicMock( # type: ignore + hdf5_controller._get_filepath = MagicMock( # type: ignore return_value="Some/Filepath" ) hdf5_controller._client.data = mock_data # type: ignore @@ -622,8 +977,6 @@ async def mock_data(scaled, flush_period): EndData(1, EndReason.START_DATA_MISMATCH) ) - mock_stop_pipeline.assert_called_once() - @patch("pandablocks_ioc._hdf_ioc.stop_pipeline") @patch("pandablocks_ioc._hdf_ioc.create_default_pipeline") @@ -661,7 +1014,7 @@ async def mock_data(scaled, flush_period): raise CancelledError # Set up all the mocks - hdf5_controller._get_filename = MagicMock( # type: ignore + hdf5_controller._get_filepath = MagicMock( # type: ignore return_value="Some/Filepath" ) hdf5_controller._client.data = mock_data # type: ignore @@ -675,9 +1028,9 @@ async def mock_data(scaled, flush_period): assert hdf5_controller._status_message_record.get() == "Capturing disabled" # len 2 - one StartData, one EndData assert pipeline_mock[0].queue.put_nowait.call_count == 2 - pipeline_mock[0].queue.put_nowait.assert_called_with(EndData(0, EndReason.OK)) - - mock_stop_pipeline.assert_called_once() + pipeline_mock[0].queue.put_nowait.assert_called_with( + EndData(0, EndReason.MANUALLY_STOPPED) + ) @patch("pandablocks_ioc._hdf_ioc.stop_pipeline") @@ -716,7 +1069,7 @@ async def mock_data(scaled, flush_period): raise Exception("Test exception") # Set up all the mocks - hdf5_controller._get_filename = MagicMock( # type: ignore + hdf5_controller._get_filepath = MagicMock( # type: ignore return_value="Some/Filepath" ) hdf5_controller._client.data = mock_data # type: ignore @@ -737,8 +1090,6 @@ async def mock_data(scaled, flush_period): EndData(0, EndReason.UNKNOWN_EXCEPTION) ) - mock_stop_pipeline.assert_called_once() - async def test_capture_on_update( hdf5_controller: HDF5RecordController, @@ -781,13 +1132,13 @@ async def test_capture_on_update_cancel_unexpected_task( task_mock.cancel.assert_called_once() -def test_hdf_get_filename( +def test_hdf_get_filepath( hdf5_controller: HDF5RecordController, ): - """Test _get_filename works when all records have valid values""" + """Test _get_filepath works when all records have valid values""" - hdf5_controller._file_path_record = MagicMock() - hdf5_controller._file_path_record.get = MagicMock( # type: ignore + hdf5_controller._directory_record = MagicMock() + hdf5_controller._directory_record.get = MagicMock( # type: ignore return_value="/some/path" ) @@ -796,14 +1147,14 @@ def test_hdf_get_filename( return_value="some_filename" ) - assert hdf5_controller._get_filename() == "/some/path/some_filename" + assert hdf5_controller._get_filepath() == "/some/path/some_filename" def test_hdf_capture_validate_valid_filename( hdf5_controller: HDF5RecordController, ): """Test _capture_validate passes when a valid filename is given""" - hdf5_controller._get_filename = MagicMock( # type: ignore + hdf5_controller._get_filepath = MagicMock( # type: ignore return_value="/valid/file.h5" ) @@ -821,7 +1172,7 @@ def test_hdf_capture_validate_invalid_filename( hdf5_controller: HDF5RecordController, ): """Test _capture_validate fails when filename cannot be created""" - hdf5_controller._get_filename = MagicMock( # type: ignore + hdf5_controller._get_filepath = MagicMock( # type: ignore side_effect=ValueError("Mocked value error") ) @@ -832,7 +1183,7 @@ def test_hdf_capture_validate_exception( hdf5_controller: HDF5RecordController, ): """Test _capture_validate fails due to other exceptions""" - hdf5_controller._get_filename = MagicMock( # type: ignore + hdf5_controller._get_filepath = MagicMock( # type: ignore side_effect=Exception("Mocked error") ) diff --git a/tests/test_tables.py b/tests/test_tables.py index adf105c7..b1fac61f 100644 --- a/tests/test_tables.py +++ b/tests/test_tables.py @@ -363,9 +363,9 @@ async def test_table_updater_update_mode_submit_exception_data_error( assert isinstance(table_updater.client.send, AsyncMock) table_updater.client.send.side_effect = Exception("Mocked exception") - table_updater.all_values_dict[ - EpicsName(EPICS_FORMAT_TABLE_NAME) - ] = InErrorException("Mocked in error exception") + table_updater.all_values_dict[EpicsName(EPICS_FORMAT_TABLE_NAME)] = ( + InErrorException("Mocked in error exception") + ) await table_updater.update_mode(TableModeEnum.SUBMIT.value)