Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Avoid global logger in events #6258

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion activation/handler_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,13 @@
return nil, fmt.Errorf("cannot store atx %s: %w", atx.ShortString(), err)
}

events.ReportNewActivation(atx)
if err := events.ReportNewActivation(atx); err != nil {
h.logger.Error("failed to emit activation",
log.ZShortStringer("atx_id", atx.ID()),
zap.Uint32("epoch", atx.PublishEpoch.Uint32()),
zap.Error(err),
)

Check warning on line 588 in activation/handler_v1.go

View check run for this annotation

Codecov / codecov/patch

activation/handler_v1.go#L584-L588

Added lines #L584 - L588 were not covered by tests
}
h.logger.Debug("new atx",
log.ZContext(ctx),
zap.Inline(atx),
Expand Down
8 changes: 7 additions & 1 deletion activation/handler_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,13 @@
return fmt.Errorf("cannot store atx %s: %w", atx.ShortString(), err)
}

events.ReportNewActivation(atx)
if err := events.ReportNewActivation(atx); err != nil {
h.logger.Error("failed to emit activation",
log.ZShortStringer("atx_id", atx.ID()),
zap.Uint32("epoch", atx.PublishEpoch.Uint32()),
zap.Error(err),
)

Check warning on line 153 in activation/handler_v2.go

View check run for this annotation

Codecov / codecov/patch

activation/handler_v2.go#L149-L153

Added lines #L149 - L153 were not covered by tests
}
h.logger.Debug("new atx", log.ZContext(ctx), zap.Inline(atx))
return err
}
Expand Down
32 changes: 26 additions & 6 deletions api/grpcserver/globalstate_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@
}

ctxzap.Debug(ctx, "GRPC GlobalStateService.Account",
addr.Field().Zap(),
zap.Stringer("address", addr),
zap.Uint64("balance", acct.StateCurrent.Balance.Value),
zap.Uint64("counter", acct.StateCurrent.Counter),
zap.Uint64("balance projected", acct.StateProjected.Balance.Value),
Expand Down Expand Up @@ -240,12 +240,20 @@
rewardsBufFull <-chan struct{}
)
if filterAccount {
if accountSubscription := events.SubscribeAccount(); accountSubscription != nil {
accountSubscription, err := events.SubscribeAccount()
if err != nil {
return status.Errorf(codes.Internal, "error subscribing to account events: %v", err)

Check warning on line 245 in api/grpcserver/globalstate_service.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/globalstate_service.go#L245

Added line #L245 was not covered by tests
}
if accountSubscription != nil {
accountCh, accountBufFull = consumeEvents[events.Account](stream.Context(), accountSubscription)
}
}
if filterReward {
if rewardsSubscription := events.SubscribeRewards(); rewardsSubscription != nil {
rewardsSubscription, err := events.SubscribeRewards()
if err != nil {
return status.Errorf(codes.Internal, "error subscribing to rewards events: %v", err)

Check warning on line 254 in api/grpcserver/globalstate_service.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/globalstate_service.go#L254

Added line #L254 was not covered by tests
}
if rewardsSubscription != nil {
rewardsCh, rewardsBufFull = consumeEvents[types.Reward](stream.Context(), rewardsSubscription)
}
}
Expand Down Expand Up @@ -369,20 +377,32 @@
layersBufFull <-chan struct{}
)
if filterAccount {
if accountSubscription := events.SubscribeAccount(); accountSubscription != nil {
accountSubscription, err := events.SubscribeAccount()
if err != nil {
return status.Errorf(codes.Internal, "error subscribing to account events: %v", err)

Check warning on line 382 in api/grpcserver/globalstate_service.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/globalstate_service.go#L382

Added line #L382 was not covered by tests
}
if accountSubscription != nil {
accountCh, accountBufFull = consumeEvents[events.Account](stream.Context(), accountSubscription)
}
}
if filterReward {
if rewardsSubscription := events.SubscribeRewards(); rewardsSubscription != nil {
rewardsSubscription, err := events.SubscribeRewards()
if err != nil {
return status.Errorf(codes.Internal, "error subscribing to rewards events: %v", err)

Check warning on line 391 in api/grpcserver/globalstate_service.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/globalstate_service.go#L391

Added line #L391 was not covered by tests
}
if rewardsSubscription != nil {
rewardsCh, rewardsBufFull = consumeEvents[types.Reward](stream.Context(), rewardsSubscription)
}
}

if filterState {
// Whenever new state is applied to the mesh, a new layer is reported.
// There is no separate reporting specifically for new state.
if layersSubscription := events.SubscribeLayers(); layersSubscription != nil {
layersSubscription, err := events.SubscribeLayers()
if err != nil {
return status.Errorf(codes.Internal, "error subscribing to layer updates: %v", err)

Check warning on line 403 in api/grpcserver/globalstate_service.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/globalstate_service.go#L403

Added line #L403 was not covered by tests
}
if layersSubscription != nil {
layersCh, layersBufFull = consumeEvents[events.LayerUpdate](stream.Context(), layersSubscription)
}
}
Expand Down
38 changes: 19 additions & 19 deletions api/grpcserver/grpcserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1445,7 +1445,7 @@ func TestTransactionService(t *testing.T) {
// Give the server-side time to subscribe to events
time.Sleep(time.Millisecond * 50)

events.ReportNewTx(0, globalTx)
require.NoError(t, events.ReportNewTx(0, globalTx))
res, err := stream.Recv()
require.NoError(t, err)
require.Nil(t, res.Transaction)
Expand All @@ -1470,7 +1470,7 @@ func TestTransactionService(t *testing.T) {
// Give the server-side time to subscribe to events
time.Sleep(time.Millisecond * 50)

events.ReportNewTx(0, globalTx)
require.NoError(t, events.ReportNewTx(0, globalTx))

// Verify
res, err := stream.Recv()
Expand Down Expand Up @@ -1563,7 +1563,7 @@ func TestTransactionService(t *testing.T) {

// TODO send header after stream has subscribed

events.ReportNewTx(0, globalTx)
require.NoError(t, events.ReportNewTx(0, globalTx))

for _, stream := range streams {
res, err := stream.Recv()
Expand Down Expand Up @@ -1593,7 +1593,7 @@ func TestTransactionService(t *testing.T) {
time.Sleep(time.Millisecond * 50)

for range subscriptionChanBufSize * 2 {
events.ReportNewTx(0, globalTx)
require.NoError(t, events.ReportNewTx(0, globalTx))
}

for range subscriptionChanBufSize {
Expand Down Expand Up @@ -1691,15 +1691,15 @@ func TestAccountMeshDataStream_comprehensive(t *testing.T) {
time.Sleep(time.Millisecond * 50)

// publish a tx
events.ReportNewTx(0, globalTx)
require.NoError(t, events.ReportNewTx(0, globalTx))
res, err := stream.Recv()
require.NoError(t, err, "got error from stream")
checkAccountMeshDataItemTx(t, res.Datum.Datum)

// test streaming a tx and an atx that are filtered out
// these should not be received
events.ReportNewTx(0, globalTx2)
events.ReportNewActivation(globalAtx2)
require.NoError(t, events.ReportNewTx(0, globalTx2))
require.NoError(t, events.ReportNewActivation(globalAtx2))

_, err = stream.Recv()
require.Error(t, err)
Expand Down Expand Up @@ -1739,29 +1739,29 @@ func TestAccountDataStream_comprehensive(t *testing.T) {
// Give the server-side time to subscribe to events
time.Sleep(time.Millisecond * 50)

events.ReportRewardReceived(types.Reward{
require.NoError(t, events.ReportRewardReceived(types.Reward{
Layer: layerFirst,
TotalReward: rewardAmount,
LayerReward: rewardAmount * 2,
Coinbase: addr1,
SmesherID: rewardSmesherID,
})
}))

res, err := stream.Recv()
require.NoError(t, err)
checkAccountDataItemReward(t, res.Datum.Datum)

// publish an account data update
events.ReportAccountUpdate(addr1)
require.NoError(t, events.ReportAccountUpdate(addr1))

res, err = stream.Recv()
require.NoError(t, err)
checkAccountDataItemAccount(t, res.Datum.Datum)

// test streaming a reward and account update that should be filtered out
// these should not be received
events.ReportAccountUpdate(addr2)
events.ReportRewardReceived(types.Reward{Coinbase: addr2})
require.NoError(t, events.ReportAccountUpdate(addr2))
require.NoError(t, events.ReportRewardReceived(types.Reward{Coinbase: addr2}))

_, err = stream.Recv()
require.Error(t, err)
Expand Down Expand Up @@ -1796,19 +1796,19 @@ func TestGlobalStateStream_comprehensive(t *testing.T) {
time.Sleep(time.Millisecond * 50)

// publish a reward
events.ReportRewardReceived(types.Reward{
require.NoError(t, events.ReportRewardReceived(types.Reward{
Layer: layerFirst,
TotalReward: rewardAmount,
LayerReward: rewardAmount * 2,
Coinbase: addr1,
SmesherID: rewardSmesherID,
})
}))
res, err := stream.Recv()
require.NoError(t, err, "got error from stream")
checkGlobalStateDataReward(t, res.Datum.Datum)

// publish an account data update
events.ReportAccountUpdate(addr1)
require.NoError(t, events.ReportAccountUpdate(addr1))
res, err = stream.Recv()
require.NoError(t, err, "got error from stream")
checkGlobalStateDataAccountWrapper(t, res.Datum.Datum)
Expand All @@ -1817,10 +1817,10 @@ func TestGlobalStateStream_comprehensive(t *testing.T) {
layer, err := meshAPIMock.GetLayer(layerFirst)
require.NoError(t, err)

events.ReportLayerUpdate(events.LayerUpdate{
require.NoError(t, events.ReportLayerUpdate(events.LayerUpdate{
LayerID: layer.Index(),
Status: events.LayerStatusTypeApplied,
})
}))
res, err = stream.Recv()
require.NoError(t, err, "got error from stream")
checkGlobalStateDataGlobalState(t, res.Datum.Datum)
Expand Down Expand Up @@ -1868,10 +1868,10 @@ func TestLayerStream_comprehensive(t *testing.T) {
require.NoError(t, err)

// Act
events.ReportLayerUpdate(events.LayerUpdate{
require.NoError(t, events.ReportLayerUpdate(events.LayerUpdate{
LayerID: layer.Index(),
Status: events.LayerStatusTypeConfirmed,
})
}))

// Verify
res, err := stream.Recv()
Expand Down
34 changes: 27 additions & 7 deletions api/grpcserver/mesh_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@
// internal or an input error? For now, all missing layers produce
// internal errors.
if err != nil {
ctxzap.Error(ctx, "could not read layer from database", layerID.Field().Zap(), zap.Error(err))
ctxzap.Error(ctx, "could not read layer from database", zap.Uint32("lid", layerID.Uint32()), zap.Error(err))
return pbLayer, status.Errorf(codes.Internal, "error reading layer data: %v", err)
} else if block == nil {
return pbLayer, nil
Expand All @@ -305,7 +305,9 @@
// E.g., if this node has not synced/received them yet.
if len(missing) != 0 {
ctxzap.Error(ctx, "could not find transactions from layer",
zap.String("missing", fmt.Sprint(missing)), layerID.Field().Zap())
zap.String("missing", fmt.Sprint(missing)),
zap.Uint32("lid", layerID.Uint32()),
)
return pbLayer, status.Errorf(codes.Internal, "error retrieving tx data")
}

Expand All @@ -325,14 +327,20 @@
// This is expected. We can only retrieve state root for a layer that was applied to state,
// which only happens after it's approved/confirmed.
ctxzap.Debug(ctx, "no state root for layer",
layerID.Field().Zap(), zap.Stringer("status", layerStatus), zap.Error(err))
zap.Uint32("lid", layerID.Uint32()),
zap.Stringer("status", layerStatus),
zap.Error(err),
)
}
hash, err := s.mesh.MeshHash(layerID)
if err != nil {
// This is expected. We can only retrieve state root for a layer that was applied to state,
// which only happens after it's approved/confirmed.
ctxzap.Debug(ctx, "no mesh hash at layer",
layerID.Field().Zap(), zap.Stringer("status", layerStatus), zap.Error(err))
zap.Uint32("lid", layerID.Uint32()),
zap.Stringer("status", layerStatus),
zap.Error(err),
)

Check warning on line 343 in api/grpcserver/mesh_service.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/mesh_service.go#L340-L343

Added lines #L340 - L343 were not covered by tests
}
pbLayer.Blocks = []*pb.Block{pbBlock}
pbLayer.Hash = hash.Bytes()
Expand Down Expand Up @@ -424,12 +432,20 @@
)

if filterTx {
if txsSubscription := events.SubscribeTxs(); txsSubscription != nil {
txsSubscription, err := events.SubscribeTxs()
if err != nil {
return status.Errorf(codes.Internal, "subscribing to txs failed: %v", err)

Check warning on line 437 in api/grpcserver/mesh_service.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/mesh_service.go#L437

Added line #L437 was not covered by tests
}
if txsSubscription != nil {
txCh, txBufFull = consumeEvents[events.Transaction](stream.Context(), txsSubscription)
}
}
if filterActivations {
if activationsSubscription := events.SubscribeActivations(); activationsSubscription != nil {
activationsSubscription, err := events.SubscribeActivations()
if err != nil {
return status.Errorf(codes.Internal, "subscribing to activations failed: %v", err)

Check warning on line 446 in api/grpcserver/mesh_service.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/mesh_service.go#L446

Added line #L446 was not covered by tests
}
if activationsSubscription != nil {
activationsCh, activationsBufFull = consumeEvents[events.ActivationTx](
stream.Context(),
activationsSubscription,
Expand Down Expand Up @@ -497,7 +513,11 @@
layersBufFull <-chan struct{}
)

if layersSubscription := events.SubscribeLayers(); layersSubscription != nil {
layersSubscription, err := events.SubscribeLayers()
if err != nil {
return status.Errorf(codes.Internal, "subscribing to layers failed: %v", err)

Check warning on line 518 in api/grpcserver/mesh_service.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/mesh_service.go#L518

Added line #L518 was not covered by tests
}
if layersSubscription != nil {
layerCh, layersBufFull = consumeEvents[events.LayerUpdate](stream.Context(), layersSubscription)
}

Expand Down
12 changes: 10 additions & 2 deletions api/grpcserver/node_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,11 @@
statusBufFull <-chan struct{}
)

if statusSubscription := events.SubscribeStatus(); statusSubscription != nil {
statusSubscription, err := events.SubscribeStatus()
if err != nil {
return status.Errorf(codes.Internal, "failed to subscribe to status events: %v", err)

Check warning on line 138 in api/grpcserver/node_service.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/node_service.go#L138

Added line #L138 was not covered by tests
}
if statusSubscription != nil {
statusCh, statusBufFull = consumeEvents[events.Status](stream.Context(), statusSubscription)
}

Expand Down Expand Up @@ -180,7 +184,11 @@
errorsBufFull <-chan struct{}
)

if errorsSubscription := events.SubscribeErrors(); errorsSubscription != nil {
errorsSubscription, err := events.SubscribeErrors()
if err != nil {
return status.Errorf(codes.Internal, "failed to subscribe to error events: %v", err)

Check warning on line 189 in api/grpcserver/node_service.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/node_service.go#L189

Added line #L189 was not covered by tests
}
if errorsSubscription != nil {
errorsCh, errorsBufFull = consumeEvents[events.NodeError](stream.Context(), errorsSubscription)
}
if err := stream.SendHeader(metadata.MD{}); err != nil {
Expand Down
18 changes: 13 additions & 5 deletions api/grpcserver/transaction_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,19 @@
txBufFull, layerBufFull <-chan struct{}
)

if txsSubscription := events.SubscribeTxs(); txsSubscription != nil {
txsSubscription, err := events.SubscribeTxs()
if err != nil {
return status.Errorf(codes.Internal, "failed to subscribe to tx events: %v", err)

Check warning on line 207 in api/grpcserver/transaction_service.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/transaction_service.go#L207

Added line #L207 was not covered by tests
}
if txsSubscription != nil {
txCh, txBufFull = consumeEvents[events.Transaction](stream.Context(), txsSubscription)
}

if layersSubscription := events.SubscribeLayers(); layersSubscription != nil {
layersSubscription, err := events.SubscribeLayers()
if err != nil {
return status.Errorf(codes.Internal, "failed to subscribe to layer events: %v", err)

Check warning on line 215 in api/grpcserver/transaction_service.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/transaction_service.go#L215

Added line #L215 was not covered by tests
}
if layersSubscription != nil {
layerCh, layerBufFull = consumeEvents[events.LayerUpdate](stream.Context(), layersSubscription)
}

Expand Down Expand Up @@ -265,7 +273,7 @@
ctxzap.Error(
stream.Context(),
"error reading layer data for updated layer",
layer.LayerID.Field().Zap(),
zap.Uint32("lid", layer.LayerID.Uint32()),

Check warning on line 276 in api/grpcserver/transaction_service.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/transaction_service.go#L276

Added line #L276 was not covered by tests
zap.Error(err),
)
return status.Error(codes.Internal, "error reading layer data")
Expand Down Expand Up @@ -313,8 +321,8 @@
ctxzap.Error(
stream.Context(),
"could not find transaction from layer",
txid.Field().Zap(),
layer.Field().Zap(),
zap.Stringer("tx_id", txid),
zap.Inline(layer),

Check warning on line 325 in api/grpcserver/transaction_service.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/transaction_service.go#L324-L325

Added lines #L324 - L325 were not covered by tests
zap.Error(err),
)
return status.Error(codes.Internal, "error retrieving tx data")
Expand Down
2 changes: 1 addition & 1 deletion api/grpcserver/transaction_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestTransactionService_StreamResults(t *testing.T) {

var expect []*types.TransactionWithResult
for _, rst := range streamed {
events.ReportResult(*rst)
require.NoError(t, events.ReportResult(*rst))
if tc.matcher.match(rst) {
expect = append(expect, rst)
}
Expand Down
Loading
Loading