Skip to content

Commit

Permalink
feat(evm-reader): Read output execution statuses
Browse files Browse the repository at this point in the history
  • Loading branch information
fmoura committed Sep 9, 2024
1 parent 43932d3 commit bba7722
Show file tree
Hide file tree
Showing 14 changed files with 1,022 additions and 129 deletions.
54 changes: 54 additions & 0 deletions internal/evmreader/application_adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// (c) Cartesi and individual authors (see AUTHORS)
// SPDX-License-Identifier: Apache-2.0 (see LICENSE)

package evmreader

import (
appcontract "github.com/cartesi/rollups-node/pkg/contracts/application"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
)

// IConsensus Wrapper
type ApplicationContractAdapter struct {
application *appcontract.Application
}

func NewApplicationContractAdapter(
appAddress common.Address,
client *ethclient.Client,
) (*ApplicationContractAdapter, error) {
applicationContract, err := appcontract.NewApplication(appAddress, client)
if err != nil {
return nil, err
}
return &ApplicationContractAdapter{
application: applicationContract,
}, nil
}

func (a *ApplicationContractAdapter) GetConsensus(opts *bind.CallOpts) (common.Address, error) {
return a.application.GetConsensus(opts)
}

func (a *ApplicationContractAdapter) RetrieveOutputExecutionEvents(
opts *bind.FilterOpts,
) ([]*appcontract.ApplicationOutputExecuted, error) {

itr, err := a.application.FilterOutputExecuted(opts)
if err != nil {
return nil, err
}
defer itr.Close()

var events []*appcontract.ApplicationOutputExecuted
for itr.Next() {
outputExecutedEvent := itr.Event
events = append(events, outputExecutedEvent)
}
if err = itr.Error(); err != nil {
return nil, err
}
return events, nil
}
15 changes: 14 additions & 1 deletion internal/evmreader/evmreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"math/big"

. "github.com/cartesi/rollups-node/internal/node/model"
appcontract "github.com/cartesi/rollups-node/pkg/contracts/application"
"github.com/cartesi/rollups-node/pkg/contracts/iconsensus"
"github.com/cartesi/rollups-node/pkg/contracts/inputbox"
"github.com/ethereum/go-ethereum"
Expand Down Expand Up @@ -47,6 +48,12 @@ type EvmReaderRepository interface {
claims []*Epoch,
mostRecentBlockNumber uint64,
) error
GetOutput(
ctx context.Context, indexKey uint64, appAddressKey Address,
) (*Output, error)
UpdateOutputExecutionTransaction(
ctx context.Context, app Address, executedOutputs []*Output, blockNumber uint64,
) error
}

// EthClient mimics part of ethclient.Client functions to narrow down the
Expand All @@ -71,6 +78,9 @@ type ConsensusContract interface {

type ApplicationContract interface {
GetConsensus(opts *bind.CallOpts) (Address, error)
RetrieveOutputExecutionEvents(
opts *bind.FilterOpts,
) ([]*appcontract.ApplicationOutputExecuted, error)
}

type ContractFactory interface {
Expand All @@ -93,7 +103,8 @@ type application struct {
consensusContract ConsensusContract
}

// EvmReader reads Input Added and Claim Submitted events from the blockchain
// EvmReader reads Input Added, Claim Submitted and
// Output Executed events from the blockchain
type EvmReader struct {
client EthClient
wsClient EthWsClient
Expand Down Expand Up @@ -213,6 +224,8 @@ func (r *EvmReader) watchForNewBlocks(ctx context.Context, ready chan<- struct{}

r.checkForClaimStatus(ctx, apps, mostRecentBlockNumber)

r.checkForOutputExecution(ctx, apps, mostRecentBlockNumber)

}
}
}
Expand Down
51 changes: 51 additions & 0 deletions internal/evmreader/evmreader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

. "github.com/cartesi/rollups-node/internal/node/model"
appcontract "github.com/cartesi/rollups-node/pkg/contracts/application"
"github.com/cartesi/rollups-node/pkg/contracts/iconsensus"
"github.com/cartesi/rollups-node/pkg/contracts/inputbox"
"github.com/ethereum/go-ethereum"
Expand Down Expand Up @@ -449,6 +450,28 @@ func newMockRepository() *MockRepository {
mock.Anything,
).Return(nil)

repo.On("UpdateOutputExecutionTransaction",
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything).Return(nil)

outputHash := common.HexToHash("0xAABBCCDDEE")
repo.On("GetOutput",
mock.Anything,
0,
common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E")).Return(
&Output{
Id: 1,
Index: 0,
RawData: common.Hex2Bytes("0xdeadbeef"),
Hash: &outputHash,
InputId: 1,
OutputHashesSiblings: nil,
TransactionHash: nil,
},
)

return repo

}
Expand Down Expand Up @@ -617,6 +640,24 @@ func (m *MockRepository) UpdateEpochs(ctx context.Context,
return args.Error(0)
}

func (m *MockRepository) GetOutput(
ctx context.Context, indexKey uint64, appAddressKey Address,
) (*Output, error) {
args := m.Called(ctx, indexKey, appAddressKey)
obj := args.Get(0)
if obj == nil {
return nil, args.Error(1)
}
return obj.(*Output), args.Error(1)
}

func (m *MockRepository) UpdateOutputExecutionTransaction(
ctx context.Context, app Address, executedOutputs []*Output, blockNumber uint64,
) error {
args := m.Called(ctx, app, executedOutputs, blockNumber)
return args.Error(0)
}

type MockApplicationContract struct {
mock.Mock
}
Expand All @@ -636,6 +677,13 @@ func (m *MockApplicationContract) GetConsensus(
return args.Get(0).(common.Address), args.Error(1)
}

func (m *MockApplicationContract) RetrieveOutputExecutionEvents(
opts *bind.FilterOpts,
) ([]*appcontract.ApplicationOutputExecuted, error) {
args := m.Called(opts)
return args.Get(0).([]*appcontract.ApplicationOutputExecuted), args.Error(1)
}

type MockIConsensusContract struct {
mock.Mock
}
Expand Down Expand Up @@ -688,6 +736,9 @@ func newEmvReaderContractFactory() *MockEvmReaderContractFactory {
mock.Anything,
).Return(common.HexToAddress("0xdeadbeef"), nil)

applicationContract.On("RetrieveOutputExecutionEvents",
mock.Anything).Return([]*appcontract.ApplicationOutputExecuted{}, nil)

consensusContract := &MockIConsensusContract{}

consensusContract.On("GetEpochLength",
Expand Down
78 changes: 0 additions & 78 deletions internal/evmreader/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,84 +121,6 @@ func (s *EvmReaderSuite) TestItReadsInputsFromNewBlocks() {
)
}

func (s *EvmReaderSuite) TestItReadsInputsFromNewBlocksWrongIConsensus() {

wsClient := FakeWSEhtClient{}

evmReader := NewEvmReader(
s.client,
&wsClient,
s.inputBox,
s.repository,
0x10,
DefaultBlockStatusLatest,
s.contractFactory,
)

// Prepare repository
s.repository.Unset("GetAllRunningApplications")
s.repository.On(
"GetAllRunningApplications",
mock.Anything,
).Return([]Application{{
ContractAddress: common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E"),
IConsensusAddress: common.HexToAddress("0xFFFFFFFF"),
LastProcessedBlock: 0x00,
}}, nil).Once()
s.repository.On(
"GetAllRunningApplications",
mock.Anything,
).Return([]Application{{
ContractAddress: common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E"),
IConsensusAddress: common.HexToAddress("0xFFFFFFFF"),
LastProcessedBlock: 0x11,
}}, nil).Once()

// Prepare Client
s.client.Unset("HeaderByNumber")
s.client.On(
"HeaderByNumber",
mock.Anything,
mock.Anything,
).Return(&header0, nil).Once()
s.client.On(
"HeaderByNumber",
mock.Anything,
mock.Anything,
).Return(&header1, nil).Once()
s.client.On(
"HeaderByNumber",
mock.Anything,
mock.Anything,
).Return(&header2, nil).Once()

// Start service
ready := make(chan struct{}, 1)
errChannel := make(chan error, 1)

go func() {
errChannel <- evmReader.Run(s.ctx, ready)
}()

select {
case <-ready:
break
case err := <-errChannel:
s.FailNow("unexpected error signal", err)
}

wsClient.fireNewHead(&header0)
wsClient.fireNewHead(&header1)
time.Sleep(time.Second)

s.inputBox.AssertNumberOfCalls(s.T(), "RetrieveInputs", 0)
s.repository.AssertNumberOfCalls(
s.T(),
"StoreEpochAndInputsTransaction",
0,
)
}

func (s *EvmReaderSuite) TestItUpdatesLastProcessedBlockWhenThereIsNoInputs() {

wsClient := FakeWSEhtClient{}
Expand Down
113 changes: 113 additions & 0 deletions internal/evmreader/output.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// (c) Cartesi and individual authors (see AUTHORS)
// SPDX-License-Identifier: Apache-2.0 (see LICENSE)

package evmreader

import (
"bytes"
"context"
"log/slog"

. "github.com/cartesi/rollups-node/internal/node/model"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
)

func (r *EvmReader) checkForOutputExecution(
ctx context.Context,
apps []application,
mostRecentBlockNumber uint64,
) {

appAddresses := appsToAddresses(apps)

slog.Debug("Checking for new Output Executed Events", "apps", appAddresses)

for _, app := range apps {

LastOutputCheck := app.LastOutputCheckBlock

// Safeguard: Only check blocks starting from the block where the InputBox
// contract was deployed as Inputs can be added to that same block
if LastOutputCheck < r.inputBoxDeploymentBlock {
LastOutputCheck = r.inputBoxDeploymentBlock
}

if mostRecentBlockNumber > LastOutputCheck {

slog.Info("Checking output execution for application",
"app", app.ContractAddress,
"last output check block", LastOutputCheck,
"most recent block", mostRecentBlockNumber)

r.readAndUpdateOutputs(ctx, app, LastOutputCheck, mostRecentBlockNumber)

} else if mostRecentBlockNumber < LastOutputCheck {
slog.Warn(
"Not reading output execution: most recent block is lower than the last processed one", //nolint:lll
"app", app.ContractAddress,
"last output check block", LastOutputCheck,
"most recent block", mostRecentBlockNumber,
)
} else {
slog.Info("Not reading output execution: already checked the most recent blocks",
"app", app.ContractAddress,
"last output check block", LastOutputCheck,
"most recent block", mostRecentBlockNumber,
)
}
}

}

func (r *EvmReader) readAndUpdateOutputs(
ctx context.Context, app application, lastOutputCheck, mostRecentBlockNumber uint64) {

contract := app.applicationContract

opts := &bind.FilterOpts{
Start: lastOutputCheck + 1,
End: &mostRecentBlockNumber,
}

outputExecutedEvents, err := contract.RetrieveOutputExecutionEvents(opts)
if err != nil {
slog.Error("Error reading output events", "app", app.ContractAddress, "error", err)
return
}

// Should we check the output hash??
var executedOutputs []*Output
for _, event := range outputExecutedEvents {

// Compare output to check it is the correct one
output, err := r.repository.GetOutput(ctx, event.OutputIndex, app.ContractAddress)
if err != nil {
slog.Error("Error retrieving output",
"app", app.ContractAddress, "index", event.OutputIndex, "error", err)
return
}

if !bytes.Equal(output.RawData, event.Output) {
slog.Debug("Output mismatch",
"app", app.ContractAddress, "index", event.OutputIndex,
"actual", output.RawData, "event's", event.Output)

slog.Error("Output mismatch. Application is in an invalid state",
"app", app.ContractAddress,
"index", event.OutputIndex)

return
}

slog.Info("Output executed", "app", app.ContractAddress, "index", event.OutputIndex)
output.TransactionHash = &event.Raw.TxHash
executedOutputs = append(executedOutputs, output)
}

err = r.repository.UpdateOutputExecutionTransaction(
ctx, app.ContractAddress, executedOutputs, mostRecentBlockNumber)
if err != nil {
slog.Error("Error storing output execution statuses", "app", app, "error", err)
}

}
Loading

0 comments on commit bba7722

Please sign in to comment.