Skip to content

Commit

Permalink
Merge pull request apache#27823 Nest all transform-specific arguments…
Browse files Browse the repository at this point in the history
… in an explicit 'config' parameter.
  • Loading branch information
robertwb authored Aug 3, 2023
2 parents 966bb2f + a606c53 commit b817b5e
Show file tree
Hide file tree
Showing 7 changed files with 373 additions and 230 deletions.
158 changes: 105 additions & 53 deletions sdks/python/apache_beam/yaml/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,11 @@ writes it out in json format.
pipeline:
transforms:
- type: ReadFromCsv
path: /path/to/input*.csv
config:
path: /path/to/input*.csv
- type: WriteToJson
path: /path/to/output.json
config:
path: /path/to/output.json
input: ReadFromCsv
```

Expand All @@ -68,12 +70,15 @@ We can also add a transformation
pipeline:
transforms:
- type: ReadFromCsv
path: /path/to/input*.csv
config:
path: /path/to/input*.csv
- type: PyFilter
keep: "lambda x: x.col3 > 100"
config:
keep: "lambda x: x.col3 > 100"
input: ReadFromCsv
- type: WriteToJson
path: /path/to/output.json
config:
path: /path/to/output.json
input: PyFilter
```

Expand All @@ -83,16 +88,20 @@ or two.
pipeline:
transforms:
- type: ReadFromCsv
path: /path/to/input*.csv
config:
path: /path/to/input*.csv
- type: PyFilter
keep: "lambda x: x.col3 > 100"
config:
keep: "lambda x: x.col3 > 100"
input: ReadFromCsv
- type: Sql
name: MySqlTransform
query: "select col1, count(*) as cnt from PCOLLECTION group by col1"
config:
query: "select col1, count(*) as cnt from PCOLLECTION group by col1"
input: PyFilter
- type: WriteToJson
path: /path/to/output.json
config:
path: /path/to/output.json
input: MySqlTransform
```

Expand All @@ -105,14 +114,18 @@ pipeline:
transforms:
- type: ReadFromCsv
path: /path/to/input*.csv
config:
path: /path/to/input*.csv
- type: PyFilter
keep: "lambda x: x.col3 > 100"
config:
keep: "lambda x: x.col3 > 100"
- type: Sql
name: MySqlTransform
query: "select col1, count(*) as cnt from PCOLLECTION group by col1"
config:
query: "select col1, count(*) as cnt from PCOLLECTION group by col1"
- type: WriteToJson
path: /path/to/output.json
config:
path: /path/to/output.json
```

As syntactic sugar, we can name the first and last transforms in our pipeline
Expand All @@ -124,19 +137,23 @@ pipeline:
source:
type: ReadFromCsv
path: /path/to/input*.csv
config:
path: /path/to/input*.csv
transforms:
- type: PyFilter
keep: "lambda x: x.col3 > 100"
config:
keep: "lambda x: x.col3 > 100"
- type: Sql
name: MySqlTransform
query: "select col1, count(*) as cnt from PCOLLECTION group by col1"
config:
query: "select col1, count(*) as cnt from PCOLLECTION group by col1"
sink:
type: WriteToJson
path: /path/to/output.json
config:
path: /path/to/output.json
```

Arbitrary non-linear pipelines are supported as well, though in this case
Expand All @@ -147,32 +164,38 @@ Here we read two sources, join them, and write two outputs.
pipeline:
- type: ReadFromCsv
name: ReadLeft
path: /path/to/left*.csv
config:
path: /path/to/left*.csv
- type: ReadFromCsv
name: ReadRight
path: /path/to/right*.csv
config:
path: /path/to/right*.csv
- type: Sql
query: select left.col1, right.col2 from left join right using (col3)
config:
query: select left.col1, right.col2 from left join right using (col3)
input:
left: ReadLeft
right: ReadRight
- type: WriteToJson
name: WriteAll
input: Sql
path: /path/to/all.json
config:
path: /path/to/all.json
- type: PyFilter
name: FilterToBig
input: Sql
keep: "lambda x: x.col2 > 100"
config:
keep: "lambda x: x.col2 > 100"
- type: WriteToCsv
name: WriteBig
input: FilterToBig
path: /path/to/big.csv
config:
path: /path/to/big.csv
```

One can, however, nest `chains` within a non-linear pipeline.
Expand All @@ -183,36 +206,44 @@ that has a single input and contains its own sink.
pipeline:
- type: ReadFromCsv
name: ReadLeft
path: /path/to/left*.csv
config:
path: /path/to/left*.csv
- type: ReadFromCsv
name: ReadRight
path: /path/to/right*.csv
config:
path: /path/to/right*.csv
- type: Sql
query: select left.col1, right.col2 from left join right using (col3)
config:
query: select left.col1, right.col2 from left join right using (col3)
input:
left: ReadLeft
right: ReadRight
- type: WriteToJson
name: WriteAll
input: Sql
path: /path/to/all.json
config:
path: /path/to/all.json
- type: chain
name: ExtraProcessingForBigRows
input: Sql
transforms:
- type: PyFilter
keep: "lambda x: x.col2 > 100"
config:
keep: "lambda x: x.col2 > 100"
- type: PyFilter
keep: "lambda x: len(x.col1) > 10"
config:
keep: "lambda x: len(x.col1) > 10"
- type: PyFilter
keep: "lambda x: x.col1 > 'z'"
config:
keep: "lambda x: x.col1 > 'z'"
sink:
type: WriteToCsv
path: /path/to/big.csv
config:
path: /path/to/big.csv
```

## Windowing
Expand All @@ -230,14 +261,16 @@ pipeline:
type: chain
transforms:
- type: ReadFromPubSub
topic: myPubSubTopic
config:
topic: myPubSubTopic
- type: WindowInto
windowing:
type: fixed
size: 60
- type: SomeAggregation
- type: WriteToPubSub
topic: anotherPubSubTopic
config:
topic: anotherPubSubTopic
```

Rather than using an explicit `WindowInto` operation, one may instead tag a
Expand All @@ -249,14 +282,16 @@ pipeline:
type: chain
transforms:
- type: ReadFromPubSub
topic: myPubSubTopic
config:
topic: myPubSubTopic
- type: SomeAggregation
windowing:
type: sliding
size: 60
period: 10
- type: WriteToPubSub
topic: anotherPubSubTopic
config:
topic: anotherPubSubTopic
```

Note that the `Sql` operation itself is often a from of aggregation, and
Expand All @@ -268,14 +303,17 @@ pipeline:
type: chain
transforms:
- type: ReadFromPubSub
topic: myPubSubTopic
config:
topic: myPubSubTopic
- type: Sql
query: "select col1, count(*) as c from PCOLLECTION"
config:
query: "select col1, count(*) as c from PCOLLECTION"
windowing:
type: sessions
gap: 60
- type: WriteToPubSub
topic: anotherPubSubTopic
config:
topic: anotherPubSubTopic
```

The specified windowing is applied to all inputs, in this case resulting in
Expand All @@ -285,14 +323,17 @@ a join per window.
pipeline:
- type: ReadFromPubSub
name: ReadLeft
topic: leftTopic
config:
topic: leftTopic
- type: ReadFromPubSub
name: ReadRight
topic: rightTopic
config:
topic: rightTopic
- type: Sql
query: select left.col1, right.col2 from left join right using (col3)
config:
query: select left.col1, right.col2 from left join right using (col3)
input:
left: ReadLeft
right: ReadRight
Expand All @@ -310,14 +351,17 @@ pipeline:
type: chain
transforms:
- type: ReadFromPubSub
topic: myPubSubTopic
config:
topic: myPubSubTopic
windowing:
type: fixed
size: 60
- type: Sql
query: "select col1, count(*) as c from PCOLLECTION"
config:
query: "select col1, count(*) as c from PCOLLECTION"
- type: WriteToPubSub
topic: anotherPubSubTopic
config:
topic: anotherPubSubTopic
```

One can also specify windowing at the top level of a pipeline (or composite),
Expand All @@ -330,11 +374,14 @@ pipeline:
type: chain
transforms:
- type: ReadFromPubSub
topic: myPubSubTopic
config:
topic: myPubSubTopic
- type: Sql
query: "select col1, count(*) as c from PCOLLECTION"
config:
query: "select col1, count(*) as c from PCOLLECTION"
- type: WriteToPubSub
topic: anotherPubSubTopic
config:
topic: anotherPubSubTopic
windowing:
type: fixed
size: 60
Expand All @@ -349,18 +396,21 @@ pipeline:
source:
type: ReadFromPubSub
topic: myPubSubTopic
config:
topic: myPubSubTopic
windowing:
type: fixed
size: 10
transforms:
- type: Sql
query: "select col1, count(*) as c from PCOLLECTION"
config:
query: "select col1, count(*) as c from PCOLLECTION"
sink:
type: WriteToCsv
path: /path/to/output.json
config:
path: /path/to/output.json
windowing:
type: fixed
size: 300
Expand All @@ -384,16 +434,18 @@ pipeline:
type: chain
source:
type: ReadFromCsv
path: /path/to/input*.csv
config:
path: /path/to/input*.csv
transforms:
- type: MyCustomTransform
args:
config:
arg: whatever
sink:
type: WriteToJson
path: /path/to/output.json
config:
path: /path/to/output.json
providers:
- type: javaJar
Expand Down
5 changes: 3 additions & 2 deletions sdks/python/apache_beam/yaml/readme_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def replace_recursive(spec, transform_type, arg_name, arg_value):
for (key, value) in spec.items()
}
if spec.get('type', None) == transform_type:
spec[arg_name] = arg_value
spec['config'][arg_name] = arg_value
return spec
elif isinstance(spec, list):
return [
Expand Down Expand Up @@ -225,7 +225,8 @@ def parse_test_methods(markdown_lines):
' type: chain',
' transforms:',
' - type: ReadFromCsv',
' path: whatever',
' config:',
' path: whatever',
] + [' ' + line for line in code_lines]
if code_lines[0] == 'pipeline:':
yaml_pipeline = '\n'.join(code_lines)
Expand Down
Loading

0 comments on commit b817b5e

Please sign in to comment.