From eef06ba58d17af4b8fad5bb6e48a75f2c4588560 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Wed, 7 Feb 2024 17:46:57 -0600 Subject: [PATCH 1/4] Testing setup for relay with large backlog on connect --- relay/relay_stress_test.go | 171 +++++++++++++++++++++++++++++++++++++ 1 file changed, 171 insertions(+) create mode 100644 relay/relay_stress_test.go diff --git a/relay/relay_stress_test.go b/relay/relay_stress_test.go new file mode 100644 index 0000000000..8690bab21f --- /dev/null +++ b/relay/relay_stress_test.go @@ -0,0 +1,171 @@ +package relay + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/ethereum/go-ethereum/log" + "github.com/offchainlabs/nitro/arbos/arbostypes" + "github.com/offchainlabs/nitro/arbutil" + "github.com/offchainlabs/nitro/broadcastclient" + "github.com/offchainlabs/nitro/broadcaster" + "github.com/offchainlabs/nitro/broadcaster/message" + "github.com/offchainlabs/nitro/util/stopwaiter" + "github.com/offchainlabs/nitro/wsbroadcastserver" +) + +type RelayBroadcastClient struct { + stopwaiter.StopWaiter + broadcaster *broadcaster.Broadcaster +} + +func NewRelayBroadcastClient(config *Config, feedErrChan chan error) *RelayBroadcastClient { + dataSignerErr := func([]byte) ([]byte, error) { + return nil, errors.New("relay attempted to sign feed message") + } + return &RelayBroadcastClient{ + broadcaster: broadcaster.NewBroadcaster(func() *wsbroadcastserver.BroadcasterConfig { return &config.Node.Feed.Output }, config.Chain.ID, feedErrChan, dataSignerErr), + } +} + +func (r *RelayBroadcastClient) Start(ctx context.Context) error { + r.StopWaiter.Start(ctx, r) + err := r.broadcaster.Initialize() + if err != nil { + return errors.New("broadcast unable to initialize") + } + err = r.broadcaster.Start(ctx) + if err != nil { + return errors.New("broadcast unable to start") + } + return nil +} + +func (r *RelayBroadcastClient) PopulateFeedBacklogByNumber(ctx context.Context, backlogSize, l2MsgSize int) { + was := r.broadcaster.GetCachedMessageCount() + var seqNums []arbutil.MessageIndex + for i := was; i < was+backlogSize; i++ { + seqNums = append(seqNums, arbutil.MessageIndex(i)) + } + + messages := make([]*message.BroadcastFeedMessage, 0, len(seqNums)) + for _, seqNum := range seqNums { + broadcastMessage := &message.BroadcastFeedMessage{ + SequenceNumber: seqNum, + Message: arbostypes.MessageWithMetadata{ + Message: &arbostypes.L1IncomingMessage{ + L2msg: make([]byte, l2MsgSize), + }, + }, + } + messages = append(messages, broadcastMessage) + } + r.broadcaster.BroadcastFeedMessages(messages) + waitForBacklog(r.broadcaster, was, was+backlogSize) +} + +func waitForBacklog(b *broadcaster.Broadcaster, was, target int) { + time.Sleep(time.Second) + prevCount := was + for count := b.GetCachedMessageCount(); count != target; count = b.GetCachedMessageCount() { + if prevCount == count { + log.Warn("unable to populate feed backlog. Cached message count did not increment") + break + } else { + prevCount = count + } + log.Info("populating feed backlog to stress test relay", "current", count, "target", target) + time.Sleep(5 * time.Second) + } +} + +type dummyTxStreamer struct { + id int + logConnection bool +} + +func (ts *dummyTxStreamer) AddBroadcastMessages(feedMessages []*message.BroadcastFeedMessage) error { + // to mimic latency of txstreamer + time.Sleep(50 * time.Millisecond) + if !ts.logConnection { + ts.logConnection = true + log.Info("test client is succesfully receiving messages", "client_Id", ts.id, "msg_size", feedMessages[0].Size()) + } + return nil +} + +func largeBacklogRelayTestImpl(t *testing.T, numClients, backlogSize, l2MsgSize int, clientsRegDeadline time.Duration, relayBCPort, relayPort string) { + // total size of the backlog = backlogSize * (l2MsgSize + 160) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + relayBroadcastClientConfig := &ConfigDefault + relayBroadcastClientConfig.Node.Feed.Output.Addr = "127.0.0.1" + relayBroadcastClientConfig.Node.Feed.Output.Port = relayBCPort + relayBroadcastClientConfig.Node.Feed.Output.ClientTimeout = 5 * time.Minute + relayBroadcastClient := NewRelayBroadcastClient(relayBroadcastClientConfig, nil) + err := relayBroadcastClient.Start(ctx) + if err != nil { + t.Fatalf("error starting relay's broadcast client %v", err) + } + relayBroadcastClient.PopulateFeedBacklogByNumber(ctx, backlogSize, l2MsgSize) + + relayConfig := &ConfigDefault + relayConfig.Node.Feed.Input.URL = []string{"ws://127.0.0.1:" + relayBCPort} + relayConfig.Node.Feed.Output.Addr = "127.0.0.1" + relayConfig.Node.Feed.Output.Port = relayPort + relayConfig.Node.Feed.Output.ClientTimeout = 5 * time.Minute + relay, err := NewRelay(relayConfig, nil) + if err != nil { + t.Fatalf("error initializing relay %v", err) + } + err = relay.Start(ctx) + if err != nil { + t.Fatalf("error starting relay %v", err) + } + waitForBacklog(relay.broadcaster, 0, backlogSize) + + relayURL := "ws://" + relay.GetListenerAddr().String() + clientConfig := broadcastclient.DefaultTestConfig + clientConfig.Timeout = 5 * time.Minute + var streamers []*dummyTxStreamer + for i := 0; i < numClients; i++ { + ts := &dummyTxStreamer{id: i} + streamers = append(streamers, ts) + client, err := broadcastclient.NewBroadcastClient(func() *broadcastclient.Config { return &clientConfig }, relayURL, relayConfig.Chain.ID, 0, ts, nil, nil, nil, func(_ int32) {}) + if err != nil { + t.FailNow() + } + client.Start(ctx) + } + + // wait for all clients to atleast connect once + time.Sleep(clientsRegDeadline) + connected := 0 + for _, ts := range streamers { + if ts.logConnection { + connected++ + } + } + if int32(connected) != int32(numClients) { + t.Fail() + } + log.Info("number of clients connected", "expected", numClients, "got", connected) +} + +func TestRelayLargeBacklog16MB(t *testing.T) { + // t.Skip("This test is for manual inspection and would be unreliable in CI even if automated") + largeBacklogRelayTestImpl(t, 150, 100000, 0, 40*time.Second, "9642", "9643") +} + +func TestRelayLargeBacklog50MB(t *testing.T) { + // t.Skip("This test is for manual inspection and would be unreliable in CI even if automated") + largeBacklogRelayTestImpl(t, 150, 100000, 340, 40*time.Second, "9644", "9645") +} + +func TestRelayLargeBacklog100MB(t *testing.T) { + // t.Skip("This test is for manual inspection and would be unreliable in CI even if automated") + largeBacklogRelayTestImpl(t, 150, 100000, 840, 40*time.Second, "9646", "9647") +} From 4c6e961e1c8a6c20b84ea7c14108d3137b301ad5 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Wed, 7 Feb 2024 17:47:57 -0600 Subject: [PATCH 2/4] minor fix --- relay/relay_stress_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/relay/relay_stress_test.go b/relay/relay_stress_test.go index 8690bab21f..4a374da32a 100644 --- a/relay/relay_stress_test.go +++ b/relay/relay_stress_test.go @@ -156,16 +156,16 @@ func largeBacklogRelayTestImpl(t *testing.T, numClients, backlogSize, l2MsgSize } func TestRelayLargeBacklog16MB(t *testing.T) { - // t.Skip("This test is for manual inspection and would be unreliable in CI even if automated") + t.Skip("This test is for manual inspection and would be unreliable in CI even if automated") largeBacklogRelayTestImpl(t, 150, 100000, 0, 40*time.Second, "9642", "9643") } func TestRelayLargeBacklog50MB(t *testing.T) { - // t.Skip("This test is for manual inspection and would be unreliable in CI even if automated") + t.Skip("This test is for manual inspection and would be unreliable in CI even if automated") largeBacklogRelayTestImpl(t, 150, 100000, 340, 40*time.Second, "9644", "9645") } func TestRelayLargeBacklog100MB(t *testing.T) { - // t.Skip("This test is for manual inspection and would be unreliable in CI even if automated") + t.Skip("This test is for manual inspection and would be unreliable in CI even if automated") largeBacklogRelayTestImpl(t, 150, 100000, 840, 40*time.Second, "9646", "9647") } From dba26a3878bbc9b187b934fde82ab44e01e94e99 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Thu, 8 Feb 2024 10:08:38 -0600 Subject: [PATCH 3/4] refactor --- relay/relay_stress_test.go | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/relay/relay_stress_test.go b/relay/relay_stress_test.go index 4a374da32a..210d365629 100644 --- a/relay/relay_stress_test.go +++ b/relay/relay_stress_test.go @@ -16,21 +16,21 @@ import ( "github.com/offchainlabs/nitro/wsbroadcastserver" ) -type RelayBroadcastClient struct { +type DummyUpStream struct { stopwaiter.StopWaiter broadcaster *broadcaster.Broadcaster } -func NewRelayBroadcastClient(config *Config, feedErrChan chan error) *RelayBroadcastClient { +func NewDummyUpStream(config *Config, feedErrChan chan error) *DummyUpStream { dataSignerErr := func([]byte) ([]byte, error) { return nil, errors.New("relay attempted to sign feed message") } - return &RelayBroadcastClient{ + return &DummyUpStream{ broadcaster: broadcaster.NewBroadcaster(func() *wsbroadcastserver.BroadcasterConfig { return &config.Node.Feed.Output }, config.Chain.ID, feedErrChan, dataSignerErr), } } -func (r *RelayBroadcastClient) Start(ctx context.Context) error { +func (r *DummyUpStream) Start(ctx context.Context) error { r.StopWaiter.Start(ctx, r) err := r.broadcaster.Initialize() if err != nil { @@ -43,7 +43,7 @@ func (r *RelayBroadcastClient) Start(ctx context.Context) error { return nil } -func (r *RelayBroadcastClient) PopulateFeedBacklogByNumber(ctx context.Context, backlogSize, l2MsgSize int) { +func (r *DummyUpStream) PopulateFeedBacklogByNumber(ctx context.Context, backlogSize, l2MsgSize int) { was := r.broadcaster.GetCachedMessageCount() var seqNums []arbutil.MessageIndex for i := was; i < was+backlogSize; i++ { @@ -96,24 +96,24 @@ func (ts *dummyTxStreamer) AddBroadcastMessages(feedMessages []*message.Broadcas return nil } -func largeBacklogRelayTestImpl(t *testing.T, numClients, backlogSize, l2MsgSize int, clientsRegDeadline time.Duration, relayBCPort, relayPort string) { +func largeBacklogRelayTestImpl(t *testing.T, numClients, backlogSize, l2MsgSize int, clientsRegDeadline time.Duration, upStreamPort, relayPort string) { // total size of the backlog = backlogSize * (l2MsgSize + 160) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - relayBroadcastClientConfig := &ConfigDefault - relayBroadcastClientConfig.Node.Feed.Output.Addr = "127.0.0.1" - relayBroadcastClientConfig.Node.Feed.Output.Port = relayBCPort - relayBroadcastClientConfig.Node.Feed.Output.ClientTimeout = 5 * time.Minute - relayBroadcastClient := NewRelayBroadcastClient(relayBroadcastClientConfig, nil) - err := relayBroadcastClient.Start(ctx) + upStreamConfig := &ConfigDefault + upStreamConfig.Node.Feed.Output.Addr = "127.0.0.1" + upStreamConfig.Node.Feed.Output.Port = upStreamPort + upStreamConfig.Node.Feed.Output.ClientTimeout = 5 * time.Minute + upStream := NewDummyUpStream(upStreamConfig, nil) + err := upStream.Start(ctx) if err != nil { t.Fatalf("error starting relay's broadcast client %v", err) } - relayBroadcastClient.PopulateFeedBacklogByNumber(ctx, backlogSize, l2MsgSize) + upStream.PopulateFeedBacklogByNumber(ctx, backlogSize, l2MsgSize) relayConfig := &ConfigDefault - relayConfig.Node.Feed.Input.URL = []string{"ws://127.0.0.1:" + relayBCPort} + relayConfig.Node.Feed.Input.URL = []string{"ws://127.0.0.1:" + upStreamPort} relayConfig.Node.Feed.Output.Addr = "127.0.0.1" relayConfig.Node.Feed.Output.Port = relayPort relayConfig.Node.Feed.Output.ClientTimeout = 5 * time.Minute From 6da0dd589b8278de1eed5bf789e565b9c9dfcf0e Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Thu, 8 Feb 2024 11:15:41 -0600 Subject: [PATCH 4/4] minor fix --- relay/relay_stress_test.go | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/relay/relay_stress_test.go b/relay/relay_stress_test.go index 210d365629..9a8875a429 100644 --- a/relay/relay_stress_test.go +++ b/relay/relay_stress_test.go @@ -96,7 +96,7 @@ func (ts *dummyTxStreamer) AddBroadcastMessages(feedMessages []*message.Broadcas return nil } -func largeBacklogRelayTestImpl(t *testing.T, numClients, backlogSize, l2MsgSize int, clientsRegDeadline time.Duration, upStreamPort, relayPort string) { +func largeBacklogRelayTestImpl(t *testing.T, numClients, backlogSize, l2MsgSize int, connectDeadline time.Duration, upStreamPort, relayPort string) { // total size of the backlog = backlogSize * (l2MsgSize + 160) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -110,6 +110,7 @@ func largeBacklogRelayTestImpl(t *testing.T, numClients, backlogSize, l2MsgSize if err != nil { t.Fatalf("error starting relay's broadcast client %v", err) } + defer upStream.StopOnly() upStream.PopulateFeedBacklogByNumber(ctx, backlogSize, l2MsgSize) relayConfig := &ConfigDefault @@ -125,24 +126,34 @@ func largeBacklogRelayTestImpl(t *testing.T, numClients, backlogSize, l2MsgSize if err != nil { t.Fatalf("error starting relay %v", err) } + defer relay.StopOnly() waitForBacklog(relay.broadcaster, 0, backlogSize) relayURL := "ws://" + relay.GetListenerAddr().String() clientConfig := broadcastclient.DefaultTestConfig clientConfig.Timeout = 5 * time.Minute + fatalErrChan := make(chan error, 10) var streamers []*dummyTxStreamer for i := 0; i < numClients; i++ { ts := &dummyTxStreamer{id: i} streamers = append(streamers, ts) - client, err := broadcastclient.NewBroadcastClient(func() *broadcastclient.Config { return &clientConfig }, relayURL, relayConfig.Chain.ID, 0, ts, nil, nil, nil, func(_ int32) {}) + client, err := broadcastclient.NewBroadcastClient(func() *broadcastclient.Config { return &clientConfig }, relayURL, relayConfig.Chain.ID, 0, ts, nil, fatalErrChan, nil, func(_ int32) {}) if err != nil { t.FailNow() } client.Start(ctx) + defer client.StopOnly() } // wait for all clients to atleast connect once - time.Sleep(clientsRegDeadline) + connectDeadlineTimer := time.NewTicker(connectDeadline) + defer connectDeadlineTimer.Stop() + select { + case err := <-fatalErrChan: + t.Fatalf("a client received a fatal error %v", err) + case <-connectDeadlineTimer.C: + } + connected := 0 for _, ts := range streamers { if ts.logConnection {