Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Tupletag not found in this PCollectionTuple when running locally. #27753

Closed
1 of 15 tasks
SKHolmes opened this issue Jul 31, 2023 · 3 comments
Closed
1 of 15 tasks

Comments

@SKHolmes
Copy link

SKHolmes commented Jul 31, 2023

What happened?

First of all let me pre-face this with, I am still new to Apache Beam so apologies if I made a simple mistake or am misunderstanding something.

We are using Beam to create a generic data streaming pipeline that reads from a messaging service and process the data depending on configurations present in the message. I believe this means at some point the Transformations will need to return a PCollectionTuple to break the data into their relevant PCollections depending on the different data types. However when attempting to access the returned PCollections using their tuple-tags in the app only the default PCollection/TupleTag is present.

Exception in thread "main" java.lang.IllegalArgumentException: TupleTag Tag<com.example.transforms.MessageParser.lambda$create$0:37#bb20b45fd4d95138> not found in this PCollectionTuple tuple

Main App

var pipeline = Pipeline.create(options);

MessageParser messageParser = MessageParser.create();
Map<PipelineType, TupleTag<Message>> pipelineTypeMap = messageParser.getPipelineTypeMap();

PCollectionTuple pCollectionTuple = pipeline.apply("Reading Pub/Sub", PubsubIO.readMessages().fromSubscription(options.getPubSubSubscription()))
				.apply(Window.<PubsubMessage>into(new GlobalWindows())
						.triggering(AfterPane.elementCountAtLeast(1))
						.discardingFiredPanes()
				)
				.apply("Parsing Pub/Sub Message", messageParser);

// Fails on line below
PCollection<Message> curvePCollection = pCollectionTuple.get(pipelineTypeMap.get(PipelineType.CURVE_INGESTION));
PCollection<Message> testPCollection = pCollectionTuple.get(pipelineTypeMap.get(PipelineType.TEST));

pipeline.run();

MessageParser

public class MessageParser extends PTransform<PCollection<PubsubMessage>, PCollectionTuple> {

    private TupleTagList tags;
    private TupleTag<Message> defaultTupleTag;
    private Map<PipelineType, TupleTag<Message>> pipelineTypeMap;

    private MessageParser(TupleTag<Message> defaultTupleTag, TupleTagList tags, Map<PipelineType, TupleTag<Message>> pipelineTypeMap) {
        this.defaultTupleTag = defaultTupleTag;
        this.tags = tags;
        this.pipelineTypeMap = pipelineTypeMap;
    }

    public static MessageParser create() {
        Map<PipelineType, TupleTag<Message>> pipelineTypeMap = new HashMap<>();

        TupleTagList tags = TupleTagList.empty();

        EnumSet.allOf(PipelineType.class)
                .forEach(pipelineType -> {
                    TupleTag<Message> t = new TupleTag<>();
                    pipelineTypeMap.put(pipelineType, t);
                    tags.and(t);
                });
        TupleTag<Message> defaultTag = new TupleTag<>();

        // Added for visibility
        pipelineTypeMap.put(null, defaultTag);

        return new MessageParser(defaultTag, tags, pipelineTypeMap);
    }


    @Override
    public PCollectionTuple expand(PCollection<PubsubMessage> input) {
        return input.apply("Running read: ", ParDo.of(new MessageParserFn(pipelineTypeMap)).withOutputTags(defaultTupleTag, tags));
    }

    public Map<PipelineType, TupleTag<Message>> getPipelineTypeMap() {
        return pipelineTypeMap;
    }
}

PipelineType

public enum PipelineType {
    CURVE_INGESTION,
    TEST
}

MessageParserFn

public class MessageParserFn extends DoFn<PubsubMessage, Message> {

    private Map<PipelineType, TupleTag<Message>> pipelineTypeMap;

    public MessageParserFn(Map<PipelineType, TupleTag<Message>> pipelineTypeMap) {
        this.pipelineTypeMap = pipelineTypeMap;
    }


    @ProcessElement
    public void processElement(ProcessContext c) throws JsonProcessingException {

        String messageBody = new String(c.element().getPayload());
        ObjectMapper objectMapper = new ObjectMapper();
        Message message = objectMapper.readValue(messageBody, Message.class);

        // Depending on our parsed PipelineType output to a different PCollection
        TupleTag tag = pipelineTypeMap.get(message.getPipelineType());
        c.output(tag, message);
    }
}

I suspect it's failing because I am attempting to interact with the PCollectionTuple outside of an execution of the pipeline transforms. But reading into the documentation here,

https://beam.apache.org/documentation/pipelines/design-your-pipeline/#a-single-transform-that-produces-multiple-outputs

the provided example seems to do exactly what I want to do and gets the PCollections post tagged output. What am I misunderstanding about multi-outputs? Is there a bug? Is the documentation wrong/unclear?

I can provide more source if required but I am hoping it will be something obvious I am doing wrong

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@SKHolmes SKHolmes changed the title [Bug]: Tupletag not found in this PCOllectionTuple when running locally. [Bug]: Tupletag not found in this PCollectionTuple when running locally. Jul 31, 2023
@SKHolmes
Copy link
Author

SKHolmes commented Aug 1, 2023

After some refactoring I have run into some weird behavior.

  public MessageParser() {
       Map<MessageType, TupleTag<Message>> messageTypeMap = new HashMap<>();
       List<TupleTag<Message>> tempList = new ArrayList<TupleTag<Message>>();
       TupleTagList tempTags = TupleTagList.empty();

       EnumSet.allOf(MessageType.class)
               .forEach(messageType -> {
                   TupleTag<Message> t = new TupleTag<Message>(){};
                   messageTypeMap.put(messageType, t);
                   tempList.add(t);
                   tempTags.and(t);
               });

       System.out.print("tags: ");
       System.out.println(tempTags.getAll());
       System.out.println(tempList);

       this.tags = tempTags;
       this.messageTypeMap = messageTypeMap;
       this.defaultTupleTag = new TupleTag<Message>(){};
   }

The above prints,

tags: []
[Tag<com.example.transforms.MessageParser$1.:30#20ff67585e33a8f6>, Tag<com.example.transforms.MessageParser$1.:30#2587af97b4865538>]

I have looked into the TupleTagList source and I don't know why it would be behaving like this. By all means it should be adding to it's inner List with .and() and should be returning that list with getAll(). It should be behaving like the ArrayList I created for testing purposes.

@SKHolmes
Copy link
Author

SKHolmes commented Aug 7, 2023

Okay one last paste dump to clarify the issue and hopefully help other people googling around for solutions as well. From what I have discovered, there is strange behavior when trying to use a TupleTagList.and(TupleTag) in a loop, (I have tested various different loops.) My bandaid solution is to create the TupleTags I need one at a time which sucks because now my code will have to change in multiple places if I need more TupleTags of my types in the future.

Paste of code,

	public static TupleTagList createPipelineTupleMap () {
		System.out.println("In createPipelineTupleMap");
		TupleTagList tags = TupleTagList.empty();
		HashMap<MessageType, TupleTag<Message>> map
				= new HashMap<MessageType, TupleTag<Message>>();
		ArrayList<MessageType> types = new ArrayList<MessageType>();
		for(MessageType type : MessageType.values()) {
			types.add(type);
			TupleTag<Message> tag = new TupleTag<Message>(){};
			tags.and(new TupleTag<Message>(){});
			map.put(type, tag);
		}
		System.out.print("Tags 2: ");
		System.out.println(tags.getAll());
		System.out.printf("Tags size: %d\n", tags.size());
		System.out.println(map.entrySet());
		System.out.print("Types: ");
		System.out.println(types);
		System.out.println(tags.and(new TupleTag<Message>(){}).getAll());
		return tags;
	}

Prints,

In createPipelineTupleMap
Tags 2: []
Tags size: 0
[CURVE_INGESTION=Tag<com.example.App$6.:126#8ce970b71df42503>, TEST=Tag<com.example.App$6.:126#554dcd2b40b5f04b>]
Types: [CURVE_INGESTION, TEST]
[Tag<com.example.App$8.:136#fc1e932efb9a0f58>]

Apologies for being hard to read. But the gist of it is, there are no tags on the TupleTagList, (Tags 2: [],) after looping over all the values of my MessageType enum. But the loop did iterate properly hence why the map is populated correctly, ([CURVE_INGESTION=Tag<com.example.App$6.:126#8ce970b71df42503>, TEST=Tag<com.example.App$6.:126#554dcd2b40b5f04b>].) I can still however add to the TupleTagList and print the contents as you can see when the line,
System.out.println(tags.and(new TupleTag<Message>(){}).getAll());
prints,

[Tag<com.example.App$8.:136#fc1e932efb9a0f58>]

@SKHolmes
Copy link
Author

Nevermind, I'm an idiot, .and() returns an immutable list, to I needed to do,
tags = tags.and(t);
in my loop.

@github-actions github-actions bot added this to the 2.50.0 Release milestone Aug 10, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant