Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: AfterCount trigger doesn't work with Sessions window in tests on DirectRunner #27636

Closed
1 of 15 tasks
olokshyn opened this issue Jul 24, 2023 · 4 comments
Closed
1 of 15 tasks
Labels
bug done & done Issue has been reviewed after it was closed for verification, followups, etc. P2 python streaming Issues pertaining to streaming functionality

Comments

@olokshyn
Copy link

What happened?

Python SDK version: 2.48.0

It is not possible to fire the Sessions window early based on the number of elements in it using the AfterCount trigger. The window only fires when the watermark advances and closes it.

Related issue: #20813. It is supposed to be fixed, but reproduces for me.

The code snippet provided below is a test that:

  1. Defines a one-day-long Sessions window.
  2. Defines the trigger.AfterCount(1) trigger that is supposed to fire the window early on every element.
  3. Combines the results of the window using a custom combiner that just puts all results in a single list.
  4. Note that there are three elements: two of them fall within a single window, and the third one sits in its own window.

Note that when the commented trigger=trigger.AfterEach(trigger.AfterCount(1), trigger.AfterWatermark()), line is used instead, the result is the same.

Expected: All three elements will be processed separately because the window fires after every single element:

[
    ("user1", [1]),
    ("user1", [2]),
    ("user1", [5]),
]

Actual: The first two elements are processed together because they fall in the same window based on the watermark:

[
    ("user1", [1, 2]),
    ("user1", [5]),
]

The test that reproduces the issue:

from datetime import datetime, timedelta, timezone
from typing import Iterable

import pytest
import apache_beam as beam
from apache_beam.transforms import trigger
from apache_beam.transforms.window import Sessions
from apache_beam.pipeline_test import TestPipeline
from apache_beam.pipeline import PipelineOptions
from apache_beam.testing.test_stream import ElementEvent, TestStream, TimestampedValue, WatermarkEvent
from apache_beam.testing.util import assert_that, equal_to


base_datetime = datetime(2023, 7, 7, 10, 30, 0, 0, timezone.utc)


@pytest.mark.parametrize(
    "events, expected",
    [
        (
            [
                WatermarkEvent(0),
                ElementEvent(
                    [
                        TimestampedValue(
                            ("user1", 1),
                            base_datetime.timestamp(),
                        ),
                    ]
                ),
                WatermarkEvent(base_datetime.timestamp() + 5),
                ElementEvent(
                    [
                        TimestampedValue(
                            ("user1", 2),
                            (base_datetime + timedelta(minutes=10)).timestamp(),
                        ),
                    ]
                ),
                WatermarkEvent((base_datetime + timedelta(minutes=10)).timestamp() + 5),
                ElementEvent(
                    [
                        TimestampedValue(
                            ("user1", 5),
                            (base_datetime + timedelta(days=1, minutes=20)).timestamp(),
                        ),
                    ]
                ),
                WatermarkEvent((base_datetime + timedelta(days=1, minutes=20)).timestamp() + 5),
            ],
            [
                ("user1", [1]),
                ("user1", [2]),
                ("user1", [5]),
            ],
        ),
    ],
)
def test_after_count_trigger(events, expected):
    with TestPipeline(options=PipelineOptions(allow_unsafe_triggers=True)) as p:
        test_stream = p | TestStream(events=events, output_tags={None})

        pcoll = (
            test_stream
            | "Window"
            >> beam.WindowInto(
                Sessions(int(timedelta(days=1).total_seconds())),
                # trigger=trigger.AfterEach(trigger.AfterCount(1), trigger.AfterWatermark()),
                trigger=trigger.AfterCount(1),
                accumulation_mode=beam.trigger.AccumulationMode.ACCUMULATING,
            )
            | "Combine" >> beam.CombinePerKey(CustomCombine())
        )
        pcoll | beam.Map(print)

        assert_that(pcoll, equal_to(expected))


class CustomCombine(beam.CombineFn):
    def create_accumulator(self) -> list:
        return []

    def add_input(self, accumulator: list, input) -> list:
        return accumulator + [input]

    def merge_accumulators(self, accumulators: Iterable[list]) -> list:
        res = [x for acc in accumulators for x in acc]
        return res

    def extract_output(self, accumulator):
        return accumulator

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@tvalentyn tvalentyn added streaming Issues pertaining to streaming functionality and removed awaiting triage labels Jul 24, 2023
@tvalentyn
Copy link
Contributor

Thanks for reaching out and a detailed report. It does look like on DirectRunner the trigger for the first window fired only once instead of twice. There are known limitations in direct runner support for streaming usecases: #21987, the last comment in #27636 however referred to Dataflow runner. Have you observed this behavior on Dataflow?

Off the top of my head I don't know how to simulate a controlled watermark lag on Dataflow easily, looks like TestStream is not supported on Datalfow. The closest instrument I am aware of is https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/periodicsequence.py but that might require additional customization.

@olokshyn
Copy link
Author

olokshyn commented Jul 25, 2023

Summary:

  1. The DataFlow runner works correctly.
  2. The DirectRunner runner works correctly.
  3. The same trigger doesn't work with TestStream and TestPipeline on DirectRunner.

The code used to test both runners is linked below.
The intention is to fire a Sessions window early on each single new element, AND to fire this window when it is closed.
I used the following window definition, which, I believe, is more correct than the one I used for the tests before:

>> beam.WindowInto(
    Sessions(WINDOW_LENGTH_SEC),
    trigger=trigger.AfterWatermark(early=trigger.AfterCount(1)),
    accumulation_mode=trigger.AccumulationMode.ACCUMULATING,
)

Both DataFlow and DirectRunner runners behave as expected.

The code snippet:

"""
A Python Beam pipeline that applies a sessions window to the Pub/Sub messages
with an early trigger that fires after every single element, accumulating the results.
"""
from datetime import timedelta
from typing import Iterable
import json
from multiprocessing import Process
import time
import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms import trigger
from apache_beam.transforms.window import Sessions
from google.cloud import pubsub_v1


logging.basicConfig(
    format="%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)


def get_logger(name: str):
    log = logging.getLogger(name)
    log.setLevel(logging.INFO)
    return log


m_log = get_logger("main")


PROJECT = "MYPROJECT"
PRODUCER_TOPIC = pubsub_v1.PublisherClient.topic_path(PROJECT, "beam-test-producer-topic")
CONSUMER_TOPIC = pubsub_v1.PublisherClient.topic_path(PROJECT, "beam-test-consumer-topic")
PRODUCER_SUBSCRIPTION = pubsub_v1.SubscriberClient.subscription_path(PROJECT, "beam-test-topic-producer-subscription")
CONSUMER_SUBSCRIPTION = pubsub_v1.SubscriberClient.subscription_path(PROJECT, "beam-test-topic-consumer-subscription")
WINDOW_LENGTH_SEC = timedelta(minutes=3).total_seconds()
WARMUP_TIME_SEC = timedelta(seconds=5).total_seconds()
COOLDOWN_TIME_SEC = timedelta(seconds=5).total_seconds()


def log_element(element, prefix):
    m_log.info(f"{prefix}: {element}")
    return element


def run_pipeline():
    options = PipelineOptions(
        # runner="DataflowRunner",
        runner="DirectRunner",
        streaming=True,
        project=PROJECT,
        region="europe-west1",
        temp_location=f"gs://{PROJECT}-dataflow-temp",
        allow_unsafe_triggers=True,
        save_main_session=True,
    )
    with beam.Pipeline(options=options) as pipeline:
        (
            pipeline
            | "Read from PubSub" >> beam.io.ReadFromPubSub(subscription=PRODUCER_SUBSCRIPTION).with_output_types(bytes)
            | "Decode" >> beam.Map(lambda x: json.loads(x.decode("utf-8")))
            | "Extract" >> beam.Map(lambda x: (x["user_id"], x["value"]))
            | "Log elements Before Window" >> beam.Map(log_element, prefix="Before Window")
            | "Window into"
            >> beam.WindowInto(
                Sessions(WINDOW_LENGTH_SEC),
                trigger=trigger.AfterWatermark(early=trigger.AfterCount(1)),
                accumulation_mode=trigger.AccumulationMode.ACCUMULATING,
            )
            | "Combine" >> beam.CombinePerKey(CustomCombine())
            | "Log elements After Combine" >> beam.Map(log_element, prefix="After Combine")
            | "Encode" >> beam.Map(lambda x: json.dumps({"user_id": x[0], "values": x[1]}).encode("utf-8"))
            | "Write to PubSub" >> beam.io.WriteToPubSub(CONSUMER_TOPIC)
        )


class CustomCombine(beam.CombineFn):
    def create_accumulator(self) -> list:
        return []

    def add_input(self, accumulator: list, input) -> list:
        return accumulator + [input]

    def merge_accumulators(self, accumulators: Iterable[list]) -> list:
        res = [x for acc in accumulators for x in acc]
        return res

    def extract_output(self, accumulator):
        return accumulator


def producer_worker():
    publisher = pubsub_v1.PublisherClient()
    log = get_logger("producer")

    log.info(f"Producer sleeps for {WARMUP_TIME_SEC}")
    time.sleep(WARMUP_TIME_SEC)

    log.info("Producing first 5 messages")
    for i in range(5):
        data = {"user_id": "user1", "value": f"first-{i}"}
        data = json.dumps(data).encode("utf-8")
        future = publisher.publish(PRODUCER_TOPIC, data=data)
        log.info(future.result())
        time.sleep(COOLDOWN_TIME_SEC)

    sleep_time = WINDOW_LENGTH_SEC + 10
    log.info(f"Sleeping for {sleep_time} seconds")
    time.sleep(sleep_time)

    log.info("Producing next 5 messages")
    for i in range(5):
        data = {"user_id": "user1", "value": f"second-{i}"}
        data = json.dumps(data).encode("utf-8")
        future = publisher.publish(PRODUCER_TOPIC, data=data)
        log.info(future.result())
        time.sleep(COOLDOWN_TIME_SEC)


def consumer_callback(message):
    log = get_logger("consumer")
    data = json.loads(message.data.decode("utf-8"))
    log.info(f"!!! MESSAGE !!!: {data}")
    message.ack()


def consumer_worker():
    log = get_logger("consumer")
    subscriber = pubsub_v1.SubscriberClient()
    log.info(f"Consumer is listening to {CONSUMER_SUBSCRIPTION}")
    subscriber.subscribe(CONSUMER_SUBSCRIPTION, callback=consumer_callback)
    try:
        while True:
            time.sleep(1)
    finally:
        log.warning("Consumer is shutting down")


def create_pubsub_resources():
    publisher = pubsub_v1.PublisherClient()

    try:
        publisher.get_topic(topic=PRODUCER_TOPIC)
        m_log.info(f"Topic {PRODUCER_TOPIC} already exists")
    except Exception:
        publisher.create_topic(name=PRODUCER_TOPIC)
        m_log.info(f"Topic {PRODUCER_TOPIC} created")
        time.sleep(COOLDOWN_TIME_SEC)

    try:
        publisher.get_topic(topic=CONSUMER_TOPIC)
        m_log.info(f"Topic {CONSUMER_TOPIC} already exists")
    except Exception:
        publisher.create_topic(name=CONSUMER_TOPIC)
        m_log.info(f"Topic {CONSUMER_TOPIC} created")
        time.sleep(COOLDOWN_TIME_SEC)

    subscriber = pubsub_v1.SubscriberClient()
    try:
        subscriber.get_subscription(subscription=PRODUCER_SUBSCRIPTION)
        m_log.info(f"Subscription {PRODUCER_SUBSCRIPTION} already exists")
    except Exception:
        subscriber.create_subscription(name=PRODUCER_SUBSCRIPTION, topic=PRODUCER_TOPIC)
        m_log.info(f"Subscription {PRODUCER_SUBSCRIPTION} created")
        time.sleep(COOLDOWN_TIME_SEC)

    try:
        subscriber.get_subscription(subscription=CONSUMER_SUBSCRIPTION)
        m_log.info(f"Subscription {CONSUMER_SUBSCRIPTION} already exists")
    except Exception:
        subscriber.create_subscription(name=CONSUMER_SUBSCRIPTION, topic=CONSUMER_TOPIC)
        m_log.info(f"Subscription {CONSUMER_SUBSCRIPTION} created")
        time.sleep(COOLDOWN_TIME_SEC)


def main():
    create_pubsub_resources()
    consumer = Process(target=consumer_worker)
    consumer.start()
    producer = Process(target=producer_worker)
    producer.start()
    run_pipeline()
    producer.join()
    consumer.join()


if __name__ == "__main__":
    main()

This is the output when running with the DirectRunner:

2023-07-25 13:49:28.401 INFO     Topic projects/MYPROJECT/topics/beam-test-producer-topic already exists
2023-07-25 13:49:28.590 INFO     Topic projects/MYPROJECT/topics/beam-test-consumer-topic already exists
2023-07-25 13:49:33.943 INFO     Subscription projects/MYPROJECT/subscriptions/beam-test-topic-producer-subscription created
2023-07-25 13:49:44.958 INFO     Subscription projects/MYPROJECT/subscriptions/beam-test-topic-consumer-subscription created
0.00s - Debugger warning: It seems that frozen modules are being used, which may
0.00s - make the debugger miss breakpoints. Please pass -Xfrozen_modules=off
0.00s - to python to disable frozen modules.
0.00s - Note: Debugging will proceed. Set PYDEVD_DISABLE_FILE_VALIDATION=1 to disable this validation.
0.00s - Debugger warning: It seems that frozen modules are being used, which may
0.00s - Debugger warning: It seems that frozen modules are being used, which may
0.00s - make the debugger miss breakpoints. Please pass -Xfrozen_modules=off
0.00s - make the debugger miss breakpoints. Please pass -Xfrozen_modules=off
0.00s - to python to disable frozen modules.
0.00s - to python to disable frozen modules.
0.00s - Note: Debugging will proceed. Set PYDEVD_DISABLE_FILE_VALIDATION=1 to disable this validation.
0.00s - Note: Debugging will proceed. Set PYDEVD_DISABLE_FILE_VALIDATION=1 to disable this validation.
2023-07-25 13:49:51.583 INFO     Producer sleeps for 5.0
2023-07-25 13:49:51.583 INFO     Consumer is listening to projects/MYPROJECT/subscriptions/beam-test-topic-consumer-subscription
2023-07-25 13:49:56.585 INFO     Producing first 5 messages
2023-07-25 13:49:56.842 INFO     7901136752441307
2023-07-25 13:49:57.035 INFO     Before Window: ('user1', 'first-0')
2023-07-25 13:49:57.084 INFO     After Combine: ('user1', ['first-0'])
2023-07-25 13:49:58.425 INFO     !!! MESSAGE !!!: {'user_id': 'user1', 'values': ['first-0']}
2023-07-25 13:50:01.915 INFO     7901152274226985
2023-07-25 13:50:02.102 INFO     Before Window: ('user1', 'first-1')
2023-07-25 13:50:02.148 INFO     After Combine: ('user1', ['first-0', 'first-1'])
2023-07-25 13:50:03.544 INFO     !!! MESSAGE !!!: {'user_id': 'user1', 'values': ['first-0', 'first-1']}
2023-07-25 13:50:06.986 INFO     7901136932871865
2023-07-25 13:50:07.172 INFO     Before Window: ('user1', 'first-2')
2023-07-25 13:50:07.219 INFO     After Combine: ('user1', ['first-0', 'first-1', 'first-2'])
2023-07-25 13:50:08.299 INFO     !!! MESSAGE !!!: {'user_id': 'user1', 'values': ['first-0', 'first-1', 'first-2']}
2023-07-25 13:50:12.069 INFO     8701650771078307
2023-07-25 13:50:12.246 INFO     Before Window: ('user1', 'first-3')
2023-07-25 13:50:12.293 INFO     After Combine: ('user1', ['first-0', 'first-3', 'first-1', 'first-2'])
2023-07-25 13:50:12.565 INFO     !!! MESSAGE !!!: {'user_id': 'user1', 'values': ['first-0', 'first-3', 'first-1', 'first-2']}
2023-07-25 13:50:17.143 INFO     7901136738393628
2023-07-25 13:50:17.338 INFO     Before Window: ('user1', 'first-4')
2023-07-25 13:50:17.384 INFO     After Combine: ('user1', ['first-0', 'first-3', 'first-1', 'first-4', 'first-2'])
2023-07-25 13:50:18.635 INFO     !!! MESSAGE !!!: {'user_id': 'user1', 'values': ['first-0', 'first-3', 'first-1', 'first-4', 'first-2']}
2023-07-25 13:50:22.149 INFO     Sleeping for 190.0 seconds
2023-07-25 13:53:29.395 INFO     After Combine: ('user1', ['first-0', 'first-3', 'first-1', 'first-4', 'first-2'])
2023-07-25 13:53:29.682 INFO     !!! MESSAGE !!!: {'user_id': 'user1', 'values': ['first-0', 'first-3', 'first-1', 'first-4', 'first-2']}
2023-07-25 13:53:32.154 INFO     Producing next 5 messages
2023-07-25 13:53:32.233 INFO     7901152440676901
2023-07-25 13:53:32.424 INFO     Before Window: ('user1', 'second-0')
2023-07-25 13:53:32.470 INFO     After Combine: ('user1', ['second-0'])
2023-07-25 13:53:32.732 INFO     !!! MESSAGE !!!: {'user_id': 'user1', 'values': ['second-0']}
2023-07-25 13:53:37.312 INFO     8701672411108061
2023-07-25 13:53:37.501 INFO     Before Window: ('user1', 'second-1')
2023-07-25 13:53:37.549 INFO     After Combine: ('user1', ['second-0', 'second-1'])
2023-07-25 13:53:37.819 INFO     !!! MESSAGE !!!: {'user_id': 'user1', 'values': ['second-0', 'second-1']}
2023-07-25 13:53:42.392 INFO     7901136872402657
2023-07-25 13:53:42.591 INFO     Before Window: ('user1', 'second-2')
2023-07-25 13:53:42.637 INFO     After Combine: ('user1', ['second-0', 'second-1', 'second-2'])
2023-07-25 13:53:42.916 INFO     !!! MESSAGE !!!: {'user_id': 'user1', 'values': ['second-0', 'second-1', 'second-2']}
2023-07-25 13:53:47.468 INFO     7901136997195278
2023-07-25 13:53:47.658 INFO     Before Window: ('user1', 'second-3')
2023-07-25 13:53:47.705 INFO     After Combine: ('user1', ['second-0', 'second-3', 'second-1', 'second-2'])
2023-07-25 13:53:47.978 INFO     !!! MESSAGE !!!: {'user_id': 'user1', 'values': ['second-0', 'second-3', 'second-1', 'second-2']}
2023-07-25 13:53:52.549 INFO     7901152531086753
2023-07-25 13:53:52.755 INFO     Before Window: ('user1', 'second-4')
2023-07-25 13:53:52.800 INFO     After Combine: ('user1', ['second-0', 'second-3', 'second-1', 'second-4', 'second-2'])
2023-07-25 13:53:53.066 INFO     !!! MESSAGE !!!: {'user_id': 'user1', 'values': ['second-0', 'second-3', 'second-1', 'second-4', 'second-2']}
2023-07-25 13:57:07.557 INFO     After Combine: ('user1', ['second-0', 'second-3', 'second-1', 'second-4', 'second-2'])
2023-07-25 13:57:07.852 INFO     !!! MESSAGE !!!: {'user_id': 'user1', 'values': ['second-0', 'second-3', 'second-1', 'second-4', 'second-2']}

This is the output when running with DataFlow runner:

2023-07-25 13:37:30.021 INFO     Topic projects/MYPROJECT/topics/beam-test-producer-topic already exists
2023-07-25 13:37:30.211 INFO     Topic projects/MYPROJECT/topics/beam-test-consumer-topic already exists
2023-07-25 13:37:36.752 INFO     Subscription projects/MYPROJECT/subscriptions/beam-test-topic-producer-subscription created
2023-07-25 13:37:48.169 INFO     Subscription projects/MYPROJECT/subscriptions/beam-test-topic-consumer-subscription created
0.00s - Debugger warning: It seems that frozen modules are being used, which may
0.00s - make the debugger miss breakpoints. Please pass -Xfrozen_modules=off
0.00s - to python to disable frozen modules.
0.00s - Note: Debugging will proceed. Set PYDEVD_DISABLE_FILE_VALIDATION=1 to disable this validation.
0.00s - Debugger warning: It seems that frozen modules are being used, which may
0.00s - make the debugger miss breakpoints. Please pass -Xfrozen_modules=off
0.00s - to python to disable frozen modules.
0.00s - Note: Debugging will proceed. Set PYDEVD_DISABLE_FILE_VALIDATION=1 to disable this validation.
0.00s - Debugger warning: It seems that frozen modules are being used, which may
0.00s - make the debugger miss breakpoints. Please pass -Xfrozen_modules=off
0.00s - to python to disable frozen modules.
0.00s - Note: Debugging will proceed. Set PYDEVD_DISABLE_FILE_VALIDATION=1 to disable this validation.
0.00s - Debugger warning: It seems that frozen modules are being used, which may
0.00s - make the debugger miss breakpoints. Please pass -Xfrozen_modules=off
0.00s - to python to disable frozen modules.
0.00s - Note: Debugging will proceed. Set PYDEVD_DISABLE_FILE_VALIDATION=1 to disable this validation.
2023-07-25 13:37:54.684 INFO     Producing first 5 messages
2023-07-25 13:37:54.684 INFO     Consumer is listening to projects/MYPROJECT/subscriptions/beam-test-topic-consumer-subscription
2023-07-25 13:37:54.954 INFO     8701580110692426
0.00s - Debugger warning: It seems that frozen modules are being used, which may
0.00s - make the debugger miss breakpoints. Please pass -Xfrozen_modules=off
0.00s - to python to disable frozen modules.
0.00s - Note: Debugging will proceed. Set PYDEVD_DISABLE_FILE_VALIDATION=1 to disable this validation.
2023-07-25 13:38:00.019 INFO     8701563637160080
2023-07-25 13:38:05.089 INFO     7901135680165217
2023-07-25 13:38:10.191 INFO     7901136060418932
2023-07-25 13:38:15.277 INFO     7901135838881582
2023-07-25 13:38:20.284 INFO     Sleeping for 190.0 seconds
2023-07-25 13:41:30.287 INFO     Producing next 5 messages
2023-07-25 13:41:30.419 INFO     7901136188227831
2023-07-25 13:41:35.494 INFO     7901135937988428
2023-07-25 13:41:40.588 INFO     7901136623003572
2023-07-25 13:41:45.681 INFO     7901136240116298
2023-07-25 13:41:50.128 INFO     !!! MESSAGE !!!: {'user_id': 'user1', 'values': ['second-2']}
2023-07-25 13:41:50.188 INFO     !!! MESSAGE !!!: {'user_id': 'user1', 'values': ['second-2', 'second-3']}
2023-07-25 13:41:50.189 INFO     !!! MESSAGE !!!: {'user_id': 'user1', 'values': ['first-0']}
2023-07-25 13:41:50.192 INFO     !!! MESSAGE !!!: {'user_id': 'user1', 'values': ['second-2', 'second-3', 'second-1']}
2023-07-25 13:41:50.242 INFO     !!! MESSAGE !!!: {'user_id': 'user1', 'values': ['first-0', 'first-2']}
2023-07-25 13:41:50.295 INFO     !!! MESSAGE !!!: {'user_id': 'user1', 'values': ['second-2', 'second-3', 'second-1', 'second-0']}
2023-07-25 13:41:50.343 INFO     !!! MESSAGE !!!: {'user_id': 'user1', 'values': ['first-0', 'first-2', 'first-1']}
2023-07-25 13:41:50.383 INFO     !!! MESSAGE !!!: {'user_id': 'user1', 'values': ['first-0', 'first-2', 'first-1', 'first-4']}
2023-07-25 13:41:50.385 INFO     !!! MESSAGE !!!: {'user_id': 'user1', 'values': ['first-0', 'first-2', 'first-1', 'first-4', 'first-3']}
2023-07-25 13:41:50.742 INFO     7901135828414233
2023-07-25 13:41:52.975 INFO     !!! MESSAGE !!!: {'user_id': 'user1', 'values': ['second-2', 'second-3', 'second-1', 'second-0', 'second-4']}
2023-07-25 13:42:00.946 INFO     !!! MESSAGE !!!: {'user_id': 'user1', 'values': ['first-0', 'first-2', 'first-1', 'first-4', 'first-3']}
2023-07-25 13:45:27.291 INFO     !!! MESSAGE !!!: {'user_id': 'user1', 'values': ['second-2', 'second-3', 'second-1', 'second-0', 'second-4']}

As you can see, both runners fire the Sessions window early on each new element, accumulating the results, and fire once again when the window is closed.

I tried the same trigger with the test case I mentioned in the bug report, and it didn't work:

from datetime import datetime, timedelta, timezone
from typing import Iterable

import pytest
import apache_beam as beam
from apache_beam.transforms import trigger
from apache_beam.transforms.window import Sessions
from apache_beam.pipeline_test import TestPipeline
from apache_beam.pipeline import PipelineOptions
from apache_beam.testing.test_stream import ElementEvent, TestStream, TimestampedValue, WatermarkEvent
from apache_beam.testing.util import assert_that, equal_to


base_datetime = datetime(2023, 7, 7, 10, 30, 0, 0, timezone.utc)


@pytest.mark.parametrize(
    "events, expected",
    [
        (
            [
                WatermarkEvent(0),
                ElementEvent(
                    [
                        TimestampedValue(
                            ("user1", 1),
                            base_datetime.timestamp(),
                        ),
                    ]
                ),
                WatermarkEvent(base_datetime.timestamp() + 5),
                ElementEvent(
                    [
                        TimestampedValue(
                            ("user1", 2),
                            (base_datetime + timedelta(minutes=10)).timestamp(),
                        ),
                    ]
                ),
                WatermarkEvent((base_datetime + timedelta(minutes=10)).timestamp() + 5),
                ElementEvent(
                    [
                        TimestampedValue(
                            ("user1", 5),
                            (base_datetime + timedelta(days=1, minutes=20)).timestamp(),
                        ),
                    ]
                ),
                WatermarkEvent((base_datetime + timedelta(days=1, minutes=20)).timestamp() + 5),
            ],
            [
                ("user1", [1]),
                ("user1", [1, 2]),
                ("user1", [1, 2]),
                ("user1", [5]),
                ("user1", [5]),
            ],
        ),
    ],
)
def test_after_count_trigger(events, expected):
    with TestPipeline(options=PipelineOptions(allow_unsafe_triggers=True)) as p:
        test_stream = p | TestStream(events=events, output_tags={None})

        pcoll = (
            test_stream
            | "Window"
            >> beam.WindowInto(
                Sessions(int(timedelta(days=1).total_seconds())),
                trigger=trigger.AfterWatermark(early=trigger.AfterCount(1)),
                accumulation_mode=beam.trigger.AccumulationMode.ACCUMULATING,
            )
            | "Combine" >> beam.CombinePerKey(CustomCombine())
        )
        pcoll | beam.Map(print)

        assert_that(pcoll, equal_to(expected))


class CustomCombine(beam.CombineFn):
    def create_accumulator(self) -> list:
        return []

    def add_input(self, accumulator: list, input) -> list:
        return accumulator + [input]

    def merge_accumulators(self, accumulators: Iterable[list]) -> list:
        res = [x for acc in accumulators for x in acc]
        return res

    def extract_output(self, accumulator):
        return accumulator

Instead of the expected result:

[
    ("user1", [1]),  # when 1 arrives
    ("user1", [1, 2]),  # when 2 arrives
    ("user1", [1, 2]),  # when the window is closed
    ("user1", [5]),  # when 5 arrives
    ("user1", [5]),  # when the window is closed
],

where the window fires on each new element (preserving the previous elements) plus fires when the window is closed.

It produces this result:

[
    ("user1", [1, 2]),
    ("user1", [1, 2]),
    ("user1", [5]),
    ("user1", [5]),
],

As you can see, it never fires on the very first element alone.

Could you point me please to the part of the code that might be causing this bug?

@tvalentyn
Copy link
Contributor

tvalentyn commented Jul 25, 2023

Are you running the testcase in streaming mode (with --streaming pipeline option set)?

@olokshyn
Copy link
Author

@tvalentyn that solved it, thank you!

So that was the change:

options = PipelineOptions(
    allow_unsafe_triggers=True,
    streaming=True,
)

Pasting the working test for future reference:

from datetime import datetime, timedelta, timezone
from typing import Iterable

import pytest
import apache_beam as beam
from apache_beam.transforms import trigger
from apache_beam.transforms.window import Sessions
from apache_beam.pipeline_test import TestPipeline
from apache_beam.pipeline import PipelineOptions
from apache_beam.testing.test_stream import ElementEvent, TestStream, TimestampedValue, WatermarkEvent
from apache_beam.testing.util import assert_that, equal_to


base_datetime = datetime(2023, 7, 7, 10, 30, 0, 0, timezone.utc)


@pytest.mark.parametrize(
    "events, expected",
    [
        (
            [
                WatermarkEvent(0),
                ElementEvent(
                    [
                        TimestampedValue(
                            ("user1", 1),
                            base_datetime.timestamp(),
                        ),
                    ]
                ),
                WatermarkEvent(base_datetime.timestamp() + 5),
                ElementEvent(
                    [
                        TimestampedValue(
                            ("user1", 2),
                            (base_datetime + timedelta(minutes=10)).timestamp(),
                        ),
                    ]
                ),
                WatermarkEvent((base_datetime + timedelta(minutes=10)).timestamp() + 5),
                ElementEvent(
                    [
                        TimestampedValue(
                            ("user1", 5),
                            (base_datetime + timedelta(days=1, minutes=20)).timestamp(),
                        ),
                    ]
                ),
                WatermarkEvent((base_datetime + timedelta(days=1, minutes=20)).timestamp() + 5),
            ],
            [
                ("user1", [1]),
                ("user1", [1, 2]),
                ("user1", [1, 2]),
                ("user1", [5]),
                ("user1", [5]),
            ],
        ),
    ],
)
def test_after_count_trigger(events, expected):
    options = PipelineOptions(
        allow_unsafe_triggers=True,
        streaming=True,
    )
    with TestPipeline(options=options) as p:
        test_stream = p | TestStream(events=events, output_tags={None})

        pcoll = (
            test_stream
            | "Window"
            >> beam.WindowInto(
                Sessions(int(timedelta(days=1).total_seconds())),
                trigger=trigger.AfterWatermark(early=trigger.AfterCount(1)),
                accumulation_mode=beam.trigger.AccumulationMode.ACCUMULATING,
            )
            | "Combine" >> beam.CombinePerKey(CustomCombine())
        )
        pcoll | beam.Map(print)

        assert_that(pcoll, equal_to(expected))


class CustomCombine(beam.CombineFn):
    def create_accumulator(self) -> list:
        return []

    def add_input(self, accumulator: list, input) -> list:
        return accumulator + [input]

    def merge_accumulators(self, accumulators: Iterable[list]) -> list:
        res = [x for acc in accumulators for x in acc]
        return res

    def extract_output(self, accumulator):
        return accumulator

@github-actions github-actions bot added this to the 2.50.0 Release milestone Jul 25, 2023
@damccorm damccorm added the done & done Issue has been reviewed after it was closed for verification, followups, etc. label Jul 25, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug done & done Issue has been reviewed after it was closed for verification, followups, etc. P2 python streaming Issues pertaining to streaming functionality
Projects
None yet
Development

No branches or pull requests

3 participants