-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Map counters holder component #394
Merged
bogdan-rosianu
merged 3 commits into
feat/snapshotless-observer-support
from
circular-queue-map-counters-holder
Sep 29, 2023
Merged
Changes from 1 commit
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
package mapCounters | ||
|
||
import "sync" | ||
|
||
type mapCounter struct { | ||
positions map[uint32]uint32 | ||
allNodesCount uint32 | ||
allNodesPosition uint32 | ||
mut sync.RWMutex | ||
} | ||
|
||
// newMapCounter returns a new instance of a mapCounter | ||
func newMapCounter() *mapCounter { | ||
return &mapCounter{ | ||
positions: make(map[uint32]uint32), | ||
allNodesPosition: 0, | ||
} | ||
} | ||
|
||
func (mc *mapCounter) computePositionForShard(shardID uint32, numNodes uint32) uint32 { | ||
mc.mut.Lock() | ||
defer mc.mut.Unlock() | ||
|
||
mc.initShardPositionIfNeededUnprotected(shardID) | ||
|
||
mc.positions[shardID]++ | ||
mc.positions[shardID] %= numNodes | ||
|
||
return mc.positions[shardID] | ||
} | ||
|
||
func (mc *mapCounter) computePositionForAllNodes(numNodes uint32) uint32 { | ||
mc.mut.Lock() | ||
defer mc.mut.Unlock() | ||
|
||
mc.initAllNodesPositionIfNeededUnprotected(numNodes) | ||
|
||
mc.allNodesPosition++ | ||
mc.allNodesPosition %= numNodes | ||
|
||
return mc.allNodesPosition | ||
} | ||
|
||
func (mc *mapCounter) initShardPositionIfNeededUnprotected(shardID uint32) { | ||
_, shardExists := mc.positions[shardID] | ||
if !shardExists { | ||
mc.positions[shardID] = 0 | ||
} | ||
} | ||
|
||
func (mc *mapCounter) initAllNodesPositionIfNeededUnprotected(numNodes uint32) { | ||
if numNodes != mc.allNodesCount { | ||
mc.allNodesCount = numNodes | ||
mc.allNodesPosition = 0 | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
package mapCounters | ||
|
||
import ( | ||
"sync" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestNewMapCounter(t *testing.T) { | ||
t.Parallel() | ||
|
||
mc := newMapCounter() | ||
require.NotNil(t, mc) | ||
require.NotNil(t, mc.positions) | ||
} | ||
|
||
func TestMapCounter_ComputeShardPositionShouldWorkWithDifferentNumOfNodes(t *testing.T) { | ||
t.Parallel() | ||
|
||
mc := newMapCounter() | ||
computeShardPosAndAssert(t, mc, 3, 1) | ||
computeShardPosAndAssert(t, mc, 3, 2) | ||
computeShardPosAndAssert(t, mc, 3, 0) | ||
computeShardPosAndAssert(t, mc, 3, 1) | ||
// change num nodes | ||
computeShardPosAndAssert(t, mc, 2, 0) | ||
computeShardPosAndAssert(t, mc, 2, 1) | ||
computeShardPosAndAssert(t, mc, 2, 0) | ||
// change num nodes again | ||
computeShardPosAndAssert(t, mc, 5, 1) | ||
computeShardPosAndAssert(t, mc, 5, 2) | ||
computeShardPosAndAssert(t, mc, 5, 3) | ||
} | ||
|
||
func TestMapCounter_ComputeShardPositionShouldWorkMultiShard(t *testing.T) { | ||
t.Parallel() | ||
|
||
mc := newMapCounter() | ||
computeShardPosAndAssertForShard(t, mc, 0, 3, 1) | ||
computeShardPosAndAssertForShard(t, mc, 1, 4, 1) | ||
|
||
computeShardPosAndAssertForShard(t, mc, 0, 3, 2) | ||
computeShardPosAndAssertForShard(t, mc, 1, 4, 2) | ||
|
||
computeShardPosAndAssertForShard(t, mc, 0, 3, 0) | ||
computeShardPosAndAssertForShard(t, mc, 1, 4, 3) | ||
|
||
computeShardPosAndAssertForShard(t, mc, 0, 3, 1) | ||
computeShardPosAndAssertForShard(t, mc, 1, 4, 0) | ||
|
||
} | ||
|
||
func computeShardPosAndAssertForShard(t *testing.T, mc *mapCounter, shardID uint32, numNodes uint32, expectedPos uint32) { | ||
actualPos := mc.computePositionForShard(shardID, numNodes) | ||
require.Equal(t, expectedPos, actualPos) | ||
} | ||
|
||
func computeShardPosAndAssert(t *testing.T, mc *mapCounter, numNodes uint32, expectedPos uint32) { | ||
computeShardPosAndAssertForShard(t, mc, 0, numNodes, expectedPos) | ||
} | ||
|
||
func TestMapCounter_ComputeAllNodesPosition(t *testing.T) { | ||
t.Parallel() | ||
|
||
mc := newMapCounter() | ||
computeAllNodesPosAndAssert(t, mc, 3, 1) | ||
computeAllNodesPosAndAssert(t, mc, 3, 2) | ||
computeAllNodesPosAndAssert(t, mc, 3, 0) | ||
computeAllNodesPosAndAssert(t, mc, 3, 1) | ||
// change num nodes - should reset | ||
computeAllNodesPosAndAssert(t, mc, 5, 1) | ||
computeAllNodesPosAndAssert(t, mc, 5, 2) | ||
computeAllNodesPosAndAssert(t, mc, 5, 3) | ||
// change num nodes again - should reset | ||
computeAllNodesPosAndAssert(t, mc, 2, 1) | ||
computeAllNodesPosAndAssert(t, mc, 2, 0) | ||
computeAllNodesPosAndAssert(t, mc, 2, 1) | ||
} | ||
|
||
func computeAllNodesPosAndAssert(t *testing.T, mc *mapCounter, numNodes uint32, expectedPos uint32) { | ||
actualPos := mc.computePositionForAllNodes(numNodes) | ||
require.Equal(t, expectedPos, actualPos) | ||
} | ||
|
||
func TestMapCounter_ConcurrentOperations(t *testing.T) { | ||
t.Parallel() | ||
|
||
mc := newMapCounter() | ||
|
||
numOperations := 10_000 | ||
wg := sync.WaitGroup{} | ||
wg.Add(numOperations) | ||
for i := 0; i < numOperations; i++ { | ||
go func(idx int) { | ||
switch idx { | ||
case 0: | ||
mc.computePositionForShard(uint32(idx), uint32(10+idx)) | ||
case 1: | ||
mc.computePositionForAllNodes(uint32(10 + idx)) | ||
} | ||
wg.Done() | ||
}(i % 2) | ||
} | ||
|
||
wg.Wait() | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
interface instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done. but didn't pass it to the constructor since the functionality of the
circularQueueNodesProvider
depends a lot on the actual implementation, so stubs in tests wouldn't help.I just wanted to extract some code in new components so the code is easier to read :D