Skip to content

Commit

Permalink
go/runtime: cancel task term if we fail to RestoreCheckpoint
Browse files Browse the repository at this point in the history
Most initialization or authorization errors happen during term
initialization. If an error occurs, fail-fast to cancel all term-related
work like journal listings and reads.

Also add a cancellation context to the *OpsPublisher, which is the very
last thing to happen in task Destroy() routines, so we don't
indefinitely retry failed appends to ops journals which don't exist or
to which we aren't authorized if the causal shard itself has been deleted.

This dramatically squelches log noise from failed RBAC checks.
  • Loading branch information
jgraettinger committed Sep 2, 2024
1 parent b2ebe81 commit 5fa7e6b
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 22 deletions.
14 changes: 10 additions & 4 deletions go/runtime/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,12 @@ func NewCaptureApp(host *FlowConsumer, shard consumer.Shard, recorder *recoveryl

// RestoreCheckpoint initializes a catalog task term and restores the last
// persisted checkpoint, if any, by delegating to its JsonStore.
func (c *Capture) RestoreCheckpoint(shard consumer.Shard) (pf.Checkpoint, error) {
func (c *Capture) RestoreCheckpoint(shard consumer.Shard) (_ pf.Checkpoint, _err error) {
defer func() {
if _err != nil {
c.term.cancel()
}
}()
if err := c.initTerm(shard); err != nil {
return pf.Checkpoint{}, err
}
Expand Down Expand Up @@ -335,9 +340,9 @@ func (c *Capture) ConsumeMessage(shard consumer.Shard, env message.Envelope, pub
if len(stats.Capture) == 0 {
// The connector may have only emitted an empty checkpoint.
// Don't publish stats in this case.
ops.PublishLog(c.publisher, ops.Log_debug,
ops.PublishLog(c.opsPublisher, ops.Log_debug,
"capture transaction committing updating driver checkpoint only")
} else if err := c.publisher.PublishStats(*stats, pub.PublishUncommitted); err != nil {
} else if err := c.opsPublisher.PublishStats(*stats, pub.PublishUncommitted); err != nil {
return fmt.Errorf("publishing stats: %w", err)
}

Expand All @@ -350,7 +355,7 @@ func (c *Capture) StartCommit(shard consumer.Shard, cp pf.Checkpoint, waitFor co
return pf.FinishedOperation(nil)
}

ops.PublishLog(c.publisher, ops.Log_debug,
ops.PublishLog(c.opsPublisher, ops.Log_debug,
"StartCommit",
"capture", c.term.labels.TaskName,
"shard", c.term.shardSpec.Id,
Expand Down Expand Up @@ -389,6 +394,7 @@ func (c *Capture) Destroy() {
_ = c.client.CloseSend()
}
c.taskBase.drop()
c.taskBase.opsCancel()
}

func (c *Capture) BeginTxn(consumer.Shard) error { return nil } // No-op.
Expand Down
13 changes: 10 additions & 3 deletions go/runtime/derive.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,13 @@ func NewDeriveApp(host *FlowConsumer, shard consumer.Shard, recorder *recoverylo
}, nil
}

func (d *Derive) RestoreCheckpoint(shard consumer.Shard) (pf.Checkpoint, error) {
func (d *Derive) RestoreCheckpoint(shard consumer.Shard) (_ pf.Checkpoint, _err error) {
defer func() {
if _err != nil {
d.term.cancel()
}
}()

if err := d.initTerm(shard); err != nil {
return pf.Checkpoint{}, err
}
Expand Down Expand Up @@ -169,7 +175,7 @@ func (d *Derive) FinalizeTxn(shard consumer.Shard, pub *message.Publisher) error
}

} else if response.Flushed != nil {
if err := d.publisher.PublishStats(*responseExt.Flushed.Stats, pub.PublishUncommitted); err != nil {
if err := d.opsPublisher.PublishStats(*responseExt.Flushed.Stats, pub.PublishUncommitted); err != nil {
return fmt.Errorf("publishing stats: %w", err)
}
return nil
Expand All @@ -178,7 +184,7 @@ func (d *Derive) FinalizeTxn(shard consumer.Shard, pub *message.Publisher) error
}

func (d *Derive) StartCommit(_ consumer.Shard, cp pf.Checkpoint, waitFor client.OpFutures) client.OpFuture {
ops.PublishLog(d.publisher, ops.Log_debug,
ops.PublishLog(d.opsPublisher, ops.Log_debug,
"StartCommit",
"derivation", d.term.labels.TaskName,
"shard", d.term.shardSpec.Id,
Expand Down Expand Up @@ -217,6 +223,7 @@ func (d *Derive) Destroy() {
if d.sqlite != nil {
d.sqlite.Destroy()
}
d.taskBase.opsCancel()
}

func (d *Derive) ClearRegistersForTest() error {
Expand Down
13 changes: 10 additions & 3 deletions go/runtime/materialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,13 @@ func NewMaterializeApp(host *FlowConsumer, shard consumer.Shard, recorder *recov
}, nil
}

func (m *Materialize) RestoreCheckpoint(shard consumer.Shard) (pf.Checkpoint, error) {
func (m *Materialize) RestoreCheckpoint(shard consumer.Shard) (_ pf.Checkpoint, _err error) {
defer func() {
if _err != nil {
m.term.cancel()
}
}()

if err := m.initTerm(shard); err != nil {
return pf.Checkpoint{}, err
}
Expand Down Expand Up @@ -154,15 +160,15 @@ func (m *Materialize) FinalizeTxn(shard consumer.Shard, pub *message.Publisher)
}

var flushedExt = pr.FromInternal[pr.MaterializeResponseExt](resp.Internal)
if err := m.publisher.PublishStats(*flushedExt.Flushed.Stats, pub.PublishUncommitted); err != nil {
if err := m.opsPublisher.PublishStats(*flushedExt.Flushed.Stats, pub.PublishUncommitted); err != nil {
return fmt.Errorf("publishing stats: %w", err)
}

return nil
}

func (m *Materialize) StartCommit(shard consumer.Shard, cp pf.Checkpoint, waitFor consumer.OpFutures) consumer.OpFuture {
ops.PublishLog(m.publisher, ops.Log_debug,
ops.PublishLog(m.opsPublisher, ops.Log_debug,
"StartCommit",
"capture", m.term.labels.TaskName,
"shard", m.term.shardSpec.Id,
Expand Down Expand Up @@ -215,6 +221,7 @@ func (m *Materialize) Destroy() {
_ = m.client.CloseSend()
}
m.taskReader.drop()
m.taskBase.opsCancel()
}

func (m *Materialize) BeginTxn(shard consumer.Shard) error { return nil } // No-op.
Expand Down
27 changes: 15 additions & 12 deletions go/runtime/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ type taskBase[TaskSpec pf.Task] struct {
host *FlowConsumer // Host Consumer application of the shard.
legacyCheckpoint pc.Checkpoint // Legacy state.json runtime checkpoint.
legacyState json.RawMessage // Legacy state.json connector state.
publisher *OpsPublisher // ops.Publisher of task ops.Logs and ops.Stats.
opsCancel context.CancelFunc // Cancels ops.Publisher context.
opsPublisher *OpsPublisher // ops.Publisher of task ops.Logs and ops.Stats.
recorder *recoverylog.Recorder // Recorder of the shard's recovery log.
svc *bindings.TaskService // Associated Rust runtime service.
term taskTerm[TaskSpec] // Current task term.
Expand Down Expand Up @@ -70,18 +71,19 @@ func newTaskBase[TaskSpec pf.Task](
extractFn func(*sql.DB, string) (TaskSpec, error),
) (*taskBase[TaskSpec], error) {

var opsCtx = pprof.WithLabels(host.OpsContext, pprof.Labels(
var opsCtx, opsCancel = context.WithCancel(host.OpsContext)
opsCtx = pprof.WithLabels(opsCtx, pprof.Labels(
"shard", shard.Spec().Id.String(), // Same label set by consumer framework.
))
var publisher = NewOpsPublisher(message.NewPublisher(
var opsPublisher = NewOpsPublisher(message.NewPublisher(
client.NewAppendService(opsCtx, host.Service.Journals), nil))

var legacyCheckpoint, legacyState, err = parseLegacyState(recorder)
if err != nil {
return nil, err
}

term, err := newTaskTerm[TaskSpec](nil, extractFn, host, publisher, shard)
term, err := newTaskTerm[TaskSpec](nil, extractFn, host, opsPublisher, shard)
if err != nil {
return nil, err
}
Expand All @@ -93,7 +95,7 @@ func newTaskBase[TaskSpec pf.Task](
TaskName: term.labels.TaskName,
UdsPath: path.Join(recorder.Dir(), "socket"),
},
publisher,
opsPublisher,
)
if err != nil {
return nil, fmt.Errorf("creating task service: %w", err)
Expand All @@ -105,7 +107,8 @@ func newTaskBase[TaskSpec pf.Task](
host: host,
legacyCheckpoint: legacyCheckpoint,
legacyState: legacyState,
publisher: publisher,
opsCancel: opsCancel,
opsPublisher: opsPublisher,
recorder: recorder,
svc: svc,
term: *term,
Expand All @@ -122,16 +125,16 @@ func (t *taskBase[TaskSpec]) StartTaskHeartbeatLoop(shard consumer.Shard, contai
} else {
logrus.WithField("shard", shard.Spec().Id).Info("usageRate will be 0 because there is no container")
}
go taskHeartbeatLoop(shard, t.publisher, usageRate)
go taskHeartbeatLoop(shard, t.opsPublisher, usageRate)
}

func (t *taskBase[TaskSpec]) initTerm(shard consumer.Shard) error {
var next, err = newTaskTerm[TaskSpec](&t.term, t.extractFn, t.host, t.publisher, shard)
var next, err = newTaskTerm[TaskSpec](&t.term, t.extractFn, t.host, t.opsPublisher, shard)
if err != nil {
return err
}

ops.PublishLog(t.publisher, ops.Log_info,
ops.PublishLog(t.opsPublisher, ops.Log_info,
"initialized catalog task term",
"nextLabels", next.labels,
"prevLabels", t.term.labels,
Expand All @@ -146,7 +149,7 @@ func (t *taskBase[TaskSpec]) initTerm(shard consumer.Shard) error {
}

func (t *taskBase[TaskSpec]) proxyHook() (*pr.Container, ops.Publisher) {
return t.container.Load(), t.publisher
return t.container.Load(), t.opsPublisher
}

func (t *taskBase[TaskSpec]) drop() {
Expand Down Expand Up @@ -213,7 +216,7 @@ func newTaskReader[TaskSpec pf.Task](
) *taskReader[TaskSpec] {
var coordinator = shuffle.NewCoordinator(
shard.Context(),
base.publisher,
base.opsPublisher,
shard.JournalClient(),
)
return &taskReader[TaskSpec]{
Expand All @@ -231,7 +234,7 @@ func (t *taskReader[TaskSpec]) initTerm(shard consumer.Shard) error {
t.term.ctx,
shard.JournalClient(),
t.term.labels.Build,
t.publisher,
t.opsPublisher,
t.host.Service,
t.term.shardSpec.Id,
t.term.taskSpec,
Expand Down

0 comments on commit 5fa7e6b

Please sign in to comment.