Skip to content

Commit

Permalink
include cancel
Browse files Browse the repository at this point in the history
  • Loading branch information
samliok committed Oct 10, 2024
1 parent b913e83 commit 4038440
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 126 deletions.
236 changes: 116 additions & 120 deletions tests/e2e/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,11 @@ package e2e

import (
"fmt"
"sync"
"time"

"github.com/ava-labs/avalanchego/api/info"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/tests"
"github.com/ava-labs/avalanchego/tests/fixture/e2e"
"github.com/ava-labs/avalanchego/tests/fixture/tmpnet"
"github.com/stretchr/testify/require"

"github.com/ava-labs/hypersdk/abi"
Expand All @@ -22,7 +19,6 @@ import (
"github.com/ava-labs/hypersdk/chain"
"github.com/ava-labs/hypersdk/tests/workload"
"github.com/ava-labs/hypersdk/throughput"
"github.com/ava-labs/hypersdk/utils"

ginkgo "github.com/onsi/ginkgo/v2"
)
Expand Down Expand Up @@ -148,122 +144,122 @@ var _ = ginkgo.Describe("[HyperSDK Spam Workloads]", func() {
})
})

var _ = ginkgo.Describe("[HyperSDK Syncing]", func() {
ginkgo.It("[Sync]", func() {
tc := e2e.NewTestContext()
require := require.New(tc)
blockchainID := e2e.GetEnv(tc).GetNetwork().GetSubnet(vmName).Chains[0].ChainID

uris := getE2EURIs(tc, blockchainID)
ginkgo.By("Generate 128 blocks", func() {
workload.GenerateNBlocks(tc.ContextWithTimeout(5*time.Minute), require, uris, txWorkloadFactory, 128)
})

var (
bootstrapNode *tmpnet.Node
bootstrapNodeURI string
)
ginkgo.By("Start a new node to bootstrap", func() {
bootstrapNode = e2e.CheckBootstrapIsPossible(tc, e2e.GetEnv(tc).GetNetwork())
bootstrapNodeURI = formatURI(bootstrapNode.URI, blockchainID)
uris = append(uris, bootstrapNodeURI)
})
ginkgo.By("Accept a transaction after state sync", func() {
txWorkload, err := txWorkloadFactory.NewSizedTxWorkload(bootstrapNodeURI, 1)
require.NoError(err)
workload.ExecuteWorkload(tc.DefaultContext(), require, uris, txWorkload)
})
ginkgo.By("Restart the node", func() {
require.NoError(e2e.GetEnv(tc).GetNetwork().RestartNode(tc.DefaultContext(), ginkgo.GinkgoWriter, bootstrapNode))
})
ginkgo.By("Generate > StateSyncMinBlocks=512", func() {
workload.GenerateNBlocks(tc.ContextWithTimeout(20*time.Minute), require, uris, txWorkloadFactory, 512)
})
var (
syncNode *tmpnet.Node
syncNodeURI string
)
ginkgo.By("Start a new node to state sync", func() {
syncNode = e2e.CheckBootstrapIsPossible(tc, e2e.GetEnv(tc).GetNetwork())
syncNodeURI = formatURI(syncNode.URI, blockchainID)
uris = append(uris, syncNodeURI)
utils.Outf("{{blue}}sync node uri: %s{{/}}\n", syncNodeURI)
c := jsonrpc.NewJSONRPCClient(syncNodeURI)
_, _, _, err := c.Network(tc.DefaultContext())
require.NoError(err)
})
ginkgo.By("Accept a transaction after state sync", func() {
txWorkload, err := txWorkloadFactory.NewSizedTxWorkload(syncNodeURI, 1)
require.NoError(err)
workload.ExecuteWorkload(tc.DefaultContext(), require, uris, txWorkload)
})
ginkgo.By("Pause the node", func() {
// TODO: remove the need to call SaveAPIPort from the test
require.NoError(syncNode.SaveAPIPort())
require.NoError(syncNode.Stop(tc.DefaultContext()))

// TODO: remove extra Ping check and rely on tmpnet to stop the node correctly
c := jsonrpc.NewJSONRPCClient(syncNodeURI)
ok, err := c.Ping(tc.DefaultContext())
require.Error(err) //nolint:forbidigo
require.False(ok)
})
ginkgo.By("Generate 256 blocks", func() {
// Generate blocks on all nodes except the paused node
runningURIs := uris[:len(uris)-1]
workload.GenerateNBlocks(tc.ContextWithTimeout(5*time.Minute), require, runningURIs, txWorkloadFactory, 256)
})
ginkgo.By("Resume the node", func() {
require.NoError(e2e.GetEnv(tc).GetNetwork().StartNode(tc.DefaultContext(), ginkgo.GinkgoWriter, syncNode))
utils.Outf("Waiting for sync node to restart")
require.NoError(tmpnet.WaitForHealthy(tc.DefaultContext(), syncNode))

utils.Outf("{{blue}}sync node reporting healthy: %s{{/}}\n", syncNodeURI)

c := jsonrpc.NewJSONRPCClient(syncNodeURI)
_, _, _, err := c.Network(tc.DefaultContext())
require.NoError(err)
})

ginkgo.By("Accept a transaction after resuming", func() {
txWorkload, err := txWorkloadFactory.NewSizedTxWorkload(syncNodeURI, 1)
require.NoError(err)
workload.ExecuteWorkload(tc.DefaultContext(), require, uris, txWorkload)
})
ginkgo.By("State sync while broadcasting txs", func() {
stopChannel := make(chan struct{})
wg := &sync.WaitGroup{}
defer wg.Wait()
defer close(stopChannel)

wg.Add(1)
go func() {
defer wg.Done()
// Recover failure if exits
defer ginkgo.GinkgoRecover()

txWorkload, err := txWorkloadFactory.NewSizedTxWorkload(uris[0], 128)
require.NoError(err)
workload.GenerateUntilStop(tc.DefaultContext(), require, uris, txWorkload, stopChannel)
}()

// Give time for transactions to start processing
time.Sleep(5 * time.Second)

syncConcurrentNode := e2e.CheckBootstrapIsPossible(tc, e2e.GetEnv(tc).GetNetwork())
syncConcurrentNodeURI := formatURI(syncConcurrentNode.URI, blockchainID)
uris = append(uris, syncConcurrentNodeURI)
c := jsonrpc.NewJSONRPCClient(syncConcurrentNodeURI)
_, _, _, err := c.Network(tc.DefaultContext())
require.NoError(err)
})
ginkgo.By("Accept a transaction after syncing", func() {
txWorkload, err := txWorkloadFactory.NewSizedTxWorkload(uris[0], 1)
require.NoError(err)
workload.ExecuteWorkload(tc.DefaultContext(), require, uris, txWorkload)
})
})
})
// var _ = ginkgo.Describe("[HyperSDK Syncing]", func() {
// ginkgo.It("[Sync]", func() {
// tc := e2e.NewTestContext()
// require := require.New(tc)
// blockchainID := e2e.GetEnv(tc).GetNetwork().GetSubnet(vmName).Chains[0].ChainID

// uris := getE2EURIs(tc, blockchainID)
// ginkgo.By("Generate 128 blocks", func() {
// workload.GenerateNBlocks(tc.ContextWithTimeout(5*time.Minute), require, uris, txWorkloadFactory, 128)
// })

// var (
// bootstrapNode *tmpnet.Node
// bootstrapNodeURI string
// )
// ginkgo.By("Start a new node to bootstrap", func() {
// bootstrapNode = e2e.CheckBootstrapIsPossible(tc, e2e.GetEnv(tc).GetNetwork())
// bootstrapNodeURI = formatURI(bootstrapNode.URI, blockchainID)
// uris = append(uris, bootstrapNodeURI)
// })
// ginkgo.By("Accept a transaction after state sync", func() {
// txWorkload, err := txWorkloadFactory.NewSizedTxWorkload(bootstrapNodeURI, 1)
// require.NoError(err)
// workload.ExecuteWorkload(tc.DefaultContext(), require, uris, txWorkload)
// })
// ginkgo.By("Restart the node", func() {
// require.NoError(e2e.GetEnv(tc).GetNetwork().RestartNode(tc.DefaultContext(), ginkgo.GinkgoWriter, bootstrapNode))
// })
// ginkgo.By("Generate > StateSyncMinBlocks=512", func() {
// workload.GenerateNBlocks(tc.ContextWithTimeout(20*time.Minute), require, uris, txWorkloadFactory, 512)
// })
// var (
// syncNode *tmpnet.Node
// syncNodeURI string
// )
// ginkgo.By("Start a new node to state sync", func() {
// syncNode = e2e.CheckBootstrapIsPossible(tc, e2e.GetEnv(tc).GetNetwork())
// syncNodeURI = formatURI(syncNode.URI, blockchainID)
// uris = append(uris, syncNodeURI)
// utils.Outf("{{blue}}sync node uri: %s{{/}}\n", syncNodeURI)
// c := jsonrpc.NewJSONRPCClient(syncNodeURI)
// _, _, _, err := c.Network(tc.DefaultContext())
// require.NoError(err)
// })
// ginkgo.By("Accept a transaction after state sync", func() {
// txWorkload, err := txWorkloadFactory.NewSizedTxWorkload(syncNodeURI, 1)
// require.NoError(err)
// workload.ExecuteWorkload(tc.DefaultContext(), require, uris, txWorkload)
// })
// ginkgo.By("Pause the node", func() {
// // TODO: remove the need to call SaveAPIPort from the test
// require.NoError(syncNode.SaveAPIPort())
// require.NoError(syncNode.Stop(tc.DefaultContext()))

// // TODO: remove extra Ping check and rely on tmpnet to stop the node correctly
// c := jsonrpc.NewJSONRPCClient(syncNodeURI)
// ok, err := c.Ping(tc.DefaultContext())
// require.Error(err) //nolint:forbidigo
// require.False(ok)
// })
// ginkgo.By("Generate 256 blocks", func() {
// // Generate blocks on all nodes except the paused node
// runningURIs := uris[:len(uris)-1]
// workload.GenerateNBlocks(tc.ContextWithTimeout(5*time.Minute), require, runningURIs, txWorkloadFactory, 256)
// })
// ginkgo.By("Resume the node", func() {
// require.NoError(e2e.GetEnv(tc).GetNetwork().StartNode(tc.DefaultContext(), ginkgo.GinkgoWriter, syncNode))
// utils.Outf("Waiting for sync node to restart")
// require.NoError(tmpnet.WaitForHealthy(tc.DefaultContext(), syncNode))

// utils.Outf("{{blue}}sync node reporting healthy: %s{{/}}\n", syncNodeURI)

// c := jsonrpc.NewJSONRPCClient(syncNodeURI)
// _, _, _, err := c.Network(tc.DefaultContext())
// require.NoError(err)
// })

// ginkgo.By("Accept a transaction after resuming", func() {
// txWorkload, err := txWorkloadFactory.NewSizedTxWorkload(syncNodeURI, 1)
// require.NoError(err)
// workload.ExecuteWorkload(tc.DefaultContext(), require, uris, txWorkload)
// })
// ginkgo.By("State sync while broadcasting txs", func() {
// stopChannel := make(chan struct{})
// wg := &sync.WaitGroup{}
// defer wg.Wait()
// defer close(stopChannel)

// wg.Add(1)
// go func() {
// defer wg.Done()
// // Recover failure if exits
// defer ginkgo.GinkgoRecover()

// txWorkload, err := txWorkloadFactory.NewSizedTxWorkload(uris[0], 128)
// require.NoError(err)
// workload.GenerateUntilStop(tc.DefaultContext(), require, uris, txWorkload, stopChannel)
// }()

// // Give time for transactions to start processing
// time.Sleep(5 * time.Second)

// syncConcurrentNode := e2e.CheckBootstrapIsPossible(tc, e2e.GetEnv(tc).GetNetwork())
// syncConcurrentNodeURI := formatURI(syncConcurrentNode.URI, blockchainID)
// uris = append(uris, syncConcurrentNodeURI)
// c := jsonrpc.NewJSONRPCClient(syncConcurrentNodeURI)
// _, _, _, err := c.Network(tc.DefaultContext())
// require.NoError(err)
// })
// ginkgo.By("Accept a transaction after syncing", func() {
// txWorkload, err := txWorkloadFactory.NewSizedTxWorkload(uris[0], 1)
// require.NoError(err)
// workload.ExecuteWorkload(tc.DefaultContext(), require, uris, txWorkload)
// })
// })
// })

func getE2EURIs(tc tests.TestContext, blockchainID ids.ID) []string {
nodeURIs := e2e.GetEnv(tc).GetNetwork().GetNodeURIs()
Expand Down
14 changes: 8 additions & 6 deletions throughput/spam.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func (s *Spammer) Spam(ctx context.Context, sh SpamHelper, terminate bool, symbo

cctx, cancel := context.WithCancel(ctx)
defer cancel()

for _, issuer := range issuers {
issuer.Start(cctx)
}
Expand All @@ -157,7 +158,7 @@ func (s *Spammer) Spam(ctx context.Context, sh SpamHelper, terminate bool, symbo
issuers[0].logStats(cctx)

// broadcast transactions
s.broadcast(cctx, sh, accounts, funds, factories, issuers, feePerTx, terminate)
s.broadcast(cctx, cancel, sh, accounts, funds, factories, issuers, feePerTx, terminate)

maxUnits, err = chain.EstimateUnits(parser.Rules(time.Now().UnixMilli()), actions, factory)
if err != nil {
Expand All @@ -167,7 +168,8 @@ func (s *Spammer) Spam(ctx context.Context, sh SpamHelper, terminate bool, symbo
}

func (s Spammer) broadcast(
cctx context.Context,
ctx context.Context,
cancel context.CancelFunc,
sh SpamHelper,
accounts []*auth.PrivateKey,

Expand Down Expand Up @@ -247,7 +249,7 @@ func (s Spammer) broadcast(

// Send transaction
actions := sh.GetTransfer(recipient, amountToTransfer, uniqueBytes())
return issuer.Send(cctx, actions, factory, feePerTx)
return issuer.Send(ctx, actions, factory, feePerTx)
})
}

Expand All @@ -271,19 +273,19 @@ func (s Spammer) broadcast(
if terminate && currentTarget == s.txsPerSecond && consecutiveUnderBacklog >= successfulRunsToIncreaseTarget {
utils.Outf("{{green}}reached target tps:{{/}} %d\n", currentTarget)
// Cancel the context to stop the issuers
cctx.Done()
cancel()
} else if consecutiveUnderBacklog >= successfulRunsToIncreaseTarget && currentTarget < s.txsPerSecond {
currentTarget = min(currentTarget+s.txsPerSecondStep, s.txsPerSecond)
utils.Outf("{{cyan}}increasing target tps:{{/}} %d\n", currentTarget)
consecutiveUnderBacklog = 0
}
case <-cctx.Done():
case <-ctx.Done():
stop = true
utils.Outf("{{yellow}}context canceled{{/}}\n")
case <-signals:
stop = true
utils.Outf("{{yellow}}exiting broadcast loop{{/}}\n")
cctx.Done()
cancel()
}
}

Expand Down

0 comments on commit 4038440

Please sign in to comment.