-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Bug]: Windowing Causes Panics In Timer and State #32559
Comments
Im also hitting slightly different behavior when I deploy a pipeline to the GCP dataflow runner. There, I see after an indeterminate count of retries, reading of state in OnTimer shows the state has been wiped (the below snippet returns tcp = false). tc, tcp, err := fn.TimerCount.Read(sp)
if err != nil || !tcp {
panic("couldn't read TimerCount state")
} Am I incorrect in assuming that if my ProcessElement function always sets state values, then OnTimer will always have that value populated? I was using timers.WithOutputTimestamp(ts) To ensure the watermark doesn't ever advance and thus the processbundle doesn't complete. For context, I am trying to add this timer per every element in a PCollection, so I am adding a uuid key. Regardless, I wouldn't expect the behavior to result in a wiped State. I did add a log in FinishBundle for the above run, and can confirm what is occurring is the bundle is being closed before OnTimer is called. In other words behavior is
func (fn * doNothingTransformFn) FinishBundle(ctx context.Context, _ func(string)) error {
log.Infof(ctx, "Closing bundle!")
return nil
} |
Obligatory question about whether the bug still exists in 2.59.0? Prism is under active development, so it's not impossible it was fixed between versions. |
#32559 (comment) is probably a separate issue. I would not yet vouch for the behavior of ProcessingTime timers yet without using TestStream, as we have yet to put in the ability for Prism actually execute in "real" time, so it's not going to execute in a comparable fashion to a production runner like Dataflow. It's important to note that setting the OutputTimestamp only arrests the downstream watermark, not the upstream watermark which determines EventTime firing. Also, outside of literally blocking the execution within a DoFn, Process bundle will eventually complete when all user code has returned. The nil/empty bytes issue might have been #32245 which either cause corruption or zeroing when built with Go 1.23.0+. |
Finally, please include more of the panic trace. Based on the provided information, it happens on line 45 of the given file, which is the line I think I saw errors like this when running the Java Validates Runner tests, but had not chased them down specifically (there were other features to resolve there first). So having a Go repro shows it's not simply the Java SDK doing something prism can't yet tolerate. |
Great news! I upgraded to |
Glad to hear it! I don't think the stack trace is necessary now, if the panic is gone. I recall that I had to fix timer handling wrt bytes being incorrectly interpreted, to pass additional tests so that's probably what did it. Closing this issue for now. |
Sounds good, trying to actually get a reproducible example of a dataflow deployed dofn causing the issue in followup before filing an issue. Thanks again for the help |
What happened?
I've been trying to use the new beam state and timers to implement a retry policy. This retry policy:
The input is a keyed pcollection with a single entry. It seems like windowing causes my pardo to panic, which is surprising to me from having read stateful and timely processing and the tour of beam (particularly this example).
Using a debugger to move through my code, the panic occurs after
ProcessElement
and beforeOnTimer
is called.using this package in
go.mod
github.com/apache/beam/sdks/v2 v2.58.0
donothing.go
donothing_test.go
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
The text was updated successfully, but these errors were encountered: