Skip to content

Commit

Permalink
Merge pull request #5522 from multiversx/fixes-import-db-2023.08.24
Browse files Browse the repository at this point in the history
Fixes in import-db for rc/v1.6.0
  • Loading branch information
iulianpascalau authored Aug 25, 2023
2 parents fce29e5 + 6122d86 commit 1a3f0f2
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ func (r *requester) SetDebugHandler(_ dataRetriever.DebugHandler) error {
return nil
}

// RequestDataFromHashArray returns nil as it is disabled
func (r *requester) RequestDataFromHashArray(_ [][]byte, _ uint32) error {
return nil
}

// IsInterfaceNil returns true if there is no value under the interface
func (r *requester) IsInterfaceNil() bool {
return r == nil
Expand Down
8 changes: 7 additions & 1 deletion state/export_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package state

import (
"time"

"github.com/multiversx/mx-chain-core-go/marshal"
"github.com/multiversx/mx-chain-go/common"
"github.com/multiversx/mx-chain-go/testscommon/storageManager"
vmcommon "github.com/multiversx/mx-chain-vm-common-go"
"time"
)

// LastSnapshotStarted -
Expand Down Expand Up @@ -105,3 +106,8 @@ func GetStorageEpochChangeWaitArgs() storageEpochChangeWaitArgs {
func (sm *snapshotsManager) WaitForStorageEpochChange(args storageEpochChangeWaitArgs) error {
return sm.waitForStorageEpochChange(args)
}

// NewNilSnapshotsManager -
func NewNilSnapshotsManager() *snapshotsManager {
return nil
}
12 changes: 9 additions & 3 deletions state/snapshotsManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,14 +247,14 @@ func (sm *snapshotsManager) snapshotState(
})
if err != nil {
log.Error("error waiting for storage epoch change", "err", err)
sm.earlySnapshotCompletion(stats)
sm.earlySnapshotCompletion(stats, trieStorageManager)
return
}

if !trieStorageManager.ShouldTakeSnapshot() {
log.Debug("skipping snapshot", "rootHash", rootHash, "epoch", epoch)

sm.earlySnapshotCompletion(stats)
sm.earlySnapshotCompletion(stats, trieStorageManager)
return
}

Expand All @@ -277,15 +277,21 @@ func (sm *snapshotsManager) snapshotState(
go sm.processSnapshotCompletion(stats, trieStorageManager, missingNodesChannel, iteratorChannels.ErrChan, rootHash, epoch)
}

func (sm *snapshotsManager) earlySnapshotCompletion(stats *snapshotStatistics) {
func (sm *snapshotsManager) earlySnapshotCompletion(stats *snapshotStatistics, trieStorageManager common.StorageManager) {
sm.mutex.Lock()
defer sm.mutex.Unlock()

stats.SnapshotFinished()
sm.isSnapshotInProgress.Reset()
trieStorageManager.ExitPruningBufferingMode()
}

func (sm *snapshotsManager) waitForStorageEpochChange(args storageEpochChangeWaitArgs) error {
if sm.processingMode == common.ImportDb {
log.Debug("no need to wait for storage epoch change as the node is running in import-db mode")
return nil
}

if args.SnapshotWaitTimeout < args.WaitTimeForSnapshotEpochCheck {
return fmt.Errorf("timeout (%s) must be greater than wait time between snapshot epoch check (%s)", args.SnapshotWaitTimeout, args.WaitTimeForSnapshotEpochCheck)
}
Expand Down
49 changes: 49 additions & 0 deletions state/snapshotsManager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/core/atomic"
"github.com/multiversx/mx-chain-go/common"
"github.com/multiversx/mx-chain-go/process/mock"
"github.com/multiversx/mx-chain-go/state"
Expand Down Expand Up @@ -104,6 +105,16 @@ func TestNewSnapshotsManager(t *testing.T) {
})
}

func TestSnapshotsManager_IsInterfaceNil(t *testing.T) {
t.Parallel()

instance := state.NewNilSnapshotsManager()
assert.True(t, instance.IsInterfaceNil())

instance, _ = state.NewSnapshotsManager(getDefaultSnapshotManagerArgs())
assert.False(t, instance.IsInterfaceNil())
}

func TestSnapshotsManager_SetSyncer(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -321,6 +332,8 @@ func TestSnapshotsManager_SnapshotState(t *testing.T) {
getLatestStorageEpochCalled := false

sm, _ := state.NewSnapshotsManager(getDefaultSnapshotManagerArgs())
enterPruningBufferingModeCalled := atomic.Flag{}
exitPruningBufferingModeCalled := atomic.Flag{}
tsm := &storageManager.StorageManagerStub{
GetLatestStorageEpochCalled: func() (uint32, error) {
getLatestStorageEpochCalled = true
Expand All @@ -331,6 +344,12 @@ func TestSnapshotsManager_SnapshotState(t *testing.T) {
assert.Fail(t, "the func should have returned before this is called")
return false
},
EnterPruningBufferingModeCalled: func() {
enterPruningBufferingModeCalled.SetValue(true)
},
ExitPruningBufferingModeCalled: func() {
exitPruningBufferingModeCalled.SetValue(true)
},
}

sm.SnapshotState(rootHash, epoch, tsm)
Expand All @@ -339,6 +358,8 @@ func TestSnapshotsManager_SnapshotState(t *testing.T) {
}

assert.True(t, getLatestStorageEpochCalled)
assert.True(t, enterPruningBufferingModeCalled.IsSet())
assert.True(t, exitPruningBufferingModeCalled.IsSet())
})
t.Run("tsm signals that a snapshot should not be taken", func(t *testing.T) {
t.Parallel()
Expand All @@ -352,6 +373,8 @@ func TestSnapshotsManager_SnapshotState(t *testing.T) {
},
}
sm, _ := state.NewSnapshotsManager(args)
enterPruningBufferingModeCalled := atomic.Flag{}
exitPruningBufferingModeCalled := atomic.Flag{}
tsm := &storageManager.StorageManagerStub{
GetLatestStorageEpochCalled: func() (uint32, error) {
return 5, nil
Expand All @@ -361,6 +384,12 @@ func TestSnapshotsManager_SnapshotState(t *testing.T) {
assert.True(t, sm.IsSnapshotInProgress())
return false
},
EnterPruningBufferingModeCalled: func() {
enterPruningBufferingModeCalled.SetValue(true)
},
ExitPruningBufferingModeCalled: func() {
exitPruningBufferingModeCalled.SetValue(true)
},
}

sm.SnapshotState(rootHash, epoch, tsm)
Expand All @@ -369,6 +398,8 @@ func TestSnapshotsManager_SnapshotState(t *testing.T) {
}

assert.True(t, shouldTakeSnapshotCalled)
assert.True(t, enterPruningBufferingModeCalled.IsSet())
assert.True(t, exitPruningBufferingModeCalled.IsSet())
})
t.Run("snapshot with errors does not mark active and does not remove lastSnapshot", func(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -503,6 +534,24 @@ func TestSnapshotsManager_WaitForStorageEpochChange(t *testing.T) {
err := sm.WaitForStorageEpochChange(args)
assert.Error(t, err)
})
t.Run("is in import-db mode should not return error on timeout condition", func(t *testing.T) {
t.Parallel()

args := state.GetStorageEpochChangeWaitArgs()
args.WaitTimeForSnapshotEpochCheck = time.Millisecond
args.SnapshotWaitTimeout = time.Millisecond * 5
args.TrieStorageManager = &storageManager.StorageManagerStub{
GetLatestStorageEpochCalled: func() (uint32, error) {
return 0, nil
},
}
argsSnapshotManager := getDefaultSnapshotManagerArgs()
argsSnapshotManager.ProcessingMode = common.ImportDb
sm, _ := state.NewSnapshotsManager(argsSnapshotManager)

err := sm.WaitForStorageEpochChange(args)
assert.Nil(t, err)
})
t.Run("returns when latestStorageEpoch == snapshotEpoch", func(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit 1a3f0f2

Please sign in to comment.