-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Bug]: AfterCount trigger doesn't work with Sessions window in tests on DirectRunner #27636
Comments
Thanks for reaching out and a detailed report. It does look like on DirectRunner the trigger for the first window fired only once instead of twice. There are known limitations in direct runner support for streaming usecases: #21987, the last comment in #27636 however referred to Dataflow runner. Have you observed this behavior on Dataflow? Off the top of my head I don't know how to simulate a controlled watermark lag on Dataflow easily, looks like TestStream is not supported on Datalfow. The closest instrument I am aware of is https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/periodicsequence.py but that might require additional customization. |
Summary:
The code used to test both runners is linked below. >> beam.WindowInto(
Sessions(WINDOW_LENGTH_SEC),
trigger=trigger.AfterWatermark(early=trigger.AfterCount(1)),
accumulation_mode=trigger.AccumulationMode.ACCUMULATING,
) Both DataFlow and DirectRunner runners behave as expected. The code snippet: """
A Python Beam pipeline that applies a sessions window to the Pub/Sub messages
with an early trigger that fires after every single element, accumulating the results.
"""
from datetime import timedelta
from typing import Iterable
import json
from multiprocessing import Process
import time
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms import trigger
from apache_beam.transforms.window import Sessions
from google.cloud import pubsub_v1
logging.basicConfig(
format="%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
def get_logger(name: str):
log = logging.getLogger(name)
log.setLevel(logging.INFO)
return log
m_log = get_logger("main")
PROJECT = "MYPROJECT"
PRODUCER_TOPIC = pubsub_v1.PublisherClient.topic_path(PROJECT, "beam-test-producer-topic")
CONSUMER_TOPIC = pubsub_v1.PublisherClient.topic_path(PROJECT, "beam-test-consumer-topic")
PRODUCER_SUBSCRIPTION = pubsub_v1.SubscriberClient.subscription_path(PROJECT, "beam-test-topic-producer-subscription")
CONSUMER_SUBSCRIPTION = pubsub_v1.SubscriberClient.subscription_path(PROJECT, "beam-test-topic-consumer-subscription")
WINDOW_LENGTH_SEC = timedelta(minutes=3).total_seconds()
WARMUP_TIME_SEC = timedelta(seconds=5).total_seconds()
COOLDOWN_TIME_SEC = timedelta(seconds=5).total_seconds()
def log_element(element, prefix):
m_log.info(f"{prefix}: {element}")
return element
def run_pipeline():
options = PipelineOptions(
# runner="DataflowRunner",
runner="DirectRunner",
streaming=True,
project=PROJECT,
region="europe-west1",
temp_location=f"gs://{PROJECT}-dataflow-temp",
allow_unsafe_triggers=True,
save_main_session=True,
)
with beam.Pipeline(options=options) as pipeline:
(
pipeline
| "Read from PubSub" >> beam.io.ReadFromPubSub(subscription=PRODUCER_SUBSCRIPTION).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: json.loads(x.decode("utf-8")))
| "Extract" >> beam.Map(lambda x: (x["user_id"], x["value"]))
| "Log elements Before Window" >> beam.Map(log_element, prefix="Before Window")
| "Window into"
>> beam.WindowInto(
Sessions(WINDOW_LENGTH_SEC),
trigger=trigger.AfterWatermark(early=trigger.AfterCount(1)),
accumulation_mode=trigger.AccumulationMode.ACCUMULATING,
)
| "Combine" >> beam.CombinePerKey(CustomCombine())
| "Log elements After Combine" >> beam.Map(log_element, prefix="After Combine")
| "Encode" >> beam.Map(lambda x: json.dumps({"user_id": x[0], "values": x[1]}).encode("utf-8"))
| "Write to PubSub" >> beam.io.WriteToPubSub(CONSUMER_TOPIC)
)
class CustomCombine(beam.CombineFn):
def create_accumulator(self) -> list:
return []
def add_input(self, accumulator: list, input) -> list:
return accumulator + [input]
def merge_accumulators(self, accumulators: Iterable[list]) -> list:
res = [x for acc in accumulators for x in acc]
return res
def extract_output(self, accumulator):
return accumulator
def producer_worker():
publisher = pubsub_v1.PublisherClient()
log = get_logger("producer")
log.info(f"Producer sleeps for {WARMUP_TIME_SEC}")
time.sleep(WARMUP_TIME_SEC)
log.info("Producing first 5 messages")
for i in range(5):
data = {"user_id": "user1", "value": f"first-{i}"}
data = json.dumps(data).encode("utf-8")
future = publisher.publish(PRODUCER_TOPIC, data=data)
log.info(future.result())
time.sleep(COOLDOWN_TIME_SEC)
sleep_time = WINDOW_LENGTH_SEC + 10
log.info(f"Sleeping for {sleep_time} seconds")
time.sleep(sleep_time)
log.info("Producing next 5 messages")
for i in range(5):
data = {"user_id": "user1", "value": f"second-{i}"}
data = json.dumps(data).encode("utf-8")
future = publisher.publish(PRODUCER_TOPIC, data=data)
log.info(future.result())
time.sleep(COOLDOWN_TIME_SEC)
def consumer_callback(message):
log = get_logger("consumer")
data = json.loads(message.data.decode("utf-8"))
log.info(f"!!! MESSAGE !!!: {data}")
message.ack()
def consumer_worker():
log = get_logger("consumer")
subscriber = pubsub_v1.SubscriberClient()
log.info(f"Consumer is listening to {CONSUMER_SUBSCRIPTION}")
subscriber.subscribe(CONSUMER_SUBSCRIPTION, callback=consumer_callback)
try:
while True:
time.sleep(1)
finally:
log.warning("Consumer is shutting down")
def create_pubsub_resources():
publisher = pubsub_v1.PublisherClient()
try:
publisher.get_topic(topic=PRODUCER_TOPIC)
m_log.info(f"Topic {PRODUCER_TOPIC} already exists")
except Exception:
publisher.create_topic(name=PRODUCER_TOPIC)
m_log.info(f"Topic {PRODUCER_TOPIC} created")
time.sleep(COOLDOWN_TIME_SEC)
try:
publisher.get_topic(topic=CONSUMER_TOPIC)
m_log.info(f"Topic {CONSUMER_TOPIC} already exists")
except Exception:
publisher.create_topic(name=CONSUMER_TOPIC)
m_log.info(f"Topic {CONSUMER_TOPIC} created")
time.sleep(COOLDOWN_TIME_SEC)
subscriber = pubsub_v1.SubscriberClient()
try:
subscriber.get_subscription(subscription=PRODUCER_SUBSCRIPTION)
m_log.info(f"Subscription {PRODUCER_SUBSCRIPTION} already exists")
except Exception:
subscriber.create_subscription(name=PRODUCER_SUBSCRIPTION, topic=PRODUCER_TOPIC)
m_log.info(f"Subscription {PRODUCER_SUBSCRIPTION} created")
time.sleep(COOLDOWN_TIME_SEC)
try:
subscriber.get_subscription(subscription=CONSUMER_SUBSCRIPTION)
m_log.info(f"Subscription {CONSUMER_SUBSCRIPTION} already exists")
except Exception:
subscriber.create_subscription(name=CONSUMER_SUBSCRIPTION, topic=CONSUMER_TOPIC)
m_log.info(f"Subscription {CONSUMER_SUBSCRIPTION} created")
time.sleep(COOLDOWN_TIME_SEC)
def main():
create_pubsub_resources()
consumer = Process(target=consumer_worker)
consumer.start()
producer = Process(target=producer_worker)
producer.start()
run_pipeline()
producer.join()
consumer.join()
if __name__ == "__main__":
main() This is the output when running with the DirectRunner:
This is the output when running with DataFlow runner:
As you can see, both runners fire the I tried the same trigger with the test case I mentioned in the bug report, and it didn't work: from datetime import datetime, timedelta, timezone
from typing import Iterable
import pytest
import apache_beam as beam
from apache_beam.transforms import trigger
from apache_beam.transforms.window import Sessions
from apache_beam.pipeline_test import TestPipeline
from apache_beam.pipeline import PipelineOptions
from apache_beam.testing.test_stream import ElementEvent, TestStream, TimestampedValue, WatermarkEvent
from apache_beam.testing.util import assert_that, equal_to
base_datetime = datetime(2023, 7, 7, 10, 30, 0, 0, timezone.utc)
@pytest.mark.parametrize(
"events, expected",
[
(
[
WatermarkEvent(0),
ElementEvent(
[
TimestampedValue(
("user1", 1),
base_datetime.timestamp(),
),
]
),
WatermarkEvent(base_datetime.timestamp() + 5),
ElementEvent(
[
TimestampedValue(
("user1", 2),
(base_datetime + timedelta(minutes=10)).timestamp(),
),
]
),
WatermarkEvent((base_datetime + timedelta(minutes=10)).timestamp() + 5),
ElementEvent(
[
TimestampedValue(
("user1", 5),
(base_datetime + timedelta(days=1, minutes=20)).timestamp(),
),
]
),
WatermarkEvent((base_datetime + timedelta(days=1, minutes=20)).timestamp() + 5),
],
[
("user1", [1]),
("user1", [1, 2]),
("user1", [1, 2]),
("user1", [5]),
("user1", [5]),
],
),
],
)
def test_after_count_trigger(events, expected):
with TestPipeline(options=PipelineOptions(allow_unsafe_triggers=True)) as p:
test_stream = p | TestStream(events=events, output_tags={None})
pcoll = (
test_stream
| "Window"
>> beam.WindowInto(
Sessions(int(timedelta(days=1).total_seconds())),
trigger=trigger.AfterWatermark(early=trigger.AfterCount(1)),
accumulation_mode=beam.trigger.AccumulationMode.ACCUMULATING,
)
| "Combine" >> beam.CombinePerKey(CustomCombine())
)
pcoll | beam.Map(print)
assert_that(pcoll, equal_to(expected))
class CustomCombine(beam.CombineFn):
def create_accumulator(self) -> list:
return []
def add_input(self, accumulator: list, input) -> list:
return accumulator + [input]
def merge_accumulators(self, accumulators: Iterable[list]) -> list:
res = [x for acc in accumulators for x in acc]
return res
def extract_output(self, accumulator):
return accumulator Instead of the expected result: [
("user1", [1]), # when 1 arrives
("user1", [1, 2]), # when 2 arrives
("user1", [1, 2]), # when the window is closed
("user1", [5]), # when 5 arrives
("user1", [5]), # when the window is closed
], where the window fires on each new element (preserving the previous elements) plus fires when the window is closed. It produces this result: [
("user1", [1, 2]),
("user1", [1, 2]),
("user1", [5]),
("user1", [5]),
], As you can see, it never fires on the very first element alone. Could you point me please to the part of the code that might be causing this bug? |
Are you running the testcase in streaming mode (with |
@tvalentyn that solved it, thank you! So that was the change: options = PipelineOptions(
allow_unsafe_triggers=True,
streaming=True,
) Pasting the working test for future reference: from datetime import datetime, timedelta, timezone
from typing import Iterable
import pytest
import apache_beam as beam
from apache_beam.transforms import trigger
from apache_beam.transforms.window import Sessions
from apache_beam.pipeline_test import TestPipeline
from apache_beam.pipeline import PipelineOptions
from apache_beam.testing.test_stream import ElementEvent, TestStream, TimestampedValue, WatermarkEvent
from apache_beam.testing.util import assert_that, equal_to
base_datetime = datetime(2023, 7, 7, 10, 30, 0, 0, timezone.utc)
@pytest.mark.parametrize(
"events, expected",
[
(
[
WatermarkEvent(0),
ElementEvent(
[
TimestampedValue(
("user1", 1),
base_datetime.timestamp(),
),
]
),
WatermarkEvent(base_datetime.timestamp() + 5),
ElementEvent(
[
TimestampedValue(
("user1", 2),
(base_datetime + timedelta(minutes=10)).timestamp(),
),
]
),
WatermarkEvent((base_datetime + timedelta(minutes=10)).timestamp() + 5),
ElementEvent(
[
TimestampedValue(
("user1", 5),
(base_datetime + timedelta(days=1, minutes=20)).timestamp(),
),
]
),
WatermarkEvent((base_datetime + timedelta(days=1, minutes=20)).timestamp() + 5),
],
[
("user1", [1]),
("user1", [1, 2]),
("user1", [1, 2]),
("user1", [5]),
("user1", [5]),
],
),
],
)
def test_after_count_trigger(events, expected):
options = PipelineOptions(
allow_unsafe_triggers=True,
streaming=True,
)
with TestPipeline(options=options) as p:
test_stream = p | TestStream(events=events, output_tags={None})
pcoll = (
test_stream
| "Window"
>> beam.WindowInto(
Sessions(int(timedelta(days=1).total_seconds())),
trigger=trigger.AfterWatermark(early=trigger.AfterCount(1)),
accumulation_mode=beam.trigger.AccumulationMode.ACCUMULATING,
)
| "Combine" >> beam.CombinePerKey(CustomCombine())
)
pcoll | beam.Map(print)
assert_that(pcoll, equal_to(expected))
class CustomCombine(beam.CombineFn):
def create_accumulator(self) -> list:
return []
def add_input(self, accumulator: list, input) -> list:
return accumulator + [input]
def merge_accumulators(self, accumulators: Iterable[list]) -> list:
res = [x for acc in accumulators for x in acc]
return res
def extract_output(self, accumulator):
return accumulator |
What happened?
Python SDK version: 2.48.0
It is not possible to fire the
Sessions
window early based on the number of elements in it using theAfterCount
trigger. The window only fires when the watermark advances and closes it.Related issue: #20813. It is supposed to be fixed, but reproduces for me.
The code snippet provided below is a test that:
Sessions
window.trigger.AfterCount(1)
trigger that is supposed to fire the window early on every element.Note that when the commented
trigger=trigger.AfterEach(trigger.AfterCount(1), trigger.AfterWatermark()),
line is used instead, the result is the same.Expected: All three elements will be processed separately because the window fires after every single element:
Actual: The first two elements are processed together because they fall in the same window based on the watermark:
The test that reproduces the issue:
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
The text was updated successfully, but these errors were encountered: