From 8b2e560813b02f0d0d492ab2a09959d6fc64fcd4 Mon Sep 17 00:00:00 2001 From: Maddie Dawson Date: Thu, 2 Nov 2023 15:09:29 -0700 Subject: [PATCH] Update --- streaming/base/converters/dataframe_to_mds.py | 21 ++++++++----------- streaming/base/format/mds/writer.py | 4 +++- streaming/base/util.py | 4 +++- 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/streaming/base/converters/dataframe_to_mds.py b/streaming/base/converters/dataframe_to_mds.py index 1adf07eee..c9a59a704 100644 --- a/streaming/base/converters/dataframe_to_mds.py +++ b/streaming/base/converters/dataframe_to_mds.py @@ -280,12 +280,10 @@ def write_mds(iterator: Iterable): def merge_and_log(df: DataFrame, batch_id: int): partitions = df.collect() if len(partitions) == 0: + logger.warning(f'[Batch #{batch_id}] No records to write') return if merge_index: - index_files = [ - (row['mds_path_local'], row['mds_path_remote']) for row in partitions - ] lock_file_path = os.path.join(out, '.merge.lock') # Acquire the lock. while True: @@ -295,20 +293,19 @@ def merge_and_log(df: DataFrame, batch_id: int): time.sleep(1) # File already exists, wait and try again else: break - do_merge_index(index_files, out, keep_local=keep_local, download_timeout=60) + do_merge_index(out, keep_local=keep_local, download_timeout=60) # Release the lock. os.close(fd) + os.remove(lock_file_path) - sum_fail_count = 0 for row in partitions: - sum_fail_count += row['fail_count'] + logger.warning(f"[Batch #{batch_id}] {row['fail_count']} failed record(s) for {row['mds_path_local']}") - if sum_fail_count > 0: - logger.warning( - f'[Batch #{batch_id}] Total failed records = {sum_fail_count}\nOverall records {dataframe.count()}' - ) - - mapped_df.writeStream.foreachBatch(merge_and_log).start() + mapped_df \ + .writeStream \ + .foreachBatch(merge_and_log) \ + .start() \ + .awaitTermination() return None, 0 else: partitions = mapped_df.collect() diff --git a/streaming/base/format/mds/writer.py b/streaming/base/format/mds/writer.py index e82fc02a8..2517703ad 100644 --- a/streaming/base/format/mds/writer.py +++ b/streaming/base/format/mds/writer.py @@ -4,6 +4,7 @@ """:class:`MDSWriter` writes samples to ``.mds`` files that can be read by :class:`MDSReader`.""" import json +import time from typing import Any, Dict, List, Optional, Tuple, Union import numpy as np @@ -123,7 +124,8 @@ def get_config(self) -> Dict[str, Any]: obj.update({ 'column_names': self.column_names, 'column_encodings': self.column_encodings, - 'column_sizes': self.column_sizes + 'column_sizes': self.column_sizes, + 'write_timestamp': time.time(), }) return obj diff --git a/streaming/base/util.py b/streaming/base/util.py index d8042eda0..4151a6a0e 100644 --- a/streaming/base/util.py +++ b/streaming/base/util.py @@ -350,7 +350,9 @@ def _merge_index_from_list(index_file_urls: List[Union[str, Tuple[str, str]]], # Move merged index from temp path to local part in out # Upload merged index to remote if out has remote part - shutil.move(merged_index_path, cu.local) + dst_index_path = os.path.join(cu.local, os.path.basename(merged_index_path)) + shutil.copy(merged_index_path, dst_index_path) + os.remove(merged_index_path) if cu.remote is not None: cu.upload_file(index_basename)