From 1c73ea64ec477159105cb688c9ceadbdd83c4648 Mon Sep 17 00:00:00 2001 From: Johnny Graettinger Date: Wed, 16 Oct 2024 16:22:03 -0500 Subject: [PATCH] go/runtime: journal watches used for mapping must use shard context And not the task term context. The issue is that task term context is canceled when the term *begins* to change, but we allow the current transaction to complete, which means writing out documents, which means we may need to create a new physical partition and are not able to do that correctly if the journal watch is no longer updating. Instead, wrap the shard context with a cancel function and cancel prior watches at the start of the *next* task term. --- go/runtime/capture.go | 12 +++++++++--- go/runtime/derive.go | 17 ++++++++++++----- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/go/runtime/capture.go b/go/runtime/capture.go index 5b23653b57..20f48a1f03 100644 --- a/go/runtime/capture.go +++ b/go/runtime/capture.go @@ -30,6 +30,7 @@ type Capture struct { restarts message.Clock // Increments for each restart. transactions message.Clock // Increments for each transaction. watches []*client.WatchedList // Watches of binding journals. + watchCancel context.CancelFunc // Canceler of watches. } var _ Application = (*Capture)(nil) @@ -71,12 +72,17 @@ func (c *Capture) RestoreCheckpoint(shard consumer.Shard) (_ pf.Checkpoint, _err return pf.Checkpoint{}, err } - // Note that prior watches are cancelled with the prior term context. + var watchCtx context.Context c.watches = c.watches[:0] // Truncate. + if c.watchCancel != nil { + c.watchCancel() // Cancel watches of previous term. + } + watchCtx, c.watchCancel = context.WithCancel(shard.Context()) + for _, binding := range c.term.taskSpec.Bindings { c.watches = append(c.watches, client.NewWatchedList( - c.term.ctx, + watchCtx, shard.JournalClient(), flow.CollectionWatchRequest(&binding.Collection), nil, @@ -224,7 +230,7 @@ func pollLoop( case op = <-pollCh: } - // Wait for the prior commit's OpFuture to resolve succesfully. + // Wait for the prior commit's OpFuture to resolve successfully. if err := op.Err(); err != nil { return err } diff --git a/go/runtime/derive.go b/go/runtime/derive.go index 04983841be..6b0cb01961 100644 --- a/go/runtime/derive.go +++ b/go/runtime/derive.go @@ -1,6 +1,7 @@ package runtime import ( + "context" "database/sql" "encoding/json" "fmt" @@ -23,9 +24,10 @@ import ( // Derive is a top-level Application which implements the derivation workflow. type Derive struct { *taskReader[*pf.CollectionSpec] - client pd.Connector_DeriveClient - sqlite *store_sqlite.Store - watch *client.WatchedList + client pd.Connector_DeriveClient + sqlite *store_sqlite.Store + watch *client.WatchedList + watchCancel context.CancelFunc } var _ Application = (*Derive)(nil) @@ -83,9 +85,14 @@ func (d *Derive) RestoreCheckpoint(shard consumer.Shard) (_ pf.Checkpoint, _err return pf.Checkpoint{}, err } - // Note the prior watch is cancelled with the prior term context. + var watchCtx context.Context + if d.watchCancel != nil { + d.watchCancel() // Cancel watch of previous term. + } + watchCtx, d.watchCancel = context.WithCancel(shard.Context()) + d.watch = client.NewWatchedList( - d.term.ctx, + watchCtx, shard.JournalClient(), flow.CollectionWatchRequest(d.term.taskSpec), nil,