From d16d78ed40c877e9278918b90454382140393fd5 Mon Sep 17 00:00:00 2001 From: peterdudfield Date: Fri, 27 Sep 2024 08:59:40 +0100 Subject: [PATCH 1/7] move files from native to local raw dir --- satip/eumetsat.py | 41 ++++++++++++++++++++++++++--------------- satip/utils.py | 25 +++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 15 deletions(-) diff --git a/satip/eumetsat.py b/satip/eumetsat.py index fd07bc94..e7e591d4 100644 --- a/satip/eumetsat.py +++ b/satip/eumetsat.py @@ -335,6 +335,7 @@ def download_date_range( datasets = identify_available_datasets(start_date, end_date, product_id=product_id) self.download_datasets(datasets, product_id=product_id) + def download_datasets(self, datasets, product_id="EO:EUM:DAT:MSG:MSG15-RSS"): """Downloads a product-id- and date-range-specific dataset from the EUMETSAT API @@ -356,26 +357,36 @@ def download_datasets(self, datasets, product_id="EO:EUM:DAT:MSG:MSG15-RSS"): for dataset_id in dataset_ids: log.debug(f"Downloading: {dataset_id}", parent="DownloadManager") - dataset_link = dataset_id_to_link( - product_id, dataset_id, access_token=self.access_token - ) - # Download the raw data - try: - self.download_single_dataset(dataset_link) - except HTTPError: - log.debug("The EUMETSAT access token has been refreshed", parent="DownloadManager") - self.request_access_token() + + files = utils.move_files(dataset_id=dataset_id, + data_dir_from=self.native_file_dir, + data_dir_to=self.data_dir) + if len(files) == 0: + dataset_link = dataset_id_to_link( product_id, dataset_id, access_token=self.access_token ) - self.download_single_dataset(dataset_link) - except Exception as e: - log.error( - f"Error downloading dataset with id {dataset_id}: {e}", - exc_info=True, - parent="DownloadManager", + # Download the raw data + try: + self.download_single_dataset(dataset_link) + except HTTPError: + log.debug("The EUMETSAT access token has been refreshed", parent="DownloadManager") + self.request_access_token() + dataset_link = dataset_id_to_link( + product_id, dataset_id, access_token=self.access_token + ) + self.download_single_dataset(dataset_link) + except Exception as e: + log.error( + f"Error downloading dataset with id {dataset_id}: {e}", + exc_info=True, + parent="DownloadManager", ) + utils.move_files(dataset_id=dataset_id, + data_dir_from=self.data_dir, + data_dir_to=self.native_file_dir) + def download_tailored_date_range( self, start_date: str, diff --git a/satip/utils.py b/satip/utils.py index 695c3c3e..33b8bef5 100644 --- a/satip/utils.py +++ b/satip/utils.py @@ -1069,3 +1069,28 @@ def get_memory() -> str: Gets memory of process as a string """ return f"{psutil.Process(os.getpid()).memory_info().rss / 1024 ** 2} MB" + + +def move_files(dataset_id: str, data_dir_from, data_dir_to): + """ Move files for dataset_id + + Args: + dataset_id: The dataset_id to move files for + data_dir_from: The directory to move files from + data_dir_to: The directory to move files to + + Returns: + files: List of files moved + """ + + data_store_filename_remote = f"{data_dir_from}/{dataset_id}*" + # get list of all files that match data_store_filename_remote + fs = fsspec.open(data_store_filename_remote).fs + files = fs.glob(data_store_filename_remote) + if len(files) > 0: + # download the files to data_dir in + log.info(f'Copying files ({len(files)}) from native file store ({data_dir_from}) ' + f'to data directory ({data_dir_to})') + fs.cp(files, data_dir_to) + + return files \ No newline at end of file From 8d4aca57d0ec1c2b56ba309409afc3a3f7a09973 Mon Sep 17 00:00:00 2001 From: peterdudfield Date: Fri, 27 Sep 2024 11:14:49 +0100 Subject: [PATCH 2/7] fix for getting fs --- satip/utils.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/satip/utils.py b/satip/utils.py index 33b8bef5..3fa56fc4 100644 --- a/satip/utils.py +++ b/satip/utils.py @@ -1084,13 +1084,17 @@ def move_files(dataset_id: str, data_dir_from, data_dir_to): """ data_store_filename_remote = f"{data_dir_from}/{dataset_id}*" + # get list of all files that match data_store_filename_remote - fs = fsspec.open(data_store_filename_remote).fs + fs = fsspec.open(data_dir_from).fs files = fs.glob(data_store_filename_remote) + if len(files) > 0: # download the files to data_dir in log.info(f'Copying files ({len(files)}) from native file store ({data_dir_from}) ' f'to data directory ({data_dir_to})') fs.cp(files, data_dir_to) + else: + log.error(f'No files found for dataset_id {dataset_id} in {data_dir_from}') return files \ No newline at end of file From 8c4918968c4c66bbaca8d75bd8ca20e0253a0ed3 Mon Sep 17 00:00:00 2001 From: peterdudfield Date: Fri, 27 Sep 2024 12:54:13 +0100 Subject: [PATCH 3/7] fix for uploading file --- satip/utils.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/satip/utils.py b/satip/utils.py index 3fa56fc4..c0331baf 100644 --- a/satip/utils.py +++ b/satip/utils.py @@ -1083,17 +1083,23 @@ def move_files(dataset_id: str, data_dir_from, data_dir_to): files: List of files moved """ - data_store_filename_remote = f"{data_dir_from}/{dataset_id}*" + data_store_filename_from = f"{data_dir_from}/{dataset_id}*" # get list of all files that match data_store_filename_remote - fs = fsspec.open(data_dir_from).fs - files = fs.glob(data_store_filename_remote) + fs_from = fsspec.open(data_dir_from).fs + fs_to = fsspec.open(data_dir_to).fs + files = fs_from.glob(data_store_filename_from) if len(files) > 0: # download the files to data_dir in log.info(f'Copying files ({len(files)}) from native file store ({data_dir_from}) ' f'to data directory ({data_dir_to})') - fs.cp(files, data_dir_to) + for file in files: + # get file name + file_name = file.split('/')[-1] + + # copy file + fs_to.put(file, data_dir_to + '/' + file_name) else: log.error(f'No files found for dataset_id {dataset_id} in {data_dir_from}') From 2894eaf25d3bd7946857fcfbf55604be1a1ca69d Mon Sep 17 00:00:00 2001 From: peterdudfield Date: Fri, 27 Sep 2024 13:08:50 +0100 Subject: [PATCH 4/7] update, split case, if local or remote --- satip/utils.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/satip/utils.py b/satip/utils.py index c0331baf..3d822f00 100644 --- a/satip/utils.py +++ b/satip/utils.py @@ -1098,8 +1098,12 @@ def move_files(dataset_id: str, data_dir_from, data_dir_to): # get file name file_name = file.split('/')[-1] - # copy file - fs_to.put(file, data_dir_to + '/' + file_name) + if hasattr(fs_to,'local_file'): + # copy file from remote to local + fs_from.get(file, data_dir_to + '/' + file_name) + else: + # copy file from local to remote + fs_to.put(file, data_dir_to + '/' + file_name) else: log.error(f'No files found for dataset_id {dataset_id} in {data_dir_from}') From 91851866ae1acf7d27e3825be45261b44ebb52d6 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 27 Sep 2024 12:49:13 +0000 Subject: [PATCH 5/7] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- satip/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/satip/utils.py b/satip/utils.py index 3d822f00..0733227d 100644 --- a/satip/utils.py +++ b/satip/utils.py @@ -1107,4 +1107,4 @@ def move_files(dataset_id: str, data_dir_from, data_dir_to): else: log.error(f'No files found for dataset_id {dataset_id} in {data_dir_from}') - return files \ No newline at end of file + return files From 0a0b32979373dfa3b98385d12466b16230b367f6 Mon Sep 17 00:00:00 2001 From: peterdudfield Date: Fri, 27 Sep 2024 13:51:23 +0100 Subject: [PATCH 6/7] lint --- satip/eumetsat.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/satip/eumetsat.py b/satip/eumetsat.py index e7e591d4..c0938eb8 100644 --- a/satip/eumetsat.py +++ b/satip/eumetsat.py @@ -370,7 +370,8 @@ def download_datasets(self, datasets, product_id="EO:EUM:DAT:MSG:MSG15-RSS"): try: self.download_single_dataset(dataset_link) except HTTPError: - log.debug("The EUMETSAT access token has been refreshed", parent="DownloadManager") + log.debug("The EUMETSAT access token has been refreshed", + parent="DownloadManager") self.request_access_token() dataset_link = dataset_id_to_link( product_id, dataset_id, access_token=self.access_token From 062c8468c91ca5fd238bad82a2693138fcc41dd1 Mon Sep 17 00:00:00 2001 From: peterdudfield Date: Fri, 27 Sep 2024 13:52:39 +0100 Subject: [PATCH 7/7] add comments --- satip/eumetsat.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/satip/eumetsat.py b/satip/eumetsat.py index c0938eb8..afc4656d 100644 --- a/satip/eumetsat.py +++ b/satip/eumetsat.py @@ -358,6 +358,7 @@ def download_datasets(self, datasets, product_id="EO:EUM:DAT:MSG:MSG15-RSS"): for dataset_id in dataset_ids: log.debug(f"Downloading: {dataset_id}", parent="DownloadManager") + # get raw files from s3, if there files = utils.move_files(dataset_id=dataset_id, data_dir_from=self.native_file_dir, data_dir_to=self.data_dir) @@ -384,6 +385,7 @@ def download_datasets(self, datasets, product_id="EO:EUM:DAT:MSG:MSG15-RSS"): parent="DownloadManager", ) + # save raw files to s3 utils.move_files(dataset_id=dataset_id, data_dir_from=self.data_dir, data_dir_to=self.native_file_dir)