diff --git a/relayer/relays/parachain/config.go b/relayer/relays/parachain/config.go index 37ada37755..10cd20538c 100644 --- a/relayer/relays/parachain/config.go +++ b/relayer/relays/parachain/config.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/snowfork/snowbridge/relayer/config" + beaconconf "github.com/snowfork/snowbridge/relayer/relays/beacon/config" ) type Config struct { @@ -14,10 +15,11 @@ type Config struct { } type SourceConfig struct { - Polkadot config.PolkadotConfig `mapstructure:"polkadot"` - Parachain config.ParachainConfig `mapstructure:"parachain"` - Ethereum config.EthereumConfig `mapstructure:"ethereum"` - Contracts SourceContractsConfig `mapstructure:"contracts"` + Polkadot config.PolkadotConfig `mapstructure:"polkadot"` + Parachain config.ParachainConfig `mapstructure:"parachain"` + Ethereum config.EthereumConfig `mapstructure:"ethereum"` + Contracts SourceContractsConfig `mapstructure:"contracts"` + Beacon beaconconf.BeaconConfig `mapstructure:"beacon"` } type SourceContractsConfig struct { diff --git a/relayer/relays/parachain/main.go b/relayer/relays/parachain/main.go index 918937f8c5..086e8b255c 100644 --- a/relayer/relays/parachain/main.go +++ b/relayer/relays/parachain/main.go @@ -2,16 +2,27 @@ package parachain import ( "context" + "errors" "fmt" + "math/big" "time" "golang.org/x/sync/errgroup" + "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/snowfork/snowbridge/relayer/chain/ethereum" "github.com/snowfork/snowbridge/relayer/chain/parachain" "github.com/snowfork/snowbridge/relayer/chain/relaychain" + "github.com/snowfork/snowbridge/relayer/contracts" "github.com/snowfork/snowbridge/relayer/crypto/secp256k1" + "github.com/snowfork/snowbridge/relayer/relays/beacon/header" + "github.com/snowfork/snowbridge/relayer/relays/beacon/header/syncer/api" + "github.com/snowfork/snowbridge/relayer/relays/beacon/header/syncer/scale" + "github.com/snowfork/snowbridge/relayer/relays/beacon/protocol" + "github.com/snowfork/snowbridge/relayer/relays/beacon/store" + + "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" ) @@ -23,6 +34,9 @@ type Relay struct { ethereumConnBeefy *ethereum.Connection ethereumChannelWriter *EthereumWriter beefyListener *BeefyListener + parachainWriter *parachain.ParachainWriter + beaconHeader *header.Header + headerCache *ethereum.HeaderCache } func NewRelay(config *Config, keypair *secp256k1.Keypair) (*Relay, error) { @@ -55,6 +69,28 @@ func NewRelay(config *Config, keypair *secp256k1.Keypair) (*Relay, error) { tasks, ) + parachainWriter := parachain.NewParachainWriter( + parachainConn, + 8, + ) + headerCache, err := ethereum.NewHeaderBlockCache( + ðereum.DefaultBlockLoader{Conn: ethereumConnWriter}, + ) + if err != nil { + return nil, err + } + p := protocol.New(config.Source.Beacon.Spec, 20) + store := store.New(config.Source.Beacon.DataStore.Location, config.Source.Beacon.DataStore.MaxEntries, *p) + store.Connect() + beaconAPI := api.NewBeaconClient(config.Source.Beacon.Endpoint, config.Source.Beacon.StateEndpoint) + beaconHeader := header.New( + parachainWriter, + beaconAPI, + config.Source.Beacon.Spec, + &store, + p, + 0, // setting is not used in the execution relay + ) return &Relay{ config: config, parachainConn: parachainConn, @@ -63,6 +99,9 @@ func NewRelay(config *Config, keypair *secp256k1.Keypair) (*Relay, error) { ethereumConnBeefy: ethereumConnBeefy, ethereumChannelWriter: ethereumChannelWriter, beefyListener: beefyListener, + parachainWriter: parachainWriter, + beaconHeader: &beaconHeader, + headerCache: headerCache, }, nil } @@ -99,7 +138,199 @@ func (relay *Relay) Start(ctx context.Context, eg *errgroup.Group) error { return err } + err = relay.parachainWriter.Start(ctx, eg) + if err != nil { + return err + } + + err = relay.startDeliverProof(ctx, eg) + if err != nil { + return err + } + log.Info("Current relay's ID:", relay.config.Schedule.ID) return nil } + +func (relay *Relay) startDeliverProof(ctx context.Context, eg *errgroup.Group) error { + eg.Go(func() error { + for { + select { + case <-ctx.Done(): + return nil + case <-time.After(60 * time.Second): + orders, err := relay.beefyListener.scanner.findOrderUndelivered(ctx) + if err != nil { + return fmt.Errorf("find undelivered order: %w", err) + } + for _, order := range orders { + event, err := relay.findEvent(ctx, order.Nonce) + if err != nil { + return fmt.Errorf("find event GatewayInboundMessageDispatched0: %w", err) + } + err = relay.doSubmit(ctx, event) + if err != nil { + return fmt.Errorf("submit delivery proof for GatewayInboundMessageDispatched0: %w", err) + } + } + } + } + }) + return nil +} + +// Todo: Improve scan algorithm +func (relay *Relay) findEvent( + ctx context.Context, + nonce uint64, +) (*contracts.GatewayInboundMessageDispatched0, error) { + + const BlocksPerQuery = 4096 + + var event *contracts.GatewayInboundMessageDispatched0 + + blockNumber, err := relay.ethereumConnWriter.Client().BlockNumber(ctx) + if err != nil { + return event, fmt.Errorf("get last block number: %w", err) + } + + done := false + + for { + var begin uint64 + if blockNumber < BlocksPerQuery { + begin = 0 + } else { + begin = blockNumber - BlocksPerQuery + } + + opts := bind.FilterOpts{ + Start: begin, + End: &blockNumber, + Context: ctx, + } + + iter, err := relay.ethereumChannelWriter.gateway.FilterInboundMessageDispatched0(&opts, []uint64{nonce}) + if err != nil { + return event, fmt.Errorf("iter dispatch event: %w", err) + } + + for { + more := iter.Next() + if !more { + err = iter.Error() + if err != nil { + return event, fmt.Errorf("iter dispatch event: %w", err) + } + break + } + if iter.Event.Nonce == nonce { + event = iter.Event + done = true + break + } + } + + if done { + iter.Close() + } + + blockNumber = begin + + if done || begin == 0 { + break + } + } + + return event, nil +} + +func (relay *Relay) makeInboundMessage( + ctx context.Context, + headerCache *ethereum.HeaderCache, + event *contracts.GatewayInboundMessageDispatched0, +) (*parachain.Message, error) { + receiptTrie, err := headerCache.GetReceiptTrie(ctx, event.Raw.BlockHash) + if err != nil { + log.WithFields(logrus.Fields{ + "blockHash": event.Raw.BlockHash.Hex(), + "blockNumber": event.Raw.BlockNumber, + "txHash": event.Raw.TxHash.Hex(), + }).WithError(err).Error("Failed to get receipt trie for event") + return nil, err + } + + msg, err := ethereum.MakeMessageFromEvent(&event.Raw, receiptTrie) + if err != nil { + log.WithFields(logrus.Fields{ + "address": event.Raw.Address.Hex(), + "blockHash": event.Raw.BlockHash.Hex(), + "blockNumber": event.Raw.BlockNumber, + "txHash": event.Raw.TxHash.Hex(), + }).WithError(err).Error("Failed to generate message from ethereum event") + return nil, err + } + + log.WithFields(logrus.Fields{ + "blockHash": event.Raw.BlockHash.Hex(), + "blockNumber": event.Raw.BlockNumber, + "txHash": event.Raw.TxHash.Hex(), + }).Info("found message") + + return msg, nil +} + +func (relay *Relay) doSubmit(ctx context.Context, ev *contracts.GatewayInboundMessageDispatched0) error { + inboundMsg, err := relay.makeInboundMessage(ctx, relay.headerCache, ev) + if err != nil { + return fmt.Errorf("make outgoing message: %w", err) + } + + logger := log.WithFields(log.Fields{ + "ethNonce": ev.Nonce, + "msgNonce": ev.Nonce, + "address": ev.Raw.Address.Hex(), + "blockHash": ev.Raw.BlockHash.Hex(), + "blockNumber": ev.Raw.BlockNumber, + "txHash": ev.Raw.TxHash.Hex(), + "txIndex": ev.Raw.TxIndex, + }) + + nextBlockNumber := new(big.Int).SetUint64(ev.Raw.BlockNumber + 1) + + blockHeader, err := relay.ethereumConnWriter.Client().HeaderByNumber(ctx, nextBlockNumber) + if err != nil { + return fmt.Errorf("get block header: %w", err) + } + + proof, err := relay.beaconHeader.FetchExecutionProof(*blockHeader.ParentBeaconRoot, false) + if err != nil && !errors.Is(err, header.ErrBeaconHeaderNotFinalized) { + return fmt.Errorf("fetch execution header proof: %w", err) + } + + err = relay.writeToParachain(ctx, proof, inboundMsg) + if err != nil { + return fmt.Errorf("write to parachain: %w", err) + } + + logger.Info("inbound message executed successfully") + + return nil +} + +func (relay *Relay) writeToParachain(ctx context.Context, proof scale.ProofPayload, inboundMsg *parachain.Message) error { + inboundMsg.Proof.ExecutionProof = proof.HeaderPayload + + log.WithFields(logrus.Fields{ + "EventLog": inboundMsg.EventLog, + "Proof": inboundMsg.Proof, + }).Debug("Generated message from Ethereum log") + + err := relay.parachainWriter.WriteToParachainAndWatch(ctx, "EthereumOutboundQueueV2.submit_delivery_proof", inboundMsg) + if err != nil { + return fmt.Errorf("submit message to outbound queue v2: %w", err) + } + + return nil +} diff --git a/relayer/relays/parachain/scanner.go b/relayer/relays/parachain/scanner.go index 15ac33f134..32e0767be2 100644 --- a/relayer/relays/parachain/scanner.go +++ b/relayer/relays/parachain/scanner.go @@ -369,7 +369,6 @@ func fetchMessageProof( func (s *Scanner) isNonceRelayed(ctx context.Context, nonce uint64) (bool, error) { var isRelayed bool - // Fetch latest nonce in ethereum gateway gatewayAddress := common.HexToAddress(s.config.Contracts.Gateway) gatewayContract, err := contracts.NewGateway( gatewayAddress, @@ -389,3 +388,41 @@ func (s *Scanner) isNonceRelayed(ctx context.Context, nonce uint64) (bool, error } return isRelayed, nil } + +func (s *Scanner) findOrderUndelivered( + ctx context.Context, +) ([]*PendingOrder, error) { + storageKey, err := types.CreateStorageKey(s.paraConn.Metadata(), "EthereumOutboundQueueV2", "PendingOrders", nil, nil) + if err != nil { + return nil, fmt.Errorf("create storage key for parachain outbound queue PendingOrders: %w", err) + } + keys, err := s.paraConn.API().RPC.State.GetKeysLatest(storageKey) + if err != nil { + return nil, fmt.Errorf("fetch nonces from PendingOrders start with key '%v': %w", storageKey, err) + } + var undeliveredOrders []*PendingOrder + for _, key := range keys { + var undeliveredOrder PendingOrder + value, err := s.paraConn.API().RPC.State.GetStorageRawLatest(key) + if err != nil { + return nil, fmt.Errorf("fetch value of pendingOrder with key '%v': %w", key, err) + } + decoder := scale.NewDecoder(bytes.NewReader(*value)) + err = decoder.Decode(&undeliveredOrder) + if err != nil { + return nil, fmt.Errorf("decode order error: %w", err) + } + isRelayed, err := s.isNonceRelayed(ctx, uint64(undeliveredOrder.Nonce)) + if err != nil { + return nil, fmt.Errorf("check nonce relayed: %w", err) + } + if isRelayed { + log.WithFields(log.Fields{ + "nonce": uint64(undeliveredOrder.Nonce), + }).Debug("Relayed but not delivered to BH") + undeliveredOrders = append(undeliveredOrders, &undeliveredOrder) + } + } + + return undeliveredOrders, nil +}