Skip to content

Commit

Permalink
fix synchornization handling response (#1002)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangchiqing authored Jul 21, 2021
1 parent cfa08e7 commit 2644560
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 21 deletions.
26 changes: 9 additions & 17 deletions engine/common/synchronization/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func (e *Engine) requestProcessingLoop() {
case <-notifier:
err := e.processAvailableRequests()
if err != nil {
e.log.Fatal().Err(err).Msg("internal error processing queued message")
e.log.Fatal().Err(err).Msg("internal error processing queued requests")
}
}
}
Expand Down Expand Up @@ -372,7 +372,7 @@ func (e *Engine) responseProcessingLoop() {
case <-notifier:
err := e.processAvailableResponses()
if err != nil {
e.log.Fatal().Err(err).Msg("internal error processing queued message")
e.log.Fatal().Err(err).Msg("internal error processing queued responses")
}
}
}
Expand All @@ -389,21 +389,15 @@ func (e *Engine) processAvailableResponses() error {

msg, ok := e.pendingSyncResponses.Get()
if ok {
err := e.onSyncResponse(msg.OriginID, msg.Payload.(*messages.SyncResponse))
e.onSyncResponse(msg.OriginID, msg.Payload.(*messages.SyncResponse))
e.metrics.MessageHandled(metrics.EngineSynchronization, metrics.MessageSyncResponse)
if err != nil {
return fmt.Errorf("could not process sync response")
}
continue
}

msg, ok = e.pendingBlockResponses.Get()
if ok {
err := e.onBlockResponse(msg.OriginID, msg.Payload.(*messages.BlockResponse))
e.onBlockResponse(msg.OriginID, msg.Payload.(*messages.BlockResponse))
e.metrics.MessageHandled(metrics.EngineSynchronization, metrics.MessageBlockResponse)
if err != nil {
return fmt.Errorf("could not process block response")
}
continue
}

Expand All @@ -427,7 +421,7 @@ func (e *Engine) processAvailableRequests() error {
if ok {
err := e.onSyncRequest(msg.OriginID, msg.Payload.(*messages.SyncRequest))
if err != nil {
return fmt.Errorf("could not process sync request: %w", err)
engine.LogError(e.log, err)
}
continue
}
Expand All @@ -436,7 +430,7 @@ func (e *Engine) processAvailableRequests() error {
if ok {
err := e.onRangeRequest(msg.OriginID, msg.Payload.(*messages.RangeRequest))
if err != nil {
return fmt.Errorf("could not process range request: %w", err)
engine.LogError(e.log, err)
}
continue
}
Expand All @@ -445,7 +439,7 @@ func (e *Engine) processAvailableRequests() error {
if ok {
err := e.onBatchRequest(msg.OriginID, msg.Payload.(*messages.BatchRequest))
if err != nil {
return fmt.Errorf("could not process batch request: %w", err)
engine.LogError(e.log, err)
}
continue
}
Expand Down Expand Up @@ -487,11 +481,10 @@ func (e *Engine) onSyncRequest(originID flow.Identifier, req *messages.SyncReque
}

// onSyncResponse processes a synchronization response.
func (e *Engine) onSyncResponse(originID flow.Identifier, res *messages.SyncResponse) error {
func (e *Engine) onSyncResponse(originID flow.Identifier, res *messages.SyncResponse) {

final := e.finalSnapshot().head
e.core.HandleHeight(final, res.Height)
return nil
}

// onRangeRequest processes a request for a range of blocks by height.
Expand Down Expand Up @@ -590,7 +583,7 @@ func (e *Engine) onBatchRequest(originID flow.Identifier, req *messages.BatchReq
}

// onBlockResponse processes a response containing a specifically requested block.
func (e *Engine) onBlockResponse(originID flow.Identifier, res *messages.BlockResponse) error {
func (e *Engine) onBlockResponse(originID flow.Identifier, res *messages.BlockResponse) {
// process the blocks one by one
for _, block := range res.Blocks {
if !e.core.HandleBlock(block.Header) {
Expand All @@ -602,7 +595,6 @@ func (e *Engine) onBlockResponse(originID flow.Identifier, res *messages.BlockRe
}
e.comp.SubmitLocal(synced)
}
return nil
}

// checkLoop will regularly scan for items that need requesting.
Expand Down
6 changes: 2 additions & 4 deletions engine/common/synchronization/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,7 @@ func (ss *SyncSuite) TestOnSyncResponse() {

// the height should be handled
ss.core.On("HandleHeight", ss.head, res.Height)
err := ss.e.onSyncResponse(originID, res)
ss.Assert().Nil(err)
ss.e.onSyncResponse(originID, res)
ss.core.AssertExpectations(ss.T())
}

Expand Down Expand Up @@ -366,8 +365,7 @@ func (ss *SyncSuite) TestOnBlockResponse() {
},
)

err := ss.e.onBlockResponse(originID, res)
ss.Assert().Nil(err)
ss.e.onBlockResponse(originID, res)
ss.comp.AssertExpectations(ss.T())
ss.core.AssertExpectations(ss.T())
}
Expand Down

0 comments on commit 2644560

Please sign in to comment.