Skip to content

Commit

Permalink
Merge branch 'main' into CNS-single-provider
Browse files Browse the repository at this point in the history
  • Loading branch information
Yaroms committed Sep 29, 2024
2 parents dae47e1 + 130d0f1 commit 5811cdf
Show file tree
Hide file tree
Showing 29 changed files with 937 additions and 202 deletions.
10 changes: 5 additions & 5 deletions ecosystem/cache/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,34 +107,34 @@ func (cs *CacheServer) Serve(ctx context.Context,
if strings.HasPrefix(listenAddr, unixPrefix) { // Unix socket
host, port, err := net.SplitHostPort(listenAddr)
if err != nil {
utils.LavaFormatFatal("Failed to parse unix socket, provide address in this format unix:/tmp/example.sock: %v\n", err)
utils.LavaFormatFatal("Failed to parse unix socket, provide address in this format unix:/tmp/example.sock", err)
return
}

syscall.Unlink(port)

addr, err := net.ResolveUnixAddr(host, port)
if err != nil {
utils.LavaFormatFatal("Failed to resolve unix socket address: %v\n", err)
utils.LavaFormatFatal("Failed to resolve unix socket address", err)
return
}

lis, err = net.ListenUnix(host, addr)
if err != nil {
utils.LavaFormatFatal("Faild to listen to unix socket listener: %v\n", err)
utils.LavaFormatFatal("Failed to listen to unix socket listener", err)
return
}

// Set permissions for the Unix socket
err = os.Chmod(port, 0o600)
if err != nil {
utils.LavaFormatFatal("Failed to set permissions for Unix socket: %v\n", err)
utils.LavaFormatFatal("Failed to set permissions for Unix socket", err)
return
}
} else {
lis, err = net.Listen("tcp", listenAddr)
if err != nil {
utils.LavaFormatFatal("Cache server failure setting up TCP listener: %v\n", err)
utils.LavaFormatFatal("Cache server failure setting up TCP listener", err)
return
}
}
Expand Down
80 changes: 72 additions & 8 deletions protocol/chainlib/consumer_websocket_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,23 @@ package chainlib
import (
"context"
"strconv"
"sync/atomic"
"time"

gojson "github.com/goccy/go-json"
"github.com/goccy/go-json"
"github.com/gofiber/websocket/v2"
formatter "github.com/lavanet/lava/v3/ecosystem/cache/format"
"github.com/lavanet/lava/v3/protocol/common"
"github.com/lavanet/lava/v3/protocol/metrics"
"github.com/lavanet/lava/v3/utils"
"github.com/lavanet/lava/v3/utils/rand"
spectypes "github.com/lavanet/lava/v3/x/spec/types"
"github.com/tidwall/gjson"
)

var (
WebSocketRateLimit = -1 // rate limit requests per second on websocket connection
WebSocketBanDuration = time.Duration(0) // once rate limit is reached, will not allow new incoming message for a duration
)

type ConsumerWebsocketManager struct {
Expand Down Expand Up @@ -67,6 +74,27 @@ func (cwm *ConsumerWebsocketManager) GetWebSocketConnectionUniqueId(dappId, user
return dappId + "__" + userIp + "__" + cwm.WebsocketConnectionUID
}

func (cwm *ConsumerWebsocketManager) handleRateLimitReached(inpData []byte) ([]byte, error) {
rateLimitError := common.JsonRpcRateLimitError
id := 0
result := gjson.GetBytes(inpData, "id")
switch result.Type {
case gjson.Number:
id = int(result.Int())
case gjson.String:
idParsed, err := strconv.Atoi(result.Raw)
if err == nil {
id = idParsed
}
}
rateLimitError.Id = id
bytesRateLimitError, err := json.Marshal(rateLimitError)
if err != nil {
return []byte{}, utils.LavaFormatError("failed marshalling jsonrpc rate limit error", err)
}
return bytesRateLimitError, nil
}

func (cwm *ConsumerWebsocketManager) ListenToMessages() {
var (
messageType int
Expand Down Expand Up @@ -110,6 +138,33 @@ func (cwm *ConsumerWebsocketManager) ListenToMessages() {
}
}()

// rate limit routine
requestsPerSecond := &atomic.Uint64{}
go func() {
if WebSocketRateLimit <= 0 {
return
}
ticker := time.NewTicker(time.Second) // rate limit per second.
defer ticker.Stop()
for {
select {
case <-webSocketCtx.Done():
return
case <-ticker.C:
// check if rate limit reached, and ban is required
if WebSocketBanDuration > 0 && requestsPerSecond.Load() > uint64(WebSocketRateLimit) {
// wait the ban duration before resetting the store.
select {
case <-webSocketCtx.Done():
return
case <-time.After(WebSocketBanDuration): // just continue
}
}
requestsPerSecond.Store(0)
}
}
}()

for {
startTime := time.Now()
msgSeed := guidString + "_" + strconv.Itoa(rand.Intn(10000000000)) // use message seed with original guid and new int
Expand All @@ -125,6 +180,15 @@ func (cwm *ConsumerWebsocketManager) ListenToMessages() {
break
}

// Check rate limit is met
if WebSocketRateLimit > 0 && requestsPerSecond.Add(1) > uint64(WebSocketRateLimit) {
rateLimitResponse, err := cwm.handleRateLimitReached(msg)
if err == nil {
websocketConnWriteChan <- webSocketMsgWithType{messageType: messageType, msg: rateLimitResponse}
}
continue
}

dappID, ok := websocketConn.Locals("dapp-id").(string)
if !ok {
// Log and remove the analyze
Expand Down Expand Up @@ -160,18 +224,17 @@ func (cwm *ConsumerWebsocketManager) ListenToMessages() {
continue
}

// check whether its a normal relay / unsubscribe / unsubscribe_all otherwise its a subscription flow.
// check whether it's a normal relay / unsubscribe / unsubscribe_all otherwise its a subscription flow.
if !IsFunctionTagOfType(protocolMessage, spectypes.FUNCTION_TAG_SUBSCRIBE) {
if IsFunctionTagOfType(protocolMessage, spectypes.FUNCTION_TAG_UNSUBSCRIBE) {
err := cwm.consumerWsSubscriptionManager.Unsubscribe(webSocketCtx, protocolMessage, dappID, userIp, cwm.WebsocketConnectionUID, metricsData)
if err != nil {
utils.LavaFormatWarning("error unsubscribing from subscription", err, utils.LogAttr("GUID", webSocketCtx))
if err == common.SubscriptionNotFoundError {
msgData, err := gojson.Marshal(common.JsonRpcSubscriptionNotFoundError)
msgData, err := json.Marshal(common.JsonRpcSubscriptionNotFoundError)
if err != nil {
continue
}

websocketConnWriteChan <- webSocketMsgWithType{messageType: messageType, msg: msgData}
}
}
Expand All @@ -189,17 +252,18 @@ func (cwm *ConsumerWebsocketManager) ListenToMessages() {
formatterMsg := logger.AnalyzeWebSocketErrorAndGetFormattedMessage(websocketConn.LocalAddr().String(), utils.LavaFormatError("could not send parsed relay", err), msgSeed, msg, cwm.apiInterface, time.Since(startTime))
if formatterMsg != nil {
websocketConnWriteChan <- webSocketMsgWithType{messageType: messageType, msg: formatterMsg}
continue
}
continue
}

relayResultReply := relayResult.GetReply()
if relayResultReply != nil {
// No need to verify signature since this is already happening inside the SendParsedRelay flow
websocketConnWriteChan <- webSocketMsgWithType{messageType: messageType, msg: relayResult.GetReply().Data}
continue
} else {
utils.LavaFormatError("Relay result is nil over websocket normal request flow, should not happen", err, utils.LogAttr("messageType", messageType))
}
utils.LavaFormatError("Relay result is nil over websocket normal request flow, should not happen", err, utils.LogAttr("messageType", messageType))
continue
}
}

Expand All @@ -224,7 +288,7 @@ func (cwm *ConsumerWebsocketManager) ListenToMessages() {

// Handle the case when the error is a method not found error
if common.APINotSupportedError.Is(err) {
msgData, err := gojson.Marshal(common.JsonRpcMethodNotFoundError)
msgData, err := json.Marshal(common.JsonRpcMethodNotFoundError)
if err != nil {
continue
}
Expand Down
11 changes: 10 additions & 1 deletion protocol/chainlib/consumer_ws_subscription_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type ConsumerWSSubscriptionManager struct {
activeSubscriptionProvidersStorage *lavasession.ActiveSubscriptionProvidersStorage
currentlyPendingSubscriptions map[string]*pendingSubscriptionsBroadcastManager
lock sync.RWMutex
consumerMetricsManager *metrics.ConsumerMetricsManager
}

func NewConsumerWSSubscriptionManager(
Expand All @@ -65,6 +66,7 @@ func NewConsumerWSSubscriptionManager(
connectionType string,
chainParser ChainParser,
activeSubscriptionProvidersStorage *lavasession.ActiveSubscriptionProvidersStorage,
consumerMetricsManager *metrics.ConsumerMetricsManager,
) *ConsumerWSSubscriptionManager {
return &ConsumerWSSubscriptionManager{
connectedDapps: make(map[string]map[string]*common.SafeChannelSender[*pairingtypes.RelayReply]),
Expand All @@ -76,6 +78,7 @@ func NewConsumerWSSubscriptionManager(
relaySender: relaySender,
connectionType: connectionType,
activeSubscriptionProvidersStorage: activeSubscriptionProvidersStorage,
consumerMetricsManager: consumerMetricsManager,
}
}

Expand Down Expand Up @@ -216,6 +219,7 @@ func (cwsm *ConsumerWSSubscriptionManager) StartSubscription(

// called after send relay failure or parsing failure afterwards
onSubscriptionFailure := func() {
go cwsm.consumerMetricsManager.SetFailedWsSubscriptionRequestMetric(metricsData.ChainID, metricsData.APIType)
cwsm.failedPendingSubscription(hashedParams)
closeWebsocketRepliesChannel()
}
Expand Down Expand Up @@ -255,6 +259,7 @@ func (cwsm *ConsumerWSSubscriptionManager) StartSubscription(
// Validated there are no active subscriptions that we can use.
firstSubscriptionReply, returnWebsocketRepliesChan := cwsm.checkForActiveSubscriptionWithLock(webSocketCtx, hashedParams, protocolMessage, dappKey, websocketRepliesSafeChannelSender, closeWebsocketRepliesChannel)
if firstSubscriptionReply != nil {
go cwsm.consumerMetricsManager.SetDuplicatedWsSubscriptionRequestMetric(metricsData.ChainID, metricsData.APIType)
if returnWebsocketRepliesChan {
return firstSubscriptionReply, websocketRepliesChan, nil
}
Expand Down Expand Up @@ -412,7 +417,7 @@ func (cwsm *ConsumerWSSubscriptionManager) StartSubscription(
cwsm.successfulPendingSubscription(hashedParams)
// Need to be run once for subscription
go cwsm.listenForSubscriptionMessages(webSocketCtx, dappID, consumerIp, replyServer, hashedParams, providerAddr, metricsData, closeSubscriptionChan)

go cwsm.consumerMetricsManager.SetWsSubscriptionRequestMetric(metricsData.ChainID, metricsData.APIType)
return &reply, websocketRepliesChan, nil
}

Expand Down Expand Up @@ -524,19 +529,22 @@ func (cwsm *ConsumerWSSubscriptionManager) listenForSubscriptionMessages(
utils.LogAttr("GUID", webSocketCtx),
utils.LogAttr("hashedParams", utils.ToHexString(hashedParams)),
)
go cwsm.consumerMetricsManager.SetWsSubscriptioDisconnectRequestMetric(metricsData.ChainID, metricsData.APIType, metrics.WsDisconnectionReasonUser)
return
case <-replyServer.Context().Done():
utils.LavaFormatTrace("reply server context canceled",
utils.LogAttr("GUID", webSocketCtx),
utils.LogAttr("hashedParams", utils.ToHexString(hashedParams)),
)
go cwsm.consumerMetricsManager.SetWsSubscriptioDisconnectRequestMetric(metricsData.ChainID, metricsData.APIType, metrics.WsDisconnectionReasonConsumer)
return
default:
var reply pairingtypes.RelayReply
err := replyServer.RecvMsg(&reply)
if err != nil {
// The connection was closed by the provider
utils.LavaFormatTrace("error reading from subscription stream", utils.LogAttr("original error", err.Error()))
go cwsm.consumerMetricsManager.SetWsSubscriptioDisconnectRequestMetric(metricsData.ChainID, metricsData.APIType, metrics.WsDisconnectionReasonProvider)
return
}
err = cwsm.handleIncomingSubscriptionNodeMessage(hashedParams, &reply, providerAddr)
Expand All @@ -545,6 +553,7 @@ func (cwsm *ConsumerWSSubscriptionManager) listenForSubscriptionMessages(
utils.LogAttr("hashedParams", hashedParams),
utils.LogAttr("reply", reply),
)
go cwsm.consumerMetricsManager.SetFailedWsSubscriptionRequestMetric(metricsData.ChainID, metricsData.APIType)
return
}
}
Expand Down
Loading

0 comments on commit 5811cdf

Please sign in to comment.