Skip to content

Commit

Permalink
Unkey after grouping to keep code cleaner
Browse files Browse the repository at this point in the history
  • Loading branch information
moradology committed Jun 3, 2024
1 parent f18c2b1 commit fa048f0
Showing 1 changed file with 2 additions and 3 deletions.
5 changes: 2 additions & 3 deletions pangeo_forge_recipes/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,7 @@ class TransferFilesWithConcurrency(beam.DoFn):
secrets: Optional[Dict] = None
open_kwargs: Optional[Dict] = None

def process(self, element):
# key here is assigned solely to limit number of workers; we drop it immediately
key, indexed_urls = element
def process(self, indexed_urls):
with ThreadPoolExecutor(max_workers=self.max_concurrency) as executor:
futures = {
executor.submit(self.transfer_file, index, url): (index, url)
Expand Down Expand Up @@ -213,6 +211,7 @@ def expand(self, pcoll):
pcoll
| "Assign Executor Grouping Key" >> beam.Map(self.assign_keys)
| "Group per-executor work" >> beam.GroupByKey()
| "Unkey after grouping" >> beam.Values()
| "Limited concurrency file transfer"
>> beam.ParDo(
TransferFilesWithConcurrency(
Expand Down

0 comments on commit fa048f0

Please sign in to comment.