diff --git a/internal/evmreader/claim.go b/internal/evmreader/claim.go new file mode 100644 index 00000000..345c73c2 --- /dev/null +++ b/internal/evmreader/claim.go @@ -0,0 +1,220 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +package evmreader + +import ( + "cmp" + "context" + "log/slog" + + . "github.com/cartesi/rollups-node/internal/node/model" + "github.com/cartesi/rollups-node/pkg/contracts/iconsensus" + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" +) + +func (r *EvmReader) checkForClaimStatus( + ctx context.Context, + apps []application, + mostRecentBlockNumber uint64, +) { + + slog.Debug("Checking for new Claim Acceptance Events") + + // Classify them by lastClaimCheck block + appsIndexedByLastCheck := indexApps(keyByLastClaimCheck, apps) + + for lastClaimCheck, apps := range appsIndexedByLastCheck { + + appAddresses := appToAddresses(apps) + + // Safeguard: Only check blocks starting from the block where the InputBox + // contract was deployed as Inputs can be added to that same block + if lastClaimCheck < r.inputBoxDeploymentBlock { + lastClaimCheck = r.inputBoxDeploymentBlock - 1 + } + + if mostRecentBlockNumber > lastClaimCheck { + + slog.Info("Checking claim acceptance for applications", + "apps", appAddresses, + "last claim check block", lastClaimCheck, + "most recent block", mostRecentBlockNumber) + + r.readAndUpdateClaims(ctx, apps, lastClaimCheck, mostRecentBlockNumber) + + } else if mostRecentBlockNumber < lastClaimCheck { + slog.Warn( + "Not reading claim acceptance: most recent block is lower than the last processed one", //nolint:lll + "apps", appAddresses, + "last claim check block", lastClaimCheck, + "most recent block", mostRecentBlockNumber, + ) + } else { + slog.Info("Not reading claim acceptance: already checked the most recent blocks", + "apps", appAddresses, + "last claim check block", lastClaimCheck, + "most recent block", mostRecentBlockNumber, + ) + } + + } +} + +func (r *EvmReader) readAndUpdateClaims( + ctx context.Context, + apps []application, + lastClaimCheck, mostRecentBlockNumber uint64, +) { + + // DISCLAIMER: The current algorithm will only handle Authority. + // To handle Quorum, node needs to handle acceptance events + // that can happen before claim submission + + // Classify them by same IConsensusAddress + sameConsensusApps := indexApps(keyByIConsensus, apps) + for iConsensusAddress, apps := range sameConsensusApps { + + appAddresses := appToAddresses(apps) + + // All these apps shares the same IConsensus + if len(apps) == 0 { + continue + } + consensusContract := apps[0].consensusContract + + // Retrieve Claim Acceptance Events from blockchain + appClaimAcceptanceEventMap, err := r.readClaimAcceptanceFromBlockchain( + ctx, consensusContract, appAddresses, lastClaimCheck+1, mostRecentBlockNumber) + if err != nil { + slog.Error("Error reading claim acceptance status", + "apps", apps, + "IConsensus", iConsensusAddress, + "start", lastClaimCheck, + "end", mostRecentBlockNumber, + "error", err) + continue + } + + // Check events against Epochs + for app, claimAcceptances := range appClaimAcceptanceEventMap { + + epochs := []*Epoch{} + for _, claimAcceptance := range claimAcceptances { + + // Get Previous claims and update their statuses + // rejecting all + previousClaims, err := r.repository.GetPreviousSubmittedClaims( + ctx, app, claimAcceptance.LastProcessedBlockNumber.Uint64()) + if err != nil { + slog.Error("Error retrieving previous submitted claims", + "app", app, "error", err) + } + + for _, previousClaim := range previousClaims { + previousClaim.Status = EpochStatusClaimRejected + epochs = append(epochs, &previousClaim) + slog.Warn("Claim rejected", + "app", app, + "lastBlock", previousClaim.LastBlock, + "hash", previousClaim.ClaimHash) + } + + // Get Claim + claim, err := r.repository.GetEpoch( + ctx, calculateEpochIndex( + r.epochLengthCache[app], + claimAcceptance.LastProcessedBlockNumber.Uint64()), + app) + if err != nil { + slog.Error("Error retrieving claim", "app", app, "error", err) + } + + // Check Claim + if claim == nil { + slog.Error("Got unknown claim event", + "app", app, + "claim last block", claimAcceptance.LastProcessedBlockNumber, + "hash", claimAcceptance.Claim) + continue + } + + // Update claim status + if claimAcceptance.Claim != *claim.ClaimHash { + slog.Warn("Claim Rejected", + "app", app, + "lastBlock", claim.LastBlock, + "hash", claim.ClaimHash) + + claim.Status = EpochStatusClaimRejected + epochs = append(epochs, claim) + } else { + slog.Info("Claim Accepted", + "app", app, + "lastBlock", claim.LastBlock, + "hash", claim.ClaimHash) + + claim.Status = EpochStatusClaimAccepted + epochs = append(epochs, claim) + } + } + + // Store everything + err = r.repository.StoreClaimsTransaction( + ctx, app, epochs, mostRecentBlockNumber) + if err != nil { + slog.Error("Error storing claims", "app", app, "error", err) + continue + } + + } + + } + +} + +func (r *EvmReader) readClaimAcceptanceFromBlockchain( + ctx context.Context, + consensusContract ConsensusContract, + appAddresses []common.Address, + startBlock, endBlock uint64, +) (map[common.Address][]*iconsensus.IConsensusClaimAcceptance, error) { + appClaimAcceptanceMap := make(map[common.Address][]*iconsensus.IConsensusClaimAcceptance) + for _, address := range appAddresses { + appClaimAcceptanceMap[address] = []*iconsensus.IConsensusClaimAcceptance{} + } + opts := &bind.FilterOpts{ + Context: ctx, + Start: startBlock, + End: &endBlock, + } + claimAcceptanceEvents, err := consensusContract.RetrieveClaimAcceptanceEvents( + opts, appAddresses) + if err != nil { + return nil, err + } + for _, event := range claimAcceptanceEvents { + appClaimAcceptanceMap[event.AppContract] = insertSorted( + sortByLastBlockNumber, appClaimAcceptanceMap[event.AppContract], event) + } + return appClaimAcceptanceMap, nil +} + +// keyByLastClaimCheck is a LastClaimCheck key extractor function intended +// to be used with `indexApps` function, see indexApps() +func keyByLastClaimCheck(app application) uint64 { + return app.LastClaimCheckBlock +} + +// keyByIConsensus is a IConsensus address key extractor function intended +// to be used with `indexApps` function, see indexApps() +func keyByIConsensus(app application) Address { + return app.IConsensusAddress +} + +// sortByLastBlockNumber is a ClaimAcceptance's by last block number sorting function. +// Intended to be used with insertSorted function, see insertSorted() +func sortByLastBlockNumber(a, b *iconsensus.IConsensusClaimAcceptance) int { + return cmp.Compare(a.LastProcessedBlockNumber.Uint64(), b.LastProcessedBlockNumber.Uint64()) +} diff --git a/internal/evmreader/claim_test.go b/internal/evmreader/claim_test.go new file mode 100644 index 00000000..ab47f610 --- /dev/null +++ b/internal/evmreader/claim_test.go @@ -0,0 +1,313 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +package evmreader + +import ( + "math/big" + "time" + + . "github.com/cartesi/rollups-node/internal/node/model" + "github.com/cartesi/rollups-node/pkg/contracts/iconsensus" + "github.com/cartesi/rollups-node/pkg/contracts/inputbox" + "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/mock" +) + +func (s *EvmReaderSuite) TestNoClaimsAcceptance() { + + wsClient := FakeWSEhtClient{} + + //New EVM Reader + evmReader := NewEvmReader( + s.client, + &wsClient, + s.inputBox, + s.repository, + 0x10, + DefaultBlockStatusLatest, + s.contractFactory, + ) + + // Prepare repository + s.repository.Unset("GetAllRunningApplications") + s.repository.On( + "GetAllRunningApplications", + mock.Anything, + ).Return([]Application{{ + ContractAddress: common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E"), + IConsensusAddress: common.HexToAddress("0xdeadbeef"), + LastClaimCheckBlock: 0x10, + }}, nil).Once() + s.repository.On( + "GetAllRunningApplications", + mock.Anything, + ).Return([]Application{{ + ContractAddress: common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E"), + IConsensusAddress: common.HexToAddress("0xdeadbeef"), + LastClaimCheckBlock: 0x11, + }}, nil).Once() + + s.repository.Unset("StoreClaimsTransaction") + s.repository.On("StoreClaimsTransaction", + mock.Anything, + mock.Anything, + mock.Anything, + ).Once().Run(func(arguments mock.Arguments) { + obj := arguments.Get(1) + claims, ok := obj.([]*Epoch) + s.Require().True(ok) + s.Require().Equal(0, len(claims)) + + }).Return(nil) + + s.repository.Unset("StoreClaimsTransaction") + s.repository.On("StoreClaimsTransaction", + mock.Anything, + mock.Anything, + mock.Anything, + ).Once().Run(func(arguments mock.Arguments) { + obj := arguments.Get(1) + claims, ok := obj.([]*Epoch) + s.Require().True(ok) + s.Require().Equal(0, len(claims)) + + obj = arguments.Get(2) + lastClaimCheck, ok := obj.(uint64) + s.Require().True(ok) + s.Require().Equal(uint64(17), lastClaimCheck) + + }).Return(nil) + s.repository.On("StoreClaimsTransaction", + mock.Anything, + mock.Anything, + mock.Anything, + ).Once().Run(func(arguments mock.Arguments) { + obj := arguments.Get(1) + claims, ok := obj.([]*Epoch) + s.Require().True(ok) + s.Require().Equal(0, len(claims)) + + obj = arguments.Get(2) + lastClaimCheck, ok := obj.(uint64) + s.Require().True(ok) + s.Require().Equal(uint64(18), lastClaimCheck) + + }).Return(nil) + + //No Inputs + s.inputBox.Unset("RetrieveInputs") + s.inputBox.On("RetrieveInputs", + mock.Anything, + mock.Anything, + mock.Anything, + ).Return([]inputbox.InputBoxInputAdded{}, nil) + + // Prepare Client + s.client.Unset("HeaderByNumber") + s.client.On( + "HeaderByNumber", + mock.Anything, + mock.Anything, + ).Return(&header0, nil).Once() + s.client.On( + "HeaderByNumber", + mock.Anything, + mock.Anything, + ).Return(&header1, nil).Once() + s.client.On( + "HeaderByNumber", + mock.Anything, + mock.Anything, + ).Return(&header2, nil).Once() + + // Start service + ready := make(chan struct{}, 1) + errChannel := make(chan error, 1) + + go func() { + errChannel <- evmReader.Run(s.ctx, ready) + }() + + select { + case <-ready: + break + case err := <-errChannel: + s.FailNow("unexpected error signal", err) + } + + wsClient.fireNewHead(&header0) + wsClient.fireNewHead(&header1) + time.Sleep(1 * time.Second) + + s.repository.AssertNumberOfCalls( + s.T(), + "StoreClaimsTransaction", + 2, + ) + +} + +func (s *EvmReaderSuite) TestReadClaimAcceptance() { + + appAddress := common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E") + + // Contract Factory + + consensusContract := &MockIConsensusContract{} + + contractFactory := newEmvReaderContractFactory() + + contractFactory.Unset("NewIConsensus") + contractFactory.On("NewIConsensus", + mock.Anything, + ).Return(consensusContract, nil) + + //New EVM Reader + wsClient := FakeWSEhtClient{} + evmReader := NewEvmReader( + s.client, + &wsClient, + s.inputBox, + s.repository, + 0x00, + DefaultBlockStatusLatest, + contractFactory, + ) + + // Prepare Claims Acceptance Events + + claimEvent0 := &iconsensus.IConsensusClaimAcceptance{ + AppContract: appAddress, + LastProcessedBlockNumber: big.NewInt(3), + Claim: common.HexToHash("0xdeadbeef"), + } + + claimEvents := []*iconsensus.IConsensusClaimAcceptance{claimEvent0} + consensusContract.On("RetrieveClaimAcceptanceEvents", + mock.Anything, + mock.Anything, + ).Return(claimEvents, nil).Once() + consensusContract.On("RetrieveClaimAcceptanceEvents", + mock.Anything, + mock.Anything, + ).Return([]*iconsensus.IConsensusClaimAcceptance{}, nil) + + // Epoch Length + consensusContract.On("GetEpochLength", + mock.Anything, + ).Return(big.NewInt(1), nil).Once() + + // Prepare repository + s.repository.Unset("GetAllRunningApplications") + s.repository.On( + "GetAllRunningApplications", + mock.Anything, + ).Return([]Application{{ + ContractAddress: appAddress, + IConsensusAddress: common.HexToAddress("0xdeadbeef"), + LastClaimCheckBlock: 0x10, + }}, nil).Once() + s.repository.On( + "GetAllRunningApplications", + mock.Anything, + ).Return([]Application{{ + ContractAddress: appAddress, + IConsensusAddress: common.HexToAddress("0xdeadbeef"), + LastClaimCheckBlock: 0x11, + }}, nil).Once() + + claim0Hash := common.HexToHash("0xdeadbeef") + claim0 := &Epoch{ + Index: 1, + FirstBlock: 1, + LastBlock: 1, + AppAddress: appAddress, + Status: EpochStatusClaimSubmitted, + ClaimHash: &claim0Hash, + } + + claim1Hash := common.HexToHash("0xdeadbeef") + claim1 := &Epoch{ + Index: 3, + FirstBlock: 3, + LastBlock: 3, + AppAddress: appAddress, + Status: EpochStatusClaimSubmitted, + ClaimHash: &claim1Hash, + } + + s.repository.Unset("GetEpoch") + s.repository.On("GetEpoch", + mock.Anything, + mock.Anything, + mock.Anything).Return(claim1, nil) + + s.repository.Unset("GetPreviousSubmittedClaims") + s.repository.On("GetPreviousSubmittedClaims", + mock.Anything, + mock.Anything, + mock.Anything, + ).Return([]Epoch{*claim0}, nil) + + s.repository.Unset("StoreClaimsTransaction") + s.repository.On("StoreClaimsTransaction", + mock.Anything, + mock.Anything, + mock.Anything, + ).Once().Run(func(arguments mock.Arguments) { + obj := arguments.Get(1) + claims, ok := obj.([]*Epoch) + s.Require().True(ok) + s.Require().Equal(2, len(claims)) + claim0 := claims[0] + claim1 := claims[1] + s.Require().NotNil(claim0) + s.Require().Equal(uint64(1), claim0.LastBlock) + s.Require().Equal(EpochStatusClaimRejected, claim0.Status) + s.Require().Equal(uint64(3), claim1.LastBlock) + s.Require().Equal(EpochStatusClaimAccepted, claim1.Status) + + }).Return(nil) + + //No Inputs + s.inputBox.Unset("RetrieveInputs") + s.inputBox.On("RetrieveInputs", + mock.Anything, + mock.Anything, + mock.Anything, + ).Return([]inputbox.InputBoxInputAdded{}, nil) + + // Prepare Client + s.client.Unset("HeaderByNumber") + s.client.On( + "HeaderByNumber", + mock.Anything, + mock.Anything, + ).Return(&header0, nil).Once() + + // Start service + ready := make(chan struct{}, 1) + errChannel := make(chan error, 1) + + go func() { + errChannel <- evmReader.Run(s.ctx, ready) + }() + + select { + case <-ready: + break + case err := <-errChannel: + s.FailNow("unexpected error signal", err) + } + + wsClient.fireNewHead(&header0) + time.Sleep(1 * time.Second) + + s.repository.AssertNumberOfCalls( + s.T(), + "StoreClaimsTransaction", + 1, + ) + +} diff --git a/internal/evmreader/consensus_adapter.go b/internal/evmreader/consensus_adapter.go new file mode 100644 index 00000000..22e76708 --- /dev/null +++ b/internal/evmreader/consensus_adapter.go @@ -0,0 +1,58 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +package evmreader + +import ( + "math/big" + + "github.com/cartesi/rollups-node/pkg/contracts/iconsensus" + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethclient" +) + +// IConsensus Wrapper +type ConsensusContractAdapter struct { + consensus *iconsensus.IConsensus +} + +func NewConsensusContractAdapter( + iconsensusAddress common.Address, + client *ethclient.Client, +) (*ConsensusContractAdapter, error) { + consensus, err := iconsensus.NewIConsensus(iconsensusAddress, client) + if err != nil { + return nil, err + } + return &ConsensusContractAdapter{ + consensus: consensus, + }, nil +} + +func (c *ConsensusContractAdapter) GetEpochLength(opts *bind.CallOpts) (*big.Int, error) { + return c.consensus.GetEpochLength(opts) +} + +func (c *ConsensusContractAdapter) RetrieveClaimAcceptanceEvents( + opts *bind.FilterOpts, + appAddresses []common.Address, +) ([]*iconsensus.IConsensusClaimAcceptance, error) { + + itr, err := c.consensus.FilterClaimAcceptance(opts, appAddresses) + if err != nil { + return nil, err + } + defer itr.Close() + + var events []*iconsensus.IConsensusClaimAcceptance + for itr.Next() { + claimAcceptanceEvent := itr.Event + events = append(events, claimAcceptanceEvent) + } + err = itr.Error() + if err != nil { + return nil, err + } + return events, nil +} diff --git a/internal/evmreader/evmreader.go b/internal/evmreader/evmreader.go index dfa2e01d..6f4b7ad2 100644 --- a/internal/evmreader/evmreader.go +++ b/internal/evmreader/evmreader.go @@ -13,6 +13,7 @@ import ( "slices" . "github.com/cartesi/rollups-node/internal/node/model" + "github.com/cartesi/rollups-node/pkg/contracts/iconsensus" "github.com/cartesi/rollups-node/pkg/contracts/inputbox" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -25,7 +26,7 @@ import ( type InputSource interface { // Wrapper for FilterInputAdded(), which is automatically generated // by go-ethereum and cannot be used for testing - RetrieveInputs(opts *bind.FilterOpts, appContract []common.Address, index []*big.Int, + RetrieveInputs(opts *bind.FilterOpts, appAddresses []common.Address, index []*big.Int, ) ([]inputbox.InputBoxInputAdded, error) } @@ -39,6 +40,12 @@ type EvmReaderRepository interface { GetAllRunningApplications(ctx context.Context) ([]Application, error) GetNodeConfig(ctx context.Context) (*NodePersistentConfig, error) GetEpoch(ctx context.Context, indexKey uint64, appAddressKey Address) (*Epoch, error) + GetPreviousSubmittedClaims(ctx context.Context, app Address, lastBlock uint64) ([]Epoch, error) + StoreClaimsTransaction(ctx context.Context, + app Address, + claims []*Epoch, + mostRecentBlockNumber uint64, + ) error } // EthClient mimics part of ethclient.Client functions to narrow down the @@ -55,6 +62,8 @@ type EthWsClient interface { type ConsensusContract interface { GetEpochLength(opts *bind.CallOpts) (*big.Int, error) + RetrieveClaimAcceptanceEvents(opts *bind.FilterOpts, appAddresses []common.Address, + ) ([]*iconsensus.IConsensusClaimAcceptance, error) } type ApplicationContract interface { @@ -74,6 +83,14 @@ func (e *SubscriptionError) Error() string { return fmt.Sprintf("Subscription error : %v", e.Cause) } +// Internal struct to hold application and it's +// contracts together +type application struct { + Application + applicationContract ApplicationContract + consensusContract ConsensusContract +} + // EvmReader reads inputs from the blockchain type EvmReader struct { client EthClient @@ -128,7 +145,7 @@ func (r *EvmReader) Run(ctx context.Context, ready chan<- struct{}) error { } } -// Watch for new blocks and reads new inputs based on the +// watchForNewBlocks watches for new blocks and reads new inputs based on the // default block configuration, which have not been processed yet. func (r *EvmReader) watchForNewBlocks(ctx context.Context, ready chan<- struct{}) error { headers := make(chan *types.Header) @@ -147,113 +164,58 @@ func (r *EvmReader) watchForNewBlocks(ctx context.Context, ready chan<- struct{} case err := <-sub.Err(): return &SubscriptionError{Cause: err} case <-headers: + // Every time a new block arrives - err = r.checkForNewInputs(ctx) + + // Get All Applications + runningApps, err := r.repository.GetAllRunningApplications(ctx) if err != nil { - slog.Error("Error checking for new inputs", + slog.Error("Error retrieving running applications for new inputs", "error", err, ) + continue } - } - } -} - -// Check if is there new Inputs for all running Applications -func (r *EvmReader) checkForNewInputs(ctx context.Context) error { - - slog.Debug("Checking for new inputs") - - // Get All Applications - apps, err := r.repository.GetAllRunningApplications(ctx) - if err != nil { - return err - } - - if len(apps) == 0 { - slog.Info("No running applications") - return nil - } - - groupedApps := r.classifyApplicationsByLastProcessedInput(apps) - - for lastProcessedBlock, apps := range groupedApps { - - appAddresses := appToAddresses(apps) - - // Safeguard: Only check blocks starting from the block where the InputBox - // contract was deployed as Inputs can be added to that same block - if lastProcessedBlock < r.inputBoxDeploymentBlock { - lastProcessedBlock = r.inputBoxDeploymentBlock - 1 - } - - mostRecentHeader, err := r.fetchMostRecentHeader( - ctx, - r.defaultBlock, - ) - if err != nil { - slog.Error("Error fetching most recent block", - "default block", r.defaultBlock, - "error", err) - continue - } - mostRecentBlockNumber := mostRecentHeader.Number.Uint64() - - if mostRecentBlockNumber > lastProcessedBlock { + // Build Contracts + var apps []application + for _, app := range runningApps { + applicationContract, consensusContract, err := r.getAppContracts(app) + if err != nil { + slog.Error("Error retrieving application contracts", "app", app, "error", err) + continue + } + apps = append(apps, application{Application: app, + applicationContract: applicationContract, + consensusContract: consensusContract}) + } - slog.Info("Checking inputs for applications", - "apps", appAddresses, - "last processed block", lastProcessedBlock, - "most recent block", mostRecentBlockNumber, - ) + if len(apps) == 0 { + slog.Info("No running consistent applications") + continue + } - err = r.readAndStoreInputs(ctx, - lastProcessedBlock+1, - mostRecentBlockNumber, - apps, + mostRecentHeader, err := r.fetchMostRecentHeader( + ctx, + r.defaultBlock, ) if err != nil { - slog.Error("Error reading inputs", - "apps", appAddresses, - "last processed block", lastProcessedBlock, - "most recent block", mostRecentBlockNumber, - "error", err, - ) + slog.Error("Error fetching most recent block", + "default block", r.defaultBlock, + "error", err) continue } - } else if mostRecentBlockNumber < lastProcessedBlock { - slog.Warn( - "Most recent block is lower than the last processed one", - "apps", appAddresses, - "last processed block", lastProcessedBlock, - "most recent block", mostRecentBlockNumber, - ) - } else { - slog.Info("Already checked the most recent blocks", - "apps", appAddresses, - "last processed block", lastProcessedBlock, - "most recent block", mostRecentBlockNumber, - ) - } - } + mostRecentBlockNumber := mostRecentHeader.Number.Uint64() - return nil -} + r.checkForNewInputs(ctx, apps, mostRecentBlockNumber) -// Group Applications that have processed til the same block height -func (r *EvmReader) classifyApplicationsByLastProcessedInput( - apps []Application, -) map[uint64][]Application { - result := make(map[uint64][]Application) - for _, app := range apps { - result[app.LastProcessedBlock] = append(result[app.LastProcessedBlock], app) - } + r.checkForClaimStatus(ctx, apps, mostRecentBlockNumber) - return result + } + } } -// Fetch the most recent header up till the +// fetchMostRecentHeader fetches the most recent header up till the // given default block func (r *EvmReader) fetchMostRecentHeader( ctx context.Context, @@ -288,202 +250,13 @@ func (r *EvmReader) fetchMostRecentHeader( return header, nil } -// Read and store inputs from the InputSource given specific filter options. -func (r *EvmReader) readAndStoreInputs( - ctx context.Context, - startBlock uint64, - endBlock uint64, - apps []Application, -) error { - appsToProcess := []common.Address{} - - for _, app := range apps { - - // Get App EpochLength - err := r.addAppEpochLengthIntoCache(app) - if err != nil { - slog.Error("Error adding epoch length into cache", - "app", app.ContractAddress, - "error", err) - continue - } - - appsToProcess = append(appsToProcess, app.ContractAddress) - - } - - if len(appsToProcess) == 0 { - slog.Warn("No valid running applications") - return nil - } - - // Retrieve Inputs from blockchain - appInputsMap, err := r.readInputsFromBlockchain(ctx, appsToProcess, startBlock, endBlock) - if err != nil { - return fmt.Errorf("failed to read inputs from block %v to block %v. %w", - startBlock, - endBlock, - err) - } - - // Index Inputs into epochs and handle epoch finalization - for address, inputs := range appInputsMap { - - epochLength := r.epochLengthCache[address] - - // Retrieves last open epoch from DB - currentEpoch, err := r.repository.GetEpoch(ctx, - calculateEpochIndex(epochLength, startBlock), address) - if err != nil { - slog.Error("Error retrieving existing current epoch", - "app", address, - "error", err, - ) - continue - } - - // Check current epoch status - if currentEpoch != nil && currentEpoch.Status != EpochStatusOpen { - slog.Error("Current epoch is not open", - "app", address, - "epoch-index", currentEpoch.Index, - "status", currentEpoch.Status, - ) - continue - } - - // Initialize epochs inputs map - var epochInputMap = make(map[*Epoch][]Input) - - // Index Inputs into epochs - for _, input := range inputs { - - inputEpochIndex := calculateEpochIndex(epochLength, input.BlockNumber) - - // If input belongs into a new epoch, close the previous known one - if currentEpoch != nil && currentEpoch.Index != inputEpochIndex { - currentEpoch.Status = EpochStatusClosed - slog.Info("Closing epoch", - "app", currentEpoch.AppAddress, - "epoch-index", currentEpoch.Index, - "start", currentEpoch.FirstBlock, - "end", currentEpoch.LastBlock) - // Add it to inputMap, so it will be stored - epochInputMap[currentEpoch] = []Input{} - currentEpoch = nil - } - if currentEpoch == nil { - currentEpoch = &Epoch{ - Index: inputEpochIndex, - FirstBlock: inputEpochIndex * epochLength, - LastBlock: (inputEpochIndex * epochLength) + epochLength - 1, - Status: EpochStatusOpen, - AppAddress: address, - } - } - - slog.Info("Indexing new Input into epoch", - "app", address, - "index", input.Index, - "block", input.BlockNumber, - "epoch-index", inputEpochIndex) - - currentInputs, ok := epochInputMap[currentEpoch] - if !ok { - currentInputs = []Input{} - } - epochInputMap[currentEpoch] = append(currentInputs, *input) - - } - - // Indexed all inputs. Check if it is time to close this epoch - if currentEpoch != nil && endBlock >= currentEpoch.LastBlock { - currentEpoch.Status = EpochStatusClosed - slog.Info("Closing epoch", - "app", currentEpoch.AppAddress, - "epoch-index", currentEpoch.Index, - "start", currentEpoch.FirstBlock, - "end", currentEpoch.LastBlock) - // Add to inputMap so it is stored - _, ok := epochInputMap[currentEpoch] - if !ok { - epochInputMap[currentEpoch] = []Input{} - } - } - - _, _, err = r.repository.StoreEpochAndInputsTransaction( - ctx, - epochInputMap, - endBlock, - address, - ) - if err != nil { - slog.Error("Error storing inputs and epochs", - "app", address, - "error", err, - ) - continue - } - - // Store everything - if len(epochInputMap) > 0 { - - slog.Debug("Inputs and epochs stored successfully", - "app", address, - "start-block", startBlock, - "end-block", endBlock, - "total epochs", len(epochInputMap), - "total inputs", len(inputs), - ) - } else { - slog.Debug("No inputs or epochs to store") - } - - } - - return nil -} - -// Checks the epoch length cache and read epoch length from IConsensus -// and add it to the cache if needed -func (r *EvmReader) addAppEpochLengthIntoCache(app Application) error { - - epochLength, ok := r.epochLengthCache[app.ContractAddress] - if !ok { - - consensus, err := r.getIConsensus(app) - if err != nil { - return errors.Join( - fmt.Errorf("error retrieving IConsensus contract for app: %s", - app.ContractAddress), - err) - } - - epochLength, err = r.getEpochLengthFromContract(consensus) - if err != nil { - return errors.Join( - fmt.Errorf("error retrieving epoch length from contracts for app %s", - app.ContractAddress), - err) - } - r.epochLengthCache[app.ContractAddress] = epochLength - slog.Info("Got epoch length from IConsensus", - "app", app.ContractAddress, - "epoch length", epochLength) - } else { - slog.Debug("Got epoch length from cache", - "app", app.ContractAddress, - "epoch length", epochLength) - } - - return nil -} - -// Retrieve ConsensusContract for a given Application -func (r *EvmReader) getIConsensus(app Application) (ConsensusContract, error) { +// getAppContracts retrieves the ApplicationContract and ConsensusContract for a given Application. +// Also validates if IConsensus configuration matches the blockchain registered one +func (r *EvmReader) getAppContracts(app Application, +) (ApplicationContract, ConsensusContract, error) { applicationContract, err := r.contractFactory.NewApplication(app.ContractAddress) if err != nil { - return nil, errors.Join( + return nil, nil, errors.Join( fmt.Errorf("error building application contract"), err, ) @@ -491,14 +264,14 @@ func (r *EvmReader) getIConsensus(app Application) (ConsensusContract, error) { } consensusAddress, err := applicationContract.GetConsensus(nil) if err != nil { - return nil, errors.Join( + return nil, nil, errors.Join( fmt.Errorf("error retrieving application consensus"), err, ) } if app.IConsensusAddress != consensusAddress { - return nil, + return nil, nil, fmt.Errorf("IConsensus addresses do not match. Deployed: %s. Configured: %s", consensusAddress, app.IConsensusAddress) @@ -506,17 +279,17 @@ func (r *EvmReader) getIConsensus(app Application) (ConsensusContract, error) { consensus, err := r.contractFactory.NewIConsensus(consensusAddress) if err != nil { - return nil, errors.Join( + return nil, nil, errors.Join( fmt.Errorf("error building consensus contract"), err, ) } - return consensus, nil + return applicationContract, consensus, nil } -// Reads the application epoch length given it's consesus contract -func (r *EvmReader) getEpochLengthFromContract(consensus ConsensusContract) (uint64, error) { +// getEpochLengthFromContract reads the application epoch length given it's consensus contract +func getEpochLengthFromContract(consensus ConsensusContract) (uint64, error) { epochLengthRaw, err := consensus.GetEpochLength(nil) if err != nil { @@ -529,57 +302,15 @@ func (r *EvmReader) getEpochLengthFromContract(consensus ConsensusContract) (uin return epochLengthRaw.Uint64(), nil } -// Read inputs from the blockchain ordered by Input index -func (r *EvmReader) readInputsFromBlockchain( - ctx context.Context, - appsAddresses []Address, - startBlock, endBlock uint64, -) (map[Address][]*Input, error) { - - // Initialize app input map - var appInputsMap = make(map[Address][]*Input) - for _, appsAddress := range appsAddresses { - appInputsMap[appsAddress] = []*Input{} - } - - opts := bind.FilterOpts{ - Context: ctx, - Start: startBlock, - End: &endBlock, - } - inputsEvents, err := r.inputSource.RetrieveInputs(&opts, appsAddresses, nil) - if err != nil { - return nil, err - } - - // Order inputs as order is not enforced by RetrieveInputs method nor the APIs - for _, event := range inputsEvents { - slog.Debug("Received input", - "app", event.AppContract, - "index", event.Index, - "block", event.Raw.BlockNumber) - input := &Input{ - Index: event.Index.Uint64(), - CompletionStatus: InputStatusNone, - RawData: event.Input, - BlockNumber: event.Raw.BlockNumber, - AppAddress: event.AppContract, - } - - // Insert Sorted - appInputsMap[event.AppContract] = insertSorted(appInputsMap[event.AppContract], input) - } - return appInputsMap, nil -} - // Util functions -// Calculates the epoch index given the input block number +// calculateEpochIndex calculates the epoch index given the input block number +// and epoch length func calculateEpochIndex(epochLength uint64, blockNumber uint64) uint64 { return blockNumber / epochLength } -func appToAddresses(apps []Application) []Address { +func appToAddresses(apps []application) []Address { var addresses []Address for _, app := range apps { addresses = append(addresses, app.ContractAddress) @@ -587,15 +318,34 @@ func appToAddresses(apps []Application) []Address { return addresses } +// sortByInputIndex is a compare function that orders Inputs +// by it's index field. It is intended to be used with +// `insertSorted`, see insertSorted() +func sortByInputIndex(a, b *Input) int { + return cmp.Compare(a.Index, b.Index) +} + // insertSorted inserts the received input in the slice at the position defined // by its index property. -func insertSorted(inputs []*Input, input *Input) []*Input { +func insertSorted[T any](compare func(a, b *T) int, slice []*T, item *T) []*T { // Insert Sorted i, _ := slices.BinarySearchFunc( - inputs, - input, - func(a, b *Input) int { - return cmp.Compare(a.Index, b.Index) - }) - return slices.Insert(inputs, i, input) + slice, + item, + compare) + return slices.Insert(slice, i, item) +} + +// Index applications given a key extractor function +func indexApps[K comparable]( + keyExtractor func(application) K, + apps []application, +) map[K][]application { + + result := make(map[K][]application) + for _, item := range apps { + key := keyExtractor(item) + result[key] = append(result[key], item) + } + return result } diff --git a/internal/evmreader/evmreader_test.go b/internal/evmreader/evmreader_test.go index 5362f02f..216ee00d 100644 --- a/internal/evmreader/evmreader_test.go +++ b/internal/evmreader/evmreader_test.go @@ -9,11 +9,11 @@ import ( "encoding/json" "fmt" "math/big" - "sync" "testing" "time" . "github.com/cartesi/rollups-node/internal/node/model" + "github.com/cartesi/rollups-node/pkg/contracts/iconsensus" "github.com/cartesi/rollups-node/pkg/contracts/inputbox" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -167,209 +167,41 @@ func (s *EvmReaderSuite) TestItFailsToSubscribeForNewInputsOnStart() { s.client.AssertNumberOfCalls(s.T(), "SubscribeNewHead", 1) } -func (s *EvmReaderSuite) TestItReadsInputsFromNewBlocks() { +func (s *EvmReaderSuite) TestItWrongIConsensus() { - waitGroup := sync.WaitGroup{} - wsClient := FakeWSEhtClient{} - wsClient.NewHeaders = []*types.Header{&header0, &header1} - wsClient.WaitGroup = &waitGroup - inputReader := NewEvmReader( - s.client, - &wsClient, - s.inputBox, - s.repository, - 0x10, - DefaultBlockStatusLatest, - s.contractFactory, - ) - - // Prepare repository - s.repository.Unset("GetAllRunningApplications") - s.repository.On( - "GetAllRunningApplications", - mock.Anything, - ).Return([]Application{{ - ContractAddress: common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E"), - IConsensusAddress: common.HexToAddress("0xdeadbeef"), - LastProcessedBlock: 0x00, - }}, nil).Once() - s.repository.On( - "GetAllRunningApplications", - mock.Anything, - ).Return([]Application{{ - ContractAddress: common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E"), - IConsensusAddress: common.HexToAddress("0xdeadbeef"), - LastProcessedBlock: 0x11, - }}, nil).Once() - - // Prepare Client - s.client.Unset("HeaderByNumber") - s.client.On( - "HeaderByNumber", - mock.Anything, - mock.Anything, - ).Return(&header0, nil).Once() - s.client.On( - "HeaderByNumber", - mock.Anything, - mock.Anything, - ).Return(&header1, nil).Once() - s.client.On( - "HeaderByNumber", - mock.Anything, - mock.Anything, - ).Return(&header2, nil).Once() + consensusContract := &MockIConsensusContract{} - // Prepare sequence of inputs - s.inputBox.Unset("RetrieveInputs") - events_0 := []inputbox.InputBoxInputAdded{inputAddedEvent0} - mostRecentBlockNumber_0 := uint64(0x11) - retrieveInputsOpts_0 := bind.FilterOpts{ - Context: s.ctx, - Start: 0x10, - End: &mostRecentBlockNumber_0, - } - s.inputBox.On( - "RetrieveInputs", - &retrieveInputsOpts_0, - mock.Anything, - mock.Anything, - ).Return(events_0, nil) + contractFactory := newEmvReaderContractFactory() - events_1 := []inputbox.InputBoxInputAdded{inputAddedEvent1} - mostRecentBlockNumber_1 := uint64(0x12) - retrieveInputsOpts_1 := bind.FilterOpts{ - Context: s.ctx, - Start: 0x12, - End: &mostRecentBlockNumber_1, - } - s.inputBox.On( - "RetrieveInputs", - &retrieveInputsOpts_1, + contractFactory.Unset("NewIConsensus") + contractFactory.On("NewIConsensus", mock.Anything, - mock.Anything, - ).Return(events_1, nil) - - // Start service - ready := make(chan struct{}, 1) - errChannel := make(chan error, 1) + ).Return(consensusContract, nil) - waitGroup.Add(1) - go func() { - errChannel <- inputReader.Run(s.ctx, ready) - }() - - select { - case <-ready: - break - case err := <-errChannel: - s.FailNow("unexpected error signal", err) - } - - waitGroup.Wait() - - s.inputBox.AssertNumberOfCalls(s.T(), "RetrieveInputs", 2) - s.repository.AssertNumberOfCalls( - s.T(), - "StoreEpochAndInputsTransaction", - 2, - ) -} - -func (s *EvmReaderSuite) TestItReadsInputsFromNewBlocksWrongIConsensus() { - - waitGroup := sync.WaitGroup{} wsClient := FakeWSEhtClient{} - wsClient.NewHeaders = []*types.Header{&header0, &header1} - wsClient.WaitGroup = &waitGroup - inputReader := NewEvmReader( + + evmReader := NewEvmReader( s.client, &wsClient, s.inputBox, s.repository, 0x10, DefaultBlockStatusLatest, - s.contractFactory, + contractFactory, ) - // Prepare repository - s.repository.Unset("GetAllRunningApplications") - s.repository.On( - "GetAllRunningApplications", - mock.Anything, - ).Return([]Application{{ - ContractAddress: common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E"), - IConsensusAddress: common.HexToAddress("0xFFFFFFFF"), - LastProcessedBlock: 0x00, - }}, nil).Once() - s.repository.On( - "GetAllRunningApplications", - mock.Anything, - ).Return([]Application{{ - ContractAddress: common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E"), - IConsensusAddress: common.HexToAddress("0xFFFFFFFF"), - LastProcessedBlock: 0x11, - }}, nil).Once() + // Prepare consensus + claimEvent0 := &iconsensus.IConsensusClaimAcceptance{ + AppContract: common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E"), + LastProcessedBlockNumber: big.NewInt(3), + Claim: common.HexToHash("0xdeadbeef"), + } - // Prepare Client - s.client.Unset("HeaderByNumber") - s.client.On( - "HeaderByNumber", - mock.Anything, - mock.Anything, - ).Return(&header0, nil).Once() - s.client.On( - "HeaderByNumber", - mock.Anything, - mock.Anything, - ).Return(&header1, nil).Once() - s.client.On( - "HeaderByNumber", + claimEvents := []*iconsensus.IConsensusClaimAcceptance{claimEvent0} + consensusContract.On("RetrieveClaimAcceptanceEvents", mock.Anything, mock.Anything, - ).Return(&header2, nil).Once() - - // Start service - ready := make(chan struct{}, 1) - errChannel := make(chan error, 1) - - waitGroup.Add(1) - go func() { - errChannel <- inputReader.Run(s.ctx, ready) - }() - - select { - case <-ready: - break - case err := <-errChannel: - s.FailNow("unexpected error signal", err) - } - - waitGroup.Wait() - - s.inputBox.AssertNumberOfCalls(s.T(), "RetrieveInputs", 0) - s.repository.AssertNumberOfCalls( - s.T(), - "StoreEpochAndInputsTransaction", - 0, - ) -} - -func (s *EvmReaderSuite) TestItUpdatesLastProcessedBlockWhenThereIsNoInputs() { - - waitGroup := sync.WaitGroup{} - wsClient := FakeWSEhtClient{} - wsClient.NewHeaders = []*types.Header{&header0, &header1} - wsClient.WaitGroup = &waitGroup - inputReader := NewEvmReader( - s.client, - &wsClient, - s.inputBox, - s.repository, - 0x10, - DefaultBlockStatusLatest, - s.contractFactory, - ) + ).Return(claimEvents, nil).Once() // Prepare repository s.repository.Unset("GetAllRunningApplications") @@ -378,17 +210,9 @@ func (s *EvmReaderSuite) TestItUpdatesLastProcessedBlockWhenThereIsNoInputs() { mock.Anything, ).Return([]Application{{ ContractAddress: common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E"), - IConsensusAddress: common.HexToAddress("0xdeadbeef"), + IConsensusAddress: common.HexToAddress("0xFFFFFFFF"), LastProcessedBlock: 0x00, }}, nil).Once() - s.repository.On( - "GetAllRunningApplications", - mock.Anything, - ).Return([]Application{{ - ContractAddress: common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E"), - IConsensusAddress: common.HexToAddress("0xdeadbeef"), - LastProcessedBlock: 0x11, - }}, nil).Once() // Prepare Client s.client.Unset("HeaderByNumber") @@ -397,150 +221,13 @@ func (s *EvmReaderSuite) TestItUpdatesLastProcessedBlockWhenThereIsNoInputs() { mock.Anything, mock.Anything, ).Return(&header0, nil).Once() - s.client.On( - "HeaderByNumber", - mock.Anything, - mock.Anything, - ).Return(&header1, nil).Once() - s.client.On( - "HeaderByNumber", - mock.Anything, - mock.Anything, - ).Return(&header2, nil).Once() - - // Prepare sequence of inputs - s.inputBox.Unset("RetrieveInputs") - events_0 := []inputbox.InputBoxInputAdded{} - mostRecentBlockNumber_0 := uint64(0x11) - retrieveInputsOpts_0 := bind.FilterOpts{ - Context: s.ctx, - Start: 0x10, - End: &mostRecentBlockNumber_0, - } - s.inputBox.On( - "RetrieveInputs", - &retrieveInputsOpts_0, - mock.Anything, - mock.Anything, - ).Return(events_0, nil) - - events_1 := []inputbox.InputBoxInputAdded{} - mostRecentBlockNumber_1 := uint64(0x12) - retrieveInputsOpts_1 := bind.FilterOpts{ - Context: s.ctx, - Start: 0x12, - End: &mostRecentBlockNumber_1, - } - s.inputBox.On( - "RetrieveInputs", - &retrieveInputsOpts_1, - mock.Anything, - mock.Anything, - ).Return(events_1, nil) - - // Start service - ready := make(chan struct{}, 1) - errChannel := make(chan error, 1) - - waitGroup.Add(1) - go func() { - errChannel <- inputReader.Run(s.ctx, ready) - }() - - select { - case <-ready: - break - case err := <-errChannel: - s.FailNow("unexpected error signal", err) - } - - waitGroup.Wait() - - s.inputBox.AssertNumberOfCalls(s.T(), "RetrieveInputs", 2) - s.repository.AssertNumberOfCalls( - s.T(), - "StoreEpochAndInputsTransaction", - 2, - ) -} - -func (s *EvmReaderSuite) TestItReadsMultipleInputsFromSingleNewBlock() { - - waitGroup := sync.WaitGroup{} - wsClient := FakeWSEhtClient{} - wsClient.NewHeaders = []*types.Header{&header2} - wsClient.WaitGroup = &waitGroup - inputReader := NewEvmReader( - s.client, - &wsClient, - s.inputBox, - s.repository, - 0x10, - DefaultBlockStatusLatest, - s.contractFactory, - ) - - // Prepare Client - s.client.Unset("HeaderByNumber") - s.client.On( - "HeaderByNumber", - mock.Anything, - mock.Anything, - ).Return(&header2, nil).Once() - - // Prepare sequence of inputs - s.inputBox.Unset("RetrieveInputs") - events_2 := []inputbox.InputBoxInputAdded{inputAddedEvent2, inputAddedEvent3} - mostRecentBlockNumber_2 := uint64(0x13) - retrieveInputsOpts_2 := bind.FilterOpts{ - Context: s.ctx, - Start: 0x13, - End: &mostRecentBlockNumber_2, - } - s.inputBox.On( - "RetrieveInputs", - &retrieveInputsOpts_2, - mock.Anything, - mock.Anything, - ).Return(events_2, nil) - - // Prepare Repo - s.repository.Unset("GetAllRunningApplications") - s.repository.On( - "GetAllRunningApplications", - mock.Anything, - ).Return([]Application{{ - ContractAddress: common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E"), - IConsensusAddress: common.HexToAddress("0xdeadbeef"), - LastProcessedBlock: 0x12, - }}, nil).Once() - s.repository.Unset("StoreEpochAndInputsTransaction") - s.repository.On( - "StoreEpochAndInputsTransaction", - mock.Anything, - mock.Anything, - mock.Anything, - mock.Anything, - ).Once().Run(func(arguments mock.Arguments) { - var epochInputMap map[*Epoch][]Input - obj := arguments.Get(1) - epochInputMap, ok := obj.(map[*Epoch][]Input) - s.Require().True(ok) - s.Require().Equal(1, len(epochInputMap)) - for _, inputs := range epochInputMap { - s.Require().Equal(2, len(inputs)) - break - } - - }).Return(make(map[uint64]uint64), make(map[uint64][]uint64), nil) // Start service ready := make(chan struct{}, 1) errChannel := make(chan error, 1) - waitGroup.Add(1) go func() { - errChannel <- inputReader.Run(s.ctx, ready) + errChannel <- evmReader.Run(s.ctx, ready) }() select { @@ -550,73 +237,22 @@ func (s *EvmReaderSuite) TestItReadsMultipleInputsFromSingleNewBlock() { s.FailNow("unexpected error signal", err) } - waitGroup.Wait() + wsClient.fireNewHead(&header0) + time.Sleep(time.Second) - s.inputBox.AssertNumberOfCalls(s.T(), "RetrieveInputs", 1) + // Should not advance input processing + s.inputBox.AssertNumberOfCalls(s.T(), "RetrieveInputs", 0) s.repository.AssertNumberOfCalls( s.T(), "StoreEpochAndInputsTransaction", - 1, - ) -} - -func (s *EvmReaderSuite) TestItStartsWhenLasProcessedBlockIsTheMostRecentBlock() { - - waitGroup := sync.WaitGroup{} - wsClient := FakeWSEhtClient{} - wsClient.NewHeaders = []*types.Header{&header2} - wsClient.WaitGroup = &waitGroup - inputReader := NewEvmReader( - s.client, - &wsClient, - s.inputBox, - s.repository, - 0x10, - DefaultBlockStatusLatest, - s.contractFactory, + 0, ) - // Prepare Client - s.client.Unset("HeaderByNumber") - s.client.On( - "HeaderByNumber", - mock.Anything, - mock.Anything, - ).Return(&header0, nil).Once() - - // Prepare Repo - s.repository.Unset("GetAllRunningApplications") - s.repository.On( - "GetAllRunningApplications", - mock.Anything, - ).Return([]Application{{ - ContractAddress: common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E"), - IConsensusAddress: common.HexToAddress("0xdeadbeef"), - LastProcessedBlock: 0x11, - }}, nil).Once() - - // Start service - ready := make(chan struct{}, 1) - errChannel := make(chan error, 1) - - waitGroup.Add(1) - go func() { - errChannel <- inputReader.Run(s.ctx, ready) - }() - - select { - case <-ready: - break - case err := <-errChannel: - s.FailNow("unexpected error signal", err) - } - - waitGroup.Wait() - - s.inputBox.AssertNumberOfCalls(s.T(), "RetrieveInputs", 0) + // Should not advance claim acceptance processing + s.inputBox.AssertNumberOfCalls(s.T(), "RetrieveClaimAcceptanceEvents", 0) s.repository.AssertNumberOfCalls( s.T(), - "StoreEpochAndInputsTransaction", + "StoreClaimsTransaction", 0, ) } @@ -690,26 +326,21 @@ func (m *MockSubscription) Err() <-chan error { // FakeClient type FakeWSEhtClient struct { - NewHeaders []*types.Header - WaitGroup *sync.WaitGroup + ch chan<- *types.Header } func (f *FakeWSEhtClient) SubscribeNewHead( ctx context.Context, ch chan<- *types.Header, ) (ethereum.Subscription, error) { - go func() { - - for _, header := range f.NewHeaders { - ch <- header - } - //Give some time to headers to be processed - time.Sleep(1 * time.Second) - f.WaitGroup.Done() - }() + f.ch = ch return newMockSubscription(), nil } +func (f *FakeWSEhtClient) fireNewHead(header *types.Header) { + f.ch <- header +} + // Mock inputbox.InputBox type MockInputBox struct { mock.Mock @@ -806,6 +437,18 @@ func newMockRepository() *MockRepository { mock.Anything, mock.Anything).Return(1, nil) + repo.On("GetPreviousSubmittedClaims", + mock.Anything, + mock.Anything, + ).Return([]Epoch{}, nil) + + repo.On("StoreClaimsTransaction", + mock.Anything, + mock.Anything, + mock.Anything, + mock.Anything, + ).Return(nil) + return repo } @@ -859,6 +502,24 @@ func (m *MockRepository) InsertEpoch( return args.Get(0).(uint64), args.Error(1) } +func (m *MockRepository) GetPreviousSubmittedClaims( + ctx context.Context, + app Address, + lastBlock uint64, +) ([]Epoch, error) { + args := m.Called(ctx, app, lastBlock) + return args.Get(0).([]Epoch), args.Error(1) + +} +func (m *MockRepository) StoreClaimsTransaction(ctx context.Context, + app Address, + epochs []*Epoch, + mostRecentBlockNumber uint64, +) error { + args := m.Called(ctx, epochs, mostRecentBlockNumber) + return args.Error(0) +} + type MockApplicationContract struct { mock.Mock } @@ -874,7 +535,7 @@ func (m *MockApplicationContract) Unset(methodName string) { func (m *MockApplicationContract) GetConsensus( opts *bind.CallOpts, ) (common.Address, error) { - args := m.Called(context.Background()) + args := m.Called(opts) return args.Get(0).(common.Address), args.Error(1) } @@ -885,10 +546,17 @@ type MockIConsensusContract struct { func (m *MockIConsensusContract) GetEpochLength( opts *bind.CallOpts, ) (*big.Int, error) { - args := m.Called(context.Background()) + args := m.Called(opts) return args.Get(0).(*big.Int), args.Error(1) } +func (m *MockIConsensusContract) RetrieveClaimAcceptanceEvents( + opts *bind.FilterOpts, appAddresses []common.Address, +) ([]*iconsensus.IConsensusClaimAcceptance, error) { + args := m.Called(opts, appAddresses) + return args.Get(0).([]*iconsensus.IConsensusClaimAcceptance), args.Error(1) +} + type MockEvmReaderContractFactory struct { mock.Mock } @@ -928,6 +596,11 @@ func newEmvReaderContractFactory() *MockEvmReaderContractFactory { consensusContract.On("GetEpochLength", mock.Anything).Return(big.NewInt(10), nil) + consensusContract.On("RetrieveClaimAcceptanceEvents", + mock.Anything, + mock.Anything, + ).Return([]*iconsensus.IConsensusClaimAcceptance{}, nil) + factory := &MockEvmReaderContractFactory{} factory.On("NewApplication", diff --git a/internal/evmreader/input.go b/internal/evmreader/input.go new file mode 100644 index 00000000..ab5768b7 --- /dev/null +++ b/internal/evmreader/input.go @@ -0,0 +1,308 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +package evmreader + +import ( + "context" + "errors" + "fmt" + "log/slog" + + . "github.com/cartesi/rollups-node/internal/node/model" + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" +) + +// checkForNewInputs checks if is there new Inputs for all running Applications +func (r *EvmReader) checkForNewInputs( + ctx context.Context, + apps []application, + mostRecentBlockNumber uint64, +) { + + slog.Debug("Checking for new inputs") + + groupedApps := indexApps(byLastProcessedBlock, apps) + + for lastProcessedBlock, apps := range groupedApps { + + appAddresses := appToAddresses(apps) + + // Safeguard: Only check blocks starting from the block where the InputBox + // contract was deployed as Inputs can be added to that same block + if lastProcessedBlock < r.inputBoxDeploymentBlock { + lastProcessedBlock = r.inputBoxDeploymentBlock - 1 + } + + if mostRecentBlockNumber > lastProcessedBlock { + + slog.Info("Checking inputs for applications", + "apps", appAddresses, + "last processed block", lastProcessedBlock, + "most recent block", mostRecentBlockNumber, + ) + + err := r.readAndStoreInputs(ctx, + lastProcessedBlock+1, + mostRecentBlockNumber, + apps, + ) + if err != nil { + slog.Error("Error reading inputs", + "apps", appAddresses, + "last processed block", lastProcessedBlock, + "most recent block", mostRecentBlockNumber, + "error", err, + ) + continue + } + } else if mostRecentBlockNumber < lastProcessedBlock { + slog.Warn( + "Not reading inputs: most recent block is lower than the last processed one", + "apps", appAddresses, + "last processed block", lastProcessedBlock, + "most recent block", mostRecentBlockNumber, + ) + } else { + slog.Info("Not reading inputs: already checked the most recent blocks", + "apps", appAddresses, + "last processed block", lastProcessedBlock, + "most recent block", mostRecentBlockNumber, + ) + } + } +} + +// readAndStoreInputs reads, inputs from the InputSource given specific filter options, indexes +// them into epochs and store the indexed inputs and epochs +func (r *EvmReader) readAndStoreInputs( + ctx context.Context, + startBlock uint64, + endBlock uint64, + apps []application, +) error { + appsToProcess := []common.Address{} + + for _, app := range apps { + + // Get App EpochLength + err := r.addAppEpochLengthIntoCache(app) + if err != nil { + slog.Error("Error adding epoch length into cache", + "app", app.ContractAddress, + "error", err) + continue + } + + appsToProcess = append(appsToProcess, app.ContractAddress) + + } + + if len(appsToProcess) == 0 { + slog.Warn("No valid running applications") + return nil + } + + // Retrieve Inputs from blockchain + appInputsMap, err := r.readInputsFromBlockchain(ctx, appsToProcess, startBlock, endBlock) + if err != nil { + return fmt.Errorf("failed to read inputs from block %v to block %v. %w", + startBlock, + endBlock, + err) + } + + // Index Inputs into epochs and handle epoch finalization + for address, inputs := range appInputsMap { + + epochLength := r.epochLengthCache[address] + + // Retrieves last open epoch from DB + currentEpoch, err := r.repository.GetEpoch(ctx, + calculateEpochIndex(epochLength, startBlock), address) + if err != nil { + slog.Error("Error retrieving existing current epoch", + "app", address, + "error", err, + ) + continue + } + + // Check current epoch status + if currentEpoch != nil && currentEpoch.Status != EpochStatusOpen { + slog.Error("Current epoch is not open", + "app", address, + "epoch-index", currentEpoch.Index, + "status", currentEpoch.Status, + ) + continue + } + + // Initialize epochs inputs map + var epochInputMap = make(map[*Epoch][]Input) + + // Index Inputs into epochs + for _, input := range inputs { + + inputEpochIndex := calculateEpochIndex(epochLength, input.BlockNumber) + + // If input belongs into a new epoch, close the previous known one + if currentEpoch != nil && currentEpoch.Index != inputEpochIndex { + currentEpoch.Status = EpochStatusClosed + slog.Info("Closing epoch", + "app", currentEpoch.AppAddress, + "epoch-index", currentEpoch.Index, + "start", currentEpoch.FirstBlock, + "end", currentEpoch.LastBlock) + // Add it to inputMap, so it will be stored + epochInputMap[currentEpoch] = []Input{} + currentEpoch = nil + } + if currentEpoch == nil { + currentEpoch = &Epoch{ + Index: inputEpochIndex, + FirstBlock: inputEpochIndex * epochLength, + LastBlock: (inputEpochIndex * epochLength) + epochLength - 1, + Status: EpochStatusOpen, + AppAddress: address, + } + } + + slog.Info("Indexing new Input into epoch", + "app", address, + "index", input.Index, + "block", input.BlockNumber, + "epoch-index", inputEpochIndex) + + currentInputs, ok := epochInputMap[currentEpoch] + if !ok { + currentInputs = []Input{} + } + epochInputMap[currentEpoch] = append(currentInputs, *input) + + } + + // Indexed all inputs. Check if it is time to close this epoch + if currentEpoch != nil && endBlock >= currentEpoch.LastBlock { + currentEpoch.Status = EpochStatusClosed + slog.Info("Closing epoch", + "app", currentEpoch.AppAddress, + "epoch-index", currentEpoch.Index, + "start", currentEpoch.FirstBlock, + "end", currentEpoch.LastBlock) + // Add to inputMap so it is stored + _, ok := epochInputMap[currentEpoch] + if !ok { + epochInputMap[currentEpoch] = []Input{} + } + } + + _, _, err = r.repository.StoreEpochAndInputsTransaction( + ctx, + epochInputMap, + endBlock, + address, + ) + if err != nil { + slog.Error("Error storing inputs and epochs", + "app", address, + "error", err, + ) + continue + } + + // Store everything + if len(epochInputMap) > 0 { + + slog.Debug("Inputs and epochs stored successfully", + "app", address, + "start-block", startBlock, + "end-block", endBlock, + "total epochs", len(epochInputMap), + "total inputs", len(inputs), + ) + } else { + slog.Debug("No inputs or epochs to store") + } + + } + + return nil +} + +// addAppEpochLengthIntoCache checks the epoch length cache and read epoch length from IConsensus +// contract and add it to the cache if needed +func (r *EvmReader) addAppEpochLengthIntoCache(app application) error { + + epochLength, ok := r.epochLengthCache[app.ContractAddress] + if !ok { + + epochLength, err := getEpochLengthFromContract(app.consensusContract) + if err != nil { + return errors.Join( + fmt.Errorf("error retrieving epoch length from contracts for app %s", + app.ContractAddress), + err) + } + r.epochLengthCache[app.ContractAddress] = epochLength + slog.Info("Got epoch length from IConsensus", + "app", app.ContractAddress, + "epoch length", epochLength) + } else { + slog.Debug("Got epoch length from cache", + "app", app.ContractAddress, + "epoch length", epochLength) + } + + return nil +} + +// readInputsFromBlockchain read the inputs from the blockchain ordered by Input index +func (r *EvmReader) readInputsFromBlockchain( + ctx context.Context, + appsAddresses []Address, + startBlock, endBlock uint64, +) (map[Address][]*Input, error) { + + // Initialize app input map + var appInputsMap = make(map[Address][]*Input) + for _, appsAddress := range appsAddresses { + appInputsMap[appsAddress] = []*Input{} + } + + opts := bind.FilterOpts{ + Context: ctx, + Start: startBlock, + End: &endBlock, + } + inputsEvents, err := r.inputSource.RetrieveInputs(&opts, appsAddresses, nil) + if err != nil { + return nil, err + } + + // Order inputs as order is not enforced by RetrieveInputs method nor the APIs + for _, event := range inputsEvents { + slog.Debug("Received input", + "app", event.AppContract, + "index", event.Index, + "block", event.Raw.BlockNumber) + input := &Input{ + Index: event.Index.Uint64(), + CompletionStatus: InputStatusNone, + RawData: event.Input, + BlockNumber: event.Raw.BlockNumber, + AppAddress: event.AppContract, + } + + // Insert Sorted + appInputsMap[event.AppContract] = insertSorted( + sortByInputIndex, appInputsMap[event.AppContract], input) + } + return appInputsMap, nil +} + +// // byLastProcessedBlock key extractor function intended to be used with `indexApps` function +func byLastProcessedBlock(app application) uint64 { + return app.LastProcessedBlock +} diff --git a/internal/evmreader/input_test.go b/internal/evmreader/input_test.go new file mode 100644 index 00000000..a5776c99 --- /dev/null +++ b/internal/evmreader/input_test.go @@ -0,0 +1,461 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +package evmreader + +import ( + "time" + + . "github.com/cartesi/rollups-node/internal/node/model" + "github.com/cartesi/rollups-node/pkg/contracts/inputbox" + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/mock" +) + +func (s *EvmReaderSuite) TestItReadsInputsFromNewBlocks() { + + wsClient := FakeWSEhtClient{} + + evmReader := NewEvmReader( + s.client, + &wsClient, + s.inputBox, + s.repository, + 0x10, + DefaultBlockStatusLatest, + s.contractFactory, + ) + + // Prepare repository + s.repository.Unset("GetAllRunningApplications") + s.repository.On( + "GetAllRunningApplications", + mock.Anything, + ).Return([]Application{{ + ContractAddress: common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E"), + IConsensusAddress: common.HexToAddress("0xdeadbeef"), + LastProcessedBlock: 0x00, + }}, nil).Once() + s.repository.On( + "GetAllRunningApplications", + mock.Anything, + ).Return([]Application{{ + ContractAddress: common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E"), + IConsensusAddress: common.HexToAddress("0xdeadbeef"), + LastProcessedBlock: 0x11, + }}, nil).Once() + + // Prepare Client + s.client.Unset("HeaderByNumber") + s.client.On( + "HeaderByNumber", + mock.Anything, + mock.Anything, + ).Return(&header0, nil).Once() + s.client.On( + "HeaderByNumber", + mock.Anything, + mock.Anything, + ).Return(&header1, nil).Once() + s.client.On( + "HeaderByNumber", + mock.Anything, + mock.Anything, + ).Return(&header2, nil).Once() + + // Prepare sequence of inputs + s.inputBox.Unset("RetrieveInputs") + events_0 := []inputbox.InputBoxInputAdded{inputAddedEvent0} + mostRecentBlockNumber_0 := uint64(0x11) + retrieveInputsOpts_0 := bind.FilterOpts{ + Context: s.ctx, + Start: 0x10, + End: &mostRecentBlockNumber_0, + } + s.inputBox.On( + "RetrieveInputs", + &retrieveInputsOpts_0, + mock.Anything, + mock.Anything, + ).Return(events_0, nil) + + events_1 := []inputbox.InputBoxInputAdded{inputAddedEvent1} + mostRecentBlockNumber_1 := uint64(0x12) + retrieveInputsOpts_1 := bind.FilterOpts{ + Context: s.ctx, + Start: 0x12, + End: &mostRecentBlockNumber_1, + } + s.inputBox.On( + "RetrieveInputs", + &retrieveInputsOpts_1, + mock.Anything, + mock.Anything, + ).Return(events_1, nil) + + // Start service + ready := make(chan struct{}, 1) + errChannel := make(chan error, 1) + + go func() { + errChannel <- evmReader.Run(s.ctx, ready) + }() + + select { + case <-ready: + break + case err := <-errChannel: + s.FailNow("unexpected error signal", err) + } + + wsClient.fireNewHead(&header0) + wsClient.fireNewHead(&header1) + time.Sleep(time.Second) + + s.inputBox.AssertNumberOfCalls(s.T(), "RetrieveInputs", 2) + s.repository.AssertNumberOfCalls( + s.T(), + "StoreEpochAndInputsTransaction", + 2, + ) +} + +func (s *EvmReaderSuite) TestItReadsInputsFromNewBlocksWrongIConsensus() { + + wsClient := FakeWSEhtClient{} + + evmReader := NewEvmReader( + s.client, + &wsClient, + s.inputBox, + s.repository, + 0x10, + DefaultBlockStatusLatest, + s.contractFactory, + ) + + // Prepare repository + s.repository.Unset("GetAllRunningApplications") + s.repository.On( + "GetAllRunningApplications", + mock.Anything, + ).Return([]Application{{ + ContractAddress: common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E"), + IConsensusAddress: common.HexToAddress("0xFFFFFFFF"), + LastProcessedBlock: 0x00, + }}, nil).Once() + s.repository.On( + "GetAllRunningApplications", + mock.Anything, + ).Return([]Application{{ + ContractAddress: common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E"), + IConsensusAddress: common.HexToAddress("0xFFFFFFFF"), + LastProcessedBlock: 0x11, + }}, nil).Once() + + // Prepare Client + s.client.Unset("HeaderByNumber") + s.client.On( + "HeaderByNumber", + mock.Anything, + mock.Anything, + ).Return(&header0, nil).Once() + s.client.On( + "HeaderByNumber", + mock.Anything, + mock.Anything, + ).Return(&header1, nil).Once() + s.client.On( + "HeaderByNumber", + mock.Anything, + mock.Anything, + ).Return(&header2, nil).Once() + + // Start service + ready := make(chan struct{}, 1) + errChannel := make(chan error, 1) + + go func() { + errChannel <- evmReader.Run(s.ctx, ready) + }() + + select { + case <-ready: + break + case err := <-errChannel: + s.FailNow("unexpected error signal", err) + } + + wsClient.fireNewHead(&header0) + wsClient.fireNewHead(&header1) + time.Sleep(time.Second) + + s.inputBox.AssertNumberOfCalls(s.T(), "RetrieveInputs", 0) + s.repository.AssertNumberOfCalls( + s.T(), + "StoreEpochAndInputsTransaction", + 0, + ) +} + +func (s *EvmReaderSuite) TestItUpdatesLastProcessedBlockWhenThereIsNoInputs() { + + wsClient := FakeWSEhtClient{} + + evmReader := NewEvmReader( + s.client, + &wsClient, + s.inputBox, + s.repository, + 0x10, + DefaultBlockStatusLatest, + s.contractFactory, + ) + + // Prepare repository + s.repository.Unset("GetAllRunningApplications") + s.repository.On( + "GetAllRunningApplications", + mock.Anything, + ).Return([]Application{{ + ContractAddress: common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E"), + IConsensusAddress: common.HexToAddress("0xdeadbeef"), + LastProcessedBlock: 0x00, + }}, nil).Once() + s.repository.On( + "GetAllRunningApplications", + mock.Anything, + ).Return([]Application{{ + ContractAddress: common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E"), + IConsensusAddress: common.HexToAddress("0xdeadbeef"), + LastProcessedBlock: 0x11, + }}, nil).Once() + + // Prepare Client + s.client.Unset("HeaderByNumber") + s.client.On( + "HeaderByNumber", + mock.Anything, + mock.Anything, + ).Return(&header0, nil).Once() + s.client.On( + "HeaderByNumber", + mock.Anything, + mock.Anything, + ).Return(&header1, nil).Once() + s.client.On( + "HeaderByNumber", + mock.Anything, + mock.Anything, + ).Return(&header2, nil).Once() + + // Prepare sequence of inputs + s.inputBox.Unset("RetrieveInputs") + events_0 := []inputbox.InputBoxInputAdded{} + mostRecentBlockNumber_0 := uint64(0x11) + retrieveInputsOpts_0 := bind.FilterOpts{ + Context: s.ctx, + Start: 0x10, + End: &mostRecentBlockNumber_0, + } + s.inputBox.On( + "RetrieveInputs", + &retrieveInputsOpts_0, + mock.Anything, + mock.Anything, + ).Return(events_0, nil) + + events_1 := []inputbox.InputBoxInputAdded{} + mostRecentBlockNumber_1 := uint64(0x12) + retrieveInputsOpts_1 := bind.FilterOpts{ + Context: s.ctx, + Start: 0x12, + End: &mostRecentBlockNumber_1, + } + s.inputBox.On( + "RetrieveInputs", + &retrieveInputsOpts_1, + mock.Anything, + mock.Anything, + ).Return(events_1, nil) + + // Start service + ready := make(chan struct{}, 1) + errChannel := make(chan error, 1) + + go func() { + errChannel <- evmReader.Run(s.ctx, ready) + }() + + select { + case <-ready: + break + case err := <-errChannel: + s.FailNow("unexpected error signal", err) + } + + wsClient.fireNewHead(&header0) + wsClient.fireNewHead(&header1) + time.Sleep(time.Second) + + s.inputBox.AssertNumberOfCalls(s.T(), "RetrieveInputs", 2) + s.repository.AssertNumberOfCalls( + s.T(), + "StoreEpochAndInputsTransaction", + 2, + ) +} + +func (s *EvmReaderSuite) TestItReadsMultipleInputsFromSingleNewBlock() { + + wsClient := FakeWSEhtClient{} + + inputReader := NewEvmReader( + s.client, + &wsClient, + s.inputBox, + s.repository, + 0x10, + DefaultBlockStatusLatest, + s.contractFactory, + ) + + // Prepare Client + s.client.Unset("HeaderByNumber") + s.client.On( + "HeaderByNumber", + mock.Anything, + mock.Anything, + ).Return(&header2, nil).Once() + + // Prepare sequence of inputs + s.inputBox.Unset("RetrieveInputs") + events_2 := []inputbox.InputBoxInputAdded{inputAddedEvent2, inputAddedEvent3} + mostRecentBlockNumber_2 := uint64(0x13) + retrieveInputsOpts_2 := bind.FilterOpts{ + Context: s.ctx, + Start: 0x13, + End: &mostRecentBlockNumber_2, + } + s.inputBox.On( + "RetrieveInputs", + &retrieveInputsOpts_2, + mock.Anything, + mock.Anything, + ).Return(events_2, nil) + + // Prepare Repo + s.repository.Unset("GetAllRunningApplications") + s.repository.On( + "GetAllRunningApplications", + mock.Anything, + ).Return([]Application{{ + ContractAddress: common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E"), + IConsensusAddress: common.HexToAddress("0xdeadbeef"), + LastProcessedBlock: 0x12, + }}, nil).Once() + s.repository.Unset("StoreEpochAndInputsTransaction") + s.repository.On( + "StoreEpochAndInputsTransaction", + mock.Anything, + mock.Anything, + mock.Anything, + mock.Anything, + ).Once().Run(func(arguments mock.Arguments) { + var epochInputMap map[*Epoch][]Input + obj := arguments.Get(1) + epochInputMap, ok := obj.(map[*Epoch][]Input) + s.Require().True(ok) + s.Require().Equal(1, len(epochInputMap)) + for _, inputs := range epochInputMap { + s.Require().Equal(2, len(inputs)) + break + } + + }).Return(make(map[uint64]uint64), make(map[uint64][]uint64), nil) + + // Start service + ready := make(chan struct{}, 1) + errChannel := make(chan error, 1) + + go func() { + errChannel <- inputReader.Run(s.ctx, ready) + }() + + select { + case <-ready: + break + case err := <-errChannel: + s.FailNow("unexpected error signal", err) + } + + wsClient.fireNewHead(&header2) + // Give a time for + time.Sleep(1 * time.Second) + + s.inputBox.AssertNumberOfCalls(s.T(), "RetrieveInputs", 1) + s.repository.AssertNumberOfCalls( + s.T(), + "StoreEpochAndInputsTransaction", + 1, + ) +} + +func (s *EvmReaderSuite) TestItStartsWhenLasProcessedBlockIsTheMostRecentBlock() { + + wsClient := FakeWSEhtClient{} + inputReader := NewEvmReader( + s.client, + &wsClient, + s.inputBox, + s.repository, + 0x10, + DefaultBlockStatusLatest, + s.contractFactory, + ) + + // Prepare Client + s.client.Unset("HeaderByNumber") + s.client.On( + "HeaderByNumber", + mock.Anything, + mock.Anything, + ).Return(&header0, nil).Once() + + // Prepare Repo + s.repository.Unset("GetAllRunningApplications") + s.repository.On( + "GetAllRunningApplications", + mock.Anything, + ).Return([]Application{{ + ContractAddress: common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E"), + IConsensusAddress: common.HexToAddress("0xdeadbeef"), + LastProcessedBlock: 0x11, + }}, nil).Once() + + // Start service + ready := make(chan struct{}, 1) + errChannel := make(chan error, 1) + + go func() { + errChannel <- inputReader.Run(s.ctx, ready) + }() + + select { + case <-ready: + break + case err := <-errChannel: + s.FailNow("unexpected error signal", err) + } + + wsClient.fireNewHead(&header2) + time.Sleep(1 * time.Second) + + s.inputBox.AssertNumberOfCalls(s.T(), "RetrieveInputs", 0) + s.repository.AssertNumberOfCalls( + s.T(), + "StoreEpochAndInputsTransaction", + 0, + ) +} diff --git a/internal/evmreader/retrypolicy/contratfactory.go b/internal/evmreader/retrypolicy/contratfactory.go index d9166115..69384886 100644 --- a/internal/evmreader/retrypolicy/contratfactory.go +++ b/internal/evmreader/retrypolicy/contratfactory.go @@ -7,7 +7,6 @@ import ( "github.com/cartesi/rollups-node/internal/evmreader" "github.com/cartesi/rollups-node/pkg/contracts/application" - "github.com/cartesi/rollups-node/pkg/contracts/iconsensus" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethclient" ) @@ -15,9 +14,10 @@ import ( // Builds contracts delegates that will // use retry policy on contract methods calls type EvmReaderContractFactory struct { - maxRetries uint64 - maxDelay time.Duration - ethClient *ethclient.Client + maxRetries uint64 + maxDelay time.Duration + ethClient *ethclient.Client + iConsensusCache map[common.Address]evmreader.ConsensusContract } func NewEvmReaderContractFactory( @@ -27,9 +27,10 @@ func NewEvmReaderContractFactory( ) *EvmReaderContractFactory { return &EvmReaderContractFactory{ - ethClient: ethClient, - maxRetries: maxRetries, - maxDelay: maxDelay, + ethClient: ethClient, + maxRetries: maxRetries, + maxDelay: maxDelay, + iConsensusCache: make(map[common.Address]evmreader.ConsensusContract), } } @@ -52,12 +53,19 @@ func (f *EvmReaderContractFactory) NewIConsensus( address common.Address, ) (evmreader.ConsensusContract, error) { - // Building a contract does not fail due to network errors. - // No need to retry this operation - consensusContract, err := iconsensus.NewIConsensus(address, f.ethClient) - if err != nil { - return nil, err + delegator, ok := f.iConsensusCache[address] + if !ok { + // Building a contract does not fail due to network errors. + // No need to retry this operation + consensus, err := evmreader.NewConsensusContractAdapter(address, f.ethClient) + if err != nil { + return nil, err + } + + delegator = NewConsensusWithRetryPolicy(consensus, f.maxRetries, f.maxDelay) + + f.iConsensusCache[address] = delegator } + return delegator, nil - return NewConsensusWithRetryPolicy(consensusContract, f.maxRetries, f.maxDelay), nil } diff --git a/internal/evmreader/retrypolicy/retrypolicy_application_delegator.go b/internal/evmreader/retrypolicy/retrypolicy_application_delegator.go index 6da7cb7d..54b0c318 100644 --- a/internal/evmreader/retrypolicy/retrypolicy_application_delegator.go +++ b/internal/evmreader/retrypolicy/retrypolicy_application_delegator.go @@ -18,10 +18,6 @@ type ApplicationRetryPolicyDelegator struct { delayBetweenCalls time.Duration } -type getConsensusArgs struct { - opts *bind.CallOpts -} - func NewApplicationWithRetryPolicy( delegate evmreader.ApplicationContract, maxRetries uint64, @@ -34,21 +30,12 @@ func NewApplicationWithRetryPolicy( } } -func (d *ApplicationRetryPolicyDelegator) GetConsensus( - opts *bind.CallOpts, +func (d *ApplicationRetryPolicyDelegator) GetConsensus(opts *bind.CallOpts, ) (common.Address, error) { - return retry.CallFunctionWithRetryPolicy(d.getConsensus, - getConsensusArgs{ - opts: opts, - }, + return retry.CallFunctionWithRetryPolicy(d.delegate.GetConsensus, + opts, d.maxRetries, d.delayBetweenCalls, "Consensus::GetEpochLength", ) } - -func (d *ApplicationRetryPolicyDelegator) getConsensus( - args getConsensusArgs, -) (common.Address, error) { - return d.delegate.GetConsensus(args.opts) -} diff --git a/internal/evmreader/retrypolicy/retrypolicy_consensus_delegator.go b/internal/evmreader/retrypolicy/retrypolicy_consensus_delegator.go index b24e643b..77158b3b 100644 --- a/internal/evmreader/retrypolicy/retrypolicy_consensus_delegator.go +++ b/internal/evmreader/retrypolicy/retrypolicy_consensus_delegator.go @@ -9,7 +9,9 @@ import ( "github.com/cartesi/rollups-node/internal/evmreader" "github.com/cartesi/rollups-node/internal/retry" + "github.com/cartesi/rollups-node/pkg/contracts/iconsensus" "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" ) // A Consensus Delegator that @@ -33,18 +35,12 @@ func NewConsensusWithRetryPolicy( } } -type getEpochLengthArgs struct { - opts *bind.CallOpts -} - func (d *ConsensusRetryPolicyDelegator) GetEpochLength( opts *bind.CallOpts, ) (*big.Int, error) { - return retry.CallFunctionWithRetryPolicy(d.getEpochLength, - getEpochLengthArgs{ - opts: opts, - }, + return retry.CallFunctionWithRetryPolicy(d.delegate.GetEpochLength, + opts, d.maxRetries, d.delayBetweenCalls, "Consensus::GetEpochLength", @@ -52,8 +48,24 @@ func (d *ConsensusRetryPolicyDelegator) GetEpochLength( } -func (d *ConsensusRetryPolicyDelegator) getEpochLength( - args getEpochLengthArgs, -) (*big.Int, error) { - return d.delegate.GetEpochLength(args.opts) +type retrieveClaimAcceptedEventsArgs struct { + opts *bind.FilterOpts + appAddresses []common.Address +} + +func (d *ConsensusRetryPolicyDelegator) RetrieveClaimAcceptanceEvents( + opts *bind.FilterOpts, appAddresses []common.Address, +) ([]*iconsensus.IConsensusClaimAcceptance, error) { + return retry.CallFunctionWithRetryPolicy(d.retrieveClaimAcceptanceEvents, + retrieveClaimAcceptedEventsArgs{ + opts: opts, + appAddresses: appAddresses, + }, d.maxRetries, + d.delayBetweenCalls, + "Consensus::RetrieveClaimAcceptedEvents") +} + +func (d *ConsensusRetryPolicyDelegator) retrieveClaimAcceptanceEvents( + args retrieveClaimAcceptedEventsArgs) ([]*iconsensus.IConsensusClaimAcceptance, error) { + return d.delegate.RetrieveClaimAcceptanceEvents(args.opts, args.appAddresses) } diff --git a/internal/node/model/models.go b/internal/node/model/models.go index ab11d1d9..f711ddf0 100644 --- a/internal/node/model/models.go +++ b/internal/node/model/models.go @@ -62,12 +62,13 @@ type NodePersistentConfig struct { } type Application struct { - Id uint64 - ContractAddress Address - TemplateHash Hash - LastProcessedBlock uint64 - Status ApplicationStatus - IConsensusAddress Address + Id uint64 + ContractAddress Address + TemplateHash Hash + LastProcessedBlock uint64 + Status ApplicationStatus + IConsensusAddress Address + LastClaimCheckBlock uint64 } type Epoch struct { diff --git a/internal/repository/base.go b/internal/repository/base.go index 10f688bc..822909da 100644 --- a/internal/repository/base.go +++ b/internal/repository/base.go @@ -90,21 +90,24 @@ func (pg *Database) InsertApplication( (contract_address, template_hash, last_processed_block, + last_claim_check_block, status, iconsensus_address) VALUES (@contractAddress, @templateHash, @lastProcessedBlock, + @lastClaimCheckBlock, @status, @iConsensusAddress)` args := pgx.NamedArgs{ - "contractAddress": app.ContractAddress, - "templateHash": app.TemplateHash, - "lastProcessedBlock": app.LastProcessedBlock, - "status": app.Status, - "iConsensusAddress": app.IConsensusAddress, + "contractAddress": app.ContractAddress, + "templateHash": app.TemplateHash, + "lastProcessedBlock": app.LastProcessedBlock, + "lastClaimCheckBlock": app.LastClaimCheckBlock, + "status": app.Status, + "iConsensusAddress": app.IConsensusAddress, } _, err := pg.db.Exec(ctx, query, args) @@ -343,12 +346,13 @@ func (pg *Database) GetApplication( appAddressKey Address, ) (*Application, error) { var ( - id uint64 - contractAddress Address - templateHash Hash - lastProcessedBlock uint64 - status ApplicationStatus - iconsensusAddress Address + id uint64 + contractAddress Address + templateHash Hash + lastProcessedBlock uint64 + lastClaimCheckBlock uint64 + status ApplicationStatus + iconsensusAddress Address ) query := ` @@ -357,6 +361,7 @@ func (pg *Database) GetApplication( contract_address, template_hash, last_processed_block, + last_claim_check_block, status, iconsensus_address FROM @@ -373,6 +378,7 @@ func (pg *Database) GetApplication( &contractAddress, &templateHash, &lastProcessedBlock, + &lastClaimCheckBlock, &status, &iconsensusAddress, ) @@ -387,12 +393,13 @@ func (pg *Database) GetApplication( } app := Application{ - Id: id, - ContractAddress: contractAddress, - TemplateHash: templateHash, - LastProcessedBlock: lastProcessedBlock, - Status: status, - IConsensusAddress: iconsensusAddress, + Id: id, + ContractAddress: contractAddress, + TemplateHash: templateHash, + LastProcessedBlock: lastProcessedBlock, + LastClaimCheckBlock: lastClaimCheckBlock, + Status: status, + IConsensusAddress: iconsensusAddress, } return &app, nil diff --git a/internal/repository/base_test.go b/internal/repository/base_test.go index 3caa6943..6556db69 100644 --- a/internal/repository/base_test.go +++ b/internal/repository/base_test.go @@ -96,7 +96,7 @@ func (s *RepositorySuite) SetupDatabase() { Id: 1, Index: 0, FirstBlock: 0, - LastBlock: 200, + LastBlock: 99, AppAddress: app.ContractAddress, ClaimHash: nil, TransactionHash: nil, @@ -109,8 +109,8 @@ func (s *RepositorySuite) SetupDatabase() { epoch2 := Epoch{ Id: 2, Index: 1, - FirstBlock: 201, - LastBlock: math.MaxUint64, + FirstBlock: 100, + LastBlock: 199, AppAddress: app.ContractAddress, ClaimHash: nil, TransactionHash: nil, @@ -120,6 +120,20 @@ func (s *RepositorySuite) SetupDatabase() { _, err = s.database.InsertEpoch(s.ctx, &epoch2) s.Require().Nil(err) + epoch3 := Epoch{ + Id: 3, + Index: 2, + FirstBlock: 200, + LastBlock: 299, + AppAddress: app.ContractAddress, + ClaimHash: nil, + TransactionHash: nil, + Status: EpochStatusClaimSubmitted, + } + + _, err = s.database.InsertEpoch(s.ctx, &epoch3) + s.Require().Nil(err) + input1 := Input{ Index: 1, CompletionStatus: InputStatusAccepted, @@ -384,7 +398,7 @@ func (s *RepositorySuite) TestEpochExists() { Status: EpochStatusOpen, Index: 0, FirstBlock: 0, - LastBlock: 200, + LastBlock: 99, TransactionHash: nil, ClaimHash: nil, AppAddress: common.HexToAddress("deadbeef"), diff --git a/internal/repository/evmreader.go b/internal/repository/evmreader.go index 52bbb307..d58914de 100644 --- a/internal/repository/evmreader.go +++ b/internal/repository/evmreader.go @@ -160,13 +160,14 @@ func (pg *Database) getAllApplicationsByStatus( criteria *ApplicationStatus, ) ([]Application, error) { var ( - id uint64 - contractAddress Address - templateHash Hash - lastProcessedBlock uint64 - status ApplicationStatus - iConsensusAddress Address - results []Application + id uint64 + contractAddress Address + templateHash Hash + lastProcessedBlock uint64 + lastClaimCheckBlock uint64 + status ApplicationStatus + iConsensusAddress Address + results []Application ) query := ` @@ -175,6 +176,7 @@ func (pg *Database) getAllApplicationsByStatus( contract_address, template_hash, last_processed_block, + last_claim_check_block, status, iconsensus_address FROM @@ -194,15 +196,16 @@ func (pg *Database) getAllApplicationsByStatus( _, err = pgx.ForEachRow(rows, []any{&id, &contractAddress, &templateHash, - &lastProcessedBlock, &status, &iConsensusAddress}, + &lastProcessedBlock, &lastClaimCheckBlock, &status, &iConsensusAddress}, func() error { app := Application{ - Id: id, - ContractAddress: contractAddress, - TemplateHash: templateHash, - LastProcessedBlock: lastProcessedBlock, - Status: status, - IConsensusAddress: iConsensusAddress, + Id: id, + ContractAddress: contractAddress, + TemplateHash: templateHash, + LastProcessedBlock: lastProcessedBlock, + LastClaimCheckBlock: lastClaimCheckBlock, + Status: status, + IConsensusAddress: iConsensusAddress, } results = append(results, app) return nil @@ -239,3 +242,132 @@ func (pg *Database) GetLastProcessedBlock( return block, nil } + +func (pg *Database) GetPreviousSubmittedClaims( + ctx context.Context, + app Address, + block uint64, +) ([]Epoch, error) { + query := ` + SELECT + id, + application_address, + index, + first_block, + last_block, + claim_hash, + transaction_hash, + status + FROM + epoch + WHERE + application_address=@appAddress AND status=@status AND last_block <= @block + ORDER BY + index ASC` + + args := pgx.NamedArgs{ + "appAddress": app, + "status": EpochStatusClaimSubmitted, + "block": block, + } + + rows, err := pg.db.Query(ctx, query, args) + if err != nil { + return nil, fmt.Errorf("GetProcessedEpochs failed: %w", err) + } + + var ( + id, index, firstBlock, lastBlock uint64 + appAddress Address + claimHash, transactionHash *Hash + status string + results []Epoch + ) + + scans := []any{ + &id, &appAddress, &index, &firstBlock, &lastBlock, &claimHash, &transactionHash, &status, + } + _, err = pgx.ForEachRow(rows, scans, func() error { + epoch := Epoch{ + Id: id, + Index: index, + AppAddress: appAddress, + FirstBlock: firstBlock, + LastBlock: lastBlock, + ClaimHash: claimHash, + TransactionHash: transactionHash, + Status: EpochStatus(status), + } + results = append(results, epoch) + return nil + }) + if err != nil { + return nil, fmt.Errorf("GetProcessedEpochs failed: %w", err) + } + return results, nil +} + +func (pg *Database) StoreClaimsTransaction( + ctx context.Context, + app Address, + claims []*Epoch, + blockNumber uint64, +) error { + + var errUpdateClaims = errors.New("unable to update claims") + + updateEpochQuery := ` + UPDATE epoch + SET + status = @status + WHERE + id = @id + ` + + tx, err := pg.db.Begin(ctx) + if err != nil { + return errors.Join(errUpdateClaims, err) + } + + for _, claim := range claims { + updateClaimArgs := pgx.NamedArgs{ + "status": claim.Status, + "id": claim.Id, + } + + tag, err := tx.Exec(ctx, updateEpochQuery, updateClaimArgs) + if err != nil { + return errors.Join(errUpdateClaims, err, tx.Rollback(ctx)) + } + if tag.RowsAffected() != 1 { + return errors.Join(errUpdateClaims, + fmt.Errorf("no rows affected when updating claim %d", claim.Index), + tx.Rollback(ctx)) + } + } + + // Update last processed block + updateLastBlockQuery := ` + UPDATE application + SET last_claim_check_block = @blockNumber + WHERE + contract_address=@contractAddress` + + updateLastBlockArgs := pgx.NamedArgs{ + "blockNumber": blockNumber, + "contractAddress": app, + } + + _, err = tx.Exec(ctx, updateLastBlockQuery, updateLastBlockArgs) + if err != nil { + return errors.Join(errUpdateClaims, err, tx.Rollback(ctx)) + } + + // Commit transaction + err = tx.Commit(ctx) + if err != nil { + return errors.Join(errUpdateClaims, err, tx.Rollback(ctx)) + } + + return nil +} diff --git a/internal/repository/evmreader_test.go b/internal/repository/evmreader_test.go index 63260aaf..e363bfdb 100644 --- a/internal/repository/evmreader_test.go +++ b/internal/repository/evmreader_test.go @@ -159,3 +159,42 @@ func (s *RepositorySuite) TestGetMostRecentBlock() { s.Require().Equal(block, response) } + +func (s *RepositorySuite) TestGetPreviousSubmittedClaims() { + response, err := s.database.GetPreviousSubmittedClaims( + s.ctx, common.HexToAddress("deadbeef"), 300) + + s.Require().Nil(err) + s.Require().NotNil(response) + s.Require().Equal(1, len(response)) + + epoch, err := s.database.GetEpoch(s.ctx, 2, common.HexToAddress("deadbeef")) + s.Require().Nil(err) + + s.Require().Equal(*epoch, response[0]) +} + +func (s *RepositorySuite) TestStoreClaimsTransaction() { + + claim, err := s.database.GetEpoch(s.ctx, 2, common.HexToAddress("deadbeef")) + s.Require().Nil(err) + s.Require().NotNil(claim) + + s.Require().Equal(EpochStatusClaimSubmitted, claim.Status) + claim.Status = EpochStatusClaimAccepted + + claims := []*Epoch{claim} + + err = s.database.StoreClaimsTransaction(s.ctx, common.HexToAddress("deadbeef"), claims, 499) + s.Require().Nil(err) + + claim, err = s.database.GetEpoch(s.ctx, 2, common.HexToAddress("deadbeef")) + s.Require().Nil(err) + s.Require().NotNil(claim) + s.Require().Equal(EpochStatusClaimAccepted, claim.Status) + + application, err := s.database.GetApplication(s.ctx, common.HexToAddress("deadbeef")) + s.Require().Nil(err) + s.Require().Equal(uint64(499), application.LastClaimCheckBlock) + +} diff --git a/internal/repository/schema/migrations/000001_create_application_input_claim_output_report_nodeconfig.up.sql b/internal/repository/schema/migrations/000001_create_application_input_claim_output_report_nodeconfig.up.sql index a4c6483f..0cf96781 100644 --- a/internal/repository/schema/migrations/000001_create_application_input_claim_output_report_nodeconfig.up.sql +++ b/internal/repository/schema/migrations/000001_create_application_input_claim_output_report_nodeconfig.up.sql @@ -37,6 +37,7 @@ CREATE TABLE "application" "last_processed_block" NUMERIC(20,0) NOT NULL CHECK ("last_processed_block" >= 0 AND "last_processed_block" <= f_maxuint64()), "status" "ApplicationStatus" NOT NULL, "iconsensus_address" BYTEA NOT NULL, + "last_claim_check_block" NUMERIC(20,0) NOT NULL CHECK ("last_claim_check_block" >= 0 AND "last_claim_check_block" <= f_maxuint64()), CONSTRAINT "application_pkey" PRIMARY KEY ("id"), UNIQUE("contract_address") ); @@ -57,6 +58,7 @@ CREATE TABLE "epoch" ); CREATE INDEX "epoch_idx" ON "epoch"("index"); +CREATE INDEX "epoch_last_block_idx" ON "epoch"("last_block"); CREATE TABLE "input" ( diff --git a/internal/repository/validator.go b/internal/repository/validator.go index ca6748b7..8a998936 100644 --- a/internal/repository/validator.go +++ b/internal/repository/validator.go @@ -23,7 +23,7 @@ func (pg *Database) GetOutputsProducedInBlockRange( endBlock uint64, ) ([]Output, error) { query := ` - SELECT + SELECT o.id, o.index, o.raw_data, @@ -89,9 +89,9 @@ func (pg *Database) GetProcessedEpochs(ctx context.Context, application Address) claim_hash, transaction_hash, status - FROM + FROM epoch - WHERE + WHERE application_address=@appAddress AND status=@status ORDER BY index ASC` @@ -167,7 +167,7 @@ func (pg *Database) GetLastInputOutputsHash( query := ` SELECT outputs_hash - FROM + FROM input WHERE epoch_id = @id @@ -274,7 +274,7 @@ func (pg *Database) SetEpochClaimAndInsertProofsTransaction( SET claim_hash=@claimHash, status=@status - WHERE + WHERE id = @id ` @@ -303,12 +303,12 @@ func (pg *Database) SetEpochClaimAndInsertProofsTransaction( } query2 := ` - UPDATE + UPDATE output - SET + SET hash = @hash, output_hashes_siblings = @outputHashesSiblings - WHERE + WHERE id = @id `