Skip to content

Commit

Permalink
optimise sync committee contribution signature verification (#12537)
Browse files Browse the repository at this point in the history
Co-authored-by: shota.silagadze <[email protected]>
  • Loading branch information
shotasilagadze and shotasilagadzetaal authored Nov 12, 2024
1 parent 7dbb5f4 commit 02cb4fb
Show file tree
Hide file tree
Showing 10 changed files with 148 additions and 95 deletions.
37 changes: 18 additions & 19 deletions cl/beacon/handler/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,33 +398,32 @@ func (a *ApiHandler) PostEthV1ValidatorContributionsAndProofs(w http.ResponseWri
return
}
failures := []poolingFailure{}
var err error
for idx, v := range msgs {
if bytes.Equal(v.Message.Contribution.AggregationBits, make([]byte, len(v.Message.Contribution.AggregationBits))) {
continue // skip empty contributions
}
if err = a.syncContributionAndProofsService.ProcessMessage(r.Context(), nil, v); err != nil && !errors.Is(err, services.ErrIgnore) {

var signedContributionAndProofWithGossipData cltypes.SignedContributionAndProofWithGossipData
signedContributionAndProofWithGossipData.SignedContributionAndProof = v
signedContributionAndProofWithGossipData.ImmediateVerification = true

encodedSSZ, err := signedContributionAndProofWithGossipData.SignedContributionAndProof.EncodeSSZ(nil)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
log.Warn("[Beacon REST] failed to encode aggregate and proof", "err", err)
return
}

signedContributionAndProofWithGossipData.GossipData = &sentinel.GossipData{
Data: encodedSSZ,
Name: gossip.TopicNameSyncCommitteeContributionAndProof,
}

if err = a.syncContributionAndProofsService.ProcessMessage(r.Context(), nil, &signedContributionAndProofWithGossipData); err != nil && !errors.Is(err, services.ErrIgnore) {
log.Warn("[Beacon REST] failed to process sync contribution", "err", err)
failures = append(failures, poolingFailure{Index: idx, Message: err.Error()})
continue
}
// Broadcast to gossip
if a.sentinel != nil {
encodedSSZ, err := v.EncodeSSZ(nil)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
log.Warn("[Beacon REST] failed to encode sync contribution", "err", err)
return
}
if _, err := a.sentinel.PublishGossip(r.Context(), &sentinel.GossipData{
Data: encodedSSZ,
Name: gossip.TopicNameSyncCommitteeContributionAndProof,
}); err != nil {
log.Warn("[Beacon REST] failed to publish gossip", "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
}

if len(failures) > 0 {
Expand Down
4 changes: 2 additions & 2 deletions cl/beacon/handler/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ func setupTestingHandler(t *testing.T, v clparams.StateVersion, logger log.Logge
return h.syncMessagePool.AddSyncCommitteeMessage(postState, *subnetID, msg.SyncCommitteeMessage)
}).AnyTimes()

syncContributionService.EXPECT().ProcessMessage(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, subnetID *uint64, msg *cltypes.SignedContributionAndProof) error {
return h.syncMessagePool.AddSyncContribution(postState, msg.Message.Contribution)
syncContributionService.EXPECT().ProcessMessage(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, subnetID *uint64, msg *cltypes.SignedContributionAndProofWithGossipData) error {
return h.syncMessagePool.AddSyncContribution(postState, msg.SignedContributionAndProof.Message.Contribution)
}).AnyTimes()
aggregateAndProofsService.EXPECT().ProcessMessage(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, subnetID *uint64, msg *cltypes.SignedAggregateAndProofData) error {
opPool.AttestationsPool.Insert(msg.SignedAggregateAndProof.Message.Aggregate.Signature, msg.SignedAggregateAndProof.Message.Aggregate)
Expand Down
7 changes: 7 additions & 0 deletions cl/cltypes/contribution.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ func (a *ContributionAndProof) HashSSZ() ([32]byte, error) {
return merkle_tree.HashTreeRoot(a.AggregatorIndex, a.Contribution, a.SelectionProof[:])
}

// SignedContributionAndProofWithGossipData type represents SignedContributionAndProof with the gossip data where it's coming from.
type SignedContributionAndProofWithGossipData struct {
SignedContributionAndProof *SignedContributionAndProof
GossipData *sentinel.GossipData
ImmediateVerification bool
}

type SignedContributionAndProof struct {
Message *ContributionAndProof `json:"message"`
Signature libcommon.Bytes96 `json:"signature"`
Expand Down
7 changes: 5 additions & 2 deletions cl/phase1/network/gossip_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,11 @@ func (g *GossipManager) routeAndProcess(ctx context.Context, data *sentinel.Goss
log.Debug("Received block via gossip", "slot", obj.Block.Slot)
return g.blockService.ProcessMessage(ctx, data.SubnetId, obj)
case gossip.TopicNameSyncCommitteeContributionAndProof:
obj := &cltypes.SignedContributionAndProof{}
if err := obj.DecodeSSZ(data.Data, int(version)); err != nil {
obj := &cltypes.SignedContributionAndProofWithGossipData{
GossipData: copyOfSentinelData(data),
SignedContributionAndProof: &cltypes.SignedContributionAndProof{},
}
if err := obj.SignedContributionAndProof.DecodeSSZ(data.Data, int(version)); err != nil {
return err
}
return g.syncContributionService.ProcessMessage(ctx, data.SubnetId, obj)
Expand Down
7 changes: 7 additions & 0 deletions cl/phase1/network/services/batch_signature_verification.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type BatchSignatureVerifier struct {
attVerifyAndExecute chan *AggregateVerificationData
aggregateProofVerify chan *AggregateVerificationData
blsToExecutionChangeVerify chan *AggregateVerificationData
syncContributionVerify chan *AggregateVerificationData
syncCommitteeMessage chan *AggregateVerificationData
voluntaryExitVerify chan *AggregateVerificationData
ctx context.Context
Expand All @@ -51,6 +52,7 @@ func NewBatchSignatureVerifier(ctx context.Context, sentinel sentinel.SentinelCl
attVerifyAndExecute: make(chan *AggregateVerificationData, 1024),
aggregateProofVerify: make(chan *AggregateVerificationData, 1024),
blsToExecutionChangeVerify: make(chan *AggregateVerificationData, 1024),
syncContributionVerify: make(chan *AggregateVerificationData, 1024),
syncCommitteeMessage: make(chan *AggregateVerificationData, 1024),
voluntaryExitVerify: make(chan *AggregateVerificationData, 1024),
}
Expand All @@ -69,6 +71,10 @@ func (b *BatchSignatureVerifier) AsyncVerifyBlsToExecutionChange(data *Aggregate
b.blsToExecutionChangeVerify <- data
}

func (b *BatchSignatureVerifier) AsyncVerifySyncContribution(data *AggregateVerificationData) {
b.syncContributionVerify <- data
}

func (b *BatchSignatureVerifier) AsyncVerifySyncCommitteeMessage(data *AggregateVerificationData) {
b.syncCommitteeMessage <- data
}
Expand All @@ -86,6 +92,7 @@ func (b *BatchSignatureVerifier) Start() {
go b.start(b.attVerifyAndExecute)
go b.start(b.aggregateProofVerify)
go b.start(b.blsToExecutionChangeVerify)
go b.start(b.syncContributionVerify)
go b.start(b.syncCommitteeMessage)
go b.start(b.voluntaryExitVerify)
}
Expand Down
2 changes: 1 addition & 1 deletion cl/phase1/network/services/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type BlobSidecarsService Service[*cltypes.BlobSidecar]
type SyncCommitteeMessagesService Service[*cltypes.SyncCommitteeMessageWithGossipData]

//go:generate mockgen -typed=true -destination=./mock_services/sync_contribution_service_mock.go -package=mock_services . SyncContributionService
type SyncContributionService Service[*cltypes.SignedContributionAndProof]
type SyncContributionService Service[*cltypes.SignedContributionAndProofWithGossipData]

//go:generate mockgen -typed=true -destination=./mock_services/aggregate_and_proof_service_mock.go -package=mock_services . AggregateAndProofService
type AggregateAndProofService Service[*cltypes.SignedAggregateAndProofData]
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

138 changes: 83 additions & 55 deletions cl/phase1/network/services/sync_contribution_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type syncContributionService struct {
seenSyncCommitteeContributions map[seenSyncCommitteeContribution]struct{}
emitters *beaconevents.EventEmitter
ethClock eth_clock.EthereumClock
batchSignatureVerifier *BatchSignatureVerifier
test bool

mu sync.Mutex
Expand All @@ -65,6 +66,7 @@ func NewSyncContributionService(
syncContributionPool sync_contribution_pool.SyncContributionPool,
ethClock eth_clock.EthereumClock,
emitters *beaconevents.EventEmitter,
batchSignatureVerifier *BatchSignatureVerifier,
test bool,
) SyncContributionService {
return &syncContributionService{
Expand All @@ -74,16 +76,17 @@ func NewSyncContributionService(
seenSyncCommitteeContributions: make(map[seenSyncCommitteeContribution]struct{}),
ethClock: ethClock,
emitters: emitters,
batchSignatureVerifier: batchSignatureVerifier,
test: test,
}
}

// ProcessMessage processes a sync contribution message
func (s *syncContributionService) ProcessMessage(ctx context.Context, subnet *uint64, signedContribution *cltypes.SignedContributionAndProof) error {
func (s *syncContributionService) ProcessMessage(ctx context.Context, subnet *uint64, signedContribution *cltypes.SignedContributionAndProofWithGossipData) error {
s.mu.Lock()
defer s.mu.Unlock()

contributionAndProof := signedContribution.Message
contributionAndProof := signedContribution.SignedContributionAndProof.Message
selectionProof := contributionAndProof.SelectionProof
aggregationBits := contributionAndProof.Contribution.AggregationBits

Expand Down Expand Up @@ -129,32 +132,74 @@ func (s *syncContributionService) ProcessMessage(ctx context.Context, subnet *ui
return ErrIgnore
}

// [REJECT] The contribution_and_proof.selection_proof is a valid signature of the SyncAggregatorSelectionData derived from the contribution by the validator with index contribution_and_proof.aggregator_index.
if err := verifySyncContributionSelectionProof(headState, contributionAndProof); !s.test && err != nil {
return err
}
// [REJECT] The aggregator signature, signed_contribution_and_proof.signature, is valid.
if err := verifyAggregatorSignatureForSyncContribution(headState, signedContribution); !s.test && err != nil {
// aggregate signatures for later verification
aggregateVerificationData, err := s.GetSignaturesOnContributionSignatures(headState, contributionAndProof, signedContribution, subcommiteePubsKeys)
if err != nil {
return err
}
// [REJECT] The aggregate signature is valid for the message beacon_block_root and aggregate pubkey derived
// from the participation info in aggregation_bits for the subcommittee specified by the contribution.subcommittee_index.
if err := verifySyncContributionProofAggregatedSignature(headState, contributionAndProof.Contribution, subcommiteePubsKeys); !s.test && err != nil {
return err

aggregateVerificationData.GossipData = signedContribution.GossipData

// further processing will be done after async signature verification
aggregateVerificationData.F = func() {

// mark the valid contribution as seen
s.markContributionAsSeen(contributionAndProof)

// emit contribution_and_proof

s.emitters.Operation().SendContributionProof(signedContribution.SignedContributionAndProof)
// add the contribution to the pool
err = s.syncContributionPool.AddSyncContribution(headState, contributionAndProof.Contribution)
if errors.Is(err, sync_contribution_pool.ErrIsSuperset) {
return
}
}
// mark the valid contribution as seen
s.markContributionAsSeen(contributionAndProof)

// emit contribution_and_proof
s.emitters.Operation().SendContributionProof(signedContribution)
// add the contribution to the pool
err = s.syncContributionPool.AddSyncContribution(headState, contributionAndProof.Contribution)
if errors.Is(err, sync_contribution_pool.ErrIsSuperset) {
return ErrIgnore

if signedContribution.ImmediateVerification {
return s.batchSignatureVerifier.ImmediateVerification(aggregateVerificationData)
}
return err

// push the signatures to verify asynchronously and run final functions after that.
s.batchSignatureVerifier.AsyncVerifySyncContribution(aggregateVerificationData)

// As the logic goes, if we return ErrIgnore there will be no peer banning and further publishing
// gossip data into the network by the gossip manager. That's what we want because we will be doing that ourselves
// in BatchVerification function. After validating signatures, if they are valid we will publish the
// gossip ourselves or ban the peer which sent that particular invalid signature.
return ErrIgnore
})
}

func (s *syncContributionService) GetSignaturesOnContributionSignatures(
headState *state.CachingBeaconState,
contributionAndProof *cltypes.ContributionAndProof,
signedContribution *cltypes.SignedContributionAndProofWithGossipData,
subcommiteePubsKeys []libcommon.Bytes48) (*AggregateVerificationData, error) {

// [REJECT] The contribution_and_proof.selection_proof is a valid signature of the SyncAggregatorSelectionData derived from the contribution by the validator with index contribution_and_proof.aggregator_index.
signature1, signatureRoot1, pubKey1, err := verifySyncContributionSelectionProof(headState, contributionAndProof)
if !s.test && err != nil {
return nil, err
}

// [REJECT] The aggregator signature, signed_contribution_and_proof.signature, is valid.
signature2, signatureRoot2, pubKey2, err := verifyAggregatorSignatureForSyncContribution(headState, signedContribution.SignedContributionAndProof)
if !s.test && err != nil {
return nil, err
}
// [REJECT] The aggregate signature is valid for the message beacon_block_root and aggregate pubkey derived
// from the participation info in aggregation_bits for the subcommittee specified by the contribution.subcommittee_index.
signature3, signatureRoot3, pubKey3, err := verifySyncContributionProofAggregatedSignature(headState, contributionAndProof.Contribution, subcommiteePubsKeys)
if !s.test && err != nil {
return nil, err
}

return &AggregateVerificationData{
Signatures: [][]byte{signature1, signature2, signature3},
SignRoots: [][]byte{signatureRoot1, signatureRoot2, signatureRoot3},
Pks: [][]byte{pubKey1, pubKey2, pubKey3},
}, nil
}

// def get_sync_subcommittee_pubkeys(state: BeaconState, subcommittee_index: uint64) -> Sequence[BLSPubkey]:
Expand Down Expand Up @@ -207,7 +252,7 @@ func (s *syncContributionService) markContributionAsSeen(contribution *cltypes.C
}

// verifySyncContributionProof verifies the sync contribution proof.
func verifySyncContributionSelectionProof(st *state.CachingBeaconState, contributionAndProof *cltypes.ContributionAndProof) error {
func verifySyncContributionSelectionProof(st *state.CachingBeaconState, contributionAndProof *cltypes.ContributionAndProof) ([]byte, []byte, []byte, error) {
syncAggregatorSelectionData := &cltypes.SyncAggregatorSelectionData{
Slot: contributionAndProof.Contribution.Slot,
SubcommitteeIndex: contributionAndProof.Contribution.SubcommitteeIndex,
Expand All @@ -216,39 +261,32 @@ func verifySyncContributionSelectionProof(st *state.CachingBeaconState, contribu

aggregatorPubKey, err := st.ValidatorPublicKey(int(contributionAndProof.AggregatorIndex))
if err != nil {
return err
return nil, nil, nil, err
}

domain, err := st.GetDomain(st.BeaconConfig().DomainSyncCommitteeSelectionProof, state.GetEpochAtSlot(st.BeaconConfig(), contributionAndProof.Contribution.Slot))
if err != nil {
return err
return nil, nil, nil, err
}

selectionDataRoot, err := fork.ComputeSigningRoot(syncAggregatorSelectionData, domain)
if err != nil {
return err
return nil, nil, nil, err
}

valid, err := bls.Verify(selectionProof[:], selectionDataRoot[:], aggregatorPubKey[:])
if err != nil {
return err
}
if !valid {
return errors.New("invalid selectionProof signature")
}
return nil
return selectionProof[:], selectionDataRoot[:], aggregatorPubKey[:], nil
}

// verifySyncContributionProof verifies the contribution aggregated signature.
func verifySyncContributionProofAggregatedSignature(s *state.CachingBeaconState, contribution *cltypes.Contribution, subCommitteeKeys []libcommon.Bytes48) error {
func verifySyncContributionProofAggregatedSignature(s *state.CachingBeaconState, contribution *cltypes.Contribution, subCommitteeKeys []libcommon.Bytes48) ([]byte, []byte, []byte, error) {
domain, err := s.GetDomain(s.BeaconConfig().DomainSyncCommittee, state.Epoch(s))
if err != nil {
return err
return nil, nil, nil, err
}

msg := utils.Sha256(contribution.BeaconBlockRoot[:], domain)
if err != nil {
return err
return nil, nil, nil, err
}
// only use the ones pertaining to the aggregation bits
subCommitteePubsKeys := make([][]byte, 0, len(subCommitteeKeys))
Expand All @@ -258,38 +296,28 @@ func verifySyncContributionProofAggregatedSignature(s *state.CachingBeaconState,
}
}

valid, err := bls.VerifyAggregate(contribution.Signature[:], msg[:], subCommitteePubsKeys)
pubKeys, err := bls.AggregatePublickKeys(subCommitteePubsKeys)
if err != nil {
return err
return nil, nil, nil, err
}

if !valid {
return errors.New("invalid signature for aggregate sync contribution")
}
return nil
return contribution.Signature[:], msg[:], pubKeys, nil
}

func verifyAggregatorSignatureForSyncContribution(s *state.CachingBeaconState, signedContributionAndProof *cltypes.SignedContributionAndProof) error {
func verifyAggregatorSignatureForSyncContribution(s *state.CachingBeaconState, signedContributionAndProof *cltypes.SignedContributionAndProof) ([]byte, []byte, []byte, error) {
contribution := signedContributionAndProof.Message.Contribution
domain, err := s.GetDomain(s.BeaconConfig().DomainContributionAndProof, contribution.Slot/s.BeaconConfig().SlotsPerEpoch)
if err != nil {
return err
return nil, nil, nil, err
}

signingRoot, err := fork.ComputeSigningRoot(signedContributionAndProof.Message, domain)
if err != nil {
return err
return nil, nil, nil, err
}
aggregatorPubKey, err := s.ValidatorPublicKey(int(signedContributionAndProof.Message.AggregatorIndex))
if err != nil {
return err
}
valid, err := bls.Verify(signedContributionAndProof.Signature[:], signingRoot[:], aggregatorPubKey[:])
if err != nil {
return err
}
if !valid {
return errors.New("invalid aggregator signature")
return nil, nil, nil, err
}
return nil
return signedContributionAndProof.Signature[:], signingRoot[:], aggregatorPubKey[:], nil
}
Loading

0 comments on commit 02cb4fb

Please sign in to comment.