Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Probelab tracing #2

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions beacon-chain/node/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ go_library(
"@com_github_prysmaticlabs_fastssz//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_urfave_cli_v2//:go_default_library",
"@com_github_aws_aws_sdk_go_v2_config//:go_default_library",
"@com_github_aws_aws_sdk_go_v2_service_kinesis//:go_default_library",
"@com_github_dennis_tra_go_kinesis//:go_default_library",
],
)

Expand Down Expand Up @@ -106,5 +109,8 @@ go_test(
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
"@com_github_urfave_cli_v2//:go_default_library",
"@com_github_aws_aws_sdk_go_v2_config//:go_default_library",
"@com_github_aws_aws_sdk_go_v2_service_kinesis//:go_default_library",
"@com_github_dennis_tra_go_kinesis//:go_default_library",
],
)
58 changes: 57 additions & 1 deletion beacon-chain/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"bytes"
"context"
"fmt"
"github.com/urfave/cli/v2"
"log/slog"
"net"
"os"
"os/signal"
Expand All @@ -15,7 +17,11 @@ import (
"strings"
"sync"
"syscall"
"time"

"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/kinesis"
gk "github.com/dennis-tra/go-kinesis"
"github.com/ethereum/go-ethereum/common"
"github.com/gorilla/mux"
"github.com/pkg/errors"
Expand Down Expand Up @@ -70,7 +76,6 @@ import (
"github.com/prysmaticlabs/prysm/v5/runtime/prereqs"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
"github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"
)

const testSkipPowFlag = "test-skip-pow"
Expand Down Expand Up @@ -124,6 +129,7 @@ type BeaconNode struct {
blobRetentionEpochs primitives.Epoch
verifyInitWaiter *verification.InitializerWaiter
syncChecker *initialsync.SyncChecker
kinesisProducer *gk.Producer
}

// New creates a new node instance, sets up configuration options, and registers
Expand Down Expand Up @@ -292,6 +298,11 @@ func startBaseServices(cliCtx *cli.Context, beacon *BeaconNode, depositAddress s
}
beacon.BlobStorage.WarmCache()

log.Debugln("Starting Kinesis Producer")
if err := beacon.initKinesisProducer(cliCtx); err != nil {
return nil, err
}

log.Debugln("Starting Slashing DB")
if err := beacon.startSlasherDB(cliCtx); err != nil {
return nil, errors.Wrap(err, "could not start slashing DB")
Expand Down Expand Up @@ -438,6 +449,17 @@ func (b *BeaconNode) Start() {
"version": version.Version(),
}).Info("Starting beacon node")

go func() {
if err := b.kinesisProducer.Start(b.ctx); err != nil {
log.WithError(err).Warn("Kinesis producer failed to start")
}
}()

if err := b.kinesisProducer.WaitIdle(b.ctx); err != nil {
log.WithError(err).Warn("Kinesis producer failed to become idle")
return
}

b.services.StartAll()

stop := b.stop
Expand All @@ -462,6 +484,12 @@ func (b *BeaconNode) Start() {

// Wait for stop channel to be closed.
<-stop

shutdownCtx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
if err := b.kinesisProducer.WaitStopped(shutdownCtx); err != nil {
log.WithError(err).Warn("failed waiting for kinesis producer to stop")
}
}

// Close handles graceful shutdown of the system.
Expand Down Expand Up @@ -656,6 +684,32 @@ func (b *BeaconNode) startSlasherDB(cliCtx *cli.Context) error {
return nil
}

func (b *BeaconNode) initKinesisProducer(cliCtx *cli.Context) error {
region := cliCtx.String(cmd.KinesisRegion.Name)
if region == "" {
return fmt.Errorf("no kinesis data stream region given")
}

streamName := cliCtx.String(cmd.KinesisStream.Name)
if streamName == "" {
return fmt.Errorf("no kinesis data stream name given")
}

awsConfig, err := config.LoadDefaultConfig(cliCtx.Context, config.WithDefaultRegion(region))
if err != nil {
return fmt.Errorf("load aws config: %w", err)
}
client := kinesis.NewFromConfig(awsConfig)
kinesisCfg := gk.DefaultProducerConfig()
kinesisCfg.Log = slog.Default()
b.kinesisProducer, err = gk.NewProducer(client, streamName, kinesisCfg)
if err != nil {
return fmt.Errorf("new kinesis producer: %w", err)
}

return nil
}

func (b *BeaconNode) startStateGen(ctx context.Context, bfs coverage.AvailableBlocker, fc forkchoice.ForkChoicer) error {
opts := []stategen.Option{stategen.WithAvailableBlocker(bfs)}
sg := stategen.New(b.db, fc, opts...)
Expand Down Expand Up @@ -716,6 +770,7 @@ func (b *BeaconNode) registerP2P(cliCtx *cli.Context) error {
EnableUPnP: cliCtx.Bool(cmd.EnableUPnPFlag.Name),
StateNotifier: b,
DB: b.db,
KinesisProducer: b.kinesisProducer,
ClockWaiter: b.clockWaiter,
})
if err != nil {
Expand Down Expand Up @@ -868,6 +923,7 @@ func (b *BeaconNode) registerSyncService(initialSyncComplete chan struct{}, bFil
regularsync.WithBlobStorage(b.BlobStorage),
regularsync.WithVerifierWaiter(b.verifyInitWaiter),
regularsync.WithAvailableBlocker(bFillStore),
regularsync.WithKinesisProducer(b.kinesisProducer),
)
return b.services.RegisterService(rs)
}
Expand Down
2 changes: 2 additions & 0 deletions beacon-chain/p2p/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ go_library(
"@com_github_sirupsen_logrus//:go_default_library",
"@io_opencensus_go//trace:go_default_library",
"@org_golang_google_protobuf//proto:go_default_library",
"@com_github_dennis_tra_go_kinesis//:go_default_library",
"@com_github_google_uuid//:go_default_library",
],
)

Expand Down
2 changes: 2 additions & 0 deletions beacon-chain/p2p/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package p2p

import (
gk "github.com/dennis-tra/go-kinesis"
statefeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/startup"
Expand Down Expand Up @@ -32,6 +33,7 @@ type Config struct {
DenyListCIDR []string
StateNotifier statefeed.Notifier
DB db.ReadOnlyDatabase
KinesisProducer *gk.Producer
ClockWaiter startup.ClockWaiter
}

Expand Down
35 changes: 35 additions & 0 deletions beacon-chain/p2p/handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
ma "github.com/multiformats/go-multiaddr"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers/peerdata"
prysmTime "github.com/prysmaticlabs/prysm/v5/time"
Expand Down Expand Up @@ -56,8 +57,39 @@ func (s *Service) AddConnectionHandler(reqFunc, goodByeFunc func(ctx context.Con
delete(peerMap, id)
}

eventHandler := func(n network.Network, c network.Conn, evtType string) {
evt := &traceEvent{
Type: evtType,
PeerID: s.host.ID(),
Timestamp: time.Now(),
Payload: struct {
RemotePeer string
RemoteMaddrs ma.Multiaddr
AgentVersion string
Direction string
Opened time.Time
Transient bool
}{
RemotePeer: c.RemotePeer().String(),
RemoteMaddrs: c.RemoteMultiaddr(),
AgentVersion: agentFromPid(c.RemotePeer(), s.host.Peerstore()),
Direction: c.Stat().Direction.String(),
Opened: c.Stat().Opened,
Transient: c.Stat().Transient,
},
}

go func() {
if err := s.cfg.KinesisProducer.PutRecord(s.ctx, evt); err != nil {
log.WithError(err).Warn("Failed to put event payload")
}
}()
}

s.host.Network().Notify(&network.NotifyBundle{
ConnectedF: func(net network.Network, conn network.Conn) {
eventHandler(net, conn, "CONNECTED")

remotePeer := conn.RemotePeer()
disconnectFromPeer := func() {
s.peers.SetConnectionState(remotePeer, peers.PeerDisconnecting)
Expand Down Expand Up @@ -146,6 +178,9 @@ func (s *Service) AddConnectionHandler(reqFunc, goodByeFunc func(ctx context.Con
validPeerConnection()
}()
},
DisconnectedF: func(net network.Network, conn network.Conn) {
eventHandler(net, conn, "DISCONNECTED")
},
})
}

Expand Down
8 changes: 7 additions & 1 deletion beacon-chain/p2p/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ func (s *Service) peerInspector(peerMap map[peer.ID]*pubsub.PeerScoreSnapshot) {

// Creates a list of pubsub options to configure out router with.
func (s *Service) pubsubOptions() []pubsub.Option {
tracer := gossipTracer{
host: s.host,
producer: s.cfg.KinesisProducer,
}

psOpts := []pubsub.Option{
pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign),
pubsub.WithNoAuthor(),
Expand All @@ -145,7 +150,8 @@ func (s *Service) pubsubOptions() []pubsub.Option {
pubsub.WithPeerScore(peerScoringParams()),
pubsub.WithPeerScoreInspect(s.peerInspector, time.Minute),
pubsub.WithGossipSubParams(pubsubGossipParam()),
pubsub.WithRawTracer(gossipTracer{host: s.host}),
pubsub.WithRawTracer(tracer),
pubsub.WithEventTracer(tracer),
}
return psOpts
}
Expand Down
Loading