Skip to content

Commit

Permalink
improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
claravanstaden committed Oct 2, 2024
1 parent f4f305c commit b94052b
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 32 deletions.
29 changes: 29 additions & 0 deletions relayer/relays/beacon/header/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,35 @@ func (h *Header) FetchExecutionProof(blockRoot common.Hash, instantVerification

}

func (h *Header) CheckHeaderFinalized(blockRoot common.Hash, instantVerification bool) error {
header, err := h.syncer.Client.GetHeaderByBlockRoot(blockRoot)
if err != nil {
return fmt.Errorf("get beacon header by blockRoot: %w", err)
}
lastFinalizedHeaderState, err := h.writer.GetLastFinalizedHeaderState()
if err != nil {
return fmt.Errorf("fetch last finalized header state: %w", err)
}

// The latest finalized header on-chain is older than the header containing the message, so we need to sync the
// finalized header with the message.
finalizedHeader, err := h.syncer.GetFinalizedHeader()
if err != nil {
return err
}

// If the header is not finalized yet, we can't do anything further.
if header.Slot > uint64(finalizedHeader.Slot) {
return fmt.Errorf("chain not finalized yet: %w", ErrBeaconHeaderNotFinalized)
}

if header.Slot > lastFinalizedHeaderState.BeaconSlot && !instantVerification {
return fmt.Errorf("on-chain header not recent enough and instantVerification is off: %w", ErrBeaconHeaderNotFinalized)
}

return nil
}

func (h *Header) isInitialSyncPeriod() bool {
initialPeriod := h.protocol.ComputeSyncPeriodAtSlot(h.cache.InitialCheckpointSlot)
lastFinalizedPeriod := h.protocol.ComputeSyncPeriodAtSlot(h.cache.Finalized.LastSyncedSlot)
Expand Down
98 changes: 66 additions & 32 deletions relayer/relays/execution/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,21 +141,20 @@ func (r *Relay) Start(ctx context.Context, eg *errgroup.Group) error {

blockNumber, err := ethconn.Client().BlockNumber(ctx)
if err != nil {
return fmt.Errorf("geOkayt last block number: %w", err)
return fmt.Errorf("get last block number: %w", err)
}

events, err := r.findEvents(ctx, blockNumber, paraNonce+1)
if err != nil {
return fmt.Errorf("find events: %w", err)
events, err := r.findFinalizedEvents(ctx, blockNumber, paraNonce+1)
if errors.Is(err, header.ErrBeaconHeaderNotFinalized) {
log.WithField("nonce", events[0].Nonce).Info("beacon header not finalized yet")
continue
} else if err != nil {
return fmt.Errorf("find finalized events: %w", err)
}

for _, ev := range events {
err = r.waitAndSend(ctx, ev)
switch {
case errors.Is(err, header.ErrBeaconHeaderNotFinalized):
log.WithField("nonce", ev.Nonce).Info("beacon header not finalized yet")
break
case err != nil:
err := r.waitAndSend(ctx, ev)
if err != nil {
return fmt.Errorf("submit message: %w", err)
}
}
Expand Down Expand Up @@ -237,6 +236,19 @@ func (r *Relay) fetchEthereumNonce(ctx context.Context) (uint64, error) {

const BlocksPerQuery = 4096

func (r *Relay) findFinalizedEvents(
ctx context.Context,
blockNumber uint64,
start uint64,
) ([]*contracts.GatewayOutboundMessageAccepted, error) {
events, err := r.findEvents(ctx, blockNumber, start)
if err != nil {
return []*contracts.GatewayOutboundMessageAccepted{}, fmt.Errorf("find events: %w", err)
}

return events, r.isInFinalizedBlock(ctx, events)
}

func (r *Relay) findEvents(
ctx context.Context,
latestFinalizedBlockNumber uint64,
Expand Down Expand Up @@ -359,37 +371,31 @@ func (r *Relay) makeInboundMessage(
}

func (r *Relay) waitAndSend(ctx context.Context, ev *contracts.GatewayOutboundMessageAccepted) (err error) {
var paraNonce uint64
ethNonce := ev.Nonce
waitingPeriod := (ethNonce + r.config.Schedule.TotalRelayerCount - r.config.Schedule.ID) % r.config.Schedule.TotalRelayerCount
log.WithFields(logrus.Fields{
"ethNonce": ethNonce,
"relayerCount": r.config.Schedule.TotalRelayerCount,
"relayerID": r.config.Schedule.ID,
"waitingPeriod": waitingPeriod,
}).Info("relayer decentralization details")
}).Info("relayer waiting period")

var cnt uint64
for {
log.Info("checking if message should be processed")
paraNonce, err = r.fetchLatestParachainNonce()
// Check the nonce again in case another relayer processed the message while this relayer downloading beacon state
isProcessed, err := r.isMessageProcessed(ev.Nonce)
if err != nil {
return fmt.Errorf("fetch latest parachain nonce: %w", err)
return fmt.Errorf("is message procssed: %w", err)
}
if ethNonce <= paraNonce {
log.WithField("nonce", paraNonce).Info("message picked up by another relayer, skipped")
// If the message is already processed we shouldn't try to submit it again
if isProcessed {
return nil
}
if cnt == waitingPeriod {
log.WithField("cnt", cnt).Info("waiting period done")
break
}
log.Info("sleeping...")
log.Info(fmt.Sprintf("sleeping for %d seconds.", time.Duration(r.config.Schedule.SleepInterval)))

time.Sleep(time.Duration(r.config.Schedule.SleepInterval) * time.Second)
log.Info("done sleeping...")
cnt++
}

err = r.doSubmit(ctx, ev)
if err != nil {
return fmt.Errorf("submit inbound message: %w", err)
Expand All @@ -399,7 +405,6 @@ func (r *Relay) waitAndSend(ctx context.Context, ev *contracts.GatewayOutboundMe
}

func (r *Relay) doSubmit(ctx context.Context, ev *contracts.GatewayOutboundMessageAccepted) error {
log.Info("getting messages")
inboundMsg, err := r.makeInboundMessage(ctx, r.headerCache, ev)
if err != nil {
return fmt.Errorf("make outgoing message: %w", err)
Expand All @@ -424,7 +429,6 @@ func (r *Relay) doSubmit(ctx context.Context, ev *contracts.GatewayOutboundMessa
}

// ParentBeaconRoot in https://eips.ethereum.org/EIPS/eip-4788 from Deneb onward
log.Info("getting execution proof")
proof, err := r.beaconHeader.FetchExecutionProof(*blockHeader.ParentBeaconRoot, r.config.InstantVerification)
if errors.Is(err, header.ErrBeaconHeaderNotFinalized) {
return err
Expand All @@ -433,13 +437,13 @@ func (r *Relay) doSubmit(ctx context.Context, ev *contracts.GatewayOutboundMessa
return fmt.Errorf("fetch execution header proof: %w", err)
}

paraNonce, err := r.fetchLatestParachainNonce()
// Check the nonce again in case another relayer processed the message while this relayer downloading beacon state
isProcessed, err := r.isMessageProcessed(ev.Nonce)
if err != nil {
return fmt.Errorf("fetch latest parachain nonce: %w", err)
return fmt.Errorf("is message procssed: %w", err)
}
// Check the nonce again in case another relayer processed the message while this relayer downloading beacon state
if ev.Nonce <= paraNonce {
log.WithField("nonce", paraNonce).Info("message picked up by another relayer, skipped")
// If the message is already processed we shouldn't try to submit it again
if isProcessed {
return nil
}

Expand All @@ -448,7 +452,7 @@ func (r *Relay) doSubmit(ctx context.Context, ev *contracts.GatewayOutboundMessa
return fmt.Errorf("write to parachain: %w", err)
}

paraNonce, err = r.fetchLatestParachainNonce()
paraNonce, err := r.fetchLatestParachainNonce()
if err != nil {
return fmt.Errorf("fetch latest parachain nonce: %w", err)
}
Expand All @@ -459,3 +463,33 @@ func (r *Relay) doSubmit(ctx context.Context, ev *contracts.GatewayOutboundMessa

return nil
}

func (r *Relay) isMessageProcessed(eventNonce uint64) (bool, error) {
paraNonce, err := r.fetchLatestParachainNonce()
if err != nil {
return false, fmt.Errorf("fetch latest parachain nonce: %w", err)
}
// Check the nonce again in case another relayer processed the message while this relayer downloading beacon state
if eventNonce <= paraNonce {
log.WithField("nonce", paraNonce).Info("message picked up by another relayer, skipped")
return true, nil
}

return false, nil
}

func (r *Relay) isInFinalizedBlock(ctx context.Context, events []*contracts.GatewayOutboundMessageAccepted) error {
if len(events) > 0 {
return nil
}
firstEvent := events[0]

nextBlockNumber := new(big.Int).SetUint64(firstEvent.Raw.BlockNumber + 1)

blockHeader, err := r.ethconn.Client().HeaderByNumber(ctx, nextBlockNumber)
if err != nil {
return fmt.Errorf("get block header: %w", err)
}

return r.beaconHeader.CheckHeaderFinalized(*blockHeader.ParentBeaconRoot, r.config.InstantVerification)
}

0 comments on commit b94052b

Please sign in to comment.