diff --git a/wfdb/io/_signal.py b/wfdb/io/_signal.py index 68ca57e4..4ee09225 100644 --- a/wfdb/io/_signal.py +++ b/wfdb/io/_signal.py @@ -1066,6 +1066,7 @@ def _rd_segment( ignore_skew, no_file=False, sig_data=None, + sig_stream=None, return_res=64, ): """ @@ -1211,6 +1212,7 @@ def _rd_segment( sampto=sampto, no_file=no_file, sig_data=sig_data, + sig_stream=sig_stream, ) # Copy over the wanted signals @@ -1235,6 +1237,7 @@ def _rd_dat_signals( sampto, no_file=False, sig_data=None, + sig_stream=None, ): """ Read all signals from a WFDB dat file. @@ -1324,20 +1327,31 @@ def _rd_dat_signals( if no_file: data_to_read = sig_data elif fmt in COMPRESSED_FMTS: - data_to_read = _rd_compressed_file( - file_name=file_name, - dir_name=dir_name, - pn_dir=pn_dir, - fmt=fmt, - sample_offset=byte_offset, - n_sig=n_sig, - samps_per_frame=samps_per_frame, - start_frame=sampfrom, - end_frame=sampto, - ) + if sig_stream is not None: + data_to_read = _rd_compressed_stream( + fp=sig_stream, + fmt=fmt, + sample_offset=byte_offset, + n_sig=n_sig, + samps_per_frame=samps_per_frame, + start_frame=sampfrom, + end_frame=sampto, + ) + else: + data_to_read = _rd_compressed_file( + file_name=file_name, + dir_name=dir_name, + pn_dir=pn_dir, + fmt=fmt, + sample_offset=byte_offset, + n_sig=n_sig, + samps_per_frame=samps_per_frame, + start_frame=sampfrom, + end_frame=sampto, + ) else: data_to_read = _rd_dat_file( - file_name, dir_name, pn_dir, fmt, start_byte, n_read_samples + file_name, dir_name, pn_dir, fmt, start_byte, n_read_samples, sig_stream ) if extra_flat_samples: @@ -1577,7 +1591,7 @@ def _required_byte_num(mode, fmt, n_samp): return int(n_bytes) -def _rd_dat_file(file_name, dir_name, pn_dir, fmt, start_byte, n_samp): +def _rd_dat_file(file_name, dir_name, pn_dir, fmt, start_byte, n_samp, sig_stream): """ Read data from a dat file, either local or remote, into a 1d numpy array. @@ -1635,8 +1649,14 @@ def _rd_dat_file(file_name, dir_name, pn_dir, fmt, start_byte, n_samp): element_count = n_samp byte_count = n_samp * BYTES_PER_SAMPLE[fmt] + # Memory Stream + if sig_stream is not None: + sig_stream.seek(start_byte) + sig_data = np.frombuffer( + sig_stream.read(), dtype=np.dtype(DATA_LOAD_TYPES[fmt]), count=element_count + ) # Local dat file - if pn_dir is None: + elif pn_dir is None: with open(os.path.join(dir_name, file_name), "rb") as fp: fp.seek(start_byte) sig_data = np.fromfile( @@ -1651,7 +1671,6 @@ def _rd_dat_file(file_name, dir_name, pn_dir, fmt, start_byte, n_samp): return sig_data - def _blocks_to_samples(sig_data, n_samp, fmt): """ Convert uint8 blocks into signal samples for unaligned dat formats. @@ -1770,6 +1789,123 @@ def _blocks_to_samples(sig_data, n_samp, fmt): return sig +def _rd_compressed_stream( + fp, + fmt, + sample_offset, + n_sig, + samps_per_frame, + start_frame, + end_frame, +): + signature = fp.read(4) + if signature != b"fLaC": + raise ValueError(f"{fp.name} is not a FLAC file") + fp.seek(0) + + with soundfile.SoundFile(fp) as sf: + # Determine the actual resolution of the FLAC stream and the + # data type will use when reading it. Note that soundfile + # doesn't support int8. + if sf.subtype == "PCM_S8": + format_bits = 8 + read_dtype = "int16" + elif sf.subtype == "PCM_16": + format_bits = 16 + read_dtype = "int16" + elif sf.subtype == "PCM_24": + format_bits = 24 + read_dtype = "int32" + else: + raise ValueError(f"unknown subtype in {fp.name} ({sf.subtype})") + + max_bits = int(fmt) - 500 + if format_bits > max_bits: + raise ValueError( + f"wrong resolution in {fp.name} " + f"({format_bits}, expected <= {max_bits})" + ) + + if sf.channels != n_sig: + raise ValueError( + f"wrong number of channels in {fp.name} " + f"({sf.channels}, expected {n_sig})" + ) + + # Read the samples. + start_samp = start_frame * samps_per_frame[0] + end_samp = end_frame * samps_per_frame[0] + sf.seek(start_samp + sample_offset) + + # We could do this: + # sig_data = sf.read(end_samp - start_samp, dtype=read_dtype) + # However, sf.read fails for huge blocks (over 2**24 total + # samples) due to a bug in libsndfile: + # https://github.com/libsndfile/libsndfile/issues/431 + # So read the data in chunks instead. + n_samp = end_samp - start_samp + sig_data = np.empty((n_samp, n_sig), dtype=read_dtype) + CHUNK_SIZE = 1024 * 1024 + for chunk_start in range(0, n_samp, CHUNK_SIZE): + chunk_end = chunk_start + CHUNK_SIZE + chunk_data = sf.read(out=sig_data[chunk_start:chunk_end]) + samples_read = chunk_data.shape[0] + if samples_read != CHUNK_SIZE: + sig_data = sig_data[: chunk_start + samples_read] + break + + # If we read an 8-bit stream as int16 or a 24-bit stream as + # int32, soundfile shifts each sample left by 8 bits. We + # want to undo this shift (and, in the case of 8-bit data, + # convert to an int8 array.) + if format_bits == 8: + # np.right_shift(sig_data, 8, dtype='int8') doesn't work. + # This seems wrong, but the numpy documentation is unclear. + sig_data2 = np.empty(sig_data.shape, dtype="int8") + sig_data = np.right_shift(sig_data, 8, out=sig_data2) + elif format_bits == 24: + # Shift 32-bit array in-place. + np.right_shift(sig_data, 8, out=sig_data) + + # Suppose we have 3 channels and 2 samples per frame. The array + # returned by sf.read looks like this: + # + # channel 0 channel 1 channel 2 + # time 0 [0,0] [0,1] [0,2] + # time 1 [1,0] [1,1] [1,2] + # time 2 [2,0] [2,1] [2,2] + # time 3 [3,0] [3,1] [3,2] + # + # We reshape this first into the following: + # + # channel 0 channel 1 channel 2 + # time 0 [0,0,0] [0,0,1] [0,0,2] + # time 1 [0,1,0] [0,1,1] [0,1,2] + # time 2 [1,0,0] [1,0,1] [1,0,2] + # time 3 [1,1,0] [1,1,1] [1,1,2] + # + # Then we transpose axes 1 and 2: + # + # channel 0 channel 1 channel 2 + # time 0 [0,0,0] [0,1,0] [0,2,0] + # time 1 [0,0,1] [0,1,1] [0,2,1] + # time 2 [1,0,0] [1,1,0] [1,2,0] + # time 3 [1,0,1] [1,1,1] [1,2,1] + # + # Then when we reshape the array to 1D, the result is in dat file + # order: + # + # channel 0 channel 1 channel 2 + # time 0 [0] [2] [4] + # time 1 [1] [3] [5] + # time 2 [6] [8] [10] + # time 3 [7] [9] [11] + + sig_data = sig_data.reshape(-1, samps_per_frame[0], n_sig) + sig_data = sig_data.transpose(0, 2, 1) + return sig_data.reshape(-1) + + def _rd_compressed_file( file_name, dir_name, @@ -1834,112 +1970,7 @@ def _rd_compressed_file( file_name = os.path.join(dir_name, file_name) with _coreio._open_file(pn_dir, file_name, "rb") as fp: - signature = fp.read(4) - if signature != b"fLaC": - raise ValueError(f"{fp.name} is not a FLAC file") - fp.seek(0) - - with soundfile.SoundFile(fp) as sf: - # Determine the actual resolution of the FLAC stream and the - # data type will use when reading it. Note that soundfile - # doesn't support int8. - if sf.subtype == "PCM_S8": - format_bits = 8 - read_dtype = "int16" - elif sf.subtype == "PCM_16": - format_bits = 16 - read_dtype = "int16" - elif sf.subtype == "PCM_24": - format_bits = 24 - read_dtype = "int32" - else: - raise ValueError(f"unknown subtype in {fp.name} ({sf.subtype})") - - max_bits = int(fmt) - 500 - if format_bits > max_bits: - raise ValueError( - f"wrong resolution in {fp.name} " - f"({format_bits}, expected <= {max_bits})" - ) - - if sf.channels != n_sig: - raise ValueError( - f"wrong number of channels in {fp.name} " - f"({sf.channels}, expected {n_sig})" - ) - - # Read the samples. - start_samp = start_frame * samps_per_frame[0] - end_samp = end_frame * samps_per_frame[0] - sf.seek(start_samp + sample_offset) - - # We could do this: - # sig_data = sf.read(end_samp - start_samp, dtype=read_dtype) - # However, sf.read fails for huge blocks (over 2**24 total - # samples) due to a bug in libsndfile: - # https://github.com/libsndfile/libsndfile/issues/431 - # So read the data in chunks instead. - n_samp = end_samp - start_samp - sig_data = np.empty((n_samp, n_sig), dtype=read_dtype) - CHUNK_SIZE = 1024 * 1024 - for chunk_start in range(0, n_samp, CHUNK_SIZE): - chunk_end = chunk_start + CHUNK_SIZE - chunk_data = sf.read(out=sig_data[chunk_start:chunk_end]) - samples_read = chunk_data.shape[0] - if samples_read != CHUNK_SIZE: - sig_data = sig_data[: chunk_start + samples_read] - break - - # If we read an 8-bit stream as int16 or a 24-bit stream as - # int32, soundfile shifts each sample left by 8 bits. We - # want to undo this shift (and, in the case of 8-bit data, - # convert to an int8 array.) - if format_bits == 8: - # np.right_shift(sig_data, 8, dtype='int8') doesn't work. - # This seems wrong, but the numpy documentation is unclear. - sig_data2 = np.empty(sig_data.shape, dtype="int8") - sig_data = np.right_shift(sig_data, 8, out=sig_data2) - elif format_bits == 24: - # Shift 32-bit array in-place. - np.right_shift(sig_data, 8, out=sig_data) - - # Suppose we have 3 channels and 2 samples per frame. The array - # returned by sf.read looks like this: - # - # channel 0 channel 1 channel 2 - # time 0 [0,0] [0,1] [0,2] - # time 1 [1,0] [1,1] [1,2] - # time 2 [2,0] [2,1] [2,2] - # time 3 [3,0] [3,1] [3,2] - # - # We reshape this first into the following: - # - # channel 0 channel 1 channel 2 - # time 0 [0,0,0] [0,0,1] [0,0,2] - # time 1 [0,1,0] [0,1,1] [0,1,2] - # time 2 [1,0,0] [1,0,1] [1,0,2] - # time 3 [1,1,0] [1,1,1] [1,1,2] - # - # Then we transpose axes 1 and 2: - # - # channel 0 channel 1 channel 2 - # time 0 [0,0,0] [0,1,0] [0,2,0] - # time 1 [0,0,1] [0,1,1] [0,2,1] - # time 2 [1,0,0] [1,1,0] [1,2,0] - # time 3 [1,0,1] [1,1,1] [1,2,1] - # - # Then when we reshape the array to 1D, the result is in dat file - # order: - # - # channel 0 channel 1 channel 2 - # time 0 [0] [2] [4] - # time 1 [1] [3] [5] - # time 2 [6] [8] [10] - # time 3 [7] [9] [11] - - sig_data = sig_data.reshape(-1, samps_per_frame[0], n_sig) - sig_data = sig_data.transpose(0, 2, 1) - return sig_data.reshape(-1) + return _rd_compressed_stream(fp, fmt, sample_offset, n_sig, samps_per_frame, start_frame, end_frame) def _skew_sig( diff --git a/wfdb/io/record.py b/wfdb/io/record.py index 09496396..e40c68ac 100644 --- a/wfdb/io/record.py +++ b/wfdb/io/record.py @@ -1788,7 +1788,7 @@ def check_np_array(item, field_name, ndim, parent_class, channel_num=None): # ------------------------- Reading Records --------------------------- # -def rdheader(record_name, pn_dir=None, rd_segments=False): +def rdheader(record_name, pn_dir=None, rd_segments=False, hea_stream=None): """ Read a WFDB header file and return a `Record` or `MultiRecord` object with the record descriptors as attributes. @@ -1826,25 +1826,28 @@ def rdheader(record_name, pn_dir=None, rd_segments=False): dir_name, base_record_name = os.path.split(record_name) dir_name = os.path.abspath(dir_name) - # Construct the download path using the database version - if (pn_dir is not None) and ("." not in pn_dir): - dir_list = pn_dir.split("/") - pn_dir = posixpath.join( - dir_list[0], download.get_version(dir_list[0]), *dir_list[1:] - ) - - # Read the local or remote header file. - file_name = f"{base_record_name}.hea" - if pn_dir is None: - with open( - os.path.join(dir_name, file_name), - "r", - encoding="ascii", - errors="ignore", - ) as f: - header_content = f.read() + if hea_stream is not None: + header_content = hea_stream.read().decode('ascii') else: - header_content = download._stream_header(file_name, pn_dir) + # Construct the download path using the database version + if (pn_dir is not None) and ("." not in pn_dir): + dir_list = pn_dir.split("/") + pn_dir = posixpath.join( + dir_list[0], download.get_version(dir_list[0]), *dir_list[1:] + ) + + # Read the local or remote header file. + file_name = f"{base_record_name}.hea" + if pn_dir is None: + with open( + os.path.join(dir_name, file_name), + "r", + encoding="ascii", + errors="ignore", + ) as f: + header_content = f.read() + else: + header_content = download._stream_header(file_name, pn_dir) # Separate comment and non-comment lines header_lines, comment_lines = header.parse_header_content(header_content) @@ -1925,6 +1928,8 @@ def rdrecord( force_channels=True, channel_names=None, warn_empty=False, + hea_stream=None, + sig_stream=None, ): """ Read a WFDB record and return the signal and record descriptors as @@ -2026,7 +2031,7 @@ def rdrecord( dir_list[0], download.get_version(dir_list[0]), *dir_list[1:] ) - record = rdheader(record_name, pn_dir=pn_dir, rd_segments=False) + record = rdheader(record_name, pn_dir=pn_dir, rd_segments=False, hea_stream=hea_stream) # Set defaults for sampto and channels input variables if sampto is None: @@ -2127,6 +2132,7 @@ def rdrecord( ignore_skew=ignore_skew, no_file=no_file, sig_data=sig_data, + sig_stream=sig_stream, return_res=return_res, )