Skip to content

Commit

Permalink
graceful shutdown of websocket (#547)
Browse files Browse the repository at this point in the history
* graceful shutdown of websocket

* context for unsubscribe logs

* added a logger to the log subscription

* lint

---------

Co-authored-by: mmsqe <[email protected]>
  • Loading branch information
valli0x and mmsqe authored Oct 30, 2024
1 parent e567c5a commit ce5773e
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 13 deletions.
30 changes: 18 additions & 12 deletions rpc/websockets.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ type websocketsServer struct {
logger log.Logger
}

func NewWebsocketsServer(clientCtx client.Context, logger log.Logger, stream *stream.RPCStream, cfg *config.Config) WebsocketsServer {
func NewWebsocketsServer(
ctx context.Context, clientCtx client.Context, logger log.Logger, stream *stream.RPCStream, cfg *config.Config,
) WebsocketsServer {
logger = logger.With("api", "websocket-server")
_, port, _ := net.SplitHostPort(cfg.JSONRPC.Address)

Expand All @@ -95,7 +97,7 @@ func NewWebsocketsServer(clientCtx client.Context, logger log.Logger, stream *st
wsAddr: cfg.JSONRPC.WsAddress,
certFile: cfg.TLS.CertificatePath,
keyFile: cfg.TLS.KeyPath,
api: newPubSubAPI(clientCtx, logger, stream),
api: newPubSubAPI(ctx, clientCtx, logger, stream),
logger: logger,
}
}
Expand Down Expand Up @@ -347,18 +349,20 @@ func (s *websocketsServer) tcpGetAndSendResponse(wsConn *wsConn, mb []byte) erro

// pubSubAPI is the eth_ prefixed set of APIs in the Web3 JSON-RPC spec
type pubSubAPI struct {
events *stream.RPCStream
logger log.Logger
clientCtx client.Context
events *stream.RPCStream
logger log.Logger
clientCtx client.Context
cancelContext context.Context
}

// newPubSubAPI creates an instance of the ethereum PubSub API.
func newPubSubAPI(clientCtx client.Context, logger log.Logger, stream *stream.RPCStream) *pubSubAPI {
func newPubSubAPI(ctx context.Context, clientCtx client.Context, logger log.Logger, stream *stream.RPCStream) *pubSubAPI {
logger = logger.With("module", "websocket-client")
return &pubSubAPI{
events: stream,
logger: logger,
clientCtx: clientCtx,
events: stream,
logger: logger,
clientCtx: clientCtx,
cancelContext: ctx,
}
}

Expand Down Expand Up @@ -411,7 +415,8 @@ type Header struct {
}

func (api *pubSubAPI) subscribeNewHeads(wsConn *wsConn, subID rpc.ID) (context.CancelFunc, error) {
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(api.cancelContext)

//nolint: errcheck
go api.events.HeaderStream().Subscribe(ctx, func(headers []stream.RPCHeader, _ int) error {
for _, header := range headers {
Expand Down Expand Up @@ -569,7 +574,7 @@ func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, subID rpc.ID, extra interfac
}
}

ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(api.cancelContext)
//nolint: errcheck
go api.events.LogStream().Subscribe(ctx, func(txLogs []*ethtypes.Log, _ int) error {
logs := rpcfilters.FilterLogs(txLogs, crit.FromBlock, crit.ToBlock, crit.Addresses, crit.Topics)
Expand All @@ -589,6 +594,7 @@ func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, subID rpc.ID, extra interfac

err := wsConn.WriteJSON(res)
if err != nil {
api.logger.Error("error writing header, will drop peer", "error", err.Error())
try(func() {
if err != websocket.ErrCloseSent {
_ = wsConn.Close()
Expand All @@ -605,7 +611,7 @@ func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, subID rpc.ID, extra interfac
}

func (api *pubSubAPI) subscribePendingTransactions(wsConn *wsConn, subID rpc.ID) (context.CancelFunc, error) {
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(api.cancelContext)
//nolint: errcheck
go api.events.PendingTxStream().Subscribe(ctx, func(items []common.Hash, _ int) error {
for _, hash := range items {
Expand Down
2 changes: 1 addition & 1 deletion server/json_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func StartJSONRPC(

srvCtx.Logger.Info("Starting JSON WebSocket server", "address", config.JSONRPC.WsAddress)

wsSrv := rpc.NewWebsocketsServer(clientCtx, srvCtx.Logger, rpcStream, config)
wsSrv := rpc.NewWebsocketsServer(ctx, clientCtx, srvCtx.Logger, rpcStream, config)
wsSrv.Start()
return httpSrv, nil
}

0 comments on commit ce5773e

Please sign in to comment.