diff --git a/observer/baseNodeProvider.go b/observer/baseNodeProvider.go index 121bcebd..5fbe5b22 100644 --- a/observer/baseNodeProvider.go +++ b/observer/baseNodeProvider.go @@ -3,23 +3,20 @@ package observer import ( "fmt" "sort" - "strings" "sync" "github.com/multiversx/mx-chain-core-go/core" "github.com/multiversx/mx-chain-proxy-go/config" "github.com/multiversx/mx-chain-proxy-go/data" + "github.com/multiversx/mx-chain-proxy-go/observer/holder" ) type baseNodeProvider struct { - mutNodes sync.RWMutex - shardIds []uint32 - configurationFilePath string - syncedNodes []*data.NodeData - outOfSyncNodes []*data.NodeData - syncedFallbackNodes []*data.NodeData - outOfSyncFallbackNodes []*data.NodeData - lastSyncedNodes map[uint32]*data.NodeData + mutNodes sync.RWMutex + shardIds []uint32 + configurationFilePath string + regularNodes NodesHolder + snapshotlessNodes NodesHolder } func (bnp *baseNodeProvider) initNodes(nodes []*data.NodeData) error { @@ -29,9 +26,6 @@ func (bnp *baseNodeProvider) initNodes(nodes []*data.NodeData) error { newNodes := make(map[uint32][]*data.NodeData) for _, observer := range nodes { - if observer.IsFallback && observer.IsSnapshotless { - return ErrObserverCannotBeBothFallbackAndSnapshotless - } shardId := observer.ShardId newNodes[shardId] = append(newNodes[shardId], observer) } @@ -42,12 +36,18 @@ func (bnp *baseNodeProvider) initNodes(nodes []*data.NodeData) error { } bnp.mutNodes.Lock() + defer bnp.mutNodes.Unlock() + bnp.shardIds = getSortedShardIDsSlice(newNodes) - bnp.syncedNodes, bnp.syncedFallbackNodes = initAllNodesSlice(newNodes) - bnp.outOfSyncNodes = make([]*data.NodeData, 0) - bnp.outOfSyncFallbackNodes = make([]*data.NodeData, 0) - bnp.lastSyncedNodes = make(map[uint32]*data.NodeData) - bnp.mutNodes.Unlock() + syncedNodes, syncedFallbackNodes, syncedSnapshotlessNodes, syncedSnapshotlessFallbackNodes := initAllNodesSlice(newNodes) + bnp.regularNodes, err = holder.NewNodesHolder(syncedNodes, syncedFallbackNodes, bnp.shardIds, data.AvailabilityAll) + if err != nil { + return err + } + bnp.snapshotlessNodes, err = holder.NewNodesHolder(syncedSnapshotlessNodes, syncedSnapshotlessFallbackNodes, bnp.shardIds, data.AvailabilityRecent) + if err != nil { + return err + } return nil } @@ -75,19 +75,15 @@ func (bnp *baseNodeProvider) GetAllNodesWithSyncState() []*data.NodeData { defer bnp.mutNodes.RUnlock() nodesSlice := make([]*data.NodeData, 0) - for _, node := range bnp.syncedNodes { - nodesSlice = append(nodesSlice, node) - } - for _, node := range bnp.outOfSyncNodes { - nodesSlice = append(nodesSlice, node) - } - for _, node := range bnp.syncedFallbackNodes { - nodesSlice = append(nodesSlice, node) - } - for _, node := range bnp.outOfSyncFallbackNodes { - nodesSlice = append(nodesSlice, node) - } - + nodesSlice = append(nodesSlice, bnp.regularNodes.GetSyncedNodes()...) + nodesSlice = append(nodesSlice, bnp.regularNodes.GetOutOfSyncNodes()...) + nodesSlice = append(nodesSlice, bnp.regularNodes.GetSyncedFallbackNodes()...) + nodesSlice = append(nodesSlice, bnp.regularNodes.GetOutOfSyncFallbackNodes()...) + + nodesSlice = append(nodesSlice, bnp.snapshotlessNodes.GetSyncedNodes()...) + nodesSlice = append(nodesSlice, bnp.snapshotlessNodes.GetOutOfSyncNodes()...) + nodesSlice = append(nodesSlice, bnp.snapshotlessNodes.GetSyncedFallbackNodes()...) + nodesSlice = append(nodesSlice, bnp.snapshotlessNodes.GetOutOfSyncFallbackNodes()...) return nodesSlice } @@ -100,300 +96,23 @@ func (bnp *baseNodeProvider) UpdateNodesBasedOnSyncState(nodesWithSyncStatus []* bnp.mutNodes.Lock() defer bnp.mutNodes.Unlock() - syncedNodes, syncedFallbackNodes, outOfSyncNodes, err := computeSyncedAndOutOfSyncNodes(nodesWithSyncStatus, bnp.shardIds) - if err != nil { - log.Error("cannot update nodes based on sync state", "error", err) - return - } - - sameNumOfSynced := len(bnp.syncedNodes) == len(syncedNodes) - sameNumOfSyncedFallback := len(bnp.syncedFallbackNodes) == len(syncedFallbackNodes) - if sameNumOfSynced && sameNumOfSyncedFallback && len(outOfSyncNodes) == 0 { - bnp.printSyncedNodesInShardsUnprotected() - // early exit as all the nodes are in sync - return - } - - syncedNodesMap := nodesSliceToShardedMap(syncedNodes) - syncedFallbackNodesMap := nodesSliceToShardedMap(syncedFallbackNodes) - - bnp.removeOutOfSyncNodesUnprotected(outOfSyncNodes, syncedNodesMap, syncedFallbackNodesMap) - bnp.addSyncedNodesUnprotected(syncedNodes, syncedFallbackNodes) - bnp.printSyncedNodesInShardsUnprotected() -} - -func (bnp *baseNodeProvider) printSyncedNodesInShardsUnprotected() { - inSyncAddresses := make(map[uint32][]string, 0) - for _, syncedNode := range bnp.syncedNodes { - inSyncAddresses[syncedNode.ShardId] = append(inSyncAddresses[syncedNode.ShardId], syncedNode.Address) - } - - inSyncFallbackAddresses := make(map[uint32][]string, 0) - for _, syncedFallbackNode := range bnp.syncedFallbackNodes { - inSyncFallbackAddresses[syncedFallbackNode.ShardId] = append(inSyncFallbackAddresses[syncedFallbackNode.ShardId], syncedFallbackNode.Address) - } - - for _, shardID := range bnp.shardIds { - totalNumOfActiveNodes := len(inSyncAddresses[shardID]) + len(inSyncFallbackAddresses[shardID]) - // if none of them is active, use the backup if exists - hasBackup := bnp.lastSyncedNodes[shardID] != nil - if totalNumOfActiveNodes == 0 && hasBackup { - totalNumOfActiveNodes++ - inSyncAddresses[shardID] = append(inSyncAddresses[shardID], bnp.lastSyncedNodes[shardID].Address) - } - log.Info(fmt.Sprintf("shard %d active nodes", shardID), - "observers count", totalNumOfActiveNodes, - "addresses", strings.Join(inSyncAddresses[shardID], ", "), - "fallback addresses", strings.Join(inSyncFallbackAddresses[shardID], ", ")) - } + regularNodes, snapshotlessNodes := splitNodesByDataAvailability(nodesWithSyncStatus) + bnp.regularNodes.UpdateNodes(regularNodes) + bnp.snapshotlessNodes.UpdateNodes(snapshotlessNodes) } -func computeSyncedAndOutOfSyncNodes(nodes []*data.NodeData, shardIDs []uint32) ([]*data.NodeData, []*data.NodeData, []*data.NodeData, error) { - tempSyncedNodesMap := make(map[uint32][]*data.NodeData) - tempSyncedFallbackNodesMap := make(map[uint32][]*data.NodeData) - tempNotSyncedNodesMap := make(map[uint32][]*data.NodeData) - +func splitNodesByDataAvailability(nodes []*data.NodeData) ([]*data.NodeData, []*data.NodeData) { + regularNodes := make([]*data.NodeData, 0) + snapshotlessNodes := make([]*data.NodeData, 0) for _, node := range nodes { - if node.IsSynced { - if node.IsFallback { - tempSyncedFallbackNodesMap[node.ShardId] = append(tempSyncedFallbackNodesMap[node.ShardId], node) - } else { - tempSyncedNodesMap[node.ShardId] = append(tempSyncedNodesMap[node.ShardId], node) - } - continue - } - - tempNotSyncedNodesMap[node.ShardId] = append(tempNotSyncedNodesMap[node.ShardId], node) - } - - syncedNodes := make([]*data.NodeData, 0) - syncedFallbackNodes := make([]*data.NodeData, 0) - notSyncedNodes := make([]*data.NodeData, 0) - for _, shardID := range shardIDs { - syncedNodes = append(syncedNodes, tempSyncedNodesMap[shardID]...) - syncedFallbackNodes = append(syncedFallbackNodes, tempSyncedFallbackNodesMap[shardID]...) - notSyncedNodes = append(notSyncedNodes, tempNotSyncedNodesMap[shardID]...) - - totalLen := len(tempSyncedNodesMap[shardID]) + len(tempSyncedFallbackNodesMap[shardID]) + len(tempNotSyncedNodesMap[shardID]) - if totalLen == 0 { - return nil, nil, nil, fmt.Errorf("%w for shard %d - no synced or not synced node", ErrWrongObserversConfiguration, shardID) - } - } - - return syncedNodes, syncedFallbackNodes, notSyncedNodes, nil -} - -func (bnp *baseNodeProvider) addSyncedNodesUnprotected(receivedSyncedNodes []*data.NodeData, receivedSyncedFallbackNodes []*data.NodeData) { - syncedNodesPerShard := make(map[uint32][]string) - for _, node := range receivedSyncedNodes { - bnp.removeFromOutOfSyncIfNeededUnprotected(node) - syncedNodesPerShard[node.ShardId] = append(syncedNodesPerShard[node.ShardId], node.Address) - if bnp.isReceivedSyncedNodeExistent(node) { - continue - } - - bnp.syncedNodes = append(bnp.syncedNodes, node) - } - - for _, node := range receivedSyncedFallbackNodes { - bnp.removeFromOutOfSyncIfNeededUnprotected(node) - if bnp.isReceivedSyncedNodeExistentAsFallback(node) { - continue - } - - bnp.syncedFallbackNodes = append(bnp.syncedFallbackNodes, node) - } - - // if there is at least one synced node regular received, clean the backup list - for _, shardId := range bnp.shardIds { - if len(syncedNodesPerShard[shardId]) != 0 { - delete(bnp.lastSyncedNodes, shardId) - } - } -} - -func (bnp *baseNodeProvider) removeFromOutOfSyncIfNeededUnprotected(node *data.NodeData) { - if node.IsFallback { - bnp.removeFallbackFromOutOfSyncListUnprotected(node) - return - } - - bnp.removeRegularFromOutOfSyncListUnprotected(node) -} - -func (bnp *baseNodeProvider) isReceivedSyncedNodeExistent(receivedNode *data.NodeData) bool { - for _, node := range bnp.syncedNodes { - if node.Address == receivedNode.Address && node.ShardId == receivedNode.ShardId { - return true - } - } - - return false -} - -func (bnp *baseNodeProvider) isReceivedSyncedNodeExistentAsFallback(receivedNode *data.NodeData) bool { - for _, node := range bnp.syncedFallbackNodes { - if node.Address == receivedNode.Address && node.ShardId == receivedNode.ShardId { - return true - } - } - - return false -} - -func (bnp *baseNodeProvider) addToOutOfSyncUnprotected(node *data.NodeData) { - if node.IsFallback { - bnp.addFallbackToOutOfSyncUnprotected(node) - return - } - - bnp.addRegularToOutOfSyncUnprotected(node) -} - -func (bnp *baseNodeProvider) addRegularToOutOfSyncUnprotected(node *data.NodeData) { - for _, oosNode := range bnp.outOfSyncNodes { - if oosNode.Address == node.Address && oosNode.ShardId == node.ShardId { - return - } - } - - bnp.outOfSyncNodes = append(bnp.outOfSyncNodes, node) -} - -func (bnp *baseNodeProvider) addFallbackToOutOfSyncUnprotected(node *data.NodeData) { - for _, oosNode := range bnp.outOfSyncFallbackNodes { - if oosNode.Address == node.Address && oosNode.ShardId == node.ShardId { - return + if node.IsSnapshotless { + snapshotlessNodes = append(snapshotlessNodes, node) + } else { + regularNodes = append(regularNodes, node) } } - bnp.outOfSyncFallbackNodes = append(bnp.outOfSyncFallbackNodes, node) -} - -func (bnp *baseNodeProvider) removeOutOfSyncNodesUnprotected( - outOfSyncNodes []*data.NodeData, - syncedNodesMap map[uint32][]*data.NodeData, - syncedFallbackNodesMap map[uint32][]*data.NodeData, -) { - if len(outOfSyncNodes) == 0 { - bnp.outOfSyncNodes = make([]*data.NodeData, 0) - bnp.outOfSyncFallbackNodes = make([]*data.NodeData, 0) - return - } - - for _, outOfSyncNode := range outOfSyncNodes { - hasOneSyncedNode := len(syncedNodesMap[outOfSyncNode.ShardId]) >= 1 - hasEnoughSyncedFallbackNodes := len(syncedFallbackNodesMap[outOfSyncNode.ShardId]) > 1 - canDeleteFallbackNode := hasOneSyncedNode || hasEnoughSyncedFallbackNodes - if outOfSyncNode.IsFallback && canDeleteFallbackNode { - bnp.removeNodeUnprotected(outOfSyncNode) - continue - } - - // if trying to delete last fallback, use last known synced node - // if backup node does not exist, keep fallback - hasBackup := bnp.lastSyncedNodes[outOfSyncNode.ShardId] != nil - if outOfSyncNode.IsFallback && hasBackup { - bnp.removeNodeUnprotected(outOfSyncNode) - continue - } - - hasEnoughSyncedNodes := len(syncedNodesMap[outOfSyncNode.ShardId]) >= 1 - if hasEnoughSyncedNodes { - bnp.removeNodeUnprotected(outOfSyncNode) - continue - } - - // trying to remove last synced node - // if fallbacks are available, save this one as backup and use fallbacks - // else, keep using this one - // save this last regular observer as backup in case fallbacks go offline - // also, if this is the old fallback observer which didn't get synced, keep it in list - wasSyncedAtPreviousStep := bnp.isReceivedSyncedNodeExistent(outOfSyncNode) - isBackupObserver := bnp.lastSyncedNodes[outOfSyncNode.ShardId] == outOfSyncNode - isRegularSyncedBefore := !outOfSyncNode.IsFallback && wasSyncedAtPreviousStep - if isRegularSyncedBefore || isBackupObserver { - log.Info("backup observer updated", - "address", outOfSyncNode.Address, - "is fallback", outOfSyncNode.IsFallback, - "shard", outOfSyncNode.ShardId) - bnp.lastSyncedNodes[outOfSyncNode.ShardId] = outOfSyncNode - } - hasOneSyncedFallbackNode := len(syncedFallbackNodesMap[outOfSyncNode.ShardId]) >= 1 - if hasOneSyncedFallbackNode { - bnp.removeNodeUnprotected(outOfSyncNode) - continue - } - - // safe to delete regular observer, as it is already in lastSyncedNodes map - if !outOfSyncNode.IsFallback { - bnp.removeNodeUnprotected(outOfSyncNode) - continue - } - - // this is a fallback node, with no synced nodes. - // save it as backup and delete it from its list - bnp.lastSyncedNodes[outOfSyncNode.ShardId] = outOfSyncNode - bnp.removeNodeUnprotected(outOfSyncNode) - } -} - -func (bnp *baseNodeProvider) removeNodeUnprotected(node *data.NodeData) { - bnp.removeNodeFromSyncedNodesUnprotected(node) - bnp.addToOutOfSyncUnprotected(node) -} - -func (bnp *baseNodeProvider) removeNodeFromSyncedNodesUnprotected(nodeToRemove *data.NodeData) { - if nodeToRemove.IsFallback { - bnp.removeFallbackFromSyncedListUnprotected(nodeToRemove) - return - } - - bnp.removeRegularFromSyncedListUnprotected(nodeToRemove) -} - -func (bnp *baseNodeProvider) removeRegularFromSyncedListUnprotected(nodeToRemove *data.NodeData) { - nodeIndex := getIndexFromList(nodeToRemove, bnp.syncedNodes) - if nodeIndex == -1 { - return - } - - copy(bnp.syncedNodes[nodeIndex:], bnp.syncedNodes[nodeIndex+1:]) - bnp.syncedNodes[len(bnp.syncedNodes)-1] = nil - bnp.syncedNodes = bnp.syncedNodes[:len(bnp.syncedNodes)-1] -} - -func (bnp *baseNodeProvider) removeFallbackFromSyncedListUnprotected(nodeToRemove *data.NodeData) { - nodeIndex := getIndexFromList(nodeToRemove, bnp.syncedFallbackNodes) - if nodeIndex == -1 { - return - } - - copy(bnp.syncedFallbackNodes[nodeIndex:], bnp.syncedFallbackNodes[nodeIndex+1:]) - bnp.syncedFallbackNodes[len(bnp.syncedFallbackNodes)-1] = nil - bnp.syncedFallbackNodes = bnp.syncedFallbackNodes[:len(bnp.syncedFallbackNodes)-1] -} - -func (bnp *baseNodeProvider) removeRegularFromOutOfSyncListUnprotected(nodeToRemove *data.NodeData) { - nodeIndex := getIndexFromList(nodeToRemove, bnp.outOfSyncNodes) - if nodeIndex == -1 { - return - } - - copy(bnp.outOfSyncNodes[nodeIndex:], bnp.outOfSyncNodes[nodeIndex+1:]) - bnp.outOfSyncNodes[len(bnp.outOfSyncNodes)-1] = nil - bnp.outOfSyncNodes = bnp.outOfSyncNodes[:len(bnp.outOfSyncNodes)-1] -} - -func (bnp *baseNodeProvider) removeFallbackFromOutOfSyncListUnprotected(nodeToRemove *data.NodeData) { - nodeIndex := getIndexFromList(nodeToRemove, bnp.outOfSyncFallbackNodes) - if nodeIndex == -1 { - return - } - - copy(bnp.outOfSyncFallbackNodes[nodeIndex:], bnp.outOfSyncFallbackNodes[nodeIndex+1:]) - bnp.outOfSyncFallbackNodes[len(bnp.outOfSyncFallbackNodes)-1] = nil - bnp.outOfSyncFallbackNodes = bnp.outOfSyncFallbackNodes[:len(bnp.outOfSyncFallbackNodes)-1] + return regularNodes, snapshotlessNodes } // ReloadNodes will reload the observers or the full history observers @@ -428,10 +147,27 @@ func (bnp *baseNodeProvider) ReloadNodes(nodesType data.NodeType) data.NodesRelo } bnp.mutNodes.Lock() + defer bnp.mutNodes.Unlock() bnp.shardIds = getSortedShardIDsSlice(newNodes) - bnp.syncedNodes, bnp.syncedFallbackNodes = initAllNodesSlice(newNodes) - bnp.lastSyncedNodes = make(map[uint32]*data.NodeData) - bnp.mutNodes.Unlock() + syncedNodes, syncedFallbackNodes, syncedSnapshotlessNodes, syncedSnapshotlessFallbackNodes := initAllNodesSlice(newNodes) + bnp.regularNodes, err = holder.NewNodesHolder(syncedNodes, syncedFallbackNodes, bnp.shardIds, data.AvailabilityAll) + if err != nil { + log.Error("cannot reload regular nodes: NewNodesHolder", "error", err) + return data.NodesReloadResponse{ + OkRequest: true, + Description: "not reloaded", + Error: "cannot create the regular nodes holder: " + err.Error(), + } + } + bnp.snapshotlessNodes, err = holder.NewNodesHolder(syncedSnapshotlessNodes, syncedSnapshotlessFallbackNodes, bnp.shardIds, data.AvailabilityRecent) + if err != nil { + log.Error("cannot reload snapshotless nodes: NewNodesHolder", "error", err) + return data.NodesReloadResponse{ + OkRequest: true, + Description: "not reloaded", + Error: "cannot create the snapshotless nodes holder: " + err.Error(), + } + } return data.NodesReloadResponse{ OkRequest: true, @@ -441,23 +177,31 @@ func (bnp *baseNodeProvider) ReloadNodes(nodesType data.NodeType) data.NodesRelo } func (bnp *baseNodeProvider) getSyncedNodesForShardUnprotected(shardId uint32, dataAvailability data.ObserverDataAvailabilityType) ([]*data.NodeData, error) { + var syncedNodesSource []*data.NodeData + if dataAvailability == data.AvailabilityRecent && len(bnp.snapshotlessNodes.GetSyncedNodes()) > 0 { + syncedNodesSource = bnp.snapshotlessNodes.GetSyncedNodes() + } else { + syncedNodesSource = bnp.regularNodes.GetSyncedNodes() + } syncedNodes := make([]*data.NodeData, 0) - for _, node := range bnp.syncedNodes { + for _, node := range syncedNodesSource { if node.ShardId != shardId { continue } - // TODO: analyze if only snapshotless observers should be returned for recent availability endpoints - // currently, we only restrict snapshotless observers not to be used for historical requests - if dataAvailability == data.AvailabilityAll && node.IsSnapshotless { - continue - } + syncedNodes = append(syncedNodes, node) } if len(syncedNodes) != 0 { return syncedNodes, nil } - for _, node := range bnp.syncedFallbackNodes { + var fallbackNodesSource []*data.NodeData + if dataAvailability == data.AvailabilityRecent { + fallbackNodesSource = bnp.snapshotlessNodes.GetSyncedNodes() + } else { + fallbackNodesSource = bnp.regularNodes.GetSyncedNodes() + } + for _, node := range fallbackNodesSource { if node.ShardId == shardId { syncedNodes = append(syncedNodes, node) } @@ -466,7 +210,13 @@ func (bnp *baseNodeProvider) getSyncedNodesForShardUnprotected(shardId uint32, d return syncedNodes, nil } - backupNode, hasBackup := bnp.lastSyncedNodes[shardId] + var lastSyncedNodesMap map[uint32]*data.NodeData + if dataAvailability == data.AvailabilityAll { + lastSyncedNodesMap = bnp.regularNodes.GetLastSyncedNodes() + } else { + lastSyncedNodesMap = bnp.snapshotlessNodes.GetLastSyncedNodes() + } + backupNode, hasBackup := lastSyncedNodesMap[shardId] if hasBackup { return []*data.NodeData{backupNode}, nil } @@ -524,9 +274,11 @@ func prepareReloadResponseMessage(newNodes map[uint32][]*data.NodeData) string { return retString } -func initAllNodesSlice(nodesOnShards map[uint32][]*data.NodeData) ([]*data.NodeData, []*data.NodeData) { - sliceToReturn := make([]*data.NodeData, 0) +func initAllNodesSlice(nodesOnShards map[uint32][]*data.NodeData) ([]*data.NodeData, []*data.NodeData, []*data.NodeData, []*data.NodeData) { + eligibleNodes := make([]*data.NodeData, 0) fallbackNodes := make([]*data.NodeData, 0) + eligibleSnapshotlessNodes := make([]*data.NodeData, 0) + fallbackSnapshotlessNodes := make([]*data.NodeData, 0) shardIDs := getSortedShardIDsSlice(nodesOnShards) finishedShards := make(map[uint32]struct{}) @@ -538,10 +290,18 @@ func initAllNodesSlice(nodesOnShards map[uint32][]*data.NodeData) ([]*data.NodeD } node := nodesOnShards[shardID][i] - if node.IsFallback { - fallbackNodes = append(fallbackNodes, node) + if node.IsSnapshotless { + if node.IsFallback { + fallbackSnapshotlessNodes = append(fallbackSnapshotlessNodes, node) + } else { + eligibleSnapshotlessNodes = append(eligibleSnapshotlessNodes, node) + } } else { - sliceToReturn = append(sliceToReturn, node) + if node.IsFallback { + fallbackNodes = append(fallbackNodes, node) + } else { + eligibleNodes = append(eligibleNodes, node) + } } } @@ -550,7 +310,7 @@ func initAllNodesSlice(nodesOnShards map[uint32][]*data.NodeData) ([]*data.NodeD } } - return sliceToReturn, fallbackNodes + return eligibleNodes, fallbackNodes, eligibleSnapshotlessNodes, fallbackSnapshotlessNodes } func getSortedShardIDsSlice(nodesOnShards map[uint32][]*data.NodeData) []uint32 { @@ -564,15 +324,3 @@ func getSortedShardIDsSlice(nodesOnShards map[uint32][]*data.NodeData) []uint32 return shardIDs } - -func getIndexFromList(providedNode *data.NodeData, list []*data.NodeData) int { - nodeIndex := -1 - for idx, node := range list { - if node.Address == providedNode.Address && node.ShardId == providedNode.ShardId { - nodeIndex = idx - break - } - } - - return nodeIndex -} diff --git a/observer/baseNodeProvider_test.go b/observer/baseNodeProvider_test.go index fe743709..ad898604 100644 --- a/observer/baseNodeProvider_test.go +++ b/observer/baseNodeProvider_test.go @@ -1,13 +1,13 @@ package observer import ( - "errors" "fmt" "sort" "testing" "github.com/multiversx/mx-chain-core-go/core" "github.com/multiversx/mx-chain-proxy-go/data" + "github.com/multiversx/mx-chain-proxy-go/observer/holder" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -172,7 +172,7 @@ func TestInitAllNodesSlice_BalancesNumObserversDistribution(t *testing.T) { "shard meta - id 3", } - resultSynced, resultFallback := initAllNodesSlice(nodesMap) + resultSynced, resultFallback, _, _ := initAllNodesSlice(nodesMap) for i, r := range resultSynced { assert.Equal(t, expectedSyncedOrder[i], r.Address) } @@ -233,7 +233,7 @@ func TestInitAllNodesSlice_UnbalancedNumObserversDistribution(t *testing.T) { "shard meta - id 4", } - resultSynced, resultFallback := initAllNodesSlice(nodesMap) + resultSynced, resultFallback, _, _ := initAllNodesSlice(nodesMap) for i, r := range resultSynced { assert.Equal(t, expectedSyncedOrder[i], r.Address) } @@ -270,7 +270,7 @@ func TestInitAllNodesSlice_EmptyObserversSliceForAShardShouldStillWork(t *testin "shard meta - id 1", } - result, _ := initAllNodesSlice(nodesMap) + result, _, _, _ := initAllNodesSlice(nodesMap) for i, r := range result { assert.Equal(t, expectedOrder[i], r.Address) } @@ -289,7 +289,7 @@ func TestInitAllNodesSlice_SingleShardShouldWork(t *testing.T) { "shard 0 - id 0", } - result, _ := initAllNodesSlice(nodesMap) + result, _, _, _ := initAllNodesSlice(nodesMap) for i, r := range result { assert.Equal(t, expectedOrder[i], r.Address) } @@ -314,50 +314,60 @@ func TestBaseNodeProvider_UpdateNodesBasedOnSyncState(t *testing.T) { require.Equal(t, []data.NodeData{ {Address: "addr3", ShardId: 0, IsSynced: true}, {Address: "addr7", ShardId: 1, IsSynced: true}, - }, convertAndSortSlice(bnp.syncedNodes)) + }, convertAndSortSlice(bnp.regularNodes.GetSyncedNodes())) require.Equal(t, []data.NodeData{ {Address: "addr0", ShardId: 0, IsSynced: true, IsFallback: true}, {Address: "addr4", ShardId: 1, IsSynced: true, IsFallback: true}, - }, convertAndSortSlice(bnp.syncedFallbackNodes)) + }, convertAndSortSlice(bnp.regularNodes.GetSyncedFallbackNodes())) require.Equal(t, []data.NodeData{ {Address: "addr2", ShardId: 0, IsSynced: false}, {Address: "addr6", ShardId: 1, IsSynced: false}, - }, convertAndSortSlice(bnp.outOfSyncNodes)) + }, convertAndSortSlice(bnp.regularNodes.GetOutOfSyncNodes())) require.Equal(t, []data.NodeData{ {Address: "addr1", ShardId: 0, IsSynced: false, IsFallback: true}, {Address: "addr5", ShardId: 1, IsSynced: false, IsFallback: true}, - }, convertAndSortSlice(bnp.outOfSyncFallbackNodes)) + }, convertAndSortSlice(bnp.regularNodes.GetOutOfSyncFallbackNodes())) } func TestBaseNodeProvider_UpdateNodesBasedOnSyncStateShouldNotRemoveNodeIfNotEnoughLeft(t *testing.T) { t.Parallel() allNodes := prepareNodes(4) + snapshotlessNodes := prepareSnapshotlessNodes(4) nodesMap := nodesSliceToShardedMap(allNodes) bnp := &baseNodeProvider{ configurationFilePath: configurationPath, shardIds: getSortedShardIDsSlice(nodesMap), - syncedNodes: allNodes, - lastSyncedNodes: map[uint32]*data.NodeData{}, + regularNodes: createNodesHolder(allNodes, []uint32{0, 1}), + snapshotlessNodes: createNodesHolder(snapshotlessNodes, []uint32{0, 1}), } - nodesCopy := copyNodes(allNodes) - setSyncedStateToNodes(nodesCopy, false, 0, 2) + nodesCopy := append(copyNodes(allNodes), copyNodes(snapshotlessNodes)...) + setSyncedStateToNodes(nodesCopy, false, 0, 2, 4, 6) bnp.UpdateNodesBasedOnSyncState(nodesCopy) require.Equal(t, []data.NodeData{ {Address: "addr1", ShardId: 0, IsSynced: true}, {Address: "addr3", ShardId: 1, IsSynced: true}, - }, convertAndSortSlice(bnp.syncedNodes)) + }, convertAndSortSlice(bnp.regularNodes.GetSyncedNodes())) require.Equal(t, []data.NodeData{ {Address: "addr0", ShardId: 0, IsSynced: false}, {Address: "addr2", ShardId: 1, IsSynced: false}, - }, convertAndSortSlice(bnp.outOfSyncNodes)) + }, convertAndSortSlice(bnp.regularNodes.GetOutOfSyncNodes())) + + require.Equal(t, []data.NodeData{ + {Address: "addr1", ShardId: 0, IsSynced: true, IsSnapshotless: true}, + {Address: "addr3", ShardId: 1, IsSynced: true, IsSnapshotless: true}, + }, convertAndSortSlice(bnp.snapshotlessNodes.GetSyncedNodes())) + require.Equal(t, []data.NodeData{ + {Address: "addr0", ShardId: 0, IsSynced: false, IsSnapshotless: true}, + {Address: "addr2", ShardId: 1, IsSynced: false, IsSnapshotless: true}, + }, convertAndSortSlice(bnp.snapshotlessNodes.GetOutOfSyncNodes())) syncedNodes, err := bnp.getSyncedNodesUnprotected(data.AvailabilityAll) require.Nil(t, err) @@ -370,16 +380,15 @@ func TestBaseNodeProvider_UpdateNodesBasedOnSyncStateShouldNotRemoveNodeIfNotEno setSyncedStateToNodes(nodesCopy, false, 1, 3) bnp.UpdateNodesBasedOnSyncState(nodesCopy) - - require.Equal(t, []data.NodeData{}, convertAndSortSlice(bnp.syncedNodes)) + require.Equal(t, []data.NodeData{}, convertAndSortSlice(bnp.regularNodes.GetSyncedNodes())) require.Equal(t, []data.NodeData{ {Address: "addr0", ShardId: 0, IsSynced: false}, {Address: "addr1", ShardId: 0, IsSynced: false}, {Address: "addr2", ShardId: 1, IsSynced: false}, {Address: "addr3", ShardId: 1, IsSynced: false}, - }, convertAndSortSlice(bnp.outOfSyncNodes)) - require.Equal(t, data.NodeData{Address: "addr1", ShardId: 0, IsSynced: false}, *bnp.lastSyncedNodes[0]) - require.Equal(t, data.NodeData{Address: "addr3", ShardId: 1, IsSynced: false}, *bnp.lastSyncedNodes[1]) + }, convertAndSortSlice(bnp.regularNodes.GetOutOfSyncNodes())) + require.Equal(t, data.NodeData{Address: "addr1", ShardId: 0, IsSynced: false}, *bnp.regularNodes.GetLastSyncedNodes()[0]) + require.Equal(t, data.NodeData{Address: "addr3", ShardId: 1, IsSynced: false}, *bnp.regularNodes.GetLastSyncedNodes()[1]) syncedNodes, err = bnp.getSyncedNodesUnprotected(data.AvailabilityAll) require.Nil(t, err) @@ -398,8 +407,8 @@ func TestBaseNodeProvider_getSyncedNodesUnprotectedShouldWork(t *testing.T) { bnp := &baseNodeProvider{ configurationFilePath: configurationPath, shardIds: getSortedShardIDsSlice(nodesMap), - syncedNodes: allNodes, - lastSyncedNodes: map[uint32]*data.NodeData{}, + regularNodes: createNodesHolder(allNodes, []uint32{0, 1}), + snapshotlessNodes: createEmptyNodesHolder(), } nodesCopy := copyNodes(allNodes) @@ -411,10 +420,10 @@ func TestBaseNodeProvider_getSyncedNodesUnprotectedShouldWork(t *testing.T) { {Address: "addr1", ShardId: 0, IsSynced: true}, {Address: "addr2", ShardId: 1, IsSynced: true}, {Address: "addr3", ShardId: 1, IsSynced: true}, - }, convertAndSortSlice(bnp.syncedNodes)) + }, convertAndSortSlice(bnp.regularNodes.GetSyncedNodes())) require.Equal(t, []data.NodeData{ {Address: "addr0", ShardId: 0, IsSynced: false}, - }, convertAndSortSlice(bnp.outOfSyncNodes)) + }, convertAndSortSlice(bnp.regularNodes.GetOutOfSyncNodes())) syncedNodes, err := bnp.getSyncedNodesUnprotected(data.AvailabilityAll) require.Nil(t, err) @@ -432,12 +441,12 @@ func TestBaseNodeProvider_getSyncedNodesUnprotectedShouldWork(t *testing.T) { require.Equal(t, []data.NodeData{ {Address: "addr2", ShardId: 1, IsSynced: true}, {Address: "addr3", ShardId: 1, IsSynced: true}, - }, convertAndSortSlice(bnp.syncedNodes)) + }, convertAndSortSlice(bnp.regularNodes.GetSyncedNodes())) require.Equal(t, []data.NodeData{ {Address: "addr0", ShardId: 0, IsSynced: false}, {Address: "addr1", ShardId: 0, IsSynced: false}, - }, convertAndSortSlice(bnp.outOfSyncNodes)) - require.Equal(t, data.NodeData{Address: "addr1", ShardId: 0, IsSynced: false}, *bnp.lastSyncedNodes[0]) + }, convertAndSortSlice(bnp.regularNodes.GetOutOfSyncNodes())) + require.Equal(t, data.NodeData{Address: "addr1", ShardId: 0, IsSynced: false}, *bnp.regularNodes.GetLastSyncedNodes()[0]) syncedNodes, err = bnp.getSyncedNodesUnprotected(data.AvailabilityAll) require.Nil(t, err) @@ -475,21 +484,21 @@ func TestBaseNodeProvider_UpdateNodesBasedOnSyncStateShouldWorkAfterMultipleIter {Address: "addr2", ShardId: 0, IsSynced: true}, {Address: "addr4", ShardId: 0, IsSynced: true}, {Address: "addr8", ShardId: 1, IsSynced: true}, - }, convertAndSortSlice(bnp.syncedNodes)) + }, convertAndSortSlice(bnp.regularNodes.GetSyncedNodes())) require.Equal(t, []data.NodeData{ {Address: "addr0", ShardId: 0, IsSynced: true, IsFallback: true}, {Address: "addr6", ShardId: 1, IsSynced: true, IsFallback: true}, - }, convertAndSortSlice(bnp.syncedFallbackNodes)) + }, convertAndSortSlice(bnp.regularNodes.GetSyncedFallbackNodes())) require.Equal(t, []data.NodeData{ {Address: "addr3", ShardId: 0, IsSynced: false}, {Address: "addr7", ShardId: 1, IsSynced: false}, {Address: "addr9", ShardId: 1, IsSynced: false}, - }, convertAndSortSlice(bnp.outOfSyncNodes)) + }, convertAndSortSlice(bnp.regularNodes.GetOutOfSyncNodes())) require.Equal(t, []data.NodeData{ {Address: "addr1", ShardId: 0, IsSynced: false, IsFallback: true}, {Address: "addr5", ShardId: 1, IsSynced: false, IsFallback: true}, - }, convertAndSortSlice(bnp.outOfSyncFallbackNodes)) - require.Equal(t, 0, len(bnp.lastSyncedNodes)) + }, convertAndSortSlice(bnp.regularNodes.GetOutOfSyncFallbackNodes())) + require.Equal(t, 0, len(bnp.regularNodes.GetLastSyncedNodes())) require.Equal(t, []data.NodeData{ {Address: "addr0", ShardId: 0, IsSynced: true, IsFallback: true}, @@ -518,13 +527,13 @@ func TestBaseNodeProvider_UpdateNodesBasedOnSyncStateShouldWorkAfterMultipleIter setSyncedStateToNodes(nodesCopy, false, 2, 3, 4, 7, 8, 9) for i := 0; i < 3; i++ { bnp.UpdateNodesBasedOnSyncState(nodesCopy) - require.Equal(t, []data.NodeData{}, convertAndSortSlice(bnp.syncedNodes)) + require.Equal(t, []data.NodeData{}, convertAndSortSlice(bnp.regularNodes.GetSyncedNodes())) require.Equal(t, []data.NodeData{ {Address: "addr0", ShardId: 0, IsSynced: true, IsFallback: true}, {Address: "addr1", ShardId: 0, IsSynced: true, IsFallback: true}, {Address: "addr5", ShardId: 1, IsSynced: true, IsFallback: true}, {Address: "addr6", ShardId: 1, IsSynced: true, IsFallback: true}, - }, convertAndSortSlice(bnp.syncedFallbackNodes)) + }, convertAndSortSlice(bnp.regularNodes.GetSyncedFallbackNodes())) require.Equal(t, []data.NodeData{ {Address: "addr2", ShardId: 0, IsSynced: false}, {Address: "addr3", ShardId: 0, IsSynced: false}, @@ -532,9 +541,9 @@ func TestBaseNodeProvider_UpdateNodesBasedOnSyncStateShouldWorkAfterMultipleIter {Address: "addr7", ShardId: 1, IsSynced: false}, {Address: "addr8", ShardId: 1, IsSynced: false}, {Address: "addr9", ShardId: 1, IsSynced: false}, - }, convertAndSortSlice(bnp.outOfSyncNodes)) - require.Equal(t, data.NodeData{Address: "addr4", ShardId: 0, IsSynced: false}, *bnp.lastSyncedNodes[0]) - require.Equal(t, data.NodeData{Address: "addr9", ShardId: 1, IsSynced: false}, *bnp.lastSyncedNodes[1]) + }, convertAndSortSlice(bnp.regularNodes.GetOutOfSyncNodes())) + require.Equal(t, data.NodeData{Address: "addr4", ShardId: 0, IsSynced: false}, *bnp.regularNodes.GetLastSyncedNodes()[0]) + require.Equal(t, data.NodeData{Address: "addr9", ShardId: 1, IsSynced: false}, *bnp.regularNodes.GetLastSyncedNodes()[1]) require.Equal(t, []data.NodeData{ {Address: "addr0", ShardId: 0, IsSynced: true, IsFallback: true}, @@ -555,8 +564,8 @@ func TestBaseNodeProvider_UpdateNodesBasedOnSyncStateShouldWorkAfterMultipleIter setSyncedStateToNodes(nodesCopy, false, 0, 1, 5, 6) for i := 0; i < 3; i++ { bnp.UpdateNodesBasedOnSyncState(nodesCopy) - require.Equal(t, []data.NodeData{}, convertAndSortSlice(bnp.syncedNodes)) - require.Equal(t, []data.NodeData{}, convertAndSortSlice(bnp.syncedFallbackNodes)) + require.Equal(t, []data.NodeData{}, convertAndSortSlice(bnp.regularNodes.GetSyncedNodes())) + require.Equal(t, []data.NodeData{}, convertAndSortSlice(bnp.regularNodes.GetSyncedFallbackNodes())) require.Equal(t, []data.NodeData{ {Address: "addr2", ShardId: 0, IsSynced: false}, {Address: "addr3", ShardId: 0, IsSynced: false}, @@ -564,15 +573,15 @@ func TestBaseNodeProvider_UpdateNodesBasedOnSyncStateShouldWorkAfterMultipleIter {Address: "addr7", ShardId: 1, IsSynced: false}, {Address: "addr8", ShardId: 1, IsSynced: false}, {Address: "addr9", ShardId: 1, IsSynced: false}, - }, convertAndSortSlice(bnp.outOfSyncNodes)) + }, convertAndSortSlice(bnp.regularNodes.GetOutOfSyncNodes())) require.Equal(t, []data.NodeData{ {Address: "addr0", ShardId: 0, IsSynced: false, IsFallback: true}, {Address: "addr1", ShardId: 0, IsSynced: false, IsFallback: true}, {Address: "addr5", ShardId: 1, IsSynced: false, IsFallback: true}, {Address: "addr6", ShardId: 1, IsSynced: false, IsFallback: true}, - }, convertAndSortSlice(bnp.outOfSyncFallbackNodes)) - require.Equal(t, data.NodeData{Address: "addr4", ShardId: 0, IsSynced: false}, *bnp.lastSyncedNodes[0]) - require.Equal(t, data.NodeData{Address: "addr9", ShardId: 1, IsSynced: false}, *bnp.lastSyncedNodes[1]) + }, convertAndSortSlice(bnp.regularNodes.GetOutOfSyncFallbackNodes())) + require.Equal(t, data.NodeData{Address: "addr4", ShardId: 0, IsSynced: false}, *bnp.regularNodes.GetLastSyncedNodes()[0]) + require.Equal(t, data.NodeData{Address: "addr9", ShardId: 1, IsSynced: false}, *bnp.regularNodes.GetLastSyncedNodes()[1]) require.Equal(t, []data.NodeData{ {Address: "addr0", ShardId: 0, IsSynced: false, IsFallback: true}, @@ -596,21 +605,21 @@ func TestBaseNodeProvider_UpdateNodesBasedOnSyncStateShouldWorkAfterMultipleIter require.Equal(t, []data.NodeData{ {Address: "addr2", ShardId: 0, IsSynced: true}, {Address: "addr7", ShardId: 1, IsSynced: true}, - }, convertAndSortSlice(bnp.syncedNodes)) - require.Equal(t, []data.NodeData{}, convertAndSortSlice(bnp.syncedFallbackNodes)) + }, convertAndSortSlice(bnp.regularNodes.GetSyncedNodes())) + require.Equal(t, []data.NodeData{}, convertAndSortSlice(bnp.regularNodes.GetSyncedFallbackNodes())) require.Equal(t, []data.NodeData{ {Address: "addr3", ShardId: 0, IsSynced: false}, {Address: "addr4", ShardId: 0, IsSynced: false}, {Address: "addr8", ShardId: 1, IsSynced: false}, {Address: "addr9", ShardId: 1, IsSynced: false}, - }, convertAndSortSlice(bnp.outOfSyncNodes)) + }, convertAndSortSlice(bnp.regularNodes.GetOutOfSyncNodes())) require.Equal(t, []data.NodeData{ {Address: "addr0", ShardId: 0, IsSynced: false, IsFallback: true}, {Address: "addr1", ShardId: 0, IsSynced: false, IsFallback: true}, {Address: "addr5", ShardId: 1, IsSynced: false, IsFallback: true}, {Address: "addr6", ShardId: 1, IsSynced: false, IsFallback: true}, - }, convertAndSortSlice(bnp.outOfSyncFallbackNodes)) - require.Equal(t, 0, len(bnp.lastSyncedNodes)) + }, convertAndSortSlice(bnp.regularNodes.GetOutOfSyncFallbackNodes())) + require.Equal(t, 0, len(bnp.regularNodes.GetLastSyncedNodes())) require.Equal(t, []data.NodeData{ {Address: "addr0", ShardId: 0, IsSynced: false, IsFallback: true}, @@ -642,17 +651,17 @@ func TestBaseNodeProvider_UpdateNodesBasedOnSyncStateShouldWorkAfterMultipleIter {Address: "addr7", ShardId: 1, IsSynced: true}, {Address: "addr8", ShardId: 1, IsSynced: true}, {Address: "addr9", ShardId: 1, IsSynced: true}, - }, convertAndSortSlice(bnp.syncedNodes)) - require.Equal(t, []data.NodeData{}, convertAndSortSlice(bnp.outOfSyncNodes)) + }, convertAndSortSlice(bnp.regularNodes.GetSyncedNodes())) + require.Equal(t, []data.NodeData{}, convertAndSortSlice(bnp.regularNodes.GetOutOfSyncNodes())) require.Equal(t, []data.NodeData{ {Address: "addr5", ShardId: 1, IsSynced: true, IsFallback: true}, {Address: "addr6", ShardId: 1, IsSynced: true, IsFallback: true}, - }, convertAndSortSlice(bnp.syncedFallbackNodes)) + }, convertAndSortSlice(bnp.regularNodes.GetSyncedFallbackNodes())) require.Equal(t, []data.NodeData{ {Address: "addr0", ShardId: 0, IsSynced: false, IsFallback: true}, {Address: "addr1", ShardId: 0, IsSynced: false, IsFallback: true}, - }, convertAndSortSlice(bnp.outOfSyncFallbackNodes)) - require.Equal(t, 0, len(bnp.lastSyncedNodes)) + }, convertAndSortSlice(bnp.regularNodes.GetOutOfSyncFallbackNodes())) + require.Equal(t, 0, len(bnp.regularNodes.GetLastSyncedNodes())) require.Equal(t, []data.NodeData{ {Address: "addr0", ShardId: 0, IsSynced: false, IsFallback: true}, @@ -671,11 +680,11 @@ func TestBaseNodeProvider_UpdateNodesBasedOnSyncStateShouldWorkAfterMultipleIter nodesCopy = copyNodes(nodesCopy) setSyncedStateToNodes(nodesCopy, false, 2, 3, 4, 7, 8, 9) bnp.UpdateNodesBasedOnSyncState(nodesCopy) - require.Equal(t, []data.NodeData{}, convertAndSortSlice(bnp.syncedNodes)) + require.Equal(t, []data.NodeData{}, convertAndSortSlice(bnp.regularNodes.GetSyncedNodes())) require.Equal(t, []data.NodeData{ {Address: "addr5", ShardId: 1, IsSynced: true, IsFallback: true}, {Address: "addr6", ShardId: 1, IsSynced: true, IsFallback: true}, - }, convertAndSortSlice(bnp.syncedFallbackNodes)) + }, convertAndSortSlice(bnp.regularNodes.GetSyncedFallbackNodes())) require.Equal(t, []data.NodeData{ {Address: "addr2", ShardId: 0, IsSynced: false}, {Address: "addr3", ShardId: 0, IsSynced: false}, @@ -683,13 +692,13 @@ func TestBaseNodeProvider_UpdateNodesBasedOnSyncStateShouldWorkAfterMultipleIter {Address: "addr7", ShardId: 1, IsSynced: false}, {Address: "addr8", ShardId: 1, IsSynced: false}, {Address: "addr9", ShardId: 1, IsSynced: false}, - }, convertAndSortSlice(bnp.outOfSyncNodes)) + }, convertAndSortSlice(bnp.regularNodes.GetOutOfSyncNodes())) require.Equal(t, []data.NodeData{ {Address: "addr0", ShardId: 0, IsSynced: false, IsFallback: true}, {Address: "addr1", ShardId: 0, IsSynced: false, IsFallback: true}, - }, convertAndSortSlice(bnp.outOfSyncFallbackNodes)) - require.Equal(t, data.NodeData{Address: "addr4", ShardId: 0, IsSynced: false}, *bnp.lastSyncedNodes[0]) - require.Equal(t, data.NodeData{Address: "addr9", ShardId: 1, IsSynced: false}, *bnp.lastSyncedNodes[1]) + }, convertAndSortSlice(bnp.regularNodes.GetOutOfSyncFallbackNodes())) + require.Equal(t, data.NodeData{Address: "addr4", ShardId: 0, IsSynced: false}, *bnp.regularNodes.GetLastSyncedNodes()[0]) + require.Equal(t, data.NodeData{Address: "addr9", ShardId: 1, IsSynced: false}, *bnp.regularNodes.GetLastSyncedNodes()[1]) require.Equal(t, []data.NodeData{ {Address: "addr0", ShardId: 0, IsSynced: false, IsFallback: true}, @@ -717,22 +726,22 @@ func TestBaseNodeProvider_UpdateNodesBasedOnSyncStateShouldWorkAfterMultipleIter {Address: "addr7", ShardId: 1, IsSynced: true}, {Address: "addr8", ShardId: 1, IsSynced: true}, {Address: "addr9", ShardId: 1, IsSynced: true}, - }, convertAndSortSlice(bnp.syncedNodes)) + }, convertAndSortSlice(bnp.regularNodes.GetSyncedNodes())) require.Equal(t, []data.NodeData{ {Address: "addr5", ShardId: 1, IsSynced: true, IsFallback: true}, {Address: "addr6", ShardId: 1, IsSynced: true, IsFallback: true}, - }, convertAndSortSlice(bnp.syncedFallbackNodes)) + }, convertAndSortSlice(bnp.regularNodes.GetSyncedFallbackNodes())) require.Equal(t, []data.NodeData{ {Address: "addr2", ShardId: 0, IsSynced: false}, {Address: "addr3", ShardId: 0, IsSynced: false}, {Address: "addr4", ShardId: 0, IsSynced: false}, - }, convertAndSortSlice(bnp.outOfSyncNodes)) + }, convertAndSortSlice(bnp.regularNodes.GetOutOfSyncNodes())) require.Equal(t, []data.NodeData{ {Address: "addr0", ShardId: 0, IsSynced: false, IsFallback: true}, {Address: "addr1", ShardId: 0, IsSynced: false, IsFallback: true}, - }, convertAndSortSlice(bnp.outOfSyncFallbackNodes)) - require.Equal(t, data.NodeData{Address: "addr4", ShardId: 0, IsSynced: false}, *bnp.lastSyncedNodes[0]) - require.Equal(t, 1, len(bnp.lastSyncedNodes)) + }, convertAndSortSlice(bnp.regularNodes.GetOutOfSyncFallbackNodes())) + require.Equal(t, data.NodeData{Address: "addr4", ShardId: 0, IsSynced: false}, *bnp.regularNodes.GetLastSyncedNodes()[0]) + require.Equal(t, 1, len(bnp.regularNodes.GetLastSyncedNodes())) require.Equal(t, []data.NodeData{ {Address: "addr0", ShardId: 0, IsSynced: false, IsFallback: true}, @@ -763,18 +772,18 @@ func TestBaseNodeProvider_UpdateNodesBasedOnSyncStateShouldWorkAfterMultipleIter {Address: "addr7", ShardId: 1, IsSynced: true}, {Address: "addr8", ShardId: 1, IsSynced: true}, {Address: "addr9", ShardId: 1, IsSynced: true}, - }, convertAndSortSlice(bnp.syncedNodes)) + }, convertAndSortSlice(bnp.regularNodes.GetSyncedNodes())) require.Equal(t, []data.NodeData{ {Address: "addr0", ShardId: 0, IsSynced: true, IsFallback: true}, {Address: "addr1", ShardId: 0, IsSynced: true, IsFallback: true}, {Address: "addr5", ShardId: 1, IsSynced: true, IsFallback: true}, {Address: "addr6", ShardId: 1, IsSynced: true, IsFallback: true}, - }, convertAndSortSlice(bnp.syncedFallbackNodes)) + }, convertAndSortSlice(bnp.regularNodes.GetSyncedFallbackNodes())) require.Equal(t, []data.NodeData{ {Address: "addr4", ShardId: 0, IsSynced: false}, - }, convertAndSortSlice(bnp.outOfSyncNodes)) - require.Equal(t, []data.NodeData{}, convertAndSortSlice(bnp.outOfSyncFallbackNodes)) - require.Equal(t, 0, len(bnp.lastSyncedNodes)) + }, convertAndSortSlice(bnp.regularNodes.GetOutOfSyncNodes())) + require.Equal(t, []data.NodeData{}, convertAndSortSlice(bnp.regularNodes.GetOutOfSyncFallbackNodes())) + require.Equal(t, 0, len(bnp.regularNodes.GetLastSyncedNodes())) require.Equal(t, []data.NodeData{ {Address: "addr0", ShardId: 0, IsSynced: true, IsFallback: true}, @@ -799,21 +808,21 @@ func TestBaseNodeProvider_UpdateNodesBasedOnSyncStateShouldWorkAfterMultipleIter {Address: "addr7", ShardId: 1, IsSynced: true}, {Address: "addr8", ShardId: 1, IsSynced: true}, {Address: "addr9", ShardId: 1, IsSynced: true}, - }, convertAndSortSlice(bnp.syncedNodes)) + }, convertAndSortSlice(bnp.regularNodes.GetSyncedNodes())) require.Equal(t, []data.NodeData{ {Address: "addr0", ShardId: 0, IsSynced: true, IsFallback: true}, {Address: "addr1", ShardId: 0, IsSynced: true, IsFallback: true}, {Address: "addr5", ShardId: 1, IsSynced: true, IsFallback: true}, {Address: "addr6", ShardId: 1, IsSynced: true, IsFallback: true}, - }, convertAndSortSlice(bnp.syncedFallbackNodes)) + }, convertAndSortSlice(bnp.regularNodes.GetSyncedFallbackNodes())) require.Equal(t, []data.NodeData{ {Address: "addr2", ShardId: 0, IsSynced: false}, {Address: "addr3", ShardId: 0, IsSynced: false}, {Address: "addr4", ShardId: 0, IsSynced: false}, - }, convertAndSortSlice(bnp.outOfSyncNodes)) - require.Equal(t, []data.NodeData{}, convertAndSortSlice(bnp.outOfSyncFallbackNodes)) - require.Equal(t, data.NodeData{Address: "addr3", ShardId: 0, IsSynced: false}, *bnp.lastSyncedNodes[0]) // last synced - require.Equal(t, 1, len(bnp.lastSyncedNodes)) + }, convertAndSortSlice(bnp.regularNodes.GetOutOfSyncNodes())) + require.Equal(t, []data.NodeData{}, convertAndSortSlice(bnp.regularNodes.GetOutOfSyncFallbackNodes())) + require.Equal(t, data.NodeData{Address: "addr3", ShardId: 0, IsSynced: false}, *bnp.regularNodes.GetLastSyncedNodes()[0]) // last synced + require.Equal(t, 1, len(bnp.regularNodes.GetLastSyncedNodes())) require.Equal(t, []data.NodeData{ {Address: "addr0", ShardId: 0, IsSynced: true, IsFallback: true}, @@ -837,22 +846,22 @@ func TestBaseNodeProvider_UpdateNodesBasedOnSyncStateShouldWorkAfterMultipleIter {Address: "addr7", ShardId: 1, IsSynced: true}, {Address: "addr8", ShardId: 1, IsSynced: true}, {Address: "addr9", ShardId: 1, IsSynced: true}, - }, convertAndSortSlice(bnp.syncedNodes)) + }, convertAndSortSlice(bnp.regularNodes.GetSyncedNodes())) require.Equal(t, []data.NodeData{ {Address: "addr5", ShardId: 1, IsSynced: true, IsFallback: true}, {Address: "addr6", ShardId: 1, IsSynced: true, IsFallback: true}, - }, convertAndSortSlice(bnp.syncedFallbackNodes)) + }, convertAndSortSlice(bnp.regularNodes.GetSyncedFallbackNodes())) require.Equal(t, []data.NodeData{ {Address: "addr2", ShardId: 0, IsSynced: false}, {Address: "addr3", ShardId: 0, IsSynced: false}, {Address: "addr4", ShardId: 0, IsSynced: false}, - }, convertAndSortSlice(bnp.outOfSyncNodes)) + }, convertAndSortSlice(bnp.regularNodes.GetOutOfSyncNodes())) require.Equal(t, []data.NodeData{ {Address: "addr0", ShardId: 0, IsSynced: false, IsFallback: true}, {Address: "addr1", ShardId: 0, IsSynced: false, IsFallback: true}, - }, convertAndSortSlice(bnp.outOfSyncFallbackNodes)) - require.Equal(t, data.NodeData{Address: "addr3", ShardId: 0, IsSynced: false}, *bnp.lastSyncedNodes[0]) - require.Equal(t, 1, len(bnp.lastSyncedNodes)) + }, convertAndSortSlice(bnp.regularNodes.GetOutOfSyncFallbackNodes())) + require.Equal(t, data.NodeData{Address: "addr3", ShardId: 0, IsSynced: false}, *bnp.regularNodes.GetLastSyncedNodes()[0]) + require.Equal(t, 1, len(bnp.regularNodes.GetLastSyncedNodes())) require.Equal(t, []data.NodeData{ {Address: "addr0", ShardId: 0, IsSynced: false, IsFallback: true}, @@ -877,15 +886,15 @@ func syncAllNodesAndCheck(t *testing.T, nodes []*data.NodeData, bnp *baseNodePro {Address: "addr7", ShardId: 1, IsSynced: true}, {Address: "addr8", ShardId: 1, IsSynced: true}, {Address: "addr9", ShardId: 1, IsSynced: true}, - }, convertAndSortSlice(bnp.syncedNodes)) - require.Equal(t, []data.NodeData{}, convertAndSortSlice(bnp.outOfSyncNodes)) + }, convertAndSortSlice(bnp.regularNodes.GetSyncedNodes())) + require.Equal(t, []data.NodeData{}, convertAndSortSlice(bnp.regularNodes.GetOutOfSyncNodes())) require.Equal(t, []data.NodeData{ {Address: "addr0", ShardId: 0, IsSynced: true, IsFallback: true}, {Address: "addr1", ShardId: 0, IsSynced: true, IsFallback: true}, {Address: "addr5", ShardId: 1, IsSynced: true, IsFallback: true}, {Address: "addr6", ShardId: 1, IsSynced: true, IsFallback: true}, - }, convertAndSortSlice(bnp.syncedFallbackNodes)) - require.Equal(t, 0, len(bnp.lastSyncedNodes)) + }, convertAndSortSlice(bnp.regularNodes.GetSyncedFallbackNodes())) + require.Equal(t, 0, len(bnp.regularNodes.GetLastSyncedNodes())) require.Equal(t, []data.NodeData{ {Address: "addr0", ShardId: 0, IsSynced: true, IsFallback: true}, @@ -918,14 +927,34 @@ func prepareNodes(count int) []*data.NodeData { return nodes } +func prepareSnapshotlessNodes(count int) []*data.NodeData { + nodes := prepareNodes(count) + for _, node := range nodes { + node.IsSnapshotless = true + } + + return nodes +} + +func createNodesHolder(nodes []*data.NodeData, shardIDs []uint32) NodesHolder { + holderInstance, _ := holder.NewNodesHolder(nodes, []*data.NodeData{}, shardIDs, "") + return holderInstance +} + +func createEmptyNodesHolder() NodesHolder { + holderInstance, _ := holder.NewNodesHolder([]*data.NodeData{}, []*data.NodeData{}, []uint32{0}, "") + return holderInstance +} + func copyNodes(nodes []*data.NodeData) []*data.NodeData { nodesCopy := make([]*data.NodeData, len(nodes)) for i, node := range nodes { nodesCopy[i] = &data.NodeData{ - ShardId: node.ShardId, - Address: node.Address, - IsSynced: node.IsSynced, - IsFallback: node.IsFallback, + ShardId: node.ShardId, + Address: node.Address, + IsSynced: node.IsSynced, + IsFallback: node.IsFallback, + IsSnapshotless: node.IsSnapshotless, } } @@ -972,207 +1001,17 @@ func TestBaseNodeProvider_GetNodesShouldWorkAccordingToTheAvailability(t *testin IsSnapshotless: false, }, } + syncedNodes, _, syncedSnapshotless, _ := initAllNodesSlice(map[uint32][]*data.NodeData{1: nodes}) bnp := &baseNodeProvider{ - syncedNodes: nodes, + regularNodes: createNodesHolder(syncedNodes, []uint32{0}), + snapshotlessNodes: createNodesHolder(syncedSnapshotless, []uint32{0}), } returnedNodes, err := bnp.getSyncedNodesForShardUnprotected(1, data.AvailabilityRecent) require.NoError(t, err) require.Equal(t, "addr0", returnedNodes[0].Address) - require.Equal(t, "addr1", returnedNodes[1].Address) returnedNodes, err = bnp.getSyncedNodesForShardUnprotected(1, data.AvailabilityAll) require.NoError(t, err) require.Equal(t, "addr1", returnedNodes[0].Address) } - -func TestComputeSyncAndOutOfSyncNodes(t *testing.T) { - t.Parallel() - - t.Run("all nodes synced", testComputeSyncedAndOutOfSyncNodesAllNodesSynced) - t.Run("enough synced nodes", testComputeSyncedAndOutOfSyncNodesEnoughSyncedObservers) - t.Run("all nodes are out of sync", testComputeSyncedAndOutOfSyncNodesAllNodesNotSynced) - t.Run("invalid config - no node", testComputeSyncedAndOutOfSyncNodesInvalidConfigurationNoNodeAtAll) - t.Run("invalid config - no node in a shard", testComputeSyncedAndOutOfSyncNodesInvalidConfigurationNoNodeInAShard) - t.Run("edge case - address should not exist in both sync and not-synced lists", testEdgeCaseAddressShouldNotExistInBothLists) -} - -func testComputeSyncedAndOutOfSyncNodesAllNodesSynced(t *testing.T) { - t.Parallel() - - shardIDs := []uint32{0, 1} - input := []*data.NodeData{ - {Address: "0", ShardId: 0, IsSynced: true}, - {Address: "1", ShardId: 0, IsSynced: true, IsFallback: true}, - {Address: "2", ShardId: 1, IsSynced: true}, - {Address: "3", ShardId: 1, IsSynced: true, IsFallback: true}, - } - - synced, syncedFb, notSynced, _ := computeSyncedAndOutOfSyncNodes(input, shardIDs) - require.Equal(t, []*data.NodeData{ - {Address: "0", ShardId: 0, IsSynced: true}, - {Address: "2", ShardId: 1, IsSynced: true}, - }, synced) - require.Equal(t, []*data.NodeData{ - {Address: "1", ShardId: 0, IsSynced: true, IsFallback: true}, - {Address: "3", ShardId: 1, IsSynced: true, IsFallback: true}, - }, syncedFb) - require.Empty(t, notSynced) -} - -func testComputeSyncedAndOutOfSyncNodesEnoughSyncedObservers(t *testing.T) { - t.Parallel() - - shardIDs := []uint32{0, 1} - input := []*data.NodeData{ - {Address: "0", ShardId: 0, IsSynced: true}, - {Address: "1", ShardId: 0, IsSynced: false}, - {Address: "2", ShardId: 0, IsSynced: true, IsFallback: true}, - {Address: "3", ShardId: 1, IsSynced: true}, - {Address: "4", ShardId: 1, IsSynced: false}, - {Address: "5", ShardId: 1, IsSynced: true, IsFallback: true}, - } - - synced, syncedFb, notSynced, _ := computeSyncedAndOutOfSyncNodes(input, shardIDs) - require.Equal(t, []*data.NodeData{ - {Address: "0", ShardId: 0, IsSynced: true}, - {Address: "3", ShardId: 1, IsSynced: true}, - }, synced) - require.Equal(t, []*data.NodeData{ - {Address: "2", ShardId: 0, IsSynced: true, IsFallback: true}, - {Address: "5", ShardId: 1, IsSynced: true, IsFallback: true}, - }, syncedFb) - require.Equal(t, []*data.NodeData{ - {Address: "1", ShardId: 0, IsSynced: false}, - {Address: "4", ShardId: 1, IsSynced: false}, - }, notSynced) -} - -func testComputeSyncedAndOutOfSyncNodesAllNodesNotSynced(t *testing.T) { - t.Parallel() - - shardIDs := []uint32{0, 1} - input := []*data.NodeData{ - {Address: "0", ShardId: 0, IsSynced: false}, - {Address: "1", ShardId: 0, IsSynced: false, IsFallback: true}, - {Address: "2", ShardId: 1, IsSynced: false}, - {Address: "3", ShardId: 1, IsSynced: false, IsFallback: true}, - } - - synced, syncedFb, notSynced, _ := computeSyncedAndOutOfSyncNodes(input, shardIDs) - require.Equal(t, []*data.NodeData{}, synced) - require.Equal(t, []*data.NodeData{}, syncedFb) - require.Equal(t, input, notSynced) -} - -func testEdgeCaseAddressShouldNotExistInBothLists(t *testing.T) { - t.Parallel() - - allNodes := prepareNodes(10) - - nodesMap := nodesSliceToShardedMap(allNodes) - bnp := &baseNodeProvider{ - configurationFilePath: configurationPath, - shardIds: getSortedShardIDsSlice(nodesMap), - syncedNodes: allNodes, - } - - setSyncedStateToNodes(allNodes, false, 1, 3, 5, 7, 9) - - bnp.UpdateNodesBasedOnSyncState(allNodes) - require.Equal(t, []data.NodeData{ - {Address: "addr0", ShardId: 0, IsSynced: true}, - {Address: "addr2", ShardId: 0, IsSynced: true}, - {Address: "addr4", ShardId: 0, IsSynced: true}, - {Address: "addr6", ShardId: 1, IsSynced: true}, - {Address: "addr8", ShardId: 1, IsSynced: true}, - }, convertAndSortSlice(bnp.syncedNodes)) - require.Equal(t, []data.NodeData{ - {Address: "addr1", ShardId: 0, IsSynced: false}, - {Address: "addr3", ShardId: 0, IsSynced: false}, - {Address: "addr5", ShardId: 1, IsSynced: false}, - {Address: "addr7", ShardId: 1, IsSynced: false}, - {Address: "addr9", ShardId: 1, IsSynced: false}, - }, convertAndSortSlice(bnp.outOfSyncNodes)) - require.False(t, slicesHaveCommonObjects(bnp.syncedNodes, bnp.outOfSyncNodes)) - - allNodes = prepareNodes(10) - - bnp.UpdateNodesBasedOnSyncState(allNodes) - - require.Equal(t, []data.NodeData{ - {Address: "addr0", ShardId: 0, IsSynced: true}, - {Address: "addr1", ShardId: 0, IsSynced: true}, - {Address: "addr2", ShardId: 0, IsSynced: true}, - {Address: "addr3", ShardId: 0, IsSynced: true}, - {Address: "addr4", ShardId: 0, IsSynced: true}, - {Address: "addr5", ShardId: 1, IsSynced: true}, - {Address: "addr6", ShardId: 1, IsSynced: true}, - {Address: "addr7", ShardId: 1, IsSynced: true}, - {Address: "addr8", ShardId: 1, IsSynced: true}, - {Address: "addr9", ShardId: 1, IsSynced: true}, - }, convertAndSortSlice(bnp.syncedNodes)) - require.False(t, slicesHaveCommonObjects(bnp.syncedNodes, bnp.outOfSyncNodes)) -} - -func testComputeSyncedAndOutOfSyncNodesInvalidConfigurationNoNodeAtAll(t *testing.T) { - t.Parallel() - - shardIDs := []uint32{0, 1} - var input []*data.NodeData - synced, syncedFb, notSynced, err := computeSyncedAndOutOfSyncNodes(input, shardIDs) - require.Error(t, err) - require.Nil(t, synced) - require.Nil(t, syncedFb) - require.Nil(t, notSynced) - - // no node in one shard - shardIDs = []uint32{0, 1} - input = []*data.NodeData{ - { - Address: "0", ShardId: 0, IsSynced: true, - }, - } - synced, syncedFb, notSynced, err = computeSyncedAndOutOfSyncNodes(input, shardIDs) - require.True(t, errors.Is(err, ErrWrongObserversConfiguration)) - require.Nil(t, synced) - require.Nil(t, syncedFb) - require.Nil(t, notSynced) -} - -func testComputeSyncedAndOutOfSyncNodesInvalidConfigurationNoNodeInAShard(t *testing.T) { - t.Parallel() - - // no node in one shard - shardIDs := []uint32{0, 1} - input := []*data.NodeData{ - { - Address: "0", ShardId: 0, IsSynced: true, - }, - } - synced, syncedFb, notSynced, err := computeSyncedAndOutOfSyncNodes(input, shardIDs) - require.True(t, errors.Is(err, ErrWrongObserversConfiguration)) - require.Nil(t, synced) - require.Nil(t, syncedFb) - require.Nil(t, notSynced) -} - -func slicesHaveCommonObjects(firstSlice []*data.NodeData, secondSlice []*data.NodeData) bool { - nodeDataToStr := func(nd *data.NodeData) string { - return fmt.Sprintf("%s%d", nd.Address, nd.ShardId) - } - firstSliceItems := make(map[string]struct{}) - for _, el := range firstSlice { - firstSliceItems[nodeDataToStr(el)] = struct{}{} - } - - for _, el := range secondSlice { - nodeDataStr := nodeDataToStr(el) - _, found := firstSliceItems[nodeDataStr] - if found { - return true - } - } - - return false -} diff --git a/observer/holder/nodesHolder.go b/observer/holder/nodesHolder.go new file mode 100644 index 00000000..b28f2625 --- /dev/null +++ b/observer/holder/nodesHolder.go @@ -0,0 +1,426 @@ +package holder + +import ( + "errors" + "fmt" + "strings" + "sync" + + logger "github.com/multiversx/mx-chain-logger-go" + "github.com/multiversx/mx-chain-proxy-go/data" +) + +var ( + errEmptyShardIDsList = errors.New("empty shard IDs list") + errWrongConfiguration = errors.New("wrong observers configuration") + log = logger.GetOrCreate("observer/holder") +) + +type nodesHolder struct { + mut sync.RWMutex + syncedNodes []*data.NodeData + outOfSyncNodes []*data.NodeData + syncedFallbackNodes []*data.NodeData + outOfSyncFallbackNodes []*data.NodeData + lastSyncedNodes map[uint32]*data.NodeData + shardIDs []uint32 + availability data.ObserverDataAvailabilityType +} + +// NewNodesHolder will return a new instance of a nodesHolder +func NewNodesHolder(syncedNodes []*data.NodeData, fallbackNodes []*data.NodeData, shardIDs []uint32, availability data.ObserverDataAvailabilityType) (*nodesHolder, error) { + if len(shardIDs) == 0 { + return nil, errEmptyShardIDsList + } + + return &nodesHolder{ + syncedNodes: syncedNodes, + outOfSyncNodes: make([]*data.NodeData, 0), + syncedFallbackNodes: fallbackNodes, + outOfSyncFallbackNodes: make([]*data.NodeData, 0), + lastSyncedNodes: make(map[uint32]*data.NodeData), + shardIDs: shardIDs, + availability: availability, + }, nil +} + +// UpdateNodes will update the internal maps based on the provided nodes +func (nh *nodesHolder) UpdateNodes(nodesWithSyncStatus []*data.NodeData) { + if len(nodesWithSyncStatus) == 0 { + return + } + syncedNodes, syncedFallbackNodes, outOfSyncNodes, err := computeSyncedAndOutOfSyncNodes(nodesWithSyncStatus, nh.shardIDs) + if err != nil { + log.Error("cannot update nodes based on sync state", "error", err) + return + } + + sameNumOfSynced := len(nh.syncedNodes) == len(syncedNodes) + sameNumOfSyncedFallback := len(nh.syncedFallbackNodes) == len(syncedFallbackNodes) + if sameNumOfSynced && sameNumOfSyncedFallback && len(outOfSyncNodes) == 0 { + nh.printSyncedNodesInShardsUnprotected() + // early exit as all the nodes are in sync + return + } + + syncedNodesMap := nodesSliceToShardedMap(syncedNodes) + syncedFallbackNodesMap := nodesSliceToShardedMap(syncedFallbackNodes) + + nh.removeOutOfSyncNodesUnprotected(outOfSyncNodes, syncedNodesMap, syncedFallbackNodesMap) + nh.addSyncedNodesUnprotected(syncedNodes, syncedFallbackNodes) + nh.printSyncedNodesInShardsUnprotected() +} + +// GetSyncedNodes returns all the synced nodes +func (nh *nodesHolder) GetSyncedNodes() []*data.NodeData { + nh.mut.RLock() + defer nh.mut.RUnlock() + + return copyNodes(nh.syncedNodes) +} + +// GetSyncedFallbackNodes returns all the synced fallback nodes +func (nh *nodesHolder) GetSyncedFallbackNodes() []*data.NodeData { + nh.mut.RLock() + defer nh.mut.RUnlock() + + return copyNodes(nh.syncedFallbackNodes) +} + +// GetOutOfSyncNodes returns all the out of sync nodes +func (nh *nodesHolder) GetOutOfSyncNodes() []*data.NodeData { + nh.mut.RLock() + defer nh.mut.RUnlock() + + return copyNodes(nh.outOfSyncNodes) +} + +// GetOutOfSyncFallbackNodes returns all the out of sync fallback nodes +func (nh *nodesHolder) GetOutOfSyncFallbackNodes() []*data.NodeData { + nh.mut.RLock() + defer nh.mut.RUnlock() + + return copyNodes(nh.outOfSyncFallbackNodes) +} + +// GetLastSyncedNodes returns the internal map of the last synced nodes +func (nh *nodesHolder) GetLastSyncedNodes() map[uint32]*data.NodeData { + mapCopy := make(map[uint32]*data.NodeData, 0) + nh.mut.RLock() + for key, value := range nh.lastSyncedNodes { + mapCopy[key] = value + } + nh.mut.RUnlock() + + return mapCopy +} + +// IsInterfaceNil returns true if there is no value under the interface +func (nh *nodesHolder) IsInterfaceNil() bool { + return nh == nil +} + +func copyNodes(nodes []*data.NodeData) []*data.NodeData { + sliceCopy := make([]*data.NodeData, 0, len(nodes)) + for _, node := range nodes { + sliceCopy = append(sliceCopy, node) + } + + return sliceCopy +} + +func (nh *nodesHolder) printSyncedNodesInShardsUnprotected() { + inSyncAddresses := make(map[uint32][]string, 0) + for _, syncedNode := range nh.syncedNodes { + inSyncAddresses[syncedNode.ShardId] = append(inSyncAddresses[syncedNode.ShardId], syncedNode.Address) + } + + inSyncFallbackAddresses := make(map[uint32][]string, 0) + for _, syncedFallbackNode := range nh.syncedFallbackNodes { + inSyncFallbackAddresses[syncedFallbackNode.ShardId] = append(inSyncFallbackAddresses[syncedFallbackNode.ShardId], syncedFallbackNode.Address) + } + + for _, shardID := range nh.shardIDs { + totalNumOfActiveNodes := len(inSyncAddresses[shardID]) + len(inSyncFallbackAddresses[shardID]) + // if none of them is active, use the backup if exists + hasBackup := nh.lastSyncedNodes[shardID] != nil + if totalNumOfActiveNodes == 0 && hasBackup { + totalNumOfActiveNodes++ + inSyncAddresses[shardID] = append(inSyncAddresses[shardID], nh.lastSyncedNodes[shardID].Address) + } + log.Info(fmt.Sprintf("shard %d active nodes", shardID), + "observers count", totalNumOfActiveNodes, + "addresses", strings.Join(inSyncAddresses[shardID], ", "), + "fallback addresses", strings.Join(inSyncFallbackAddresses[shardID], ", ")) + } +} + +func computeSyncedAndOutOfSyncNodes(nodes []*data.NodeData, shardIDs []uint32) ([]*data.NodeData, []*data.NodeData, []*data.NodeData, error) { + tempSyncedNodesMap := make(map[uint32][]*data.NodeData) + tempSyncedFallbackNodesMap := make(map[uint32][]*data.NodeData) + tempNotSyncedNodesMap := make(map[uint32][]*data.NodeData) + + for _, node := range nodes { + if node.IsSynced { + if node.IsFallback { + tempSyncedFallbackNodesMap[node.ShardId] = append(tempSyncedFallbackNodesMap[node.ShardId], node) + } else { + tempSyncedNodesMap[node.ShardId] = append(tempSyncedNodesMap[node.ShardId], node) + } + continue + } + + tempNotSyncedNodesMap[node.ShardId] = append(tempNotSyncedNodesMap[node.ShardId], node) + } + + syncedNodes := make([]*data.NodeData, 0) + syncedFallbackNodes := make([]*data.NodeData, 0) + notSyncedNodes := make([]*data.NodeData, 0) + for _, shardID := range shardIDs { + syncedNodes = append(syncedNodes, tempSyncedNodesMap[shardID]...) + syncedFallbackNodes = append(syncedFallbackNodes, tempSyncedFallbackNodesMap[shardID]...) + notSyncedNodes = append(notSyncedNodes, tempNotSyncedNodesMap[shardID]...) + + totalLen := len(tempSyncedNodesMap[shardID]) + len(tempSyncedFallbackNodesMap[shardID]) + len(tempNotSyncedNodesMap[shardID]) + if totalLen == 0 { + return nil, nil, nil, fmt.Errorf("%w for shard %d - no synced or not synced node", errWrongConfiguration, shardID) + } + } + + return syncedNodes, syncedFallbackNodes, notSyncedNodes, nil +} + +func (nh *nodesHolder) addSyncedNodesUnprotected(receivedSyncedNodes []*data.NodeData, receivedSyncedFallbackNodes []*data.NodeData) { + syncedNodesPerShard := make(map[uint32][]string) + for _, node := range receivedSyncedNodes { + nh.removeFromOutOfSyncIfNeededUnprotected(node) + syncedNodesPerShard[node.ShardId] = append(syncedNodesPerShard[node.ShardId], node.Address) + if nh.isReceivedSyncedNodeExistent(node) { + continue + } + + nh.syncedNodes = append(nh.syncedNodes, node) + } + + for _, node := range receivedSyncedFallbackNodes { + nh.removeFromOutOfSyncIfNeededUnprotected(node) + if nh.isReceivedSyncedNodeExistentAsFallback(node) { + continue + } + + nh.syncedFallbackNodes = append(nh.syncedFallbackNodes, node) + } + + // if there is at least one synced node regular received, clean the backup list + for _, shardId := range nh.shardIDs { + if len(syncedNodesPerShard[shardId]) != 0 { + delete(nh.lastSyncedNodes, shardId) + } + } +} + +func (nh *nodesHolder) removeFromOutOfSyncIfNeededUnprotected(node *data.NodeData) { + if node.IsFallback { + nh.removeFallbackFromOutOfSyncListUnprotected(node) + return + } + + nh.removeRegularFromOutOfSyncListUnprotected(node) +} + +func (nh *nodesHolder) isReceivedSyncedNodeExistent(receivedNode *data.NodeData) bool { + for _, node := range nh.syncedNodes { + if node.Address == receivedNode.Address && node.ShardId == receivedNode.ShardId { + return true + } + } + + return false +} + +func (nh *nodesHolder) isReceivedSyncedNodeExistentAsFallback(receivedNode *data.NodeData) bool { + for _, node := range nh.syncedFallbackNodes { + if node.Address == receivedNode.Address && node.ShardId == receivedNode.ShardId { + return true + } + } + + return false +} + +func (nh *nodesHolder) addToOutOfSyncUnprotected(node *data.NodeData) { + if node.IsFallback { + nh.addFallbackToOutOfSyncUnprotected(node) + return + } + + nh.addRegularToOutOfSyncUnprotected(node) +} + +func (nh *nodesHolder) addRegularToOutOfSyncUnprotected(node *data.NodeData) { + for _, oosNode := range nh.outOfSyncNodes { + if oosNode.Address == node.Address && oosNode.ShardId == node.ShardId { + return + } + } + + nh.outOfSyncNodes = append(nh.outOfSyncNodes, node) +} + +func (nh *nodesHolder) addFallbackToOutOfSyncUnprotected(node *data.NodeData) { + for _, oosNode := range nh.outOfSyncFallbackNodes { + if oosNode.Address == node.Address && oosNode.ShardId == node.ShardId { + return + } + } + + nh.outOfSyncFallbackNodes = append(nh.outOfSyncFallbackNodes, node) +} + +func (nh *nodesHolder) removeOutOfSyncNodesUnprotected( + outOfSyncNodes []*data.NodeData, + syncedNodesMap map[uint32][]*data.NodeData, + syncedFallbackNodesMap map[uint32][]*data.NodeData, +) { + if len(outOfSyncNodes) == 0 { + nh.outOfSyncNodes = make([]*data.NodeData, 0) + nh.outOfSyncFallbackNodes = make([]*data.NodeData, 0) + return + } + + for _, outOfSyncNode := range outOfSyncNodes { + hasOneSyncedNode := len(syncedNodesMap[outOfSyncNode.ShardId]) >= 1 + hasEnoughSyncedFallbackNodes := len(syncedFallbackNodesMap[outOfSyncNode.ShardId]) > 1 + canDeleteFallbackNode := hasOneSyncedNode || hasEnoughSyncedFallbackNodes + if outOfSyncNode.IsFallback && canDeleteFallbackNode { + nh.removeNodeUnprotected(outOfSyncNode) + continue + } + + // if trying to delete last fallback, use last known synced node + // if backup node does not exist, keep fallback + hasBackup := nh.lastSyncedNodes[outOfSyncNode.ShardId] != nil + if outOfSyncNode.IsFallback && hasBackup { + nh.removeNodeUnprotected(outOfSyncNode) + continue + } + + hasEnoughSyncedNodes := len(syncedNodesMap[outOfSyncNode.ShardId]) >= 1 + if hasEnoughSyncedNodes { + nh.removeNodeUnprotected(outOfSyncNode) + continue + } + + // trying to remove last synced node + // if fallbacks are available, save this one as backup and use fallbacks + // else, keep using this one + // save this last regular observer as backup in case fallbacks go offline + // also, if this is the old fallback observer which didn't get synced, keep it in list + wasSyncedAtPreviousStep := nh.isReceivedSyncedNodeExistent(outOfSyncNode) + isBackupObserver := nh.lastSyncedNodes[outOfSyncNode.ShardId] == outOfSyncNode + isRegularSyncedBefore := !outOfSyncNode.IsFallback && wasSyncedAtPreviousStep + if isRegularSyncedBefore || isBackupObserver { + log.Info("backup observer updated", + "address", outOfSyncNode.Address, + "is fallback", outOfSyncNode.IsFallback, + "shard", outOfSyncNode.ShardId) + nh.lastSyncedNodes[outOfSyncNode.ShardId] = outOfSyncNode + } + hasOneSyncedFallbackNode := len(syncedFallbackNodesMap[outOfSyncNode.ShardId]) >= 1 + if hasOneSyncedFallbackNode { + nh.removeNodeUnprotected(outOfSyncNode) + continue + } + + // safe to delete regular observer, as it is already in lastSyncedNodes map + if !outOfSyncNode.IsFallback { + nh.removeNodeUnprotected(outOfSyncNode) + continue + } + + // this is a fallback node, with no synced nodes. + // save it as backup and delete it from its list + nh.lastSyncedNodes[outOfSyncNode.ShardId] = outOfSyncNode + nh.removeNodeUnprotected(outOfSyncNode) + } +} + +func (nh *nodesHolder) removeNodeUnprotected(node *data.NodeData) { + nh.removeNodeFromSyncedNodesUnprotected(node) + nh.addToOutOfSyncUnprotected(node) +} + +func (nh *nodesHolder) removeNodeFromSyncedNodesUnprotected(nodeToRemove *data.NodeData) { + if nodeToRemove.IsFallback { + nh.removeFallbackFromSyncedListUnprotected(nodeToRemove) + return + } + + nh.removeRegularFromSyncedListUnprotected(nodeToRemove) +} + +func (nh *nodesHolder) removeRegularFromSyncedListUnprotected(nodeToRemove *data.NodeData) { + nodeIndex := getIndexFromList(nodeToRemove, nh.syncedNodes) + if nodeIndex == -1 { + return + } + + copy(nh.syncedNodes[nodeIndex:], nh.syncedNodes[nodeIndex+1:]) + nh.syncedNodes[len(nh.syncedNodes)-1] = nil + nh.syncedNodes = nh.syncedNodes[:len(nh.syncedNodes)-1] +} + +func (nh *nodesHolder) removeFallbackFromSyncedListUnprotected(nodeToRemove *data.NodeData) { + nodeIndex := getIndexFromList(nodeToRemove, nh.syncedFallbackNodes) + if nodeIndex == -1 { + return + } + + copy(nh.syncedFallbackNodes[nodeIndex:], nh.syncedFallbackNodes[nodeIndex+1:]) + nh.syncedFallbackNodes[len(nh.syncedFallbackNodes)-1] = nil + nh.syncedFallbackNodes = nh.syncedFallbackNodes[:len(nh.syncedFallbackNodes)-1] +} + +func (nh *nodesHolder) removeRegularFromOutOfSyncListUnprotected(nodeToRemove *data.NodeData) { + nodeIndex := getIndexFromList(nodeToRemove, nh.outOfSyncNodes) + if nodeIndex == -1 { + return + } + + copy(nh.outOfSyncNodes[nodeIndex:], nh.outOfSyncNodes[nodeIndex+1:]) + nh.outOfSyncNodes[len(nh.outOfSyncNodes)-1] = nil + nh.outOfSyncNodes = nh.outOfSyncNodes[:len(nh.outOfSyncNodes)-1] +} + +func (nh *nodesHolder) removeFallbackFromOutOfSyncListUnprotected(nodeToRemove *data.NodeData) { + nodeIndex := getIndexFromList(nodeToRemove, nh.outOfSyncFallbackNodes) + if nodeIndex == -1 { + return + } + + copy(nh.outOfSyncFallbackNodes[nodeIndex:], nh.outOfSyncFallbackNodes[nodeIndex+1:]) + nh.outOfSyncFallbackNodes[len(nh.outOfSyncFallbackNodes)-1] = nil + nh.outOfSyncFallbackNodes = nh.outOfSyncFallbackNodes[:len(nh.outOfSyncFallbackNodes)-1] +} + +func getIndexFromList(providedNode *data.NodeData, list []*data.NodeData) int { + nodeIndex := -1 + for idx, node := range list { + if node.Address == providedNode.Address && node.ShardId == providedNode.ShardId { + nodeIndex = idx + break + } + } + + return nodeIndex +} + +func nodesSliceToShardedMap(nodes []*data.NodeData) map[uint32][]*data.NodeData { + newNodes := make(map[uint32][]*data.NodeData) + for _, node := range nodes { + shardId := node.ShardId + newNodes[shardId] = append(newNodes[shardId], node) + } + + return newNodes +} diff --git a/observer/holder/nodesHolder_test.go b/observer/holder/nodesHolder_test.go new file mode 100644 index 00000000..741ae0c1 --- /dev/null +++ b/observer/holder/nodesHolder_test.go @@ -0,0 +1,249 @@ +package holder + +import ( + "errors" + "fmt" + "sort" + "testing" + + "github.com/multiversx/mx-chain-proxy-go/data" + "github.com/stretchr/testify/require" +) + +func TestComputeSyncAndOutOfSyncNodes(t *testing.T) { + t.Parallel() + + t.Run("all nodes synced", testComputeSyncedAndOutOfSyncNodesAllNodesSynced) + t.Run("enough synced nodes", testComputeSyncedAndOutOfSyncNodesEnoughSyncedObservers) + t.Run("all nodes are out of sync", testComputeSyncedAndOutOfSyncNodesAllNodesNotSynced) + t.Run("invalid config - no node", testComputeSyncedAndOutOfSyncNodesInvalidConfigurationNoNodeAtAll) + t.Run("invalid config - no node in a shard", testComputeSyncedAndOutOfSyncNodesInvalidConfigurationNoNodeInAShard) + t.Run("edge case - address should not exist in both sync and not-synced lists", testEdgeCaseAddressShouldNotExistInBothLists) +} + +func testComputeSyncedAndOutOfSyncNodesAllNodesSynced(t *testing.T) { + t.Parallel() + + shardIDs := []uint32{0, 1} + input := []*data.NodeData{ + {Address: "0", ShardId: 0, IsSynced: true}, + {Address: "1", ShardId: 0, IsSynced: true, IsFallback: true}, + {Address: "2", ShardId: 1, IsSynced: true}, + {Address: "3", ShardId: 1, IsSynced: true, IsFallback: true}, + } + + synced, syncedFb, notSynced, _ := computeSyncedAndOutOfSyncNodes(input, shardIDs) + require.Equal(t, []*data.NodeData{ + {Address: "0", ShardId: 0, IsSynced: true}, + {Address: "2", ShardId: 1, IsSynced: true}, + }, synced) + require.Equal(t, []*data.NodeData{ + {Address: "1", ShardId: 0, IsSynced: true, IsFallback: true}, + {Address: "3", ShardId: 1, IsSynced: true, IsFallback: true}, + }, syncedFb) + require.Empty(t, notSynced) +} + +func testComputeSyncedAndOutOfSyncNodesEnoughSyncedObservers(t *testing.T) { + t.Parallel() + + shardIDs := []uint32{0, 1} + input := []*data.NodeData{ + {Address: "0", ShardId: 0, IsSynced: true}, + {Address: "1", ShardId: 0, IsSynced: false}, + {Address: "2", ShardId: 0, IsSynced: true, IsFallback: true}, + {Address: "3", ShardId: 1, IsSynced: true}, + {Address: "4", ShardId: 1, IsSynced: false}, + {Address: "5", ShardId: 1, IsSynced: true, IsFallback: true}, + } + + synced, syncedFb, notSynced, _ := computeSyncedAndOutOfSyncNodes(input, shardIDs) + require.Equal(t, []*data.NodeData{ + {Address: "0", ShardId: 0, IsSynced: true}, + {Address: "3", ShardId: 1, IsSynced: true}, + }, synced) + require.Equal(t, []*data.NodeData{ + {Address: "2", ShardId: 0, IsSynced: true, IsFallback: true}, + {Address: "5", ShardId: 1, IsSynced: true, IsFallback: true}, + }, syncedFb) + require.Equal(t, []*data.NodeData{ + {Address: "1", ShardId: 0, IsSynced: false}, + {Address: "4", ShardId: 1, IsSynced: false}, + }, notSynced) +} + +func testComputeSyncedAndOutOfSyncNodesAllNodesNotSynced(t *testing.T) { + t.Parallel() + + shardIDs := []uint32{0, 1} + input := []*data.NodeData{ + {Address: "0", ShardId: 0, IsSynced: false}, + {Address: "1", ShardId: 0, IsSynced: false, IsFallback: true}, + {Address: "2", ShardId: 1, IsSynced: false}, + {Address: "3", ShardId: 1, IsSynced: false, IsFallback: true}, + } + + synced, syncedFb, notSynced, _ := computeSyncedAndOutOfSyncNodes(input, shardIDs) + require.Equal(t, []*data.NodeData{}, synced) + require.Equal(t, []*data.NodeData{}, syncedFb) + require.Equal(t, input, notSynced) +} + +func testEdgeCaseAddressShouldNotExistInBothLists(t *testing.T) { + t.Parallel() + + allNodes := prepareNodes(10) + + nodesMap := nodesSliceToShardedMap(allNodes) + nh := &nodesHolder{ + shardIDs: getSortedShardIDsSlice(nodesMap), + syncedNodes: allNodes, + } + + setSyncedStateToNodes(allNodes, false, 1, 3, 5, 7, 9) + + nh.UpdateNodes(allNodes) + require.Equal(t, []data.NodeData{ + {Address: "addr0", ShardId: 0, IsSynced: true}, + {Address: "addr2", ShardId: 0, IsSynced: true}, + {Address: "addr4", ShardId: 0, IsSynced: true}, + {Address: "addr6", ShardId: 1, IsSynced: true}, + {Address: "addr8", ShardId: 1, IsSynced: true}, + }, convertAndSortSlice(nh.syncedNodes)) + require.Equal(t, []data.NodeData{ + {Address: "addr1", ShardId: 0, IsSynced: false}, + {Address: "addr3", ShardId: 0, IsSynced: false}, + {Address: "addr5", ShardId: 1, IsSynced: false}, + {Address: "addr7", ShardId: 1, IsSynced: false}, + {Address: "addr9", ShardId: 1, IsSynced: false}, + }, convertAndSortSlice(nh.outOfSyncNodes)) + require.False(t, slicesHaveCommonObjects(nh.syncedNodes, nh.outOfSyncNodes)) + + allNodes = prepareNodes(10) + + nh.UpdateNodes(allNodes) + + require.Equal(t, []data.NodeData{ + {Address: "addr0", ShardId: 0, IsSynced: true}, + {Address: "addr1", ShardId: 0, IsSynced: true}, + {Address: "addr2", ShardId: 0, IsSynced: true}, + {Address: "addr3", ShardId: 0, IsSynced: true}, + {Address: "addr4", ShardId: 0, IsSynced: true}, + {Address: "addr5", ShardId: 1, IsSynced: true}, + {Address: "addr6", ShardId: 1, IsSynced: true}, + {Address: "addr7", ShardId: 1, IsSynced: true}, + {Address: "addr8", ShardId: 1, IsSynced: true}, + {Address: "addr9", ShardId: 1, IsSynced: true}, + }, convertAndSortSlice(nh.syncedNodes)) + require.False(t, slicesHaveCommonObjects(nh.syncedNodes, nh.outOfSyncNodes)) +} + +func testComputeSyncedAndOutOfSyncNodesInvalidConfigurationNoNodeAtAll(t *testing.T) { + t.Parallel() + + shardIDs := []uint32{0, 1} + var input []*data.NodeData + synced, syncedFb, notSynced, err := computeSyncedAndOutOfSyncNodes(input, shardIDs) + require.Error(t, err) + require.Nil(t, synced) + require.Nil(t, syncedFb) + require.Nil(t, notSynced) + + // no node in one shard + shardIDs = []uint32{0, 1} + input = []*data.NodeData{ + { + Address: "0", ShardId: 0, IsSynced: true, + }, + } + synced, syncedFb, notSynced, err = computeSyncedAndOutOfSyncNodes(input, shardIDs) + require.True(t, errors.Is(err, errWrongConfiguration)) + require.Nil(t, synced) + require.Nil(t, syncedFb) + require.Nil(t, notSynced) +} + +func testComputeSyncedAndOutOfSyncNodesInvalidConfigurationNoNodeInAShard(t *testing.T) { + t.Parallel() + + // no node in one shard + shardIDs := []uint32{0, 1} + input := []*data.NodeData{ + { + Address: "0", ShardId: 0, IsSynced: true, + }, + } + synced, syncedFb, notSynced, err := computeSyncedAndOutOfSyncNodes(input, shardIDs) + require.True(t, errors.Is(err, errWrongConfiguration)) + require.Nil(t, synced) + require.Nil(t, syncedFb) + require.Nil(t, notSynced) +} + +func slicesHaveCommonObjects(firstSlice []*data.NodeData, secondSlice []*data.NodeData) bool { + nodeDataToStr := func(nd *data.NodeData) string { + return fmt.Sprintf("%s%d", nd.Address, nd.ShardId) + } + firstSliceItems := make(map[string]struct{}) + for _, el := range firstSlice { + firstSliceItems[nodeDataToStr(el)] = struct{}{} + } + + for _, el := range secondSlice { + nodeDataStr := nodeDataToStr(el) + _, found := firstSliceItems[nodeDataStr] + if found { + return true + } + } + + return false +} + +func prepareNodes(count int) []*data.NodeData { + nodes := make([]*data.NodeData, 0, count) + for i := 0; i < count; i++ { + shardID := uint32(0) + if i >= count/2 { + shardID = 1 + } + nodes = append(nodes, &data.NodeData{ + ShardId: shardID, + Address: fmt.Sprintf("addr%d", i), + IsSynced: true, + }) + } + + return nodes +} + +func getSortedShardIDsSlice(nodesOnShards map[uint32][]*data.NodeData) []uint32 { + shardIDs := make([]uint32, 0) + for shardID := range nodesOnShards { + shardIDs = append(shardIDs, shardID) + } + sort.SliceStable(shardIDs, func(i, j int) bool { + return shardIDs[i] < shardIDs[j] + }) + + return shardIDs +} + +func setSyncedStateToNodes(nodes []*data.NodeData, state bool, indices ...int) { + for _, idx := range indices { + nodes[idx].IsSynced = state + } +} + +func convertAndSortSlice(nodes []*data.NodeData) []data.NodeData { + newSlice := make([]data.NodeData, 0, len(nodes)) + for _, node := range nodes { + newSlice = append(newSlice, *node) + } + + sort.Slice(newSlice, func(i, j int) bool { + return newSlice[i].Address < newSlice[j].Address + }) + + return newSlice +} diff --git a/observer/interface.go b/observer/interface.go index 0adae03a..0e545a59 100644 --- a/observer/interface.go +++ b/observer/interface.go @@ -11,3 +11,14 @@ type NodesProviderHandler interface { ReloadNodes(nodesType data.NodeType) data.NodesReloadResponse IsInterfaceNil() bool } + +// NodesHolder defines the actions of a component that is able to hold nodes +type NodesHolder interface { + UpdateNodes(nodesWithSyncStatus []*data.NodeData) + GetSyncedNodes() []*data.NodeData + GetSyncedFallbackNodes() []*data.NodeData + GetOutOfSyncNodes() []*data.NodeData + GetOutOfSyncFallbackNodes() []*data.NodeData + GetLastSyncedNodes() map[uint32]*data.NodeData + IsInterfaceNil() bool +}