Skip to content

Commit

Permalink
rearrange order of files read in and improved parallelisation selection
Browse files Browse the repository at this point in the history
  • Loading branch information
mattjbr123 committed Aug 28, 2024
1 parent b55a161 commit 9092bb2
Showing 1 changed file with 12 additions and 10 deletions.
22 changes: 12 additions & 10 deletions scripts/convert_GEAR_beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
# and be adapted for your own datasets.

import os
import pdb
import apache_beam as beam
from pangeo_forge_recipes.patterns import ConcatDim, FilePattern
from apache_beam.options.pipeline_options import PipelineOptions
Expand All @@ -27,7 +28,7 @@
td = "/work/scratch-pw2/mattjbr"
tn = "gear_1hrly_fulloutput_yearly_100km_chunks.zarr"
target_chunks = {"time": int(365.25*24), "y": 100, "x": 100, "bnds": 2}
#nprocs = 64
nprocs = 8
prune = 12 # no. of files to process, set to 0 to use all

if not os.path.exists(td):
Expand All @@ -39,7 +40,7 @@ def make_path(time):

years = list(range(startyear, endyear + 1))
months = list(range(1, 13))
ymonths = [f"{year}{month:02d}" for month in months for year in years]
ymonths = [f"{year}{month:02d}" for year in years for month in months]
time_concat_dim = ConcatDim("time", ymonths)

pattern = FilePattern(make_path, time_concat_dim)
Expand Down Expand Up @@ -96,11 +97,12 @@ def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
| ConsolidateMetadata()
)

#beam_options = PipelineOptions(
# direct_num_workers=nprocs, direct_running_mode="multi_processing"
#)
#with beam.Pipeline(options=beam_options) as p:
# p | recipe

with beam.Pipeline() as p:
p | recipe
if nprocs > 1:
beam_options = PipelineOptions(
direct_num_workers=nprocs, direct_running_mode="multi_processing"
)
with beam.Pipeline(options=beam_options) as p:
p | recipe
else:
with beam.Pipeline() as p:
p | recipe

0 comments on commit 9092bb2

Please sign in to comment.