From bc903363a9a705c67d203386fafb91938840a0c3 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Wed, 16 Nov 2022 14:18:12 -0500 Subject: [PATCH] Python TextIO Performance Test (#23951) * Python TextIO Performance Test * Add filebasedio_perf_test module for unified test framework for Python file-based IOs * Fix MetricsReader publishes metrics duplicately if more than one load test declared. This is because MetricsReader.publishers was static class variable * Fix pylint * Distribute Python performance tests random time at a day instead of all at 3PM * Add information about length conversion --- ..._PerformanceTests_BigQueryIO_Python.groovy | 4 +- ...PerformanceTests_FileBasedIO_Python.groovy | 81 ++++++++ ...ob_PerformanceTests_PubsubIO_Python.groovy | 2 +- ...b_PerformanceTests_SpannerIO_Python.groovy | 4 +- .../Python_IO_IT_Tests_Dataflow.json | 122 ++++++++++++ .../apache_beam/io/filebasedio_perf_test.py | 188 ++++++++++++++++++ .../testing/load_tests/load_test.py | 4 +- .../load_tests/load_test_metrics_utils.py | 16 +- .../apache_beam/testing/synthetic_pipeline.py | 2 +- 9 files changed, 410 insertions(+), 13 deletions(-) create mode 100644 .test-infra/jenkins/job_PerformanceTests_FileBasedIO_Python.groovy create mode 100644 sdks/python/apache_beam/io/filebasedio_perf_test.py diff --git a/.test-infra/jenkins/job_PerformanceTests_BigQueryIO_Python.groovy b/.test-infra/jenkins/job_PerformanceTests_BigQueryIO_Python.groovy index 1ccb8238ba87..853347f9ebfb 100644 --- a/.test-infra/jenkins/job_PerformanceTests_BigQueryIO_Python.groovy +++ b/.test-infra/jenkins/job_PerformanceTests_BigQueryIO_Python.groovy @@ -90,7 +90,7 @@ PhraseTriggeringPostCommitBuilder.postCommitJob( executeJob(delegate, bqio_read_test) } -CronJobBuilder.cronJob('beam_PerformanceTests_BiqQueryIO_Read_Python', 'H 15 * * *', this) { +CronJobBuilder.cronJob('beam_PerformanceTests_BiqQueryIO_Read_Python', 'H H * * *', this) { executeJob(delegate, bqio_read_test) } @@ -103,6 +103,6 @@ PhraseTriggeringPostCommitBuilder.postCommitJob( executeJob(delegate, bqio_write_test) } -CronJobBuilder.cronJob('beam_PerformanceTests_BiqQueryIO_Write_Python_Batch', 'H 15 * * *', this) { +CronJobBuilder.cronJob('beam_PerformanceTests_BiqQueryIO_Write_Python_Batch', 'H H * * *', this) { executeJob(delegate, bqio_write_test) } diff --git a/.test-infra/jenkins/job_PerformanceTests_FileBasedIO_Python.groovy b/.test-infra/jenkins/job_PerformanceTests_FileBasedIO_Python.groovy new file mode 100644 index 000000000000..e45beadf321a --- /dev/null +++ b/.test-infra/jenkins/job_PerformanceTests_FileBasedIO_Python.groovy @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import CommonJobProperties as common +import LoadTestsBuilder as loadTestsBuilder +import InfluxDBCredentialsHelper + +def now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC')) + +def jobs = [ + [ + name : 'beam_PerformanceTests_TextIOIT_Python', + description : 'Runs performance tests for Python TextIOIT', + test : 'apache_beam.io.filebasedio_perf_test', + githubTitle : 'Python TextIO Performance Test', + githubTriggerPhrase: 'Run Python TextIO Performance Test', + pipelineOptions : [ + publish_to_big_query : true, + metrics_dataset : 'beam_performance', + metrics_table : 'python_textio_1GB_results', + influx_measurement : 'python_textio_1GB_results', + test_class : 'TextIOPerfTest', + input_options : '\'{' + + '"num_records": 25000000,' + + '"key_size": 9,' + + '"value_size": 21}\'', + dataset_size : '1050000000', + num_workers : '5', + autoscaling_algorithm: 'NONE' + ] + ] +] + +jobs.findAll { + it.name in [ + 'beam_PerformanceTests_TextIOIT_Python', + ] +}.forEach { testJob -> createGCSFileBasedIOITTestJob(testJob) } + +private void createGCSFileBasedIOITTestJob(testJob) { + job(testJob.name) { + description(testJob.description) + common.setTopLevelMainJobProperties(delegate) + common.enablePhraseTriggeringFromPullRequest(delegate, testJob.githubTitle, testJob.githubTriggerPhrase) + common.setAutoJob(delegate, 'H H * * *') + InfluxDBCredentialsHelper.useCredentials(delegate) + additionalPipelineArgs = [ + influxDatabase: InfluxDBCredentialsHelper.InfluxDBDatabaseName, + influxHost: InfluxDBCredentialsHelper.InfluxDBHostUrl, + ] + testJob.pipelineOptions.putAll(additionalPipelineArgs) + + def dataflowSpecificOptions = [ + runner : 'DataflowRunner', + project : 'apache-beam-testing', + region : 'us-central1', + temp_location : 'gs://temp-storage-for-perf-tests/', + filename_prefix : "gs://temp-storage-for-perf-tests/${testJob.name}/\${BUILD_ID}/", + ] + + Map allPipelineOptions = dataflowSpecificOptions << testJob.pipelineOptions + + loadTestsBuilder.loadTest( + delegate, testJob.name, CommonTestProperties.Runner.DATAFLOW, CommonTestProperties.SDK.PYTHON, allPipelineOptions, testJob.test) + } +} diff --git a/.test-infra/jenkins/job_PerformanceTests_PubsubIO_Python.groovy b/.test-infra/jenkins/job_PerformanceTests_PubsubIO_Python.groovy index 327e93f392ff..262eda3fd909 100644 --- a/.test-infra/jenkins/job_PerformanceTests_PubsubIO_Python.groovy +++ b/.test-infra/jenkins/job_PerformanceTests_PubsubIO_Python.groovy @@ -70,6 +70,6 @@ PhraseTriggeringPostCommitBuilder.postCommitJob( executeJob(delegate, psio_test) } -CronJobBuilder.cronJob('beam_PerformanceTests_PubsubIOIT_Python_Streaming', 'H 15 * * *', this) { +CronJobBuilder.cronJob('beam_PerformanceTests_PubsubIOIT_Python_Streaming', 'H H * * *', this) { executeJob(delegate, psio_test) } diff --git a/.test-infra/jenkins/job_PerformanceTests_SpannerIO_Python.groovy b/.test-infra/jenkins/job_PerformanceTests_SpannerIO_Python.groovy index 489c72ebaa25..416186567075 100644 --- a/.test-infra/jenkins/job_PerformanceTests_SpannerIO_Python.groovy +++ b/.test-infra/jenkins/job_PerformanceTests_SpannerIO_Python.groovy @@ -92,7 +92,7 @@ PhraseTriggeringPostCommitBuilder.postCommitJob( executeJob(delegate, spannerio_read_test_2gb) } -CronJobBuilder.cronJob('beam_PerformanceTests_SpannerIO_Read_2GB_Python', 'H 15 * * *', this) { +CronJobBuilder.cronJob('beam_PerformanceTests_SpannerIO_Read_2GB_Python', 'H H * * *', this) { executeJob(delegate, spannerio_read_test_2gb) } @@ -105,6 +105,6 @@ PhraseTriggeringPostCommitBuilder.postCommitJob( executeJob(delegate, spannerio_write_test_2gb) } -CronJobBuilder.cronJob('beam_PerformanceTests_SpannerIO_Write_2GB_Python_Batch', 'H 15 * * *', this) { +CronJobBuilder.cronJob('beam_PerformanceTests_SpannerIO_Write_2GB_Python_Batch', 'H H * * *', this) { executeJob(delegate, spannerio_write_test_2gb) } diff --git a/.test-infra/metrics/grafana/dashboards/perftests_metrics/Python_IO_IT_Tests_Dataflow.json b/.test-infra/metrics/grafana/dashboards/perftests_metrics/Python_IO_IT_Tests_Dataflow.json index 5b1ff2b8103b..6db7a46edb5a 100644 --- a/.test-infra/metrics/grafana/dashboards/perftests_metrics/Python_IO_IT_Tests_Dataflow.json +++ b/.test-infra/metrics/grafana/dashboards/perftests_metrics/Python_IO_IT_Tests_Dataflow.json @@ -482,6 +482,128 @@ "align": false, "alignLevel": null } + }, + { + "aliasColors": {}, + "bars": false, + "cacheTimeout": null, + "dashLength": 10, + "dashes": false, + "datasource": "BeamInfluxDB", + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 9, + "w": 12, + "x": 12, + "y": 9 + }, + "hiddenSeries": false, + "id": 6, + "interval": "24h", + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": false, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 2, + "links": [], + "nullPointMode": "connected", + "options": { + "dataLinks": [] + }, + "percentage": false, + "pluginVersion": "6.7.2", + "pointradius": 2, + "points": true, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "alias": "$tag_metric", + "groupBy": [ + { + "params": [ + "$__interval" + ], + "type": "time" + } + ], + "measurement": "", + "orderByTime": "ASC", + "policy": "default", + "query": "SELECT mean(\"value\") FROM \"python_textio_1GB_results\" WHERE \"metric\" = 'read_runtime' OR \"metric\" = 'write_runtime' AND $timeFilter GROUP BY time($__interval), \"metric\"", + "rawQuery": true, + "refId": "A", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "value" + ], + "type": "field" + }, + { + "params": [], + "type": "mean" + } + ] + ], + "tags": [] + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "TextIO | GCS | 1 GB", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "transparent": true, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "$$hashKey": "object:403", + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "$$hashKey": "object:404", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } } ], "schemaVersion": 22, diff --git a/sdks/python/apache_beam/io/filebasedio_perf_test.py b/sdks/python/apache_beam/io/filebasedio_perf_test.py new file mode 100644 index 000000000000..7d5b673098d5 --- /dev/null +++ b/sdks/python/apache_beam/io/filebasedio_perf_test.py @@ -0,0 +1,188 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Performance tests for file based io connectors.""" + +import logging +import sys +import uuid +from typing import Tuple + +import apache_beam as beam +from apache_beam import typehints +from apache_beam.io.filesystems import FileSystems +from apache_beam.io.iobase import Read +from apache_beam.io.textio import ReadFromText +from apache_beam.io.textio import WriteToText +from apache_beam.testing.load_tests.load_test import LoadTest +from apache_beam.testing.load_tests.load_test import LoadTestOptions +from apache_beam.testing.load_tests.load_test_metrics_utils import CountMessages +from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime +from apache_beam.testing.synthetic_pipeline import SyntheticSource +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to + +WRITE_NAMESPACE = 'write' +READ_NAMESPACE = 'read' + +_LOGGER = logging.getLogger(__name__) + + +class FileBasedIOTestOptions(LoadTestOptions): + @classmethod + def _add_argparse_args(cls, parser): + parser.add_argument( + '--test_class', required=True, help='Test class to run.') + parser.add_argument( + '--filename_prefix', + required=True, + help='Destination prefix for files generated by the test.') + parser.add_argument( + '--compression_type', + default='auto', + help='File compression type for writing and reading test files.') + parser.add_argument( + '--number_of_shards', + type=int, + default=0, + help='Number of files this test will create during the write phase.') + parser.add_argument( + '--dataset_size', + type=int, + help='Size of data saved on the target filesystem (bytes).') + + +@typehints.with_output_types(bytes) +@typehints.with_input_types(Tuple[bytes, bytes]) +class SyntheticRecordToStrFn(beam.DoFn): + """ + A DoFn that convert key-value bytes from synthetic source to string record. + + It uses base64 to convert random bytes emitted from the synthetic source. + Therefore, every 3 bytes give 4 bytes long ascii characters. + + Output length = 4(ceil[len(key)/3] + ceil[len(value)/3]) + 1 + """ + def process(self, element): + import base64 + yield base64.b64encode(element[0]) + b' ' + base64.b64encode(element[1]) + + +class CreateFolderFn(beam.DoFn): + """Create folder at pipeline runtime.""" + def __init__(self, folder): + self.folder = folder + + def process(self, element): + from apache_beam.io.filesystems import FileSystems # pylint: disable=reimported + filesystem = FileSystems.get_filesystem(self.folder) + if filesystem.has_dirs() and not filesystem.exists(self.folder): + filesystem.mkdirs(self.folder) + + +class TextIOPerfTest: + def run(self): + write_test = _TextIOWritePerfTest(need_cleanup=False) + read_test = _TextIOReadPerfTest(input_folder=write_test.output_folder) + write_test.run() + read_test.run() + + +class _TextIOWritePerfTest(LoadTest): + def __init__(self, need_cleanup=True): + super().__init__(WRITE_NAMESPACE) + self.need_cleanup = need_cleanup + self.test_options = self.pipeline.get_pipeline_options().view_as( + FileBasedIOTestOptions) + self.output_folder = FileSystems.join( + self.test_options.filename_prefix, str(uuid.uuid4())) + + def test(self): + # first makedir if needed + _ = ( + self.pipeline + | beam.Impulse() + | beam.ParDo(CreateFolderFn(self.output_folder))) + + # write to text + _ = ( + self.pipeline + | 'Produce rows' >> Read( + SyntheticSource(self.parse_synthetic_source_options())) + | 'Count records' >> beam.ParDo(CountMessages(self.metrics_namespace)) + | 'Format' >> beam.ParDo(SyntheticRecordToStrFn()) + | 'Measure time' >> beam.ParDo(MeasureTime(self.metrics_namespace)) + | 'Write Text' >> WriteToText( + file_path_prefix=FileSystems.join(self.output_folder, 'test'), + compression_type=self.test_options.compression_type, + num_shards=self.test_options.number_of_shards)) + + def cleanup(self): + if not self.need_cleanup: + return + try: + FileSystems.delete([self.output_folder]) + except IOError: + # may not have delete permission, just raise a warning + _LOGGER.warning( + 'Unable to delete file %s during cleanup.', self.output_folder) + + +class _TextIOReadPerfTest(LoadTest): + def __init__(self, input_folder): + super().__init__(READ_NAMESPACE) + self.test_options = self.pipeline.get_pipeline_options().view_as( + FileBasedIOTestOptions) + self.input_folder = input_folder + + def test(self): + output = ( + self.pipeline + | 'Read from text' >> + ReadFromText(file_pattern=FileSystems.join(self.input_folder, '*')) + | 'Count records' >> beam.ParDo(CountMessages(self.metrics_namespace)) + | 'Measure time' >> beam.ParDo(MeasureTime(self.metrics_namespace)) + | 'Count' >> beam.combiners.Count.Globally()) + assert_that(output, equal_to([self.input_options['num_records']])) + + def cleanup(self): + try: + #FileSystems.delete([self.input_folder]) + pass + except IOError: + # may not have delete permission, just raise a warning + _LOGGER.warning( + 'Unable to delete file %s during cleanup.', self.input_folder) + + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) + + test_options = TestPipeline().get_pipeline_options().view_as( + FileBasedIOTestOptions) + supported_test_classes = list( + filter( + lambda s: s.endswith('PerfTest') and not s.startswith('_'), + dir(sys.modules[__name__]))) + + if test_options.test_class not in supported_test_classes: + raise RuntimeError( + f'Test {test_options.test_class} not found. ' + 'Supported tests are {supported_test_classes}') + + getattr(sys.modules[__name__], test_options.test_class)().run() diff --git a/sdks/python/apache_beam/testing/load_tests/load_test.py b/sdks/python/apache_beam/testing/load_tests/load_test.py index f5917fbfba27..3112c12ab86c 100644 --- a/sdks/python/apache_beam/testing/load_tests/load_test.py +++ b/sdks/python/apache_beam/testing/load_tests/load_test.py @@ -25,6 +25,7 @@ from apache_beam.metrics import MetricsFilter from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.runners.runner import PipelineState from apache_beam.testing.load_tests.load_test_metrics_utils import InfluxDBMetricsPublisherOptions from apache_beam.testing.load_tests.load_test_metrics_utils import MetricsReader from apache_beam.testing.test_pipeline import TestPipeline @@ -148,7 +149,8 @@ def run(self): if not hasattr(self, 'result'): self.result = self.pipeline.run() # Defaults to waiting forever, unless timeout_ms has been set - self.result.wait_until_finish(duration=self.timeout_ms) + state = self.result.wait_until_finish(duration=self.timeout_ms) + assert state != PipelineState.FAILED self._metrics_monitor.publish_metrics(self.result, self.extra_metrics) finally: self.cleanup() diff --git a/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py b/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py index 9c6ef2a935ec..fbca1cb96e9d 100644 --- a/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py +++ b/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py @@ -33,7 +33,6 @@ import logging import time import uuid -from typing import Any from typing import List from typing import Mapping from typing import Optional @@ -185,8 +184,6 @@ class MetricsReader(object): A :class:`MetricsReader` retrieves metrics from pipeline result, prepares it for publishers and setup publishers. """ - publishers = [] # type: List[Any] - def __init__( self, project_name=None, @@ -206,6 +203,7 @@ def __init__( filters: MetricFilter to query only filtered metrics """ self._namespace = namespace + self.publishers: List[MetricsPublisher] = [] self.publishers.append(ConsoleMetricsPublisher()) check = project_name and bq_table and bq_dataset and publish_to_bq @@ -385,7 +383,13 @@ def _prepare_runtime_metrics(self, distributions): return runtime_in_s -class ConsoleMetricsPublisher(object): +class MetricsPublisher: + """Base class for metrics publishers.""" + def publish(self, results): + raise NotImplementedError + + +class ConsoleMetricsPublisher(MetricsPublisher): """A :class:`ConsoleMetricsPublisher` publishes collected metrics to console output.""" def publish(self, results): @@ -401,7 +405,7 @@ def publish(self, results): _LOGGER.info("No test results were collected.") -class BigQueryMetricsPublisher(object): +class BigQueryMetricsPublisher(MetricsPublisher): """A :class:`BigQueryMetricsPublisher` publishes collected metrics to BigQuery output.""" def __init__(self, project_name, table, dataset): @@ -484,7 +488,7 @@ def http_auth_enabled(self): return self.user is not None and self.password is not None -class InfluxDBMetricsPublisher(object): +class InfluxDBMetricsPublisher(MetricsPublisher): """Publishes collected metrics to InfluxDB database.""" def __init__( self, diff --git a/sdks/python/apache_beam/testing/synthetic_pipeline.py b/sdks/python/apache_beam/testing/synthetic_pipeline.py index 305e42294867..a520b31cb9fb 100644 --- a/sdks/python/apache_beam/testing/synthetic_pipeline.py +++ b/sdks/python/apache_beam/testing/synthetic_pipeline.py @@ -22,7 +22,7 @@ controlled through arguments. Please see function 'parse_args()' for more details about the arguments. -Shape of the pipeline is primariy controlled through two arguments. Argument +Shape of the pipeline is primarily controlled through two arguments. Argument 'steps' can be used to define a list of steps as a JSON string. Argument 'barrier' describes how these steps are separated from each other. Argument 'barrier' can be use to build a pipeline as a series of steps or a tree of