From 366de23d899adce7d6837e96a188fb62dc874365 Mon Sep 17 00:00:00 2001 From: dyoung Date: Tue, 23 Nov 2021 19:03:50 +0800 Subject: [PATCH] Optimize bulk (#130) * use bulk caller get chainInfo and estimate in bulk sender * fix typo * use pending nonce when apply unsigend transaction empty fields Co-authored-by: dayong --- cfxclient/bulk/bulk_caller.go | 23 +++-- cfxclient/bulk/bulk_caller_cfx.go | 9 +- cfxclient/bulk/bulk_sender.go | 145 +++++++++++++++++++----------- cfxclient/bulk/bulk_test.go | 2 +- client.go | 15 +++- interface.go | 1 + 6 files changed, 135 insertions(+), 60 deletions(-) diff --git a/cfxclient/bulk/bulk_caller.go b/cfxclient/bulk/bulk_caller.go index 8c155d6..4c8a3b5 100644 --- a/cfxclient/bulk/bulk_caller.go +++ b/cfxclient/bulk/bulk_caller.go @@ -37,15 +37,16 @@ type BulkCaller struct { outHandlers map[int]*OutputHandler *BulkCfxCaller - debug *BulkDebugCaller - trace *BulkTraceCaller - pos *BulkPosCaller customer *BulkCustomCaller -} -// NewBulkerCaller creates new bulk caller instance -func NewBulkerCaller(rpcCaller sdk.ClientOperator) *BulkCaller { + debug *BulkDebugCaller + trace *BulkTraceCaller + pos *BulkPosCaller + txpool *BulkTxpoolCaller +} +// NewBulkCaller creates new bulk caller instance +func NewBulkCaller(rpcCaller sdk.ClientOperator) *BulkCaller { core := NewBulkCallerCore(rpcCaller) cfx := NewBulkCfxCaller(core) @@ -57,6 +58,11 @@ func NewBulkerCaller(rpcCaller sdk.ClientOperator) *BulkCaller { outHandlers: outHandlers, BulkCfxCaller: cfx, customer: customer, + + debug: NewBulkDebugCaller(core), + trace: NewBulkTraceCaller(core), + pos: NewBulkPosCaller(core), + txpool: NewBulkTxpoolCaller(core), } } @@ -85,6 +91,11 @@ func (b *BulkCaller) Customer() *BulkCustomCaller { return b.customer } +// TxPool returns BulkTxpoolCaller for genereating "txpool" namespace relating rpc request +func (b *BulkCaller) Txpool() *BulkTxpoolCaller { + return b.txpool +} + // Execute sends all rpc requests in queue by rpc call "batch" on one request func (b *BulkCaller) Execute() error { _errors, _err := batchCall(b.BulkCallerCore.caller, b.BulkCallerCore.batchElems, b.outHandlers) diff --git a/cfxclient/bulk/bulk_caller_cfx.go b/cfxclient/bulk/bulk_caller_cfx.go index a6d1772..83883fd 100644 --- a/cfxclient/bulk/bulk_caller_cfx.go +++ b/cfxclient/bulk/bulk_caller_cfx.go @@ -44,7 +44,14 @@ func (client *BulkCfxCaller) GetNextNonce(address types.Address, epoch ...*types } // GetStatus returns status of connecting conflux node -//ignore +func (client *BulkCfxCaller) GetStatus() (*types.Status, *error) { + result := new(types.Status) + err := new(error) + + elem := newBatchElem(result, "cfx_getStatus") + (*BulkCallerCore)(client).appendElemsAndError(elem, err) + return result, err +} func (client *BulkCfxCaller) GetEpochNumber(epoch ...*types.Epoch) (*hexutil.Big, *error) { result := new(hexutil.Big) diff --git a/cfxclient/bulk/bulk_sender.go b/cfxclient/bulk/bulk_sender.go index 44de776..0db0a2d 100644 --- a/cfxclient/bulk/bulk_sender.go +++ b/cfxclient/bulk/bulk_sender.go @@ -49,7 +49,11 @@ func (b *BulkSender) PopulateTransactions() error { userUsedNoncesMap := b.gatherUsedNonces() // fill nonce - userNextNonceCache := make(map[string]*big.Int, len(b.unsignedTxs)) + userNextNonceCache, err := b.gatherInitNextNonces() + if err != nil { + return errors.WithStack(err) + } + for _, utx := range b.unsignedTxs { utx.From.CompleteByNetworkID(networkId) utx.To.CompleteByNetworkID(networkId) @@ -72,18 +76,6 @@ func (b *BulkSender) PopulateTransactions() error { if utx.Nonce == nil { from := utx.From.String() - if userNextNonceCache[from] == nil { - hexNonce, err := b.signalbeCaller.TxPool().NextNonce(*utx.From) - if err != nil { - hexNonce, err = b.signalbeCaller.GetNextNonce(*utx.From) - if err != nil { - return errors.WithStack(err) - } - } - - userNextNonceCache[from] = hexNonce.ToInt() - } - utx.Nonce = (*hexutil.Big)(userNextNonceCache[from]) // avoid to reuse user used nonce, increase it if transactions used the nonce in cache for { @@ -95,25 +87,42 @@ func (b *BulkSender) PopulateTransactions() error { } } + return b.populateGasAndStorage() +} +func (b *BulkSender) populateGasAndStorage() error { + estimatPtrs, errPtrs := make([]*types.Estimate, len(b.unsignedTxs)), make([]*error, len(b.unsignedTxs)) + bulkCaller := NewBulkCaller(b.signalbeCaller) for i, utx := range b.unsignedTxs { - // The gas and storage limit may be influnced by all fileds of transaction ,so set them at last step. - if utx.StorageLimit == nil || utx.Gas == nil { - callReq := new(types.CallRequest) - callReq.FillByUnsignedTx(utx) - - estimat, err := b.signalbeCaller.EstimateGasAndCollateral(*callReq) - if err != nil { - return errors.Wrapf(err, "failed to estimate gas and collateral of %vth transaction, request = %+v", i, *callReq) - } + if utx.StorageLimit != nil && utx.Gas != nil { + continue + } + callReq := new(types.CallRequest) + callReq.FillByUnsignedTx(utx) - if utx.Gas == nil { - utx.Gas = estimat.GasLimit - } + estimatPtrs[i], errPtrs[i] = bulkCaller.EstimateGasAndCollateral(*callReq) + } - if utx.StorageLimit == nil { - utx.StorageLimit = types.NewUint64(estimat.StorageCollateralized.ToInt().Uint64()) - } + err := bulkCaller.Execute() + if err != nil { + return errors.WithStack(err) + } + + for i, utx := range b.unsignedTxs { + if utx.StorageLimit != nil && utx.Gas != nil { + continue + } + + if *errPtrs[i] != nil { + return errors.WithMessagef(*errPtrs[i], "failed to estimate %vth transaction %+v", i, utx) + } + + if utx.Gas == nil { + utx.Gas = estimatPtrs[i].GasLimit + } + + if utx.StorageLimit == nil { + utx.StorageLimit = types.NewUint64(estimatPtrs[i].StorageCollateralized.ToInt().Uint64()) } } return nil @@ -133,6 +142,48 @@ func (b *BulkSender) gatherUsedNonces() map[string]map[string]bool { return result } +func (b *BulkSender) gatherInitNextNonces() (map[string]*big.Int, error) { + result := make(map[string]*big.Int) + + bulkCaller := NewBulkCaller(b.signalbeCaller) + isUserCached := make(map[string]bool) + poolNextNonces, poolNextNonceErrs := make(map[string]*hexutil.Big), make(map[string]*error) + nextNonces, nextNonceErrs := make(map[string]*hexutil.Big), make(map[string]*error) + + for _, utx := range b.unsignedTxs { + if isUserCached[utx.From.String()] { + continue + } + poolNextNonces[utx.From.String()], poolNextNonceErrs[utx.From.String()] = bulkCaller.Txpool().NextNonce(*utx.From) + nextNonces[utx.From.String()], nextNonceErrs[utx.From.String()] = bulkCaller.GetNextNonce(*utx.From) + } + + err := bulkCaller.Execute() + if err != nil { + return nil, errors.WithStack(err) + } + + for _, utx := range b.unsignedTxs { + user := utx.From.String() + if utx.Nonce != nil || result[user] != nil { + continue + } + + if *poolNextNonceErrs[user] == nil { + result[utx.From.String()] = poolNextNonces[user].ToInt() + continue + } + + if *nextNonceErrs[user] == nil { + result[utx.From.String()] = nextNonces[user].ToInt() + continue + } + + return nil, errors.WithStack(*nextNonceErrs[user]) + } + return result, nil +} + func (b *BulkSender) checkIsNonceUsed(usedCaches map[string]map[string]bool, user *cfxaddress.Address, nonce *hexutil.Big) bool { hasCache, ok := usedCaches[user.String()] if ok { @@ -150,39 +201,31 @@ func (b *BulkSender) getChainInfos() ( err error, ) { _client := b.signalbeCaller - defaultAccount, err = _client.GetAccountManager().GetDefault() + + _defaultAccount, err := _client.GetAccountManager().GetDefault() if err != nil { return nil, nil, 0, nil, nil, errors.Wrap(err, "failed to get default account") } - status, err := _client.GetStatus() - if err != nil { - return nil, nil, 0, nil, nil, errors.WithStack(err) - } - chainID = &status.ChainID + bulkCaller := NewBulkCaller(_client) + _status, statusErr := bulkCaller.GetStatus() + _gasPrice, gasPriceErr := bulkCaller.GetGasPrice() + _epoch, epochErr := bulkCaller.GetEpochNumber(types.EpochLatestState) + err = bulkCaller.Execute() - networkId, err = _client.GetNetworkID() - if err != nil { - return nil, nil, 0, nil, nil, errors.WithStack(err) + if err != nil || *statusErr != nil || *gasPriceErr != nil || *epochErr != nil { + return nil, nil, 0, nil, nil, errors.Wrap(err, "failed to bulk fetch chain infos") } - gasPrice, err = _client.GetGasPrice() - if err != nil { - return nil, nil, 0, nil, nil, errors.Wrap(err, "failed to get gas price") - } + _chainID, _networkId := &_status.ChainID, uint32(_status.NetworkID) + _epochHeight := types.NewUint64(_epoch.ToInt().Uint64()) // conflux responsed gasprice offen be 0, but the min gasprice is 1 when sending transaction, so do this - if gasPrice.ToInt().Cmp(big.NewInt(constants.MinGasprice)) < 1 { - gasPrice = types.NewBigInt(constants.MinGasprice) - } - - epoch, err := _client.GetEpochNumber(types.EpochLatestState) - if err != nil { - return nil, nil, 0, nil, nil, errors.Wrap(err, "failed to get the latest state epoch number") + if _gasPrice.ToInt().Cmp(big.NewInt(constants.MinGasprice)) < 1 { + _gasPrice = types.NewBigInt(constants.MinGasprice) } - epochHeight = types.NewUint64(epoch.ToInt().Uint64()) - return defaultAccount, chainID, networkId, gasPrice, epochHeight, nil + return _defaultAccount, _chainID, _networkId, _gasPrice, _epochHeight, nil } // Clear clear batch elems and errors in queue for new bulk call action @@ -206,7 +249,7 @@ func (b *BulkSender) SignAndSend() (txHashes []*types.Hash, txErrors []error, ba } // send - bulkCaller := NewBulkerCaller(b.signalbeCaller) + bulkCaller := NewBulkCaller(b.signalbeCaller) hashes := make([]*types.Hash, len(rawTxs)) errs := make([]*error, len(rawTxs)) for i, rawTx := range rawTxs { diff --git a/cfxclient/bulk/bulk_test.go b/cfxclient/bulk/bulk_test.go index 010ca1f..8d489be 100644 --- a/cfxclient/bulk/bulk_test.go +++ b/cfxclient/bulk/bulk_test.go @@ -17,7 +17,7 @@ func TestBulkCall(t *testing.T) { if err != nil { panic(err) } - bulkCaller := NewBulkerCaller(_client) + bulkCaller := NewBulkCaller(_client) gasPrice, gasPriceError := bulkCaller.Cfx().GetGasPrice() err = bulkCaller.Execute() diff --git a/client.go b/client.go index 7a99978..294b09f 100644 --- a/client.go +++ b/client.go @@ -659,7 +659,7 @@ func (client *Client) ApplyUnsignedTransactionDefault(tx *types.UnsignedTransact tx.To.CompleteByNetworkID(client.networkID) if tx.Nonce == nil { - nonce, err := client.GetNextNonce(*tx.From, nil) + nonce, err := client.GetNextUsableNonce(*tx.From) if err != nil { return errors.Wrap(err, "failed to get nonce") } @@ -1139,6 +1139,19 @@ func (client *Client) WaitForTransationReceipt(txhash types.Hash, duration time. return txReceipt, nil } +func (client *Client) GetNextUsableNonce(user types.Address) (nonce *hexutil.Big, err error) { + hexNonce, err := client.TxPool().NextNonce(user) + if err != nil { + hexNonce, err = client.GetNextNonce(user) + if err != nil { + return nil, errors.WithStack(err) + } + } + return hexNonce, nil +} + +// ======== private methods============= + func (client *Client) wrappedCallRPC(result interface{}, method string, args ...interface{}) error { fmtedArgs := client.genRPCParams(args...) return client.CallRPC(result, method, fmtedArgs...) diff --git a/interface.go b/interface.go index ac84f9a..bb86542 100644 --- a/interface.go +++ b/interface.go @@ -129,6 +129,7 @@ type ClientOperator interface { WaitForTransationBePacked(txhash types.Hash, duration time.Duration) (*types.Transaction, error) WaitForTransationReceipt(txhash types.Hash, duration time.Duration) (*types.TransactionReceipt, error) + GetNextUsableNonce(user types.Address) (nonce *hexutil.Big, err error) } type RpcPos interface {