Skip to content

Commit

Permalink
Minor nits on pubsub implementation (#1593)
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronbuchwald authored Sep 30, 2024
1 parent 592cd7a commit 8ab9d83
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 10 deletions.
14 changes: 9 additions & 5 deletions api/ws/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,12 @@ func (w *WebSocketServer) AddTxListener(tx *chain.Transaction, c *pubsub.Connect

// TODO: limit max number of tx listeners a single connection can create
txID := tx.ID()
if _, ok := w.txListeners[txID]; !ok {
w.txListeners[txID] = pubsub.NewConnections()
connections, ok := w.txListeners[txID]
if !ok {
connections = pubsub.NewConnections()
w.txListeners[txID] = connections
}
w.txListeners[txID].Add(c)
connections.Add(c)
w.expiringTxs.Add([]*chain.Transaction{tx})
}

Expand Down Expand Up @@ -219,7 +221,9 @@ func (w *WebSocketServer) AcceptBlock(b *chain.StatefulBlock) error {
if err != nil {
return err
}
w.s.Publish(append([]byte{TxMode}, bytes...), listeners)
// Skip clearing inactive connections because they'll be deleted
// regardless.
_ = w.s.Publish(append([]byte{TxMode}, bytes...), listeners)
delete(w.txListeners, txID)
// [expiringTxs] will be cleared eventually (does not support removal)
}
Expand Down Expand Up @@ -274,7 +278,7 @@ func (w *WebSocketServer) MessageCallback() pubsub.Callback {
}
w.AddTxListener(tx, c)

// Submit will remove from [txWaiters] if it is not added
// Submit will remove from [txListeners] if it is not added
txID := tx.ID()
if err := w.vm.Submit(ctx, false, []*chain.Transaction{tx})[0]; err != nil {
w.logger.Error("failed to submit tx",
Expand Down
2 changes: 1 addition & 1 deletion pubsub/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type Server struct {
conns *Connections
}

// New returns a new Server instance. The callback function [f] is called
// New returns a new Server instance. [callback] is called
// by the server in response to messages if not nil.
func New(
log logging.Logger,
Expand Down
4 changes: 0 additions & 4 deletions pubsub/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ func (x *counter) dummyProcessTXCallback(b []byte, _ *Connection) {
// a msg to be sent to all connections. Checks the message was delivered properly
// and the connection is properly handled when closed.
func TestServerPublish(t *testing.T) {
t.Skip("FLAKY")

require := require.New(t)
// Create a new logger for the test
logger := logging.NoLog{}
Expand Down Expand Up @@ -194,8 +192,6 @@ func TestServerRead(t *testing.T) {
// a msg to be sent to only one of the connections. Checks the message was
// delivered properly and the connection is properly handled when closed.
func TestServerPublishSpecific(t *testing.T) {
t.Skip("FLAKY")

require := require.New(t)
// Create a new logger for the test
logger := logging.NoLog{}
Expand Down

0 comments on commit 8ab9d83

Please sign in to comment.