Skip to content

Commit

Permalink
Support string FQN as a way to add lineage information (#32613)
Browse files Browse the repository at this point in the history
* Support string FQN as a way to add lineage information

* clarify the two use case of lineage.add
  • Loading branch information
Abacn authored Oct 1, 2024
1 parent eaf53e5 commit 00445ad
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 1 deletion.
26 changes: 25 additions & 1 deletion sdks/python/apache_beam/metrics/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,31 @@ def get_fq_name(

def add(
self, system: str, *segments: str, subtype: Optional[str] = None) -> None:
self.metric.add(self.get_fq_name(system, *segments, subtype=subtype))
"""
Adds the given details as Lineage.
For asset level lineage the resource location should be specified as
Dataplex FQN, see
https://cloud.google.com/data-catalog/docs/fully-qualified-names
Example of adding FQN components:
- `add("system", "segment1", "segment2")`
- `add("system", "segment1", "segment2", subtype="subtype")`
Example of adding a FQN:
- `add("system:segment1.segment2")`
- `add("system:subtype:segment1.segment2")`
The first positional argument serves as system, if full segments are
provided, or the full FQN if it is provided as a single argument.
"""
system_or_details = system
if len(segments) == 0 and subtype is None:
self.metric.add(system_or_details)
else:
self.metric.add(self.get_fq_name(system, *segments, subtype=subtype))

@staticmethod
def query(results: MetricResults, label: str) -> Set[str]:
Expand Down
11 changes: 11 additions & 0 deletions sdks/python/apache_beam/metrics/metric_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,17 @@ def test_fq_name(self):
"apache:beam:" + v + '.' + v,
Lineage.get_fq_name("apache", k, k, subtype="beam"))

def test_add(self):
lineage = Lineage(Lineage.SOURCE)
stringset = set()
# override
lineage.metric = stringset
lineage.add("s", "1", "2")
lineage.add("s:3.4")
lineage.add("s", "5", "6.7")
lineage.add("s", "1", "2", subtype="t")
self.assertSetEqual(stringset, {"s:1.2", "s:3.4", "s:t:1.2", "s:5.`6.7`"})


if __name__ == '__main__':
unittest.main()

0 comments on commit 00445ad

Please sign in to comment.