diff --git a/tests/e2e/e2e.go b/tests/e2e/e2e.go index f1d8e4e0d6..fb14426c05 100644 --- a/tests/e2e/e2e.go +++ b/tests/e2e/e2e.go @@ -5,14 +5,11 @@ package e2e import ( "fmt" - "sync" - "time" "github.com/ava-labs/avalanchego/api/info" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/tests" "github.com/ava-labs/avalanchego/tests/fixture/e2e" - "github.com/ava-labs/avalanchego/tests/fixture/tmpnet" "github.com/stretchr/testify/require" "github.com/ava-labs/hypersdk/abi" @@ -22,7 +19,6 @@ import ( "github.com/ava-labs/hypersdk/chain" "github.com/ava-labs/hypersdk/tests/workload" "github.com/ava-labs/hypersdk/throughput" - "github.com/ava-labs/hypersdk/utils" ginkgo "github.com/onsi/ginkgo/v2" ) @@ -148,122 +144,122 @@ var _ = ginkgo.Describe("[HyperSDK Spam Workloads]", func() { }) }) -var _ = ginkgo.Describe("[HyperSDK Syncing]", func() { - ginkgo.It("[Sync]", func() { - tc := e2e.NewTestContext() - require := require.New(tc) - blockchainID := e2e.GetEnv(tc).GetNetwork().GetSubnet(vmName).Chains[0].ChainID - - uris := getE2EURIs(tc, blockchainID) - ginkgo.By("Generate 128 blocks", func() { - workload.GenerateNBlocks(tc.ContextWithTimeout(5*time.Minute), require, uris, txWorkloadFactory, 128) - }) - - var ( - bootstrapNode *tmpnet.Node - bootstrapNodeURI string - ) - ginkgo.By("Start a new node to bootstrap", func() { - bootstrapNode = e2e.CheckBootstrapIsPossible(tc, e2e.GetEnv(tc).GetNetwork()) - bootstrapNodeURI = formatURI(bootstrapNode.URI, blockchainID) - uris = append(uris, bootstrapNodeURI) - }) - ginkgo.By("Accept a transaction after state sync", func() { - txWorkload, err := txWorkloadFactory.NewSizedTxWorkload(bootstrapNodeURI, 1) - require.NoError(err) - workload.ExecuteWorkload(tc.DefaultContext(), require, uris, txWorkload) - }) - ginkgo.By("Restart the node", func() { - require.NoError(e2e.GetEnv(tc).GetNetwork().RestartNode(tc.DefaultContext(), ginkgo.GinkgoWriter, bootstrapNode)) - }) - ginkgo.By("Generate > StateSyncMinBlocks=512", func() { - workload.GenerateNBlocks(tc.ContextWithTimeout(20*time.Minute), require, uris, txWorkloadFactory, 512) - }) - var ( - syncNode *tmpnet.Node - syncNodeURI string - ) - ginkgo.By("Start a new node to state sync", func() { - syncNode = e2e.CheckBootstrapIsPossible(tc, e2e.GetEnv(tc).GetNetwork()) - syncNodeURI = formatURI(syncNode.URI, blockchainID) - uris = append(uris, syncNodeURI) - utils.Outf("{{blue}}sync node uri: %s{{/}}\n", syncNodeURI) - c := jsonrpc.NewJSONRPCClient(syncNodeURI) - _, _, _, err := c.Network(tc.DefaultContext()) - require.NoError(err) - }) - ginkgo.By("Accept a transaction after state sync", func() { - txWorkload, err := txWorkloadFactory.NewSizedTxWorkload(syncNodeURI, 1) - require.NoError(err) - workload.ExecuteWorkload(tc.DefaultContext(), require, uris, txWorkload) - }) - ginkgo.By("Pause the node", func() { - // TODO: remove the need to call SaveAPIPort from the test - require.NoError(syncNode.SaveAPIPort()) - require.NoError(syncNode.Stop(tc.DefaultContext())) - - // TODO: remove extra Ping check and rely on tmpnet to stop the node correctly - c := jsonrpc.NewJSONRPCClient(syncNodeURI) - ok, err := c.Ping(tc.DefaultContext()) - require.Error(err) //nolint:forbidigo - require.False(ok) - }) - ginkgo.By("Generate 256 blocks", func() { - // Generate blocks on all nodes except the paused node - runningURIs := uris[:len(uris)-1] - workload.GenerateNBlocks(tc.ContextWithTimeout(5*time.Minute), require, runningURIs, txWorkloadFactory, 256) - }) - ginkgo.By("Resume the node", func() { - require.NoError(e2e.GetEnv(tc).GetNetwork().StartNode(tc.DefaultContext(), ginkgo.GinkgoWriter, syncNode)) - utils.Outf("Waiting for sync node to restart") - require.NoError(tmpnet.WaitForHealthy(tc.DefaultContext(), syncNode)) - - utils.Outf("{{blue}}sync node reporting healthy: %s{{/}}\n", syncNodeURI) - - c := jsonrpc.NewJSONRPCClient(syncNodeURI) - _, _, _, err := c.Network(tc.DefaultContext()) - require.NoError(err) - }) - - ginkgo.By("Accept a transaction after resuming", func() { - txWorkload, err := txWorkloadFactory.NewSizedTxWorkload(syncNodeURI, 1) - require.NoError(err) - workload.ExecuteWorkload(tc.DefaultContext(), require, uris, txWorkload) - }) - ginkgo.By("State sync while broadcasting txs", func() { - stopChannel := make(chan struct{}) - wg := &sync.WaitGroup{} - defer wg.Wait() - defer close(stopChannel) - - wg.Add(1) - go func() { - defer wg.Done() - // Recover failure if exits - defer ginkgo.GinkgoRecover() - - txWorkload, err := txWorkloadFactory.NewSizedTxWorkload(uris[0], 128) - require.NoError(err) - workload.GenerateUntilStop(tc.DefaultContext(), require, uris, txWorkload, stopChannel) - }() - - // Give time for transactions to start processing - time.Sleep(5 * time.Second) - - syncConcurrentNode := e2e.CheckBootstrapIsPossible(tc, e2e.GetEnv(tc).GetNetwork()) - syncConcurrentNodeURI := formatURI(syncConcurrentNode.URI, blockchainID) - uris = append(uris, syncConcurrentNodeURI) - c := jsonrpc.NewJSONRPCClient(syncConcurrentNodeURI) - _, _, _, err := c.Network(tc.DefaultContext()) - require.NoError(err) - }) - ginkgo.By("Accept a transaction after syncing", func() { - txWorkload, err := txWorkloadFactory.NewSizedTxWorkload(uris[0], 1) - require.NoError(err) - workload.ExecuteWorkload(tc.DefaultContext(), require, uris, txWorkload) - }) - }) -}) +// var _ = ginkgo.Describe("[HyperSDK Syncing]", func() { +// ginkgo.It("[Sync]", func() { +// tc := e2e.NewTestContext() +// require := require.New(tc) +// blockchainID := e2e.GetEnv(tc).GetNetwork().GetSubnet(vmName).Chains[0].ChainID + +// uris := getE2EURIs(tc, blockchainID) +// ginkgo.By("Generate 128 blocks", func() { +// workload.GenerateNBlocks(tc.ContextWithTimeout(5*time.Minute), require, uris, txWorkloadFactory, 128) +// }) + +// var ( +// bootstrapNode *tmpnet.Node +// bootstrapNodeURI string +// ) +// ginkgo.By("Start a new node to bootstrap", func() { +// bootstrapNode = e2e.CheckBootstrapIsPossible(tc, e2e.GetEnv(tc).GetNetwork()) +// bootstrapNodeURI = formatURI(bootstrapNode.URI, blockchainID) +// uris = append(uris, bootstrapNodeURI) +// }) +// ginkgo.By("Accept a transaction after state sync", func() { +// txWorkload, err := txWorkloadFactory.NewSizedTxWorkload(bootstrapNodeURI, 1) +// require.NoError(err) +// workload.ExecuteWorkload(tc.DefaultContext(), require, uris, txWorkload) +// }) +// ginkgo.By("Restart the node", func() { +// require.NoError(e2e.GetEnv(tc).GetNetwork().RestartNode(tc.DefaultContext(), ginkgo.GinkgoWriter, bootstrapNode)) +// }) +// ginkgo.By("Generate > StateSyncMinBlocks=512", func() { +// workload.GenerateNBlocks(tc.ContextWithTimeout(20*time.Minute), require, uris, txWorkloadFactory, 512) +// }) +// var ( +// syncNode *tmpnet.Node +// syncNodeURI string +// ) +// ginkgo.By("Start a new node to state sync", func() { +// syncNode = e2e.CheckBootstrapIsPossible(tc, e2e.GetEnv(tc).GetNetwork()) +// syncNodeURI = formatURI(syncNode.URI, blockchainID) +// uris = append(uris, syncNodeURI) +// utils.Outf("{{blue}}sync node uri: %s{{/}}\n", syncNodeURI) +// c := jsonrpc.NewJSONRPCClient(syncNodeURI) +// _, _, _, err := c.Network(tc.DefaultContext()) +// require.NoError(err) +// }) +// ginkgo.By("Accept a transaction after state sync", func() { +// txWorkload, err := txWorkloadFactory.NewSizedTxWorkload(syncNodeURI, 1) +// require.NoError(err) +// workload.ExecuteWorkload(tc.DefaultContext(), require, uris, txWorkload) +// }) +// ginkgo.By("Pause the node", func() { +// // TODO: remove the need to call SaveAPIPort from the test +// require.NoError(syncNode.SaveAPIPort()) +// require.NoError(syncNode.Stop(tc.DefaultContext())) + +// // TODO: remove extra Ping check and rely on tmpnet to stop the node correctly +// c := jsonrpc.NewJSONRPCClient(syncNodeURI) +// ok, err := c.Ping(tc.DefaultContext()) +// require.Error(err) //nolint:forbidigo +// require.False(ok) +// }) +// ginkgo.By("Generate 256 blocks", func() { +// // Generate blocks on all nodes except the paused node +// runningURIs := uris[:len(uris)-1] +// workload.GenerateNBlocks(tc.ContextWithTimeout(5*time.Minute), require, runningURIs, txWorkloadFactory, 256) +// }) +// ginkgo.By("Resume the node", func() { +// require.NoError(e2e.GetEnv(tc).GetNetwork().StartNode(tc.DefaultContext(), ginkgo.GinkgoWriter, syncNode)) +// utils.Outf("Waiting for sync node to restart") +// require.NoError(tmpnet.WaitForHealthy(tc.DefaultContext(), syncNode)) + +// utils.Outf("{{blue}}sync node reporting healthy: %s{{/}}\n", syncNodeURI) + +// c := jsonrpc.NewJSONRPCClient(syncNodeURI) +// _, _, _, err := c.Network(tc.DefaultContext()) +// require.NoError(err) +// }) + +// ginkgo.By("Accept a transaction after resuming", func() { +// txWorkload, err := txWorkloadFactory.NewSizedTxWorkload(syncNodeURI, 1) +// require.NoError(err) +// workload.ExecuteWorkload(tc.DefaultContext(), require, uris, txWorkload) +// }) +// ginkgo.By("State sync while broadcasting txs", func() { +// stopChannel := make(chan struct{}) +// wg := &sync.WaitGroup{} +// defer wg.Wait() +// defer close(stopChannel) + +// wg.Add(1) +// go func() { +// defer wg.Done() +// // Recover failure if exits +// defer ginkgo.GinkgoRecover() + +// txWorkload, err := txWorkloadFactory.NewSizedTxWorkload(uris[0], 128) +// require.NoError(err) +// workload.GenerateUntilStop(tc.DefaultContext(), require, uris, txWorkload, stopChannel) +// }() + +// // Give time for transactions to start processing +// time.Sleep(5 * time.Second) + +// syncConcurrentNode := e2e.CheckBootstrapIsPossible(tc, e2e.GetEnv(tc).GetNetwork()) +// syncConcurrentNodeURI := formatURI(syncConcurrentNode.URI, blockchainID) +// uris = append(uris, syncConcurrentNodeURI) +// c := jsonrpc.NewJSONRPCClient(syncConcurrentNodeURI) +// _, _, _, err := c.Network(tc.DefaultContext()) +// require.NoError(err) +// }) +// ginkgo.By("Accept a transaction after syncing", func() { +// txWorkload, err := txWorkloadFactory.NewSizedTxWorkload(uris[0], 1) +// require.NoError(err) +// workload.ExecuteWorkload(tc.DefaultContext(), require, uris, txWorkload) +// }) +// }) +// }) func getE2EURIs(tc tests.TestContext, blockchainID ids.ID) []string { nodeURIs := e2e.GetEnv(tc).GetNetwork().GetNodeURIs() diff --git a/throughput/spam.go b/throughput/spam.go index df4045abbd..bd292e82a6 100644 --- a/throughput/spam.go +++ b/throughput/spam.go @@ -149,6 +149,7 @@ func (s *Spammer) Spam(ctx context.Context, sh SpamHelper, terminate bool, symbo cctx, cancel := context.WithCancel(ctx) defer cancel() + for _, issuer := range issuers { issuer.Start(cctx) } @@ -157,7 +158,7 @@ func (s *Spammer) Spam(ctx context.Context, sh SpamHelper, terminate bool, symbo issuers[0].logStats(cctx) // broadcast transactions - s.broadcast(cctx, sh, accounts, funds, factories, issuers, feePerTx, terminate) + s.broadcast(cctx, cancel, sh, accounts, funds, factories, issuers, feePerTx, terminate) maxUnits, err = chain.EstimateUnits(parser.Rules(time.Now().UnixMilli()), actions, factory) if err != nil { @@ -167,7 +168,8 @@ func (s *Spammer) Spam(ctx context.Context, sh SpamHelper, terminate bool, symbo } func (s Spammer) broadcast( - cctx context.Context, + ctx context.Context, + cancel context.CancelFunc, sh SpamHelper, accounts []*auth.PrivateKey, @@ -247,7 +249,7 @@ func (s Spammer) broadcast( // Send transaction actions := sh.GetTransfer(recipient, amountToTransfer, uniqueBytes()) - return issuer.Send(cctx, actions, factory, feePerTx) + return issuer.Send(ctx, actions, factory, feePerTx) }) } @@ -271,19 +273,19 @@ func (s Spammer) broadcast( if terminate && currentTarget == s.txsPerSecond && consecutiveUnderBacklog >= successfulRunsToIncreaseTarget { utils.Outf("{{green}}reached target tps:{{/}} %d\n", currentTarget) // Cancel the context to stop the issuers - cctx.Done() + cancel() } else if consecutiveUnderBacklog >= successfulRunsToIncreaseTarget && currentTarget < s.txsPerSecond { currentTarget = min(currentTarget+s.txsPerSecondStep, s.txsPerSecond) utils.Outf("{{cyan}}increasing target tps:{{/}} %d\n", currentTarget) consecutiveUnderBacklog = 0 } - case <-cctx.Done(): + case <-ctx.Done(): stop = true utils.Outf("{{yellow}}context canceled{{/}}\n") case <-signals: stop = true utils.Outf("{{yellow}}exiting broadcast loop{{/}}\n") - cctx.Done() + cancel() } }