Skip to content

Commit

Permalink
go/runtime: journal watches used for mapping must use shard context
Browse files Browse the repository at this point in the history
And not the task term context. The issue is that task term context is
canceled when the term *begins* to change, but we allow the current
transaction to complete, which means writing out documents, which means
we may need to create a new physical partition and are not able to do
that correctly if the journal watch is no longer updating.

Instead, wrap the shard context with a cancel function and cancel
prior watches at the start of the *next* task term.
  • Loading branch information
jgraettinger committed Oct 16, 2024
1 parent 69765a6 commit 1c73ea6
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 8 deletions.
12 changes: 9 additions & 3 deletions go/runtime/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Capture struct {
restarts message.Clock // Increments for each restart.
transactions message.Clock // Increments for each transaction.
watches []*client.WatchedList // Watches of binding journals.
watchCancel context.CancelFunc // Canceler of watches.
}

var _ Application = (*Capture)(nil)
Expand Down Expand Up @@ -71,12 +72,17 @@ func (c *Capture) RestoreCheckpoint(shard consumer.Shard) (_ pf.Checkpoint, _err
return pf.Checkpoint{}, err
}

// Note that prior watches are cancelled with the prior term context.
var watchCtx context.Context
c.watches = c.watches[:0] // Truncate.

if c.watchCancel != nil {
c.watchCancel() // Cancel watches of previous term.
}
watchCtx, c.watchCancel = context.WithCancel(shard.Context())

for _, binding := range c.term.taskSpec.Bindings {
c.watches = append(c.watches, client.NewWatchedList(
c.term.ctx,
watchCtx,
shard.JournalClient(),
flow.CollectionWatchRequest(&binding.Collection),
nil,
Expand Down Expand Up @@ -224,7 +230,7 @@ func pollLoop(
case op = <-pollCh:
}

// Wait for the prior commit's OpFuture to resolve succesfully.
// Wait for the prior commit's OpFuture to resolve successfully.
if err := op.Err(); err != nil {
return err
}
Expand Down
17 changes: 12 additions & 5 deletions go/runtime/derive.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package runtime

import (
"context"
"database/sql"
"encoding/json"
"fmt"
Expand All @@ -23,9 +24,10 @@ import (
// Derive is a top-level Application which implements the derivation workflow.
type Derive struct {
*taskReader[*pf.CollectionSpec]
client pd.Connector_DeriveClient
sqlite *store_sqlite.Store
watch *client.WatchedList
client pd.Connector_DeriveClient
sqlite *store_sqlite.Store
watch *client.WatchedList
watchCancel context.CancelFunc
}

var _ Application = (*Derive)(nil)
Expand Down Expand Up @@ -83,9 +85,14 @@ func (d *Derive) RestoreCheckpoint(shard consumer.Shard) (_ pf.Checkpoint, _err
return pf.Checkpoint{}, err
}

// Note the prior watch is cancelled with the prior term context.
var watchCtx context.Context
if d.watchCancel != nil {
d.watchCancel() // Cancel watch of previous term.
}
watchCtx, d.watchCancel = context.WithCancel(shard.Context())

d.watch = client.NewWatchedList(
d.term.ctx,
watchCtx,
shard.JournalClient(),
flow.CollectionWatchRequest(d.term.taskSpec),
nil,
Expand Down

0 comments on commit 1c73ea6

Please sign in to comment.