Skip to content

Commit

Permalink
Merge pull request #300 from openclimatefix/issue/cp-native-files
Browse files Browse the repository at this point in the history
Issue/cp native files
  • Loading branch information
peterdudfield authored Sep 27, 2024
2 parents 57178b0 + 062c846 commit 89a7e00
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 15 deletions.
44 changes: 29 additions & 15 deletions satip/eumetsat.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ def download_date_range(
datasets = identify_available_datasets(start_date, end_date, product_id=product_id)
self.download_datasets(datasets, product_id=product_id)


def download_datasets(self, datasets, product_id="EO:EUM:DAT:MSG:MSG15-RSS"):
"""Downloads a product-id- and date-range-specific dataset from the EUMETSAT API
Expand All @@ -356,26 +357,39 @@ def download_datasets(self, datasets, product_id="EO:EUM:DAT:MSG:MSG15-RSS"):

for dataset_id in dataset_ids:
log.debug(f"Downloading: {dataset_id}", parent="DownloadManager")
dataset_link = dataset_id_to_link(
product_id, dataset_id, access_token=self.access_token
)
# Download the raw data
try:
self.download_single_dataset(dataset_link)
except HTTPError:
log.debug("The EUMETSAT access token has been refreshed", parent="DownloadManager")
self.request_access_token()

# get raw files from s3, if there
files = utils.move_files(dataset_id=dataset_id,
data_dir_from=self.native_file_dir,
data_dir_to=self.data_dir)
if len(files) == 0:

dataset_link = dataset_id_to_link(
product_id, dataset_id, access_token=self.access_token
)
self.download_single_dataset(dataset_link)
except Exception as e:
log.error(
f"Error downloading dataset with id {dataset_id}: {e}",
exc_info=True,
parent="DownloadManager",
# Download the raw data
try:
self.download_single_dataset(dataset_link)
except HTTPError:
log.debug("The EUMETSAT access token has been refreshed",
parent="DownloadManager")
self.request_access_token()
dataset_link = dataset_id_to_link(
product_id, dataset_id, access_token=self.access_token
)
self.download_single_dataset(dataset_link)
except Exception as e:
log.error(
f"Error downloading dataset with id {dataset_id}: {e}",
exc_info=True,
parent="DownloadManager",
)

# save raw files to s3
utils.move_files(dataset_id=dataset_id,
data_dir_from=self.data_dir,
data_dir_to=self.native_file_dir)

def download_tailored_date_range(
self,
start_date: str,
Expand Down
39 changes: 39 additions & 0 deletions satip/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1069,3 +1069,42 @@ def get_memory() -> str:
Gets memory of process as a string
"""
return f"{psutil.Process(os.getpid()).memory_info().rss / 1024 ** 2} MB"


def move_files(dataset_id: str, data_dir_from, data_dir_to):
""" Move files for dataset_id
Args:
dataset_id: The dataset_id to move files for
data_dir_from: The directory to move files from
data_dir_to: The directory to move files to
Returns:
files: List of files moved
"""

data_store_filename_from = f"{data_dir_from}/{dataset_id}*"

# get list of all files that match data_store_filename_remote
fs_from = fsspec.open(data_dir_from).fs
fs_to = fsspec.open(data_dir_to).fs
files = fs_from.glob(data_store_filename_from)

if len(files) > 0:
# download the files to data_dir in
log.info(f'Copying files ({len(files)}) from native file store ({data_dir_from}) '
f'to data directory ({data_dir_to})')
for file in files:
# get file name
file_name = file.split('/')[-1]

if hasattr(fs_to,'local_file'):
# copy file from remote to local
fs_from.get(file, data_dir_to + '/' + file_name)
else:
# copy file from local to remote
fs_to.put(file, data_dir_to + '/' + file_name)
else:
log.error(f'No files found for dataset_id {dataset_id} in {data_dir_from}')

return files

0 comments on commit 89a7e00

Please sign in to comment.