From c416f4f6f28eeab7942d37394dd02e2450bef65a Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Mon, 28 Aug 2023 22:10:28 -0400 Subject: [PATCH] Add per key inference integration test (#28026) * Add per key inference integration test * Lint --- .../apache_beam/examples/inference/README.md | 56 ++++ ...ytorch_model_per_key_image_segmentation.py | 311 ++++++++++++++++++ .../ml/inference/pytorch_inference_it_test.py | 48 +++ 3 files changed, 415 insertions(+) create mode 100644 sdks/python/apache_beam/examples/inference/pytorch_model_per_key_image_segmentation.py diff --git a/sdks/python/apache_beam/examples/inference/README.md b/sdks/python/apache_beam/examples/inference/README.md index 1653f3a9699f..e66e1ce471d4 100644 --- a/sdks/python/apache_beam/examples/inference/README.md +++ b/sdks/python/apache_beam/examples/inference/README.md @@ -225,6 +225,62 @@ This writes the output to the `predictions.csv` with contents like: ``` Each line has data separated by a semicolon ";". The first item is the file name. The second item is a list of predicted instances. +--- +## Per Key Image segmentation + +[`pytorch_model_per_key_image_segmentation.py`](./pytorch_model_per_key_image_segmentation.py) contains an implementation for a RunInference pipeline that performs image segementation using multiple different trained models based on the `maskrcnn_resnet50_fpn` architecture. + +The pipeline reads images, performs basic preprocessing, passes the images to the PyTorch implementation of RunInference, and then writes predictions to a text file. + +### Dataset and model for image segmentation + +To use this transform, you need a dataset and model for image segmentation. If you've already done the previous example (Image segmentation with pytorch_image_segmentation.py you can reuse the results from some of those setup steps). + +1. Create a directory named `IMAGES_DIR`. Create or download images and put them in this directory. The directory is not required if image names in the input file `IMAGE_FILE_NAMES.txt` you create in step 2 have absolute paths. +A popular dataset is from [Coco](https://cocodataset.org/#home). Follow their instructions to download the images. +2. Create a file named `IMAGE_FILE_NAMES.txt` that contains the absolute paths of each of the images in `IMAGES_DIR` that you want to use to run image segmentation. The path to the file can be different types of URIs such as your local file system, an AWS S3 bucket, or a GCP Cloud Storage bucket. For example: +``` +/absolute/path/to/image1.jpg +/absolute/path/to/image2.jpg +``` +3. Download the [maskrcnn_resnet50_fpn](https://pytorch.org/vision/0.12/models.html#id70) and [maskrcnn_resnet50_fpn_v2](https://pytorch.org/vision/main/models/generated/torchvision.models.detection.maskrcnn_resnet50_fpn_v2.html) models from Pytorch's repository of pretrained models. These models require the torchvision library. To download this model, run the following commands from a Python shell: +``` +import torch +from torchvision.models.detection import maskrcnn_resnet50_fpn +from torchvision.models.detection import maskrcnn_resnet50_fpn_v2 +model = maskrcnn_resnet50_fpn(pretrained=True) +torch.save(model.state_dict(), 'maskrcnn_resnet50_fpn.pth') # You can replace maskrcnn_resnet50_fpn.pth with your preferred file name for your model state dictionary. +model = maskrcnn_resnet50_fpn_v2(pretrained=True) +torch.save(model.state_dict(), 'maskrcnn_resnet50_fpn_v2.pth') # You can replace maskrcnn_resnet50_fpn_v2.pth with your preferred file name for your model state dictionary. +``` +4. Note a path to an `OUTPUT` file that can be used by the pipeline to write the predictions. + +### Running `pytorch_model_per_key_image_segmentation.py` + +To run the image segmentation pipeline locally, use the following command: +```sh +python -m apache_beam.examples.inference.pytorch_model_per_key_image_segmentation \ + --input IMAGE_FILE_NAMES \ + --images_dir IMAGES_DIR \ + --output OUTPUT \ + --model_state_dict_paths MODEL_STATE_DICT1,MODEL_STATE_DICT2 +``` +`images_dir` is only needed if your `IMAGE_FILE_NAMES.txt` file contains relative paths (they will be relative from `IMAGES_DIR`). + +For example, if you've followed the naming conventions recommended above: +```sh +python -m apache_beam.examples.inference.pytorch_model_per_key_image_segmentation \ + --input IMAGE_FILE_NAMES.txt \ + --output predictions.csv \ + --model_state_dict_path 'maskrcnn_resnet50_fpn.pth,maskrcnn_resnet50_fpn_v2.pth' +``` +This writes the output to the `predictions.csv` with contents like: +``` +/Users/dannymccormick/Downloads/images/datasets_coco_raw-data_val2017_000000000139.jpg --- v1 predictions: ['chair', 'tv','potted plant'] --- v2 predictions: ['motorcycle', 'frisbee', 'couch'] +... +``` +Each image has 2 pieces of associated data - `v1 predictions` and `v2 predictions` corresponding to the version of the model that was used for segmentation. + --- ## Object Detection diff --git a/sdks/python/apache_beam/examples/inference/pytorch_model_per_key_image_segmentation.py b/sdks/python/apache_beam/examples/inference/pytorch_model_per_key_image_segmentation.py new file mode 100644 index 000000000000..e09a348511b2 --- /dev/null +++ b/sdks/python/apache_beam/examples/inference/pytorch_model_per_key_image_segmentation.py @@ -0,0 +1,311 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +A pipeline that uses RunInference API to perform image segmentation using +multiple different models. +""" + +import argparse +import io +import logging +import os +from typing import Iterable +from typing import Iterator +from typing import Optional +from typing import Tuple + +import apache_beam as beam +import torch +from apache_beam.io.filesystems import FileSystems +from apache_beam.ml.inference.base import KeyedModelHandler +from apache_beam.ml.inference.base import KeyMhMapping +from apache_beam.ml.inference.base import PredictionResult +from apache_beam.ml.inference.base import RunInference +from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import SetupOptions +from apache_beam.runners.runner import PipelineResult +from PIL import Image +from torchvision import transforms +from torchvision.models.detection import maskrcnn_resnet50_fpn +from torchvision.models.detection import maskrcnn_resnet50_fpn_v2 + +COCO_INSTANCE_CLASSES = [ + '__background__', + 'person', + 'bicycle', + 'car', + 'motorcycle', + 'airplane', + 'bus', + 'train', + 'truck', + 'boat', + 'traffic light', + 'fire hydrant', + 'N/A', + 'stop sign', + 'parking meter', + 'bench', + 'bird', + 'cat', + 'dog', + 'horse', + 'sheep', + 'cow', + 'elephant', + 'bear', + 'zebra', + 'giraffe', + 'N/A', + 'backpack', + 'umbrella', + 'N/A', + 'N/A', + 'handbag', + 'tie', + 'suitcase', + 'frisbee', + 'skis', + 'snowboard', + 'sports ball', + 'kite', + 'baseball bat', + 'baseball glove', + 'skateboard', + 'surfboard', + 'tennis racket', + 'bottle', + 'N/A', + 'wine glass', + 'cup', + 'fork', + 'knife', + 'spoon', + 'bowl', + 'banana', + 'apple', + 'sandwich', + 'orange', + 'broccoli', + 'carrot', + 'hot dog', + 'pizza', + 'donut', + 'cake', + 'chair', + 'couch', + 'potted plant', + 'bed', + 'N/A', + 'dining table', + 'N/A', + 'N/A', + 'toilet', + 'N/A', + 'tv', + 'laptop', + 'mouse', + 'remote', + 'keyboard', + 'cell phone', + 'microwave', + 'oven', + 'toaster', + 'sink', + 'refrigerator', + 'N/A', + 'book', + 'clock', + 'vase', + 'scissors', + 'teddy bear', + 'hair drier', + 'toothbrush' +] + +CLASS_ID_TO_NAME = dict(enumerate(COCO_INSTANCE_CLASSES)) + + +def read_image(image_file_name: str, + path_to_dir: Optional[str] = None) -> Tuple[str, Image.Image]: + if path_to_dir is not None: + image_file_name = os.path.join(path_to_dir, image_file_name) + with FileSystems().open(image_file_name, 'r') as file: + data = Image.open(io.BytesIO(file.read())).convert('RGB') + return image_file_name, data + + +def preprocess_image(data: Image.Image) -> torch.Tensor: + image_size = (224, 224) + transform = transforms.Compose([ + transforms.Resize(image_size), + transforms.ToTensor(), + ]) + return transform(data) + + +def filter_empty_lines(text: str) -> Iterator[str]: + if len(text.strip()) > 0: + yield text + + +class KeyExamplesForEachModelType(beam.DoFn): + """Duplicate data to run against each model type""" + def process( + self, element: Tuple[torch.Tensor, + str]) -> Iterable[Tuple[str, torch.Tensor]]: + yield 'v1', element[0] + yield 'v2', element[0] + + +class PostProcessor(beam.DoFn): + def process( + self, element: Tuple[str, PredictionResult]) -> Tuple[torch.Tensor, str]: + model, prediction_result = element + prediction_labels = prediction_result.inference['labels'] + classes = [CLASS_ID_TO_NAME[label.item()] for label in prediction_labels] + yield prediction_result.example, f'{model} predictions: {str(classes)}' + + +class FormatResults(beam.DoFn): + def process(self, element): + _, filename_prediction = element + predictions = filename_prediction['predictions'] + v1_predictions = next(p for p in predictions if 'v1 predictions' in p) + v2_predictions = next(p for p in predictions if 'v2 predictions' in p) + yield ( + f"{filename_prediction['image_names'][0]} --- " + f"{v1_predictions} --- " + f"{v2_predictions}") + + +def parse_known_args(argv): + """Parses args for the workflow.""" + parser = argparse.ArgumentParser() + parser.add_argument( + '--input', + dest='input', + required=True, + help='Path to the text file containing image names.') + parser.add_argument( + '--output', + dest='output', + required=True, + help='Path where to save output predictions.' + ' text file.') + parser.add_argument( + '--model_state_dict_paths', + dest='model_state_dict_paths', + required=True, + help="Comma separated paths to the models' state_dicts. " + "For this example, should include exactly 2 state_dicts corresponding " + "to maskrcnn_resnet50_fpn and maskrcnn_resnet50_fpn_v2 classes.") + parser.add_argument( + '--images_dir', + help='Path to the directory where images are stored.' + 'Not required if image names in the input file have absolute path.') + return parser.parse_known_args(argv) + + +def run( + argv=None, save_main_session=True, test_pipeline=None) -> PipelineResult: + """ + Args: + argv: Command line arguments defined for this example. + model_params: Parameters passed to the constructor of the model_class. + These will be used to instantiate the model object in the + RunInference API. + save_main_session: Used for internal testing. + test_pipeline: Used for internal testing. + """ + known_args, pipeline_args = parse_known_args(argv) + pipeline_options = PipelineOptions(pipeline_args) + pipeline_options.view_as(SetupOptions).save_main_session = save_main_session + + state_dicts = known_args.model_state_dict_paths.split(',') + if len(state_dicts) != 2: + raise AssertionError( + f'Expected exactly 2 state_dicts to be supplied, got {len(state_dicts)}' + ) + + mh1 = PytorchModelHandlerTensor( + state_dict_path=state_dicts[0], + model_class=maskrcnn_resnet50_fpn, + model_params={'num_classes': 91}) + mh2 = PytorchModelHandlerTensor( + state_dict_path=state_dicts[1], + model_class=maskrcnn_resnet50_fpn_v2, + model_params={'num_classes': 91}) + + # We'll use v1 and v2 as our keys to point to our model handlers. + # Note that multiple keys can also point to a single model handler, + # unlike this example. + model_handler = KeyedModelHandler( + [KeyMhMapping(['v1'], mh1), KeyMhMapping(['v2'], mh2)]) + + pipeline = test_pipeline + if not test_pipeline: + pipeline = beam.Pipeline(options=pipeline_options) + + value_filename_pair = ( + pipeline + | 'ReadImageNames' >> beam.io.ReadFromText(known_args.input) + | 'FilterEmptyLines' >> beam.ParDo(filter_empty_lines) + | 'ReadImageData' >> beam.Map( + lambda image_name: read_image( + image_file_name=image_name, path_to_dir=known_args.images_dir)) + | 'PreprocessImages' >> beam.MapTuple( + lambda file_name, data: (preprocess_image(data), file_name))) + + predictions = ( + value_filename_pair + | 'DuplicateData' >> beam.ParDo(KeyExamplesForEachModelType()) + | 'PyTorchRunInference' >> RunInference(model_handler) + | 'ProcessOutput' >> beam.ParDo(PostProcessor())) + + # We now have our set of (example, prediction) and (example, original + # filename) tuples. We can use CoGroupByKey to join them by original example, + # converting the Tensors to lists first so that Beam can compare them without + # a custom coder. If all you care about is the model used for inference, you + # can also get that from the PredictionResult returned from RunInference. + results = ({ + 'image_names': ( + value_filename_pair | beam.MapTuple( + lambda example, filename: (example.tolist(), filename))), + 'predictions': ( + predictions | beam.MapTuple( + lambda example, prediction: (example.tolist(), prediction))) + } | beam.CoGroupByKey()) + + _ = ( + results + | 'FormatResults' >> beam.ParDo(FormatResults()) + | "WriteOutput" >> beam.io.WriteToText( + known_args.output, + shard_name_template='', + append_trailing_newlines=True)) + + result = pipeline.run() + result.wait_until_finish() + return result + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + run() diff --git a/sdks/python/apache_beam/ml/inference/pytorch_inference_it_test.py b/sdks/python/apache_beam/ml/inference/pytorch_inference_it_test.py index e00660bcbd98..2cc49be54599 100644 --- a/sdks/python/apache_beam/ml/inference/pytorch_inference_it_test.py +++ b/sdks/python/apache_beam/ml/inference/pytorch_inference_it_test.py @@ -32,6 +32,7 @@ import torch from apache_beam.examples.inference import pytorch_image_classification from apache_beam.examples.inference import pytorch_image_segmentation + from apache_beam.examples.inference import pytorch_model_per_key_image_segmentation from apache_beam.examples.inference import pytorch_language_modeling except ImportError as e: torch = None @@ -127,6 +128,53 @@ def test_torch_run_inference_coco_maskrcnn_resnet50_fpn(self): prediction_labels = predictions_dict[filename] self.assertEqual(actual_labels, prediction_labels) + @pytest.mark.uses_pytorch + @pytest.mark.it_postcommit + @pytest.mark.timeout(1800) + def test_torch_run_inference_coco_maskrcnn_resnet50_fpn_v1_and_v2(self): + test_pipeline = TestPipeline(is_integration_test=True) + # text files containing absolute path to the coco validation data on GCS + file_of_image_names = 'gs://apache-beam-ml/testing/inputs/it_coco_validation_inputs.txt' # pylint: disable=line-too-long + output_file_dir = 'gs://apache-beam-ml/testing/predictions' + output_file = '/'.join([output_file_dir, str(uuid.uuid4()), 'result.txt']) + + model_state_dict_paths = [ + 'gs://apache-beam-ml/models/torchvision.models.detection.maskrcnn_resnet50_fpn.pth', # pylint: disable=line-too-long + 'gs://apache-beam-ml/models/torchvision.models.detection.maskrcnn_resnet50_fpn_v2.pth' # pylint: disable=line-too-long + ] + images_dir = 'gs://apache-beam-ml/datasets/coco/raw-data/val2017' + extra_opts = { + 'input': file_of_image_names, + 'output': output_file, + 'model_state_dict_paths': ','.join(model_state_dict_paths), + 'images_dir': images_dir, + } + pytorch_model_per_key_image_segmentation.run( + test_pipeline.get_full_options_as_args(**extra_opts), + save_main_session=False) + + self.assertEqual(FileSystems().exists(output_file), True) + predictions = process_outputs(filepath=output_file) + actuals_file = 'gs://apache-beam-ml/testing/expected_outputs/test_torch_run_inference_coco_maskrcnn_resnet50_fpn_v1_and_v2_actuals.txt' # pylint: disable=line-too-long + actuals = process_outputs(filepath=actuals_file) + + predictions_dict = {} + for prediction in predictions: + p = prediction.split('---') + filename = p[0] + v1predictions = p[1] + v2predictions = p[2] + predictions_dict[filename] = (v1predictions, v2predictions) + + for actual in actuals: + a = actual.split('---') + filename = a[0] + v1actuals = a[1] + v2actuals = a[2] + v1prediction_labels, v2prediction_labels = predictions_dict[filename] + self.assertEqual(v1actuals, v1prediction_labels) + self.assertEqual(v2actuals, v2prediction_labels) + @pytest.mark.uses_pytorch @pytest.mark.it_postcommit def test_torch_run_inference_bert_for_masked_lm(self):