From 9092bb2c9c8f099eb49b9672911e53f130e628b6 Mon Sep 17 00:00:00 2001 From: mattjbr123 Date: Wed, 28 Aug 2024 14:58:56 +0100 Subject: [PATCH] rearrange order of files read in and improved parallelisation selection --- scripts/convert_GEAR_beam.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/scripts/convert_GEAR_beam.py b/scripts/convert_GEAR_beam.py index 50959e2..9a99e96 100644 --- a/scripts/convert_GEAR_beam.py +++ b/scripts/convert_GEAR_beam.py @@ -7,6 +7,7 @@ # and be adapted for your own datasets. import os +import pdb import apache_beam as beam from pangeo_forge_recipes.patterns import ConcatDim, FilePattern from apache_beam.options.pipeline_options import PipelineOptions @@ -27,7 +28,7 @@ td = "/work/scratch-pw2/mattjbr" tn = "gear_1hrly_fulloutput_yearly_100km_chunks.zarr" target_chunks = {"time": int(365.25*24), "y": 100, "x": 100, "bnds": 2} -#nprocs = 64 +nprocs = 8 prune = 12 # no. of files to process, set to 0 to use all if not os.path.exists(td): @@ -39,7 +40,7 @@ def make_path(time): years = list(range(startyear, endyear + 1)) months = list(range(1, 13)) -ymonths = [f"{year}{month:02d}" for month in months for year in years] +ymonths = [f"{year}{month:02d}" for year in years for month in months] time_concat_dim = ConcatDim("time", ymonths) pattern = FilePattern(make_path, time_concat_dim) @@ -96,11 +97,12 @@ def expand(self, pcoll: beam.PCollection) -> beam.PCollection: | ConsolidateMetadata() ) -#beam_options = PipelineOptions( -# direct_num_workers=nprocs, direct_running_mode="multi_processing" -#) -#with beam.Pipeline(options=beam_options) as p: -# p | recipe - -with beam.Pipeline() as p: - p | recipe +if nprocs > 1: + beam_options = PipelineOptions( + direct_num_workers=nprocs, direct_running_mode="multi_processing" + ) + with beam.Pipeline(options=beam_options) as p: + p | recipe +else: + with beam.Pipeline() as p: + p | recipe