Skip to content

Commit

Permalink
Optimize bulk (#130)
Browse files Browse the repository at this point in the history
* use bulk caller get chainInfo and estimate in bulk sender

* fix typo

* use pending nonce when apply unsigend transaction empty fields

Co-authored-by: dayong <[email protected]>
  • Loading branch information
wangdayong228 and dayong authored Nov 23, 2021
1 parent eb93ef1 commit 366de23
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 60 deletions.
23 changes: 17 additions & 6 deletions cfxclient/bulk/bulk_caller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,16 @@ type BulkCaller struct {

outHandlers map[int]*OutputHandler
*BulkCfxCaller
debug *BulkDebugCaller
trace *BulkTraceCaller
pos *BulkPosCaller
customer *BulkCustomCaller
}

// NewBulkerCaller creates new bulk caller instance
func NewBulkerCaller(rpcCaller sdk.ClientOperator) *BulkCaller {
debug *BulkDebugCaller
trace *BulkTraceCaller
pos *BulkPosCaller
txpool *BulkTxpoolCaller
}

// NewBulkCaller creates new bulk caller instance
func NewBulkCaller(rpcCaller sdk.ClientOperator) *BulkCaller {
core := NewBulkCallerCore(rpcCaller)
cfx := NewBulkCfxCaller(core)

Expand All @@ -57,6 +58,11 @@ func NewBulkerCaller(rpcCaller sdk.ClientOperator) *BulkCaller {
outHandlers: outHandlers,
BulkCfxCaller: cfx,
customer: customer,

debug: NewBulkDebugCaller(core),
trace: NewBulkTraceCaller(core),
pos: NewBulkPosCaller(core),
txpool: NewBulkTxpoolCaller(core),
}
}

Expand Down Expand Up @@ -85,6 +91,11 @@ func (b *BulkCaller) Customer() *BulkCustomCaller {
return b.customer
}

// TxPool returns BulkTxpoolCaller for genereating "txpool" namespace relating rpc request
func (b *BulkCaller) Txpool() *BulkTxpoolCaller {
return b.txpool
}

// Execute sends all rpc requests in queue by rpc call "batch" on one request
func (b *BulkCaller) Execute() error {
_errors, _err := batchCall(b.BulkCallerCore.caller, b.BulkCallerCore.batchElems, b.outHandlers)
Expand Down
9 changes: 8 additions & 1 deletion cfxclient/bulk/bulk_caller_cfx.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,14 @@ func (client *BulkCfxCaller) GetNextNonce(address types.Address, epoch ...*types
}

// GetStatus returns status of connecting conflux node
//ignore
func (client *BulkCfxCaller) GetStatus() (*types.Status, *error) {
result := new(types.Status)
err := new(error)

elem := newBatchElem(result, "cfx_getStatus")
(*BulkCallerCore)(client).appendElemsAndError(elem, err)
return result, err
}

func (client *BulkCfxCaller) GetEpochNumber(epoch ...*types.Epoch) (*hexutil.Big, *error) {
result := new(hexutil.Big)
Expand Down
145 changes: 94 additions & 51 deletions cfxclient/bulk/bulk_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ func (b *BulkSender) PopulateTransactions() error {

userUsedNoncesMap := b.gatherUsedNonces()
// fill nonce
userNextNonceCache := make(map[string]*big.Int, len(b.unsignedTxs))
userNextNonceCache, err := b.gatherInitNextNonces()
if err != nil {
return errors.WithStack(err)
}

for _, utx := range b.unsignedTxs {
utx.From.CompleteByNetworkID(networkId)
utx.To.CompleteByNetworkID(networkId)
Expand All @@ -72,18 +76,6 @@ func (b *BulkSender) PopulateTransactions() error {

if utx.Nonce == nil {
from := utx.From.String()
if userNextNonceCache[from] == nil {
hexNonce, err := b.signalbeCaller.TxPool().NextNonce(*utx.From)
if err != nil {
hexNonce, err = b.signalbeCaller.GetNextNonce(*utx.From)
if err != nil {
return errors.WithStack(err)
}
}

userNextNonceCache[from] = hexNonce.ToInt()
}

utx.Nonce = (*hexutil.Big)(userNextNonceCache[from])
// avoid to reuse user used nonce, increase it if transactions used the nonce in cache
for {
Expand All @@ -95,25 +87,42 @@ func (b *BulkSender) PopulateTransactions() error {

}
}
return b.populateGasAndStorage()
}

func (b *BulkSender) populateGasAndStorage() error {
estimatPtrs, errPtrs := make([]*types.Estimate, len(b.unsignedTxs)), make([]*error, len(b.unsignedTxs))
bulkCaller := NewBulkCaller(b.signalbeCaller)
for i, utx := range b.unsignedTxs {
// The gas and storage limit may be influnced by all fileds of transaction ,so set them at last step.
if utx.StorageLimit == nil || utx.Gas == nil {
callReq := new(types.CallRequest)
callReq.FillByUnsignedTx(utx)

estimat, err := b.signalbeCaller.EstimateGasAndCollateral(*callReq)
if err != nil {
return errors.Wrapf(err, "failed to estimate gas and collateral of %vth transaction, request = %+v", i, *callReq)
}
if utx.StorageLimit != nil && utx.Gas != nil {
continue
}
callReq := new(types.CallRequest)
callReq.FillByUnsignedTx(utx)

if utx.Gas == nil {
utx.Gas = estimat.GasLimit
}
estimatPtrs[i], errPtrs[i] = bulkCaller.EstimateGasAndCollateral(*callReq)
}

if utx.StorageLimit == nil {
utx.StorageLimit = types.NewUint64(estimat.StorageCollateralized.ToInt().Uint64())
}
err := bulkCaller.Execute()
if err != nil {
return errors.WithStack(err)
}

for i, utx := range b.unsignedTxs {
if utx.StorageLimit != nil && utx.Gas != nil {
continue
}

if *errPtrs[i] != nil {
return errors.WithMessagef(*errPtrs[i], "failed to estimate %vth transaction %+v", i, utx)
}

if utx.Gas == nil {
utx.Gas = estimatPtrs[i].GasLimit
}

if utx.StorageLimit == nil {
utx.StorageLimit = types.NewUint64(estimatPtrs[i].StorageCollateralized.ToInt().Uint64())
}
}
return nil
Expand All @@ -133,6 +142,48 @@ func (b *BulkSender) gatherUsedNonces() map[string]map[string]bool {
return result
}

func (b *BulkSender) gatherInitNextNonces() (map[string]*big.Int, error) {
result := make(map[string]*big.Int)

bulkCaller := NewBulkCaller(b.signalbeCaller)
isUserCached := make(map[string]bool)
poolNextNonces, poolNextNonceErrs := make(map[string]*hexutil.Big), make(map[string]*error)
nextNonces, nextNonceErrs := make(map[string]*hexutil.Big), make(map[string]*error)

for _, utx := range b.unsignedTxs {
if isUserCached[utx.From.String()] {
continue
}
poolNextNonces[utx.From.String()], poolNextNonceErrs[utx.From.String()] = bulkCaller.Txpool().NextNonce(*utx.From)
nextNonces[utx.From.String()], nextNonceErrs[utx.From.String()] = bulkCaller.GetNextNonce(*utx.From)
}

err := bulkCaller.Execute()
if err != nil {
return nil, errors.WithStack(err)
}

for _, utx := range b.unsignedTxs {
user := utx.From.String()
if utx.Nonce != nil || result[user] != nil {
continue
}

if *poolNextNonceErrs[user] == nil {
result[utx.From.String()] = poolNextNonces[user].ToInt()
continue
}

if *nextNonceErrs[user] == nil {
result[utx.From.String()] = nextNonces[user].ToInt()
continue
}

return nil, errors.WithStack(*nextNonceErrs[user])
}
return result, nil
}

func (b *BulkSender) checkIsNonceUsed(usedCaches map[string]map[string]bool, user *cfxaddress.Address, nonce *hexutil.Big) bool {
hasCache, ok := usedCaches[user.String()]
if ok {
Expand All @@ -150,39 +201,31 @@ func (b *BulkSender) getChainInfos() (
err error,
) {
_client := b.signalbeCaller
defaultAccount, err = _client.GetAccountManager().GetDefault()

_defaultAccount, err := _client.GetAccountManager().GetDefault()
if err != nil {
return nil, nil, 0, nil, nil, errors.Wrap(err, "failed to get default account")
}

status, err := _client.GetStatus()
if err != nil {
return nil, nil, 0, nil, nil, errors.WithStack(err)
}
chainID = &status.ChainID
bulkCaller := NewBulkCaller(_client)
_status, statusErr := bulkCaller.GetStatus()
_gasPrice, gasPriceErr := bulkCaller.GetGasPrice()
_epoch, epochErr := bulkCaller.GetEpochNumber(types.EpochLatestState)
err = bulkCaller.Execute()

networkId, err = _client.GetNetworkID()
if err != nil {
return nil, nil, 0, nil, nil, errors.WithStack(err)
if err != nil || *statusErr != nil || *gasPriceErr != nil || *epochErr != nil {
return nil, nil, 0, nil, nil, errors.Wrap(err, "failed to bulk fetch chain infos")
}

gasPrice, err = _client.GetGasPrice()
if err != nil {
return nil, nil, 0, nil, nil, errors.Wrap(err, "failed to get gas price")
}
_chainID, _networkId := &_status.ChainID, uint32(_status.NetworkID)
_epochHeight := types.NewUint64(_epoch.ToInt().Uint64())

// conflux responsed gasprice offen be 0, but the min gasprice is 1 when sending transaction, so do this
if gasPrice.ToInt().Cmp(big.NewInt(constants.MinGasprice)) < 1 {
gasPrice = types.NewBigInt(constants.MinGasprice)
}

epoch, err := _client.GetEpochNumber(types.EpochLatestState)
if err != nil {
return nil, nil, 0, nil, nil, errors.Wrap(err, "failed to get the latest state epoch number")
if _gasPrice.ToInt().Cmp(big.NewInt(constants.MinGasprice)) < 1 {
_gasPrice = types.NewBigInt(constants.MinGasprice)
}
epochHeight = types.NewUint64(epoch.ToInt().Uint64())

return defaultAccount, chainID, networkId, gasPrice, epochHeight, nil
return _defaultAccount, _chainID, _networkId, _gasPrice, _epochHeight, nil
}

// Clear clear batch elems and errors in queue for new bulk call action
Expand All @@ -206,7 +249,7 @@ func (b *BulkSender) SignAndSend() (txHashes []*types.Hash, txErrors []error, ba
}

// send
bulkCaller := NewBulkerCaller(b.signalbeCaller)
bulkCaller := NewBulkCaller(b.signalbeCaller)
hashes := make([]*types.Hash, len(rawTxs))
errs := make([]*error, len(rawTxs))
for i, rawTx := range rawTxs {
Expand Down
2 changes: 1 addition & 1 deletion cfxclient/bulk/bulk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestBulkCall(t *testing.T) {
if err != nil {
panic(err)
}
bulkCaller := NewBulkerCaller(_client)
bulkCaller := NewBulkCaller(_client)

gasPrice, gasPriceError := bulkCaller.Cfx().GetGasPrice()
err = bulkCaller.Execute()
Expand Down
15 changes: 14 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,7 @@ func (client *Client) ApplyUnsignedTransactionDefault(tx *types.UnsignedTransact
tx.To.CompleteByNetworkID(client.networkID)

if tx.Nonce == nil {
nonce, err := client.GetNextNonce(*tx.From, nil)
nonce, err := client.GetNextUsableNonce(*tx.From)
if err != nil {
return errors.Wrap(err, "failed to get nonce")
}
Expand Down Expand Up @@ -1139,6 +1139,19 @@ func (client *Client) WaitForTransationReceipt(txhash types.Hash, duration time.
return txReceipt, nil
}

func (client *Client) GetNextUsableNonce(user types.Address) (nonce *hexutil.Big, err error) {
hexNonce, err := client.TxPool().NextNonce(user)
if err != nil {
hexNonce, err = client.GetNextNonce(user)
if err != nil {
return nil, errors.WithStack(err)
}
}
return hexNonce, nil
}

// ======== private methods=============

func (client *Client) wrappedCallRPC(result interface{}, method string, args ...interface{}) error {
fmtedArgs := client.genRPCParams(args...)
return client.CallRPC(result, method, fmtedArgs...)
Expand Down
1 change: 1 addition & 0 deletions interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ type ClientOperator interface {

WaitForTransationBePacked(txhash types.Hash, duration time.Duration) (*types.Transaction, error)
WaitForTransationReceipt(txhash types.Hash, duration time.Duration) (*types.TransactionReceipt, error)
GetNextUsableNonce(user types.Address) (nonce *hexutil.Big, err error)
}

type RpcPos interface {
Expand Down

0 comments on commit 366de23

Please sign in to comment.