-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
integrate ACP-118 #408
integrate ACP-118 #408
Changes from 46 commits
c46fc41
650c987
a9c7768
9db9153
557262a
b56e5d3
adb8fb2
f399cff
fa3dbc0
c771ca9
fa60e03
0108081
cf689e5
b186d1c
95797af
b8735b0
0be46eb
e3cefff
79132a1
b5cff9e
2f337c9
bce634d
811c97a
30a718d
9f81123
e9eea93
29cac30
7fd79dc
58af3de
e1edf7e
70666da
77262bc
ec5b830
0498f4a
440a52d
91c5016
47cfef6
b87bcc1
5852f6c
7f9488c
0648123
ae43d72
ca06934
6cd8245
b04df21
3d7707d
2ba7b61
576a375
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,7 +15,9 @@ import ( | |
|
||
"github.com/ava-labs/avalanchego/ids" | ||
"github.com/ava-labs/avalanchego/message" | ||
networkP2P "github.com/ava-labs/avalanchego/network/p2p" | ||
"github.com/ava-labs/avalanchego/proto/pb/p2p" | ||
"github.com/ava-labs/avalanchego/proto/pb/sdk" | ||
"github.com/ava-labs/avalanchego/subnets" | ||
"github.com/ava-labs/avalanchego/utils/constants" | ||
"github.com/ava-labs/avalanchego/utils/crypto/bls" | ||
|
@@ -26,9 +28,10 @@ import ( | |
"github.com/ava-labs/awm-relayer/signature-aggregator/aggregator/cache" | ||
"github.com/ava-labs/awm-relayer/signature-aggregator/metrics" | ||
"github.com/ava-labs/awm-relayer/utils" | ||
coreEthMsg "github.com/ava-labs/coreth/plugin/evm/message" | ||
corethMsg "github.com/ava-labs/coreth/plugin/evm/message" | ||
msg "github.com/ava-labs/subnet-evm/plugin/evm/message" | ||
"go.uber.org/zap" | ||
"google.golang.org/protobuf/proto" | ||
) | ||
|
||
type blsSignatureBuf [bls.SignatureLen]byte | ||
|
@@ -42,8 +45,8 @@ const ( | |
) | ||
|
||
var ( | ||
codec = msg.Codec | ||
coreEthCodec = coreEthMsg.Codec | ||
codec = msg.Codec | ||
corethCodec = corethMsg.Codec | ||
|
||
// Errors | ||
errNotEnoughSignatures = errors.New("failed to collect a threshold of signatures") | ||
|
@@ -60,6 +63,7 @@ type SignatureAggregator struct { | |
subnetsMapLock sync.RWMutex | ||
metrics *metrics.SignatureAggregatorMetrics | ||
cache *cache.Cache | ||
etnaTime time.Time | ||
} | ||
|
||
func NewSignatureAggregator( | ||
|
@@ -68,6 +72,7 @@ func NewSignatureAggregator( | |
signatureCacheSize uint64, | ||
metrics *metrics.SignatureAggregatorMetrics, | ||
messageCreator message.Creator, | ||
etnaTime time.Time, | ||
) (*SignatureAggregator, error) { | ||
cache, err := cache.NewCache(signatureCacheSize, logger) | ||
if err != nil { | ||
|
@@ -84,13 +89,15 @@ func NewSignatureAggregator( | |
messageCreator: messageCreator, | ||
currentRequestID: atomic.Uint32{}, | ||
cache: cache, | ||
etnaTime: etnaTime, | ||
} | ||
sa.currentRequestID.Store(rand.Uint32()) | ||
return &sa, nil | ||
} | ||
|
||
func (s *SignatureAggregator) CreateSignedMessage( | ||
unsignedMessage *avalancheWarp.UnsignedMessage, | ||
justification []byte, | ||
inputSigningSubnet ids.ID, | ||
quorumPercentage uint64, | ||
) (*avalancheWarp.Message, error) { | ||
|
@@ -176,19 +183,7 @@ func (s *SignatureAggregator) CreateSignedMessage( | |
)) | ||
} | ||
|
||
// TODO: remove this special handling and replace with ACP-118 interface once available | ||
var reqBytes []byte | ||
if sourceSubnet == constants.PrimaryNetworkID { | ||
req := coreEthMsg.MessageSignatureRequest{ | ||
MessageID: unsignedMessage.ID(), | ||
} | ||
reqBytes, err = coreEthMsg.RequestToBytes(coreEthCodec, req) | ||
} else { | ||
req := msg.MessageSignatureRequest{ | ||
MessageID: unsignedMessage.ID(), | ||
} | ||
reqBytes, err = msg.RequestToBytes(codec, req) | ||
} | ||
reqBytes, err := s.marshalRequest(unsignedMessage, justification, sourceSubnet) | ||
if err != nil { | ||
msg := "Failed to marshal request bytes" | ||
s.logger.Error( | ||
|
@@ -524,14 +519,13 @@ func (s *SignatureAggregator) isValidSignatureResponse( | |
return blsSignatureBuf{}, false | ||
} | ||
|
||
var sigResponse msg.SignatureResponse | ||
if _, err := msg.Codec.Unmarshal(appResponse.AppBytes, &sigResponse); err != nil { | ||
signature, err := s.unmarshalResponse(appResponse.AppBytes) | ||
if err != nil { | ||
s.logger.Error( | ||
"Error unmarshaling signature response", | ||
zap.Error(err), | ||
) | ||
} | ||
signature := sigResponse.Signature | ||
|
||
// If the node returned an empty signature, then it has not yet seen the warp message. Retry later. | ||
emptySignature := blsSignatureBuf{} | ||
|
@@ -543,6 +537,15 @@ func (s *SignatureAggregator) isValidSignatureResponse( | |
return blsSignatureBuf{}, false | ||
} | ||
|
||
if len(signature) != bls.SignatureLen { | ||
s.logger.Debug( | ||
"Response signature has incorrect length", | ||
zap.Int("actual", len(signature)), | ||
zap.Int("expected", bls.SignatureLen), | ||
) | ||
return blsSignatureBuf{}, false | ||
} | ||
|
||
sig, err := bls.SignatureFromBytes(signature[:]) | ||
if err != nil { | ||
s.logger.Debug( | ||
|
@@ -590,3 +593,61 @@ func (s *SignatureAggregator) aggregateSignatures( | |
} | ||
return aggSig, vdrBitSet, nil | ||
} | ||
|
||
// TODO: refactor this to remove special handling based on etnaTime | ||
minghinmatthewlam marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// after Etna release, along with related config and testing code | ||
func (s *SignatureAggregator) marshalRequest( | ||
unsignedMessage *avalancheWarp.UnsignedMessage, | ||
justification []byte, | ||
sourceSubnet ids.ID, | ||
) ([]byte, error) { | ||
if !s.etnaTime.IsZero() && s.etnaTime.Before(time.Now()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be nice to combine these two conditions into a helper There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Addressed in 576a375 |
||
// Post-Etna case | ||
messageBytes, err := proto.Marshal( | ||
&sdk.SignatureRequest{ | ||
Message: unsignedMessage.Bytes(), | ||
Justification: justification, | ||
}, | ||
) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return networkP2P.PrefixMessage( | ||
networkP2P.ProtocolPrefix(networkP2P.SignatureRequestHandlerID), | ||
messageBytes, | ||
), nil | ||
} else { | ||
// Pre-Etna case | ||
if sourceSubnet == constants.PrimaryNetworkID { | ||
req := corethMsg.MessageSignatureRequest{ | ||
MessageID: unsignedMessage.ID(), | ||
} | ||
return corethMsg.RequestToBytes(corethCodec, req) | ||
} else { | ||
req := msg.MessageSignatureRequest{ | ||
MessageID: unsignedMessage.ID(), | ||
} | ||
return msg.RequestToBytes(codec, req) | ||
} | ||
} | ||
} | ||
|
||
func (s *SignatureAggregator) unmarshalResponse(responseBytes []byte) (blsSignatureBuf, error) { | ||
if !s.etnaTime.IsZero() && s.etnaTime.Before(time.Now()) { | ||
// Post-Etna case | ||
var sigResponse sdk.SignatureResponse | ||
err := proto.Unmarshal(responseBytes, &sigResponse) | ||
if err != nil { | ||
return blsSignatureBuf{}, err | ||
} | ||
return blsSignatureBuf(sigResponse.Signature), nil | ||
} else { | ||
// Pre-Etna case | ||
var sigResponse msg.SignatureResponse | ||
_, err := msg.Codec.Unmarshal(responseBytes, &sigResponse) | ||
if err != nil { | ||
return blsSignatureBuf{}, err | ||
} | ||
return sigResponse.Signature, nil | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ package aggregator | |
|
||
import ( | ||
"testing" | ||
"time" | ||
|
||
"github.com/ava-labs/avalanchego/ids" | ||
"github.com/ava-labs/avalanchego/message" | ||
|
@@ -43,6 +44,8 @@ func instantiateAggregator(t *testing.T) ( | |
1024, | ||
sigAggMetrics, | ||
messageCreator, | ||
// Setting the etnaTime to a minute ago so that the post-etna code path is used in the test | ||
time.Now().Add(-1*time.Minute), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the purpose of the -1 minute? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A comment clarifying this would be helpful There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
) | ||
require.Equal(t, err, nil) | ||
return aggregator, mockNetwork | ||
|
@@ -61,7 +64,7 @@ func TestCreateSignedMessageFailsWithNoValidators(t *testing.T) { | |
}, | ||
nil, | ||
) | ||
_, err = aggregator.CreateSignedMessage(msg, ids.Empty, 80) | ||
_, err = aggregator.CreateSignedMessage(msg, nil, ids.Empty, 80) | ||
require.ErrorContains(t, err, "no signatures") | ||
} | ||
|
||
|
@@ -78,7 +81,7 @@ func TestCreateSignedMessageFailsWithoutSufficientConnectedStake(t *testing.T) { | |
}, | ||
nil, | ||
) | ||
_, err = aggregator.CreateSignedMessage(msg, ids.Empty, 80) | ||
_, err = aggregator.CreateSignedMessage(msg, nil, ids.Empty, 80) | ||
require.ErrorContains( | ||
t, | ||
err, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could take in a string as well and use
time.Parse
in a date foramtThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like that the
v.GetTime()
call will handle both RFC3339 format as well as UNIX timestamps cleanly and output it unambiguously when being written out.With
time.Parse
we would need to manually handle conversion when writing it out to disk and reading it separately.