Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(eth-rpc): Conversion types and functions between Ethereum txs and blocks and Tendermint ones. #1856

Merged
merged 16 commits into from
May 5, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- [#1838](https://github.com/NibiruChain/nibiru/pull/1838) - feat(eth): Go-ethereum, crypto, encoding, and unit tests for evm/types
- [#1841](https://github.com/NibiruChain/nibiru/pull/1841) - feat(eth): Collections encoders for bytes, Ethereum addresses, and Ethereum hashes
- [#1847](https://github.com/NibiruChain/nibiru/pull/1847) - fix(docker-chaosnet): release snapshot docker build failed CI.
- [#1855](https://github.com/NibiruChain/nibiru/pull/1855) - feat(eth-pubsub): Implement in-memory EventBus for real-time topic management and event distribution
- [#1856](https://github.com/NibiruChain/nibiru/pull/1856) - feat(eth-rpc): Conversion types and functions between Ethereum txs and blocks and Tendermint ones.

#### Dapp modules: perp, spot, etc

Expand Down
174 changes: 174 additions & 0 deletions eth/rpc/pubsub/pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
// Copyright (c) 2023-2024 Nibi, Inc.
package pubsub

import (
"sync"
"sync/atomic"

"github.com/pkg/errors"

coretypes "github.com/cometbft/cometbft/rpc/core/types"
)

type UnsubscribeFunc func()

// EventBus manages topics and subscriptions. A "topic" is a named channel of
// communication. A "subscription" is the action taken by a subscriber to express
// interest in receiving messages broadcasted from a specific topic.
type EventBus interface {
// AddTopic: Adds a new topic with the specified name and message source
AddTopic(name string, src <-chan coretypes.ResultEvent) error
// RemoveTopic: Removes the specified topic and all its related data,
// ensuring clean up of resources.
RemoveTopic(name string)
Subscribe(name string) (<-chan coretypes.ResultEvent, UnsubscribeFunc, error)
Topics() []string
}

// memEventBus is an implemention of the `EventBus` interface.
type memEventBus struct {
topics map[string]<-chan coretypes.ResultEvent
topicsMux *sync.RWMutex
subscribers map[string]map[uint64]chan<- coretypes.ResultEvent
subscribersMux *sync.RWMutex
currentUniqueID uint64
}

// NewEventBus returns a fresh imlpemention of `memEventBus`, which implements
// the `EventBus` interface for managing Ethereum topics and subscriptions.
func NewEventBus() EventBus {
return &memEventBus{
topics: make(map[string]<-chan coretypes.ResultEvent),
topicsMux: new(sync.RWMutex),
subscribers: make(map[string]map[uint64]chan<- coretypes.ResultEvent),
subscribersMux: new(sync.RWMutex),
}
}

// GenUniqueID atomically increments and returns a unique identifier for a new subscriber.
// This ID is used internally to manage subscriber-specific channels.
func (m *memEventBus) GenUniqueID() uint64 {
return atomic.AddUint64(&m.currentUniqueID, 1)
}

// Topics returns a list of all topics currently managed by the EventBus. The
// list is safe for concurrent access and is a snapshot of current topic names.
func (m *memEventBus) Topics() (topics []string) {
m.topicsMux.RLock()
defer m.topicsMux.RUnlock()

topics = make([]string, 0, len(m.topics))
for topicName := range m.topics {
topics = append(topics, topicName)
}

return topics
}

// AddTopic adds a new topic with the specified name and message source
func (m *memEventBus) AddTopic(name string, src <-chan coretypes.ResultEvent) error {
m.topicsMux.RLock()
_, ok := m.topics[name]
m.topicsMux.RUnlock()

if ok {
return errors.New("topic already registered")
}

m.topicsMux.Lock()
m.topics[name] = src
m.topicsMux.Unlock()

go m.publishTopic(name, src)

return nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The AddTopic method correctly handles the addition of new topics, including thread-safe checks and the initialization of the publishing goroutine. However, consider handling the potential race condition between checking if a topic exists and adding it. This could be mitigated by using a single write lock for the entire operation.

- m.topicsMux.RLock()
- _, ok := m.topics[name]
- m.topicsMux.RUnlock()
- if ok {
+ m.topicsMux.Lock()
+ if _, ok := m.topics[name]; ok {
+   m.topicsMux.Unlock()
    return errors.New("topic already registered")
  }
- m.topicsMux.Lock()
  m.topics[name] = src
  m.topicsMux.Unlock()

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
// AddTopic adds a new topic with the specified name and message source
func (m *memEventBus) AddTopic(name string, src <-chan coretypes.ResultEvent) error {
m.topicsMux.RLock()
_, ok := m.topics[name]
m.topicsMux.RUnlock()
if ok {
return errors.New("topic already registered")
}
m.topicsMux.Lock()
m.topics[name] = src
m.topicsMux.Unlock()
go m.publishTopic(name, src)
return nil
}
// AddTopic adds a new topic with the specified name and message source
func (m *memEventBus) AddTopic(name string, src <-chan coretypes.ResultEvent) error {
m.topicsMux.Lock()
if _, ok := m.topics[name]; ok {
m.topicsMux.Unlock()
return errors.New("topic already registered")
}
m.topics[name] = src
m.topicsMux.Unlock()
go m.publishTopic(name, src)
return nil
}


// RemoveTopic: Removes the specified topic and all its related data, ensuring
// clean up of resources.
func (m *memEventBus) RemoveTopic(name string) {
m.topicsMux.Lock()
delete(m.topics, name)
m.topicsMux.Unlock()
}

// Subscribe attempts to create a subscription to the specified topic. It returns
// a channel to receive messages, a function to unsubscribe, and an error if the
// topic does not exist.
func (m *memEventBus) Subscribe(name string) (<-chan coretypes.ResultEvent, UnsubscribeFunc, error) {
m.topicsMux.RLock()
_, ok := m.topics[name]
m.topicsMux.RUnlock()

if !ok {
return nil, nil, errors.Errorf("topic not found: %s", name)
}

ch := make(chan coretypes.ResultEvent)
m.subscribersMux.Lock()
defer m.subscribersMux.Unlock()

id := m.GenUniqueID()
if _, ok := m.subscribers[name]; !ok {
m.subscribers[name] = make(map[uint64]chan<- coretypes.ResultEvent)
}
m.subscribers[name][id] = ch

unsubscribe := func() {
m.subscribersMux.Lock()
defer m.subscribersMux.Unlock()
delete(m.subscribers[name], id)
}

return ch, unsubscribe, nil
}

func (m *memEventBus) publishTopic(name string, src <-chan coretypes.ResultEvent) {
for {
msg, ok := <-src
if !ok {
m.closeAllSubscribers(name)
m.topicsMux.Lock()
delete(m.topics, name)
m.topicsMux.Unlock()
return
}
m.publishAllSubscribers(name, msg)
}
}

// closeAllSubscribers closes all subscriber channels associated with the
// specified topic and removes the topic from the subscribers map. This function
// is typically called when a topic is deleted or no longer available to ensure
// all resources are released properly and to prevent goroutine leaks. It ensures
// thread-safe execution by locking around the operation.
func (m *memEventBus) closeAllSubscribers(name string) {
m.subscribersMux.Lock()
defer m.subscribersMux.Unlock()

subscribers := m.subscribers[name]
delete(m.subscribers, name)
// #nosec G705
for _, sub := range subscribers {
close(sub)
}
}

// publishAllSubscribers sends a message to all subscribers of the specified
// topic. It uses a non-blocking send operation to deliver the message to
// subscriber channels. If a subscriber's channel is not ready to receive the
// message (i.e., the channel is full), the message is skipped for that
// subscriber to avoid blocking the publisher. This function ensures thread-safe
// access to subscribers by using a read lock.
func (m *memEventBus) publishAllSubscribers(name string, msg coretypes.ResultEvent) {
m.subscribersMux.RLock()
defer m.subscribersMux.RUnlock()
subscribers := m.subscribers[name]
// #nosec G705
for _, sub := range subscribers {
select {
case sub <- msg:
default:
}
}
}
155 changes: 155 additions & 0 deletions eth/rpc/pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package pubsub

import (
"log"
"sort"
"sync"
"testing"
"time"

rpccore "github.com/cometbft/cometbft/rpc/core/types"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)

// subscribeAndPublish: Helper function used to perform concurrent subscription
// and publishing actions. It concurrently subscribes multiple clients to the
// specified topic and simultanesouly sends an empty message to the topic channel
// for each subscription.
func subscribeAndPublish(t *testing.T, eb EventBus, topic string, topicChan chan rpccore.ResultEvent) {
var (
wg sync.WaitGroup
subscribersCount = 50
emptyMsg = rpccore.ResultEvent{}
)
for i := 0; i < subscribersCount; i++ {
wg.Add(1)
// concurrently subscribe to the topic
go func() {
defer wg.Done()
_, _, err := eb.Subscribe(topic)
require.NoError(t, err)
}()

// send events to the topic
wg.Add(1)
go func() {
defer wg.Done()
topicChan <- emptyMsg
}()
}
wg.Wait()
}

type SuitePubsub struct {
suite.Suite
}

func TestSuitePubsub(t *testing.T) {
suite.Run(t, new(SuitePubsub))
}

func (s *SuitePubsub) TestAddTopic() {
q := NewEventBus()
// dummy vars
topicA := "guard"
topicB := "cream"

s.NoError(q.AddTopic(topicA, make(<-chan rpccore.ResultEvent)))
s.NoError(q.AddTopic(topicB, make(<-chan rpccore.ResultEvent)))
s.Error(q.AddTopic(topicB, make(<-chan rpccore.ResultEvent)))

topics := q.Topics()
sort.Strings(topics) // cream should be first
s.Require().EqualValues([]string{topicB, topicA}, topics)
}

func (s *SuitePubsub) TestSubscribe() {
q := NewEventBus()

// dummy vars
topicA := "0xfoo"
topicB := "blockchain"

srcA := make(chan rpccore.ResultEvent)
err := q.AddTopic(topicA, srcA)
s.NoError(err)

srcB := make(chan rpccore.ResultEvent)
err = q.AddTopic(topicB, srcB)
s.NoError(err)

// subscriber channels
subChanA, _, err := q.Subscribe(topicA)
s.NoError(err)
subChanB1, _, err := q.Subscribe(topicB)
s.NoError(err)
subChanB2, _, err := q.Subscribe(topicB)
s.NoError(err)

wg := new(sync.WaitGroup)
wg.Add(4)

emptyMsg := rpccore.ResultEvent{}
go func() {
defer wg.Done()
msg := <-subChanA
log.Println(topicA+":", msg)
s.EqualValues(emptyMsg, msg)
}()

go func() {
defer wg.Done()
msg := <-subChanB1
log.Println(topicB+":", msg)
s.EqualValues(emptyMsg, msg)
}()

go func() {
defer wg.Done()
msg := <-subChanB2
log.Println(topicB+"2:", msg)
s.EqualValues(emptyMsg, msg)
}()

go func() {
defer wg.Done()

time.Sleep(time.Second)

close(srcA)
close(srcB)
}()

wg.Wait()
time.Sleep(time.Second)
}

// TestConcurrentSubscribeAndPublish: Stress tests the module to make sure that
// operations are handled properly under concurrent access.
func (s *SuitePubsub) TestConcurrentSubscribeAndPublish() {
var (
wg sync.WaitGroup
eb = NewEventBus()
topicName = "topic-name"
topicCh = make(chan rpccore.ResultEvent)
runsCount = 5
)

err := eb.AddTopic(topicName, topicCh)
s.Require().NoError(err)

for i := 0; i < runsCount; i++ {
subscribeAndPublish(s.T(), eb, topicName, topicCh)
}

// close channel to make test end
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(2 * time.Second)
close(topicCh)
}()

wg.Wait()
}
39 changes: 39 additions & 0 deletions eth/rpc/types/addrlock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright (c) 2023-2024 Nibi, Inc.
package types

import (
"sync"

"github.com/ethereum/go-ethereum/common"
)

// AddrLocker is a mutex structure used to avoid querying outdated account data
type AddrLocker struct {
mu sync.Mutex
locks map[common.Address]*sync.Mutex
}

// lock returns the lock of the given address.
func (l *AddrLocker) lock(address common.Address) *sync.Mutex {
l.mu.Lock()
defer l.mu.Unlock()
if l.locks == nil {
l.locks = make(map[common.Address]*sync.Mutex)
}
if _, ok := l.locks[address]; !ok {
l.locks[address] = new(sync.Mutex)
}
return l.locks[address]
}

// LockAddr locks an account's mutex. This is used to prevent another tx getting the
// same nonce until the lock is released. The mutex prevents the (an identical nonce) from
// being read again during the time that the first transaction is being signed.
func (l *AddrLocker) LockAddr(address common.Address) {
l.lock(address).Lock()
}

// UnlockAddr unlocks the mutex of the given account.
func (l *AddrLocker) UnlockAddr(address common.Address) {
l.lock(address).Unlock()
}
Loading
Loading