Skip to content

Commit

Permalink
seperate broadcast txs
Browse files Browse the repository at this point in the history
  • Loading branch information
samliok committed Oct 10, 2024
1 parent 02ba233 commit b913e83
Showing 1 changed file with 31 additions and 14 deletions.
45 changes: 31 additions & 14 deletions throughput/spam.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ func (s *Spammer) Spam(ctx context.Context, sh SpamHelper, terminate bool, symbo
return err
}

var fundsL sync.Mutex
// distribute funds
accounts, funds, factories, err := s.distributeFunds(ctx, cli, parser, feePerTx, sh)
if err != nil {
Expand All @@ -148,10 +147,6 @@ func (s *Spammer) Spam(ctx context.Context, sh SpamHelper, terminate bool, symbo
return err
}

// make sure we can exit gracefully & return funds
signals := make(chan os.Signal, 2)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)

cctx, cancel := context.WithCancel(ctx)
defer cancel()
for _, issuer := range issuers {
Expand All @@ -161,10 +156,36 @@ func (s *Spammer) Spam(ctx context.Context, sh SpamHelper, terminate bool, symbo
// set logging
issuers[0].logStats(cctx)

// Broadcast txs
// broadcast transactions
s.broadcast(cctx, sh, accounts, funds, factories, issuers, feePerTx, terminate)

maxUnits, err = chain.EstimateUnits(parser.Rules(time.Now().UnixMilli()), actions, factory)
if err != nil {
return err
}
return s.returnFunds(ctx, cli, parser, maxUnits, sh, accounts, factories, funds, symbol)
}

func (s Spammer) broadcast(
cctx context.Context,
sh SpamHelper,
accounts []*auth.PrivateKey,

funds map[codec.Address]uint64,
factories []chain.AuthFactory,
issuers []*issuer,

feePerTx uint64,
terminate bool,
) {
// make sure we can exit gracefully & return funds
signals := make(chan os.Signal, 2)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)

var (
// Do not call this function concurrently (math.Rand is not safe for concurrent use)
z = rand.NewZipf(s.zipfSeed, s.sZipf, s.vZipf, uint64(s.numAccounts)-1)
z = rand.NewZipf(s.zipfSeed, s.sZipf, s.vZipf, uint64(s.numAccounts)-1)
fundsL = sync.Mutex{}

it = time.NewTimer(0)
currentTarget = min(s.txsPerSecond, s.minTxsPerSecond)
Expand Down Expand Up @@ -250,7 +271,7 @@ func (s *Spammer) Spam(ctx context.Context, sh SpamHelper, terminate bool, symbo
if terminate && currentTarget == s.txsPerSecond && consecutiveUnderBacklog >= successfulRunsToIncreaseTarget {
utils.Outf("{{green}}reached target tps:{{/}} %d\n", currentTarget)
// Cancel the context to stop the issuers
cancel()
cctx.Done()
} else if consecutiveUnderBacklog >= successfulRunsToIncreaseTarget && currentTarget < s.txsPerSecond {
currentTarget = min(currentTarget+s.txsPerSecondStep, s.txsPerSecond)
utils.Outf("{{cyan}}increasing target tps:{{/}} %d\n", currentTarget)
Expand All @@ -262,17 +283,13 @@ func (s *Spammer) Spam(ctx context.Context, sh SpamHelper, terminate bool, symbo
case <-signals:
stop = true
utils.Outf("{{yellow}}exiting broadcast loop{{/}}\n")
cancel()
cctx.Done()
}
}

// Wait for all issuers to finish
utils.Outf("{{yellow}}waiting for issuers to return{{/}}\n")
issuerWg.Wait()
maxUnits, err = chain.EstimateUnits(parser.Rules(time.Now().UnixMilli()), actions, factory)
if err != nil {
return err
}
return s.returnFunds(ctx, cli, parser, maxUnits, sh, accounts, factories, funds, symbol)
}

func (s *Spammer) logZipf(zipfSeed *rand.Rand) {
Expand Down

0 comments on commit b913e83

Please sign in to comment.