diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 98519cd508ba..01b12cfa717a 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -812,7 +812,9 @@ public final void processElement2(StreamRecord streamRecord) thro WindowedValue element = it.next(); // we need to set the correct key in case the operator is // a (keyed) window operator - setKeyContextElement1(new StreamRecord<>(element)); + if (keySelector != null) { + setCurrentKey(keySelector.getKey(element)); + } Iterable> justPushedBack = pushbackDoFnRunner.processElementInReadyWindows(element);