From 448184e80e60c3774bcfc3aac138d64ccf7444a7 Mon Sep 17 00:00:00 2001 From: Anand Inguva <34158215+AnandInguva@users.noreply.github.com> Date: Mon, 21 Aug 2023 20:17:35 +0000 Subject: [PATCH] MLTransform transform catalog snippets (#27709) * add snippets * Draft * add test * fix imports in test * Add doc ref to other places * Add doc * Apply suggestions from code review Co-authored-by: Rebecca Szper <98840847+rszper@users.noreply.github.com> * fix website snippets * Fix test name * fix artifact location * address comments * Apply suggestions from code review Co-authored-by: Rebecca Szper <98840847+rszper@users.noreply.github.com> * Update website/www/site/content/en/documentation/transforms/python/elementwise/mltransform.md Co-authored-by: Rebecca Szper <98840847+rszper@users.noreply.github.com> --------- Co-authored-by: Rebecca Szper <98840847+rszper@users.noreply.github.com> --- .../transforms/elementwise/mltransform.py | 121 ++++++++++++++++++ .../elementwise/mltransform_test.py | 90 +++++++++++++ .../python/elementwise/mltransform.md | 119 +++++++++++++++++ .../transforms/python/overview.md | 2 + .../section-menu/en/documentation.html | 1 + 5 files changed, 333 insertions(+) create mode 100644 sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform.py create mode 100644 sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform_test.py create mode 100644 website/www/site/content/en/documentation/transforms/python/elementwise/mltransform.md diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform.py new file mode 100644 index 000000000000..63ce448e69d9 --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform.py @@ -0,0 +1,121 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file +# pylint: disable=reimported +# pylint:disable=line-too-long + + +def mltransform_scale_to_0_1(test=None): + # [START mltransform_scale_to_0_1] + import apache_beam as beam + from apache_beam.ml.transforms.base import MLTransform + from apache_beam.ml.transforms.tft import ScaleTo01 + import tempfile + + data = [ + { + 'x': [1, 5, 3] + }, + { + 'x': [4, 2, 8] + }, + ] + + artifact_location = tempfile.mkdtemp() + scale_to_0_1_fn = ScaleTo01(columns=['x']) + + with beam.Pipeline() as p: + transformed_data = ( + p + | beam.Create(data) + | MLTransform(write_artifact_location=artifact_location).with_transform( + scale_to_0_1_fn) + | beam.Map(print)) + # [END mltransform_scale_to_0_1] + if test: + test(transformed_data) + + +def mltransform_compute_and_apply_vocabulary(test=None): + # [START mltransform_compute_and_apply_vocabulary] + import apache_beam as beam + from apache_beam.ml.transforms.base import MLTransform + from apache_beam.ml.transforms.tft import ComputeAndApplyVocabulary + import tempfile + + artifact_location = tempfile.mkdtemp() + data = [ + { + 'x': ['I', 'love', 'Beam'] + }, + { + 'x': ['Beam', 'is', 'awesome'] + }, + ] + compute_and_apply_vocabulary_fn = ComputeAndApplyVocabulary(columns=['x']) + with beam.Pipeline() as p: + transformed_data = ( + p + | beam.Create(data) + | MLTransform(write_artifact_location=artifact_location).with_transform( + compute_and_apply_vocabulary_fn) + | beam.Map(print)) + # [END mltransform_compute_and_apply_vocabulary] + if test: + test(transformed_data) + + +def mltransform_compute_and_apply_vocabulary_with_scalar(test=None): + # [START mltransform_compute_and_apply_vocabulary_with_scalar] + import apache_beam as beam + from apache_beam.ml.transforms.base import MLTransform + from apache_beam.ml.transforms.tft import ComputeAndApplyVocabulary + import tempfile + data = [ + { + 'x': 'I' + }, + { + 'x': 'love' + }, + { + 'x': 'Beam' + }, + { + 'x': 'Beam' + }, + { + 'x': 'is' + }, + { + 'x': 'awesome' + }, + ] + artifact_location = tempfile.mkdtemp() + compute_and_apply_vocabulary_fn = ComputeAndApplyVocabulary(columns=['x']) + with beam.Pipeline() as p: + transformed_data = ( + p + | beam.Create(data) + | MLTransform(write_artifact_location=artifact_location).with_transform( + compute_and_apply_vocabulary_fn) + | beam.Map(print)) + # [END mltransform_compute_and_apply_vocabulary_with_scalar] + if test: + test(transformed_data) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform_test.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform_test.py new file mode 100644 index 000000000000..1d2197e35e4e --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform_test.py @@ -0,0 +1,90 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file +# pylint: disable=ungrouped-imports + +import unittest +from io import StringIO + +import mock + +from apache_beam.testing.test_pipeline import TestPipeline + +try: + # fail when tft is not installed. + import tensorflow_transform as tft # pylint: disable=unused-import + from apache_beam.examples.snippets.transforms.elementwise.mltransform import mltransform_scale_to_0_1 + from apache_beam.examples.snippets.transforms.elementwise.mltransform import mltransform_compute_and_apply_vocabulary + from apache_beam.examples.snippets.transforms.elementwise.mltransform import mltransform_compute_and_apply_vocabulary_with_non_columnar_data +except ImportError: + raise unittest.SkipTest('tensorflow_transform is not installed.') + + +def check_mltransform_compute_and_apply_vocab(): + expected = '''[START mltransform_compute_and_apply_vocab] +Row(x=array([4, 1, 0])) +Row(x=array([0, 2, 3])) + [END mltransform_compute_and_apply_vocab] '''.splitlines()[1:-1] + return expected + + +def check_mltransform_scale_to_0_1(): + expected = '''[START mltransform_scale_to_0_1] +Row(x=array([0. , 0.5714286, 0.2857143], dtype=float32), x_max=array([8.], dtype=float32), x_min=array([1.], dtype=float32)) +Row(x=array([0.42857143, 0.14285715, 1. ], dtype=float32), x_max=array([8.], dtype=float32), x_min=array([1.], dtype=float32)) + [END mltransform_scale_to_0_1] '''.splitlines()[1:-1] + return expected + + +def check_mltransform_compute_and_apply_vocabulary_with_scalar(): + expected = '''[START mltransform_compute_and_apply_vocabulary_with_scalar] +Row(x=array([4])) +Row(x=array([1])) +Row(x=array([0])) +Row(x=array([2])) +Row(x=array([3])) + [END mltransform_compute_and_apply_vocabulary_with_scalar] '''.splitlines( + )[1:-1] + return expected + + +@mock.patch('apache_beam.Pipeline', TestPipeline) +@mock.patch('sys.stdout', new_callable=StringIO) +class MLTransformStdOutTest(unittest.TestCase): + def test_mltransform_compute_and_apply_vocab(self, mock_stdout): + mltransform_compute_and_apply_vocabulary() + predicted = mock_stdout.getvalue().splitlines() + expected = check_mltransform_compute_and_apply_vocab() + self.assertEqual(predicted, expected) + + def test_mltransform_scale_to_0_1(self, mock_stdout): + mltransform_scale_to_0_1() + predicted = mock_stdout.getvalue().splitlines() + expected = check_mltransform_scale_to_0_1() + self.assertEqual(predicted, expected) + + def test_mltransform_compute_and_apply_vocab_scalar(self, mock_stdout): + mltransform_compute_and_apply_vocabulary_with_non_columnar_data() + predicted = mock_stdout.getvalue().splitlines() + expected = check_mltransform_compute_and_apply_vocabulary_with_scalar() + self.assertEqual(predicted, expected) + + +if __name__ == '__main__': + unittest.main() diff --git a/website/www/site/content/en/documentation/transforms/python/elementwise/mltransform.md b/website/www/site/content/en/documentation/transforms/python/elementwise/mltransform.md new file mode 100644 index 000000000000..964d90f1510b --- /dev/null +++ b/website/www/site/content/en/documentation/transforms/python/elementwise/mltransform.md @@ -0,0 +1,119 @@ +--- +title: "MLTransform" +--- + + +# MLTransform for data processing + +{{< localstorage language language-py >}} + + + + + + +
+ + {{< button-pydoc path="apache_beam.ml.transforms" class="MLTransform" >}} + +
+ + +Use `MLTransform` to apply common machine learning (ML) processing tasks on keyed data. Apache Beam provides ML data processing transformations that you can use with `MLTransform`. For the full list of available data +processing transformations, see the [tft.py file](https://github.com/apache/beam/blob/ab93fb1988051baac6c3b9dd1031f4d68bd9a149/sdks/python/apache_beam/ml/transforms/tft.py#L52) in GitHub. + + +To define a data processing transformation by using `MLTransform`, create instances of data processing transforms with `columns` as input parameters. The data in the specified `columns` is transformed and outputted to the `beam.Row` object. + +The following example demonstrates how to use `MLTransform` to normalize your data between 0 and 1 by using the minimum and maximum values from your entire dataset. `MLTransform` uses the `ScaleTo01` transformation. + + +``` +scale_to_z_score_transform = ScaleToZScore(columns=['x', 'y']) +with beam.Pipeline() as p: + (data | MLTransform(write_artifact_location=artifact_location).with_transform(scale_to_z_score_transform)) +``` + +In this example, `MLTransform` receives a value for `write_artifact_location`. `MLTransform` then uses this location value to write artifacts generated by the transform. To pass the data processing transform, you can use either the `with_transform` method of `MLTransform` or a list. + +``` +MLTransform(transforms=transforms, write_artifact_location=write_artifact_location) +``` + +The transforms passed to `MLTransform` are applied sequentially on the dataset. `MLTransform` expects a dictionary and returns a transformed row object with NumPy arrays. +## Examples + +The following examples demonstrate how to to create pipelines that use `MLTransform` to preprocess data. + +`MLTransform` can do a full pass on the dataset, which is useful when you need to transform a single element only after analyzing the entire dataset. +The first two examples require a full pass over the dataset to complete the data transformation. + +* For the `ComputeAndApplyVocabulary` transform, the transform needs access to all of the unique words in the dataset. +* For the `ScaleTo01` transform, the transform needs to know the minimum and maximum values in the dataset. + +### Example 1 + +This example creates a pipeline that uses `MLTransform` to scale data between 0 and 1. +The example takes a list of integers and converts them into the range of 0 to 1 using the transform `ScaleTo01`. + +{{< highlight language="py" file="sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform.py" + class="notebook-skip" >}} +{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform.py" mltransform_scale_to_0_1 >}} +{{}} + +{{< paragraph class="notebook-skip" >}} +Output: +{{< /paragraph >}} +{{< highlight class="notebook-skip" >}} +{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform_test.py" mltransform_scale_to_0_1 >}} +{{< /highlight >}} + + +### Example 2 + +This example creates a pipeline that use `MLTransform` to compute vocabulary on the entire dataset and assign indices to each unique vocabulary item. +It takes a list of strings, computes vocabulary over the entire dataset, and then applies a unique index to each vocabulary item. + + +{{< highlight language="py" file="sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform.py" + class="notebook-skip" >}} +{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform.py" mltransform_compute_and_apply_vocabulary >}} +{{}} + +{{< paragraph class="notebook-skip" >}} +Output: +{{< /paragraph >}} +{{< highlight class="notebook-skip" >}} +{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform_test.py" mltransform_compute_and_apply_vocab >}} +{{< /highlight >}} + + +### Example 3 + +This example creates a pipeline that uses `MLTransform` to compute vocabulary on the entire dataset and assign indices to each unique vocabulary item. This pipeline takes a single element as input instead of a list of elements. + + +{{< highlight language="py" file="sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform.py" + class="notebook-skip" >}} +{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform.py" mltransform_compute_and_apply_vocabulary_with_scalar >}} +{{}} + +{{< paragraph class="notebook-skip" >}} +Output: +{{< /paragraph >}} +{{< highlight class="notebook-skip" >}} +{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform_test.py" mltransform_compute_and_apply_vocabulary_with_scalar >}} +{{< /highlight >}} + diff --git a/website/www/site/content/en/documentation/transforms/python/overview.md b/website/www/site/content/en/documentation/transforms/python/overview.md index d30af75352b2..666b7c95f080 100644 --- a/website/www/site/content/en/documentation/transforms/python/overview.md +++ b/website/www/site/content/en/documentation/transforms/python/overview.md @@ -27,6 +27,7 @@ limitations under the License. KeysExtracts the key from each element in a collection of key-value pairs. KvSwapSwaps the key and value of each element in a collection of key-value pairs. MapApplies a function to every element in the input and outputs the result. + MLTransformApplies data processing transforms to the dataset. ParDoThe most-general mechanism for applying a user-defined DoFn to every element in the input collection. PartitionRoutes each input element to a specific output collection based on some partition @@ -39,6 +40,7 @@ limitations under the License. and updates the implicit timestamp associated with each input. Note that it is only safe to adjust timestamps forwards. ValuesExtracts the value from each element in a collection of key-value pairs. + ## Aggregation diff --git a/website/www/site/layouts/partials/section-menu/en/documentation.html b/website/www/site/layouts/partials/section-menu/en/documentation.html index adde85126078..3f461b1de99f 100755 --- a/website/www/site/layouts/partials/section-menu/en/documentation.html +++ b/website/www/site/layouts/partials/section-menu/en/documentation.html @@ -291,6 +291,7 @@
  • Keys
  • KvSwap
  • Map
  • +
  • MLTransform
  • ParDo
  • Partition
  • Regex