Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(eth-pubsub): Implement in-memory EventBus for real-time topic management and event distribution #1855

Merged
merged 3 commits into from
Apr 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- [#1838](https://github.com/NibiruChain/nibiru/pull/1838) - feat(eth): Go-ethereum, crypto, encoding, and unit tests for evm/types
- [#1841](https://github.com/NibiruChain/nibiru/pull/1841) - feat(eth): Collections encoders for bytes, Ethereum addresses, and Ethereum hashes
- [#1847](https://github.com/NibiruChain/nibiru/pull/1847) - fix(docker-chaosnet): release snapshot docker build failed CI.
- [#1855](https://github.com/NibiruChain/nibiru/pull/1855) - feat(eth-pubsub): Implement in-memory EventBus for real-time topic management and event distribution

#### Dapp modules: perp, spot, etc

Expand Down
174 changes: 174 additions & 0 deletions eth/rpc/pubsub/pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
// Copyright (c) 2023-2024 Nibi, Inc.
package pubsub

import (
"sync"
"sync/atomic"

"github.com/pkg/errors"

coretypes "github.com/cometbft/cometbft/rpc/core/types"
)

type UnsubscribeFunc func()

// EventBus manages topics and subscriptions. A "topic" is a named channel of
// communication. A "subscription" is the action taken by a subscriber to express
// interest in receiving messages broadcasted from a specific topic.
type EventBus interface {
// AddTopic: Adds a new topic with the specified name and message source
AddTopic(name string, src <-chan coretypes.ResultEvent) error
// RemoveTopic: Removes the specified topic and all its related data,
// ensuring clean up of resources.
RemoveTopic(name string)
Subscribe(name string) (<-chan coretypes.ResultEvent, UnsubscribeFunc, error)
Topics() []string
}

// memEventBus is an implemention of the `EventBus` interface.
type memEventBus struct {
topics map[string]<-chan coretypes.ResultEvent
topicsMux *sync.RWMutex
subscribers map[string]map[uint64]chan<- coretypes.ResultEvent
subscribersMux *sync.RWMutex
currentUniqueID uint64
}

// NewEventBus returns a fresh imlpemention of `memEventBus`, which implements
// the `EventBus` interface for managing Ethereum topics and subscriptions.
func NewEventBus() EventBus {
return &memEventBus{
topics: make(map[string]<-chan coretypes.ResultEvent),
topicsMux: new(sync.RWMutex),
subscribers: make(map[string]map[uint64]chan<- coretypes.ResultEvent),
subscribersMux: new(sync.RWMutex),
}
}

// GenUniqueID atomically increments and returns a unique identifier for a new subscriber.
// This ID is used internally to manage subscriber-specific channels.
func (m *memEventBus) GenUniqueID() uint64 {
return atomic.AddUint64(&m.currentUniqueID, 1)
}

// Topics returns a list of all topics currently managed by the EventBus. The
// list is safe for concurrent access and is a snapshot of current topic names.
func (m *memEventBus) Topics() (topics []string) {
m.topicsMux.RLock()
defer m.topicsMux.RUnlock()

topics = make([]string, 0, len(m.topics))
for topicName := range m.topics {
topics = append(topics, topicName)
}

return topics
}

// AddTopic adds a new topic with the specified name and message source
func (m *memEventBus) AddTopic(name string, src <-chan coretypes.ResultEvent) error {
m.topicsMux.RLock()
_, ok := m.topics[name]
m.topicsMux.RUnlock()

if ok {
return errors.New("topic already registered")
}

m.topicsMux.Lock()
m.topics[name] = src
m.topicsMux.Unlock()

go m.publishTopic(name, src)

return nil
}

// RemoveTopic: Removes the specified topic and all its related data, ensuring
// clean up of resources.
func (m *memEventBus) RemoveTopic(name string) {
m.topicsMux.Lock()
delete(m.topics, name)
m.topicsMux.Unlock()

Check warning on line 92 in eth/rpc/pubsub/pubsub.go

View check run for this annotation

Codecov / codecov/patch

eth/rpc/pubsub/pubsub.go#L89-L92

Added lines #L89 - L92 were not covered by tests
}

// Subscribe attempts to create a subscription to the specified topic. It returns
// a channel to receive messages, a function to unsubscribe, and an error if the
// topic does not exist.
func (m *memEventBus) Subscribe(name string) (<-chan coretypes.ResultEvent, UnsubscribeFunc, error) {
m.topicsMux.RLock()
_, ok := m.topics[name]
m.topicsMux.RUnlock()

if !ok {
return nil, nil, errors.Errorf("topic not found: %s", name)

Check warning on line 104 in eth/rpc/pubsub/pubsub.go

View check run for this annotation

Codecov / codecov/patch

eth/rpc/pubsub/pubsub.go#L104

Added line #L104 was not covered by tests
}

ch := make(chan coretypes.ResultEvent)
m.subscribersMux.Lock()
defer m.subscribersMux.Unlock()

id := m.GenUniqueID()
if _, ok := m.subscribers[name]; !ok {
m.subscribers[name] = make(map[uint64]chan<- coretypes.ResultEvent)
}
m.subscribers[name][id] = ch

unsubscribe := func() {
m.subscribersMux.Lock()
defer m.subscribersMux.Unlock()
delete(m.subscribers[name], id)

Check warning on line 120 in eth/rpc/pubsub/pubsub.go

View check run for this annotation

Codecov / codecov/patch

eth/rpc/pubsub/pubsub.go#L118-L120

Added lines #L118 - L120 were not covered by tests
}

return ch, unsubscribe, nil
}

func (m *memEventBus) publishTopic(name string, src <-chan coretypes.ResultEvent) {
for {
msg, ok := <-src
if !ok {
m.closeAllSubscribers(name)
m.topicsMux.Lock()
delete(m.topics, name)
m.topicsMux.Unlock()
return
}
m.publishAllSubscribers(name, msg)
}
}

// closeAllSubscribers closes all subscriber channels associated with the
// specified topic and removes the topic from the subscribers map. This function
// is typically called when a topic is deleted or no longer available to ensure
// all resources are released properly and to prevent goroutine leaks. It ensures
// thread-safe execution by locking around the operation.
func (m *memEventBus) closeAllSubscribers(name string) {
m.subscribersMux.Lock()
defer m.subscribersMux.Unlock()

subscribers := m.subscribers[name]
delete(m.subscribers, name)
// #nosec G705
for _, sub := range subscribers {
close(sub)
}
}

// publishAllSubscribers sends a message to all subscribers of the specified
// topic. It uses a non-blocking send operation to deliver the message to
// subscriber channels. If a subscriber's channel is not ready to receive the
// message (i.e., the channel is full), the message is skipped for that
// subscriber to avoid blocking the publisher. This function ensures thread-safe
// access to subscribers by using a read lock.
func (m *memEventBus) publishAllSubscribers(name string, msg coretypes.ResultEvent) {
m.subscribersMux.RLock()
defer m.subscribersMux.RUnlock()
subscribers := m.subscribers[name]
// #nosec G705
for _, sub := range subscribers {
select {
case sub <- msg:

Check warning on line 170 in eth/rpc/pubsub/pubsub.go

View check run for this annotation

Codecov / codecov/patch

eth/rpc/pubsub/pubsub.go#L170

Added line #L170 was not covered by tests
default:
}
}
}
155 changes: 155 additions & 0 deletions eth/rpc/pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package pubsub

import (
"log"
"sort"
"sync"
"testing"
"time"

rpccore "github.com/cometbft/cometbft/rpc/core/types"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)

// subscribeAndPublish: Helper function used to perform concurrent subscription
// and publishing actions. It concurrently subscribes multiple clients to the
// specified topic and simultanesouly sends an empty message to the topic channel
// for each subscription.
func subscribeAndPublish(t *testing.T, eb EventBus, topic string, topicChan chan rpccore.ResultEvent) {
var (
wg sync.WaitGroup
subscribersCount = 50
emptyMsg = rpccore.ResultEvent{}
)
for i := 0; i < subscribersCount; i++ {
wg.Add(1)
// concurrently subscribe to the topic
go func() {
defer wg.Done()
_, _, err := eb.Subscribe(topic)
require.NoError(t, err)
}()

// send events to the topic
wg.Add(1)
go func() {
defer wg.Done()
topicChan <- emptyMsg
}()
}
wg.Wait()
}

type SuitePubsub struct {
suite.Suite
}

func TestSuitePubsub(t *testing.T) {
suite.Run(t, new(SuitePubsub))
}

func (s *SuitePubsub) TestAddTopic() {
q := NewEventBus()
// dummy vars
topicA := "guard"
topicB := "cream"

s.NoError(q.AddTopic(topicA, make(<-chan rpccore.ResultEvent)))
s.NoError(q.AddTopic(topicB, make(<-chan rpccore.ResultEvent)))
s.Error(q.AddTopic(topicB, make(<-chan rpccore.ResultEvent)))

topics := q.Topics()
sort.Strings(topics) // cream should be first
s.Require().EqualValues([]string{topicB, topicA}, topics)
}

func (s *SuitePubsub) TestSubscribe() {
q := NewEventBus()

// dummy vars
topicA := "0xfoo"
topicB := "blockchain"

srcA := make(chan rpccore.ResultEvent)
err := q.AddTopic(topicA, srcA)
s.NoError(err)

srcB := make(chan rpccore.ResultEvent)
err = q.AddTopic(topicB, srcB)
s.NoError(err)

// subscriber channels
subChanA, _, err := q.Subscribe(topicA)
s.NoError(err)
subChanB1, _, err := q.Subscribe(topicB)
s.NoError(err)
subChanB2, _, err := q.Subscribe(topicB)
s.NoError(err)

wg := new(sync.WaitGroup)
wg.Add(4)

emptyMsg := rpccore.ResultEvent{}
go func() {
defer wg.Done()
msg := <-subChanA
log.Println(topicA+":", msg)
s.EqualValues(emptyMsg, msg)
}()

go func() {
defer wg.Done()
msg := <-subChanB1
log.Println(topicB+":", msg)
s.EqualValues(emptyMsg, msg)
}()

go func() {
defer wg.Done()
msg := <-subChanB2
log.Println(topicB+"2:", msg)
s.EqualValues(emptyMsg, msg)
}()

go func() {
defer wg.Done()

time.Sleep(time.Second)

close(srcA)
close(srcB)
}()

wg.Wait()
time.Sleep(time.Second)
}

// TestConcurrentSubscribeAndPublish: Stress tests the module to make sure that
// operations are handled properly under concurrent access.
func (s *SuitePubsub) TestConcurrentSubscribeAndPublish() {
var (
wg sync.WaitGroup
eb = NewEventBus()
topicName = "topic-name"
topicCh = make(chan rpccore.ResultEvent)
runsCount = 5
)

err := eb.AddTopic(topicName, topicCh)
s.Require().NoError(err)

for i := 0; i < runsCount; i++ {
subscribeAndPublish(s.T(), eb, topicName, topicCh)
}

// close channel to make test end
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(2 * time.Second)
close(topicCh)
}()

wg.Wait()
}
Loading