Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: OnTimer("loopingTimer") not triggered on 2.52.0 #29816

Closed
1 of 16 tasks
yardenbm opened this issue Dec 19, 2023 · 5 comments · Fixed by #30969
Closed
1 of 16 tasks

[Bug]: OnTimer("loopingTimer") not triggered on 2.52.0 #29816

yardenbm opened this issue Dec 19, 2023 · 5 comments · Fixed by #30969

Comments

@yardenbm
Copy link

What happened?

Since upgrading to 2.52.0, looks like the @OnTimer("loopingTimer") is not working properly for me.
Was tested and working on 2.51
no logs were printed from @OnTimer("loopingTimer")
The logic in my code is pretty much identical to: https://beam.apache.org/blog/looping-timers/ , Option 3:

      @ProcessElement public void process(ProcessContext c, @StateId("key") ValueState<String> key,
        @StateId("loopingTimerTime") ValueState<Long> loopingTimerTime,
        @TimerId("loopingTimer") Timer loopingTimer) {

      Long currentTimerValue = loopingTimerTime.read();
      Instant nextTimerTimeBasedOnCurrentElement = c.timestamp().plus(Duration.standardMinutes(1));

      if (currentTimerValue == null || currentTimerValue >
          nextTimerTimeBasedOnCurrentElement.getMillis()) {
        loopingTimer.set(nextTimerTimeBasedOnCurrentElement);
        loopingTimerTime.write(nextTimerTimeBasedOnCurrentElement.getMillis());
      }

      if (key.read() == null) {
        key.write(c.element().getKey());
      }

      c.output(c.element());
    }

    @OnTimer("loopingTimer")
    public void onTimer(
        OnTimerContext c,
        @StateId("key") ValueState<String> key,
        @TimerId("loopingTimer") Timer loopingTimer) {

      LOG.info("Timer @ {} fired", c.timestamp());
      c.output(KV.of(key.read(), 0));

      Instant nextTimer = c.timestamp().plus(Duration.standardMinutes(1));
      if (nextTimer.isBefore(stopTimerTime)) {
        loopingTimer.set(nextTimer);
      } else {
        LOG.info(
            "Timer not being set as exceeded Stop Timer value {} ",
            stopTimerTime);
      }
    }
  }

using FlinkRunner

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@sigalite
Copy link

hi, any updates on this issue? we see that same behaviour in beam 2.53 so we are not able to upgrade to latest versions.

@je-ik
Copy link
Contributor

je-ik commented Mar 27, 2024

Hi, what is the source?

@je-ik
Copy link
Contributor

je-ik commented Apr 11, 2024

I see similar issues with 2.55.0 using Kafka and --experiments=use_deprecated_read. This seems to work if experiments=beam_fn_api is used, but this works only for classical (non-portable) runner. I will investigate this and report the results.

@je-ik je-ik self-assigned this Apr 11, 2024
@je-ik je-ik mentioned this issue Apr 12, 2024
3 tasks
@Abacn
Copy link
Contributor

Abacn commented Apr 12, 2024

A major Flink runner change from Beam 2.51.0 -> 2.52.0 was #28614. Could this be related to #29902 ?

@je-ik
Copy link
Contributor

je-ik commented Apr 15, 2024

I don't think this is related. #28614 is related to batch execution. I'll investigate this and report results here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants