Skip to content
This repository has been archived by the owner on Aug 31, 2021. It is now read-only.

Commit

Permalink
(VDB-570) Handle duplicate storage diffs
Browse files Browse the repository at this point in the history
- If processing a new diff for a row that already exists in the DB,
  ignore the error without logging or queueing
- If processing a queued diff for a row that already exists, remove
  it from the queue
  • Loading branch information
rmulhol committed May 2, 2019
1 parent 782e3fd commit 36c4da3
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 16 deletions.
2 changes: 1 addition & 1 deletion cmd/composeAndExecute.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,5 +193,5 @@ func composeAndExecute() {
func init() {
rootCmd.AddCommand(composeAndExecuteCmd)
composeAndExecuteCmd.Flags().BoolVarP(&recheckHeadersArg, "recheck-headers", "r", false, "whether to re-check headers for watched events")
composeAndExecuteCmd.Flags().DurationVarP(&queueRecheckInterval, "queue-recheck-interval", "q", 5 * time.Minute, "how often to recheck queued storage diffs")
composeAndExecuteCmd.Flags().DurationVarP(&queueRecheckInterval, "queue-recheck-interval", "q", 5*time.Minute, "how often to recheck queued storage diffs")
}
2 changes: 1 addition & 1 deletion cmd/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func execute() {
func init() {
rootCmd.AddCommand(executeCmd)
executeCmd.Flags().BoolVarP(&recheckHeadersArg, "recheck-headers", "r", false, "whether to re-check headers for watched events")
executeCmd.Flags().DurationVarP(&queueRecheckInterval, "queue-recheck-interval", "q", 5 * time.Minute, "how often to recheck queued storage diffs")
executeCmd.Flags().DurationVarP(&queueRecheckInterval, "queue-recheck-interval", "q", 5*time.Minute, "how often to recheck queued storage diffs")
}

type Exporter interface {
Expand Down
24 changes: 12 additions & 12 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,22 @@ import (
)

var (
cfgFile string
databaseConfig config.Database
genConfig config.Plugin
ipc string
levelDbPath string
cfgFile string
databaseConfig config.Database
genConfig config.Plugin
ipc string
levelDbPath string
queueRecheckInterval time.Duration
startingBlockNumber int64
storageDiffsPath string
syncAll bool
endingBlockNumber int64
recheckHeadersArg bool
startingBlockNumber int64
storageDiffsPath string
syncAll bool
endingBlockNumber int64
recheckHeadersArg bool
)

const (
pollingInterval = 7 * time.Second
validationWindow = 15
pollingInterval = 7 * time.Second
validationWindow = 15
)

var rootCmd = &cobra.Command{
Expand Down
3 changes: 3 additions & 0 deletions libraries/shared/storage/utils/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
package utils

import (
"errors"
"fmt"
)

var ErrRowExists = errors.New("parsed row for storage diff already exists")

type ErrContractNotFound struct {
Contract string
}
Expand Down
4 changes: 2 additions & 2 deletions libraries/shared/watcher/storage_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (storageWatcher StorageWatcher) processRow(row utils.StorageDiffRow) {
return
}
executeErr := storageTransformer.Execute(row)
if executeErr != nil {
if executeErr != nil && executeErr != utils.ErrRowExists {
logrus.Warn(fmt.Sprintf("error executing storage transformer: %s", executeErr))
queueErr := storageWatcher.Queue.Add(row)
if queueErr != nil {
Expand All @@ -100,7 +100,7 @@ func (storageWatcher StorageWatcher) processQueue() {
continue
}
executeErr := storageTransformer.Execute(row)
if executeErr == nil {
if executeErr == nil || executeErr == utils.ErrRowExists {
storageWatcher.deleteRow(row.Id)
}
}
Expand Down
23 changes: 23 additions & 0 deletions libraries/shared/watcher/storage_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,18 @@ var _ = Describe("Storage Watcher", func() {
close(done)
})

It("does not queue row if transformer execution fails because row already exists", func(done Done) {
mockTransformer.ExecuteErr = utils.ErrRowExists

go storageWatcher.Execute(rows, errs, time.Hour)

Expect(<-errs).To(BeNil())
Consistently(func() bool {
return mockQueue.AddCalled
}).Should(BeFalse())
close(done)
})

It("queues row for later processing if transformer execution fails", func(done Done) {
mockTransformer.ExecuteErr = fakes.FakeError

Expand Down Expand Up @@ -187,6 +199,17 @@ var _ = Describe("Storage Watcher", func() {
close(done)
})

It("deletes row from queue if transformer execution errors because row already exists", func(done Done) {
mockTransformer.ExecuteErr = utils.ErrRowExists

go storageWatcher.Execute(rows, errs, time.Nanosecond)

Eventually(func() int {
return mockQueue.DeletePassedId
}).Should(Equal(row.Id))
close(done)
})

It("logs error if deleting persisted row fails", func(done Done) {
mockQueue.DeleteErr = fakes.FakeError
tempFile, fileErr := ioutil.TempFile("", "log")
Expand Down

0 comments on commit 36c4da3

Please sign in to comment.