Skip to content

Commit

Permalink
Merge branch 'main' into CNS-audit-fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
oren-lava committed Sep 22, 2024
2 parents bf3a296 + 6278d1a commit 21bd4ff
Show file tree
Hide file tree
Showing 39 changed files with 1,770 additions and 460 deletions.
7 changes: 6 additions & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,9 @@ go.work.sum

Dockerfile
docker/docker-compose.*
cmd/**/Dockerfile
cmd/**/Dockerfile

.dockerignore
.gcloudignore

Makefile
4 changes: 3 additions & 1 deletion .github/workflows/lava.yml
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,6 @@ jobs:
contents: write
packages: write
id-token: write
needs: [test-consensus, test-protocol]
runs-on: ubuntu-latest
strategy:
matrix:
Expand Down Expand Up @@ -423,6 +422,9 @@ jobs:
uses: docker/build-push-action@v5
continue-on-error: true
with:
provenance: false
sbom: false
context: .
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
file: cmd/${{ matrix.binary }}/Dockerfile
Expand Down
7 changes: 4 additions & 3 deletions cmd/lavad/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ RUN apk add --no-cache \

WORKDIR /lava

COPY go.mod go.sum ./

ENV GOCACHE=/root/.cache/go-build
RUN --mount=type=cache,target=/root/.cache/go-build \
--mount=type=cache,target=/root/go/pkg/mod \
go mod download
--mount=type=bind,source=go.sum,target=go.sum \
--mount=type=bind,source=go.mod,target=go.mod \
go mod download -x

COPY . .

Expand Down
5 changes: 4 additions & 1 deletion cmd/lavad/Dockerfile.Cosmovisor
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@ WORKDIR /lava

COPY go.mod go.sum ./

ENV GOCACHE=/root/.cache/go-build
RUN --mount=type=cache,target=/root/.cache/go-build \
--mount=type=cache,target=/root/go/pkg/mod \
go mod download
--mount=type=bind,source=go.sum,target=go.sum \
--mount=type=bind,source=go.mod,target=go.mod \
go mod download -x

COPY . .

Expand Down
9 changes: 5 additions & 4 deletions cmd/lavap/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ RUN apk add --no-cache \

WORKDIR /lava

COPY go.mod go.sum ./

ENV GOCACHE=/root/.cache/go-build
RUN --mount=type=cache,target=/root/.cache/go-build \
--mount=type=cache,target=/root/go/pkg/mod \
go mod download

--mount=type=bind,source=go.sum,target=go.sum \
--mount=type=bind,source=go.mod,target=go.mod \
go mod download -x

COPY . .

ARG GIT_VERSION
Expand Down
5 changes: 3 additions & 2 deletions protocol/chainlib/consumer_websocket_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/lavanet/lava/v3/protocol/common"
"github.com/lavanet/lava/v3/protocol/metrics"
"github.com/lavanet/lava/v3/utils"
"github.com/lavanet/lava/v3/utils/rand"
spectypes "github.com/lavanet/lava/v3/x/spec/types"
)

Expand Down Expand Up @@ -85,6 +86,7 @@ func (cwm *ConsumerWebsocketManager) ListenToMessages() {

webSocketCtx, cancelWebSocketCtx := context.WithCancel(context.Background())
guid := utils.GenerateUniqueIdentifier()
guidString := strconv.FormatUint(guid, 10)
webSocketCtx = utils.WithUniqueIdentifier(webSocketCtx, guid)
utils.LavaFormatDebug("consumer websocket manager started", utils.LogAttr("GUID", webSocketCtx))
defer func() {
Expand All @@ -110,7 +112,7 @@ func (cwm *ConsumerWebsocketManager) ListenToMessages() {

for {
startTime := time.Now()
msgSeed := logger.GetMessageSeed()
msgSeed := guidString + "_" + strconv.Itoa(rand.Intn(10000000000)) // use message seed with original guid and new int

utils.LavaFormatTrace("listening for new message from the websocket")

Expand All @@ -132,7 +134,6 @@ func (cwm *ConsumerWebsocketManager) ListenToMessages() {
}
}

msgSeed = strconv.FormatUint(guid, 10)
userIp := websocketConn.RemoteAddr().String()

logFormattedMsg := string(msg)
Expand Down
1 change: 0 additions & 1 deletion protocol/chainlib/consumer_ws_subscription_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ func (cwsm *ConsumerWSSubscriptionManager) StartSubscription(
utils.LogAttr("GUID", webSocketCtx),
utils.LogAttr("hashedParams", utils.ToHexString(hashedParams)),
utils.LogAttr("dappKey", dappKey),
utils.LogAttr("connectedDapps", cwsm.connectedDapps),
)

websocketRepliesChan := make(chan *pairingtypes.RelayReply)
Expand Down
35 changes: 9 additions & 26 deletions protocol/chainlib/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"net/http"
"strconv"
"strings"
"sync"
"time"

"github.com/goccy/go-json"
Expand Down Expand Up @@ -50,25 +49,6 @@ type GrpcNodeErrorResponse struct {
ErrorCode uint32 `json:"error_code"`
}

type grpcDescriptorCache struct {
cachedDescriptors sync.Map // method name is the key, method descriptor is the value
}

func (gdc *grpcDescriptorCache) getDescriptor(methodName string) *desc.MethodDescriptor {
if descriptor, ok := gdc.cachedDescriptors.Load(methodName); ok {
converted, success := descriptor.(*desc.MethodDescriptor) // convert to a descriptor
if success {
return converted
}
utils.LavaFormatError("Failed Converting method descriptor", nil, utils.Attribute{Key: "Method", Value: methodName})
}
return nil
}

func (gdc *grpcDescriptorCache) setDescriptor(methodName string, descriptor *desc.MethodDescriptor) {
gdc.cachedDescriptors.Store(methodName, descriptor)
}

type GrpcChainParser struct {
BaseChainParser

Expand Down Expand Up @@ -388,7 +368,7 @@ func (apil *GrpcChainListener) GetListeningAddress() string {
type GrpcChainProxy struct {
BaseChainProxy
conn grpcConnectorInterface
descriptorsCache *grpcDescriptorCache
descriptorsCache *common.SafeSyncMap[string, *desc.MethodDescriptor]
}
type grpcConnectorInterface interface {
Close()
Expand All @@ -413,7 +393,7 @@ func NewGrpcChainProxy(ctx context.Context, nConns uint, rpcProviderEndpoint lav
func newGrpcChainProxy(ctx context.Context, averageBlockTime time.Duration, parser ChainParser, conn grpcConnectorInterface, rpcProviderEndpoint lavasession.RPCProviderEndpoint) (ChainProxy, error) {
cp := &GrpcChainProxy{
BaseChainProxy: BaseChainProxy{averageBlockTime: averageBlockTime, ErrorHandler: &GRPCErrorHandler{}, ChainID: rpcProviderEndpoint.ChainID, HashedNodeUrl: chainproxy.HashURL(rpcProviderEndpoint.NodeUrls[0].Url)},
descriptorsCache: &grpcDescriptorCache{},
descriptorsCache: &common.SafeSyncMap[string, *desc.MethodDescriptor]{},
}
cp.conn = conn
if cp.conn == nil {
Expand Down Expand Up @@ -471,9 +451,12 @@ func (cp *GrpcChainProxy) SendNodeMsg(ctx context.Context, ch chan interface{},
descriptorSource := rpcInterfaceMessages.DescriptorSourceFromServer(cl)
svc, methodName := rpcInterfaceMessages.ParseSymbol(nodeMessage.Path)

// check if we have method descriptor already cached.
methodDescriptor := cp.descriptorsCache.getDescriptor(methodName)
if methodDescriptor == nil { // method descriptor not cached yet, need to fetch it and add to cache
// Check if we have method descriptor already cached.
// The reason we do Load and then Store here, instead of LoadOrStore:
// On the worst case scenario, where 2 threads are accessing the map at the same time, the same descriptor will be stored twice.
// It is better than the alternative, which is always creating the descriptor, since the outcome is the same.
methodDescriptor, found, _ := cp.descriptorsCache.Load(methodName)
if !found { // method descriptor not cached yet, need to fetch it and add to cache
var descriptor desc.Descriptor
if descriptor, err = descriptorSource.FindSymbol(svc); err != nil {
return nil, "", nil, utils.LavaFormatError("descriptorSource.FindSymbol", err, utils.Attribute{Key: "GUID", Value: ctx})
Expand All @@ -488,7 +471,7 @@ func (cp *GrpcChainProxy) SendNodeMsg(ctx context.Context, ch chan interface{},
}

// add the descriptor to the chainProxy cache
cp.descriptorsCache.setDescriptor(methodName, methodDescriptor)
cp.descriptorsCache.Store(methodName, methodDescriptor)
}

msgFactory := dynamic.NewMessageFactoryWithDefaults()
Expand Down
3 changes: 3 additions & 0 deletions protocol/chainlib/grpcproxy/grpcproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"golang.org/x/net/http2/h2c"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
Expand All @@ -28,6 +30,7 @@ type HealthReporter interface {
func NewGRPCProxy(cb ProxyCallBack, healthCheckPath string, cmdFlags common.ConsumerCmdFlags, healthReporter HealthReporter) (*grpc.Server, *http.Server, error) {
serverReceiveMaxMessageSize := grpc.MaxRecvMsgSize(MaxCallRecvMsgSize) // setting receive size to 32mb instead of 4mb default
s := grpc.NewServer(grpc.UnknownServiceHandler(makeProxyFunc(cb)), grpc.ForceServerCodec(RawBytesCodec{}), serverReceiveMaxMessageSize)
grpc_health_v1.RegisterHealthServer(s, health.NewServer())
wrappedServer := grpcweb.WrapServer(s)
handler := func(resp http.ResponseWriter, req *http.Request) {
// Set CORS headers
Expand Down
76 changes: 58 additions & 18 deletions protocol/chainlib/node_error_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
"fmt"
"io"
"net"
"os"
"net/url"
"regexp"
"strings"
"syscall"

"github.com/goccy/go-json"

Expand All @@ -23,24 +23,64 @@ import (
type genericErrorHandler struct{}

func (geh *genericErrorHandler) handleConnectionError(err error) error {
if err == net.ErrWriteToConnected {
return utils.LavaFormatProduction("Provider Side Failed Sending Message, Reason: Write to connected connection", nil)
} else if err == net.ErrClosed {
return utils.LavaFormatProduction("Provider Side Failed Sending Message, Reason: Operation on closed connection", nil)
} else if err == io.EOF {
return utils.LavaFormatProduction("Provider Side Failed Sending Message, Reason: End of input stream reached", nil)
} else if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() {
return utils.LavaFormatProduction("Provider Side Failed Sending Message, Reason: Network operation timed out", nil)
} else if _, ok := err.(*net.DNSError); ok {
return utils.LavaFormatProduction("Provider Side Failed Sending Message, Reason: DNS resolution failed", nil)
} else if opErr, ok := err.(*net.OpError); ok {
if sysErr, ok := opErr.Err.(*os.SyscallError); ok && sysErr.Err == syscall.ECONNREFUSED {
return utils.LavaFormatProduction("Provider Side Failed Sending Message, Reason: Connection refused", nil)
// Generic error message
genericMsg := "Provider Side Failed Sending Message"

switch {
case err == net.ErrWriteToConnected:
return utils.LavaFormatProduction(genericMsg+", Reason: Write to connected connection", nil)
case err == net.ErrClosed:
return utils.LavaFormatProduction(genericMsg+", Reason: Operation on closed connection", nil)
case err == io.EOF:
return utils.LavaFormatProduction(genericMsg+", Reason: End of input stream reached", nil)
case strings.Contains(err.Error(), "http: server gave HTTP response to HTTPS client"):
return utils.LavaFormatProduction(genericMsg+", Reason: misconfigured http endpoint as https", nil)
}

if opErr, ok := err.(*net.OpError); ok {
switch {
case opErr.Timeout():
return utils.LavaFormatProduction(genericMsg+", Reason: Network operation timed out", nil)
case strings.Contains(opErr.Error(), "connection refused"):
return utils.LavaFormatProduction(genericMsg+", Reason: Connection refused", nil)
default:
// Handle other OpError cases without exposing specific details
return utils.LavaFormatProduction(genericMsg+", Reason: Network operation error", nil)
}
}
if urlErr, ok := err.(*url.Error); ok {
switch {
case urlErr.Timeout():
return utils.LavaFormatProduction(genericMsg+", Reason: url.Error issue", nil)
case strings.Contains(urlErr.Error(), "connection refused"):
return utils.LavaFormatProduction(genericMsg+", Reason: Connection refused", nil)
}
} else if strings.Contains(err.Error(), "http: server gave HTTP response to HTTPS client") {
return utils.LavaFormatProduction("Provider Side Failed Sending Message, Reason: misconfigured http endpoint as https", nil)
}
return nil // do not return here so the caller will return the error inside the data so it reaches the user when it doesn't match any specific cases

if _, ok := err.(*net.DNSError); ok {
return utils.LavaFormatProduction(genericMsg+", Reason: DNS resolution failed", nil)
}

// Mask IP addresses and potential secrets in the error message, and check if any secret was found
maskedError, foundSecret := maskSensitiveInfo(err.Error())
if foundSecret {
// Log or handle the case when a secret was found, if necessary
utils.LavaFormatProduction(genericMsg+maskedError, nil)
}
return nil
}

func maskSensitiveInfo(errMsg string) (string, bool) {
foundSecret := false

// Mask IP addresses
ipRegex := regexp.MustCompile(`\b(?:\d{1,3}\.){3}\d{1,3}\b`)
if ipRegex.MatchString(errMsg) {
foundSecret = true
errMsg = ipRegex.ReplaceAllString(errMsg, "[IP_ADDRESS]")
}

return errMsg, foundSecret
}

func (geh *genericErrorHandler) handleGenericErrors(ctx context.Context, nodeError error) error {
Expand Down
22 changes: 22 additions & 0 deletions protocol/chainlib/node_error_handler_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package chainlib

import (
"bytes"
"context"
"errors"
"io"
"net"
"net/http"
"os"
"syscall"
"testing"
"time"

"github.com/lavanet/lava/v3/utils"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -78,3 +81,22 @@ func TestNodeErrorHandlerGenericErrors(t *testing.T) {
err = neh.handleGenericErrors(ctx, errors.New("dummy error"))
require.Equal(t, err, nil)
}

func TestNodeErrorHandlerTimeout(t *testing.T) {
httpClient := &http.Client{
Timeout: 5 * time.Minute, // we are doing a timeout by request
}
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
msgBuffer := bytes.NewBuffer([]byte{1, 2, 3})
req, err := http.NewRequestWithContext(ctx, "test", "http://0.0.0.0:6789", msgBuffer)
require.NoError(t, err)
_, err = httpClient.Do(req)
require.Error(t, err)
utils.LavaFormatDebug(err.Error())
genericHandler := genericErrorHandler{}
bctx := context.Background()
ret := genericHandler.handleGenericErrors(bctx, err)
utils.LavaFormatDebug(ret.Error())
require.NotContains(t, ret.Error(), "http://0.0.0.0:6789")
}
16 changes: 12 additions & 4 deletions protocol/chainlib/provider_node_subscription_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func TestSubscriptionManager_MultipleParallelSubscriptionsWithTheSameParamsAndNo
t.Run(play.name, func(t *testing.T) {
ts := SetupForTests(t, 1, play.specId, "../../")

wg := sync.WaitGroup{}
sentMessageToNodeChannel := make(chan bool, 1)
// msgCount := 0
upgrader := websocket.Upgrader{}
first := true
Expand Down Expand Up @@ -373,7 +373,12 @@ func TestSubscriptionManager_MultipleParallelSubscriptionsWithTheSameParamsAndNo
return
}
utils.LavaFormatDebug("write message")
wg.Done()
select {
case sentMessageToNodeChannel <- true:
utils.LavaFormatDebug("sent message to node")
default:
utils.LavaFormatDebug("unable to communicate with the test")
}

// Write the first reply
err = conn.WriteMessage(messageType, play.subscriptionFirstReply)
Expand Down Expand Up @@ -405,7 +410,6 @@ func TestSubscriptionManager_MultipleParallelSubscriptionsWithTheSameParamsAndNo
mockRpcProvider := &RelayFinalizationBlocksHandlerMock{}
pnsm := NewProviderNodeSubscriptionManager(chainRouter, chainParser, mockRpcProvider, ts.Providers[0].SK)

wg.Add(1)
wgAllIds := sync.WaitGroup{}
wgAllIds.Add(9)
errors := []error{}
Expand All @@ -429,7 +433,11 @@ func TestSubscriptionManager_MultipleParallelSubscriptionsWithTheSameParamsAndNo

utils.LavaFormatDebug("Waiting wait group")
wgAllIds.Wait()
wg.Wait() // Make sure the subscription manager sent a message to the node
select {
case <-sentMessageToNodeChannel: // Make sure the subscription manager sent a message to the node
case <-time.After(time.Second * 10):
require.Fail(t, "timeout waiting for message to node")
}
// make sure we had only one error, on the first subscription attempt
require.Len(t, errors, 1)

Expand Down
Loading

0 comments on commit 21bd4ff

Please sign in to comment.