From 0449479576b5059174a0929502e143a8c2561c75 Mon Sep 17 00:00:00 2001 From: Hlib Kanunnikov Date: Wed, 12 Apr 2023 10:53:49 +0200 Subject: [PATCH] refactor(libs/fraud): fraud tests overhaul (#2039) The main goal here is to prepare everything for extraction. There was a cyclic dependency and dependence on ExtendedHeader inside of fraud, which blocked the extraction. This patch fixes this in conjunction with cleanings for testing utilities and actual tests. List of other notable changes: - Updates go-header and - migrates to the new API for testing in header - Restructures fraud pkg and extract fraud.Service and related logic into sub pkg fraudserv - Keeping only interface definitions inside of the root fraud - Extracts testing utilities into fraudtest - Adds sprinkles of documentation --- gossip_score.go => fraudserv/gossip_score.go | 2 +- helpers.go => fraudserv/helpers.go | 12 +- {pb => fraudserv/pb}/proof.pb.go | 9 +- {pb => fraudserv/pb}/proof.proto | 0 requester.go => fraudserv/requester.go | 4 +- service.go => fraudserv/service.go | 47 ++-- fraudserv/service_test.go | 196 +++++++++++++++ store.go => fraudserv/store.go | 16 +- store_test.go => fraudserv/store_test.go | 21 +- subscription.go => fraudserv/subscription.go | 8 +- sync.go => fraudserv/sync.go | 11 +- fraudtest/dummy_proof.go | 53 ++++ fraudtest/dummy_service.go | 29 +++ interface.go | 13 +- metrics.go | 3 + proof.go | 46 ++-- service_test.go | 239 ------------------- testing.go | 149 ------------ 18 files changed, 379 insertions(+), 479 deletions(-) rename gossip_score.go => fraudserv/gossip_score.go (98%) rename helpers.go => fraudserv/helpers.go (73%) rename {pb => fraudserv/pb}/proof.pb.go (99%) rename {pb => fraudserv/pb}/proof.proto (100%) rename requester.go => fraudserv/requester.go (93%) rename service.go => fraudserv/service.go (82%) create mode 100644 fraudserv/service_test.go rename store.go => fraudserv/store.go (79%) rename store_test.go => fraudserv/store_test.go (88%) rename subscription.go => fraudserv/subscription.go (75%) rename sync.go => fraudserv/sync.go (93%) create mode 100644 fraudtest/dummy_proof.go create mode 100644 fraudtest/dummy_service.go delete mode 100644 service_test.go delete mode 100644 testing.go diff --git a/gossip_score.go b/fraudserv/gossip_score.go similarity index 98% rename from gossip_score.go rename to fraudserv/gossip_score.go index 2a97948..2f758fb 100644 --- a/gossip_score.go +++ b/fraudserv/gossip_score.go @@ -1,4 +1,4 @@ -package fraud +package fraudserv import ( "math" diff --git a/helpers.go b/fraudserv/helpers.go similarity index 73% rename from helpers.go rename to fraudserv/helpers.go index 3ba55a1..99b73d5 100644 --- a/helpers.go +++ b/fraudserv/helpers.go @@ -1,4 +1,4 @@ -package fraud +package fraudserv import ( "context" @@ -7,6 +7,8 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" + + "github.com/celestiaorg/celestia-node/libs/fraud" ) func PubsubTopicID(fraudType, networkID string) string { @@ -17,8 +19,12 @@ func protocolID(networkID string) protocol.ID { return protocol.ID(fmt.Sprintf("/%s/fraud/v0.0.1", networkID)) } -func join(p *pubsub.PubSub, proofType ProofType, networkID string, - validate func(context.Context, ProofType, peer.ID, *pubsub.Message) pubsub.ValidationResult) (*pubsub.Topic, error) { +func join( + p *pubsub.PubSub, + proofType fraud.ProofType, + networkID string, + validate func(context.Context, fraud.ProofType, peer.ID, *pubsub.Message) pubsub.ValidationResult, +) (*pubsub.Topic, error) { topic := PubsubTopicID(proofType.String(), networkID) log.Infow("joining topic", "id", topic) t, err := p.Join(topic) diff --git a/pb/proof.pb.go b/fraudserv/pb/proof.pb.go similarity index 99% rename from pb/proof.pb.go rename to fraudserv/pb/proof.pb.go index 2d5b679..09bbc18 100644 --- a/pb/proof.pb.go +++ b/fraudserv/pb/proof.pb.go @@ -4,11 +4,12 @@ package fraud_pb import ( - fmt "fmt" - proto "github.com/gogo/protobuf/proto" - io "io" - math "math" + "fmt" + "io" + "math" math_bits "math/bits" + + "github.com/gogo/protobuf/proto" ) // Reference imports to suppress errors if they are not otherwise used. diff --git a/pb/proof.proto b/fraudserv/pb/proof.proto similarity index 100% rename from pb/proof.proto rename to fraudserv/pb/proof.proto diff --git a/requester.go b/fraudserv/requester.go similarity index 93% rename from requester.go rename to fraudserv/requester.go index 75a1d2d..682bf9b 100644 --- a/requester.go +++ b/fraudserv/requester.go @@ -1,4 +1,4 @@ -package fraud +package fraudserv import ( "context" @@ -9,7 +9,7 @@ import ( "github.com/celestiaorg/go-libp2p-messenger/serde" - pb "github.com/celestiaorg/celestia-node/libs/fraud/pb" + pb "github.com/celestiaorg/celestia-node/libs/fraud/fraudserv/pb" ) const ( diff --git a/service.go b/fraudserv/service.go similarity index 82% rename from service.go rename to fraudserv/service.go index 1e68a80..631c390 100644 --- a/service.go +++ b/fraudserv/service.go @@ -1,4 +1,4 @@ -package fraud +package fraudserv import ( "bytes" @@ -9,12 +9,21 @@ import ( "sync" "github.com/ipfs/go-datastore" + logging "github.com/ipfs/go-log/v2" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" + + "github.com/celestiaorg/celestia-node/libs/fraud" +) + +var ( + log = logging.Logger("fraudserv") + tracer = otel.Tracer("fraudserv") ) // fraudRequests is the amount of external requests that will be tried to get fraud proofs from @@ -30,14 +39,14 @@ type ProofService struct { cancel context.CancelFunc topicsLk sync.RWMutex - topics map[ProofType]*pubsub.Topic + topics map[fraud.ProofType]*pubsub.Topic storesLk sync.RWMutex - stores map[ProofType]datastore.Datastore + stores map[fraud.ProofType]datastore.Datastore pubsub *pubsub.PubSub host host.Host - getter headerFetcher + getter fraud.HeaderFetcher ds datastore.Datastore syncerEnabled bool @@ -46,7 +55,7 @@ type ProofService struct { func NewProofService( p *pubsub.PubSub, host host.Host, - getter headerFetcher, + getter fraud.HeaderFetcher, ds datastore.Datastore, syncerEnabled bool, networkID string, @@ -55,8 +64,8 @@ func NewProofService( pubsub: p, host: host, getter: getter, - topics: make(map[ProofType]*pubsub.Topic), - stores: make(map[ProofType]datastore.Datastore), + topics: make(map[fraud.ProofType]*pubsub.Topic), + stores: make(map[fraud.ProofType]datastore.Datastore), ds: ds, networkID: networkID, syncerEnabled: syncerEnabled, @@ -64,7 +73,7 @@ func NewProofService( } // registerProofTopics registers proofTypes as pubsub topics to be joined. -func (f *ProofService) registerProofTopics(proofTypes ...ProofType) error { +func (f *ProofService) registerProofTopics(proofTypes ...fraud.ProofType) error { for _, proofType := range proofTypes { t, err := join(f.pubsub, proofType, f.networkID, f.processIncoming) if err != nil { @@ -81,7 +90,7 @@ func (f *ProofService) registerProofTopics(proofTypes ...ProofType) error { // if syncer is enabled. func (f *ProofService) Start(context.Context) error { f.ctx, f.cancel = context.WithCancel(context.Background()) - if err := f.registerProofTopics(registeredProofTypes()...); err != nil { + if err := f.registerProofTopics(fraud.Registered()...); err != nil { return err } id := protocolID(f.networkID) @@ -95,13 +104,13 @@ func (f *ProofService) Start(context.Context) error { } // Stop removes the stream handler and cancels the underlying ProofService -func (f *ProofService) Stop(context.Context) error { +func (f *ProofService) Stop(context.Context) (err error) { f.host.RemoveStreamHandler(protocolID(f.networkID)) f.cancel() - return nil + return } -func (f *ProofService) Subscribe(proofType ProofType) (_ Subscription, err error) { +func (f *ProofService) Subscribe(proofType fraud.ProofType) (_ fraud.Subscription, err error) { f.topicsLk.Lock() defer f.topicsLk.Unlock() t, ok := f.topics[proofType] @@ -115,7 +124,7 @@ func (f *ProofService) Subscribe(proofType ProofType) (_ Subscription, err error return &subscription{subs}, nil } -func (f *ProofService) Broadcast(ctx context.Context, p Proof) error { +func (f *ProofService) Broadcast(ctx context.Context, p fraud.Proof) error { bin, err := p.MarshalBinary() if err != nil { return err @@ -132,7 +141,7 @@ func (f *ProofService) Broadcast(ctx context.Context, p Proof) error { // processIncoming encompasses the logic for validating fraud proofs. func (f *ProofService) processIncoming( ctx context.Context, - proofType ProofType, + proofType fraud.ProofType, from peer.ID, msg *pubsub.Message, ) pubsub.ValidationResult { @@ -143,10 +152,10 @@ func (f *ProofService) processIncoming( // unmarshal message to the Proof. // Peer will be added to black list if unmarshalling fails. - proof, err := Unmarshal(proofType, msg.Data) + proof, err := fraud.Unmarshal(proofType, msg.Data) if err != nil { log.Errorw("unmarshalling failed", "err", err) - if !errors.Is(err, &errNoUnmarshaler{}) { + if !errors.Is(err, &fraud.ErrNoUnmarshaler{}) { f.pubsub.BlacklistPeer(from) } span.RecordError(err) @@ -201,7 +210,7 @@ func (f *ProofService) processIncoming( return pubsub.ValidationAccept } -func (f *ProofService) Get(ctx context.Context, proofType ProofType) ([]Proof, error) { +func (f *ProofService) Get(ctx context.Context, proofType fraud.ProofType) ([]fraud.Proof, error) { f.storesLk.Lock() store, ok := f.stores[proofType] if !ok { @@ -214,7 +223,7 @@ func (f *ProofService) Get(ctx context.Context, proofType ProofType) ([]Proof, e } // put adds a fraud proof to the local storage. -func (f *ProofService) put(ctx context.Context, proofType ProofType, hash string, data []byte) error { +func (f *ProofService) put(ctx context.Context, proofType fraud.ProofType, hash string, data []byte) error { f.storesLk.Lock() store, ok := f.stores[proofType] if !ok { @@ -226,7 +235,7 @@ func (f *ProofService) put(ctx context.Context, proofType ProofType, hash string } // verifyLocal checks if a fraud proof has been stored locally. -func (f *ProofService) verifyLocal(ctx context.Context, proofType ProofType, hash string, data []byte) bool { +func (f *ProofService) verifyLocal(ctx context.Context, proofType fraud.ProofType, hash string, data []byte) bool { f.storesLk.RLock() storage, ok := f.stores[proofType] f.storesLk.RUnlock() diff --git a/fraudserv/service_test.go b/fraudserv/service_test.go new file mode 100644 index 0000000..f297ab9 --- /dev/null +++ b/fraudserv/service_test.go @@ -0,0 +1,196 @@ +package fraudserv + +import ( + "context" + "testing" + "time" + + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/sync" + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p/core/host" + mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" + "github.com/stretchr/testify/require" + + "github.com/celestiaorg/go-header" + "github.com/celestiaorg/go-header/headertest" + + "github.com/celestiaorg/celestia-node/libs/fraud/fraudtest" +) + +func TestService_SubscribeBroadcastValid(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + t.Cleanup(cancel) + + serv := newTestService(ctx, t, false) + require.NoError(t, serv.Start(ctx)) + + fraud := fraudtest.NewValidProof() + sub, err := serv.Subscribe(fraud.Type()) + require.NoError(t, err) + defer sub.Cancel() + + require.NoError(t, serv.Broadcast(ctx, fraud)) + _, err = sub.Proof(ctx) + require.NoError(t, err) +} + +func TestService_SubscribeBroadcastInvalid(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + t.Cleanup(cancel) + + serv := newTestService(ctx, t, false) + require.NoError(t, serv.Start(ctx)) + + fraud := fraudtest.NewInvalidProof() + sub, err := serv.Subscribe(fraud.Type()) + require.NoError(t, err) + defer sub.Cancel() + + err = serv.Broadcast(ctx, fraud) + require.Error(t, err) + + ctx2, cancel := context.WithTimeout(context.Background(), time.Millisecond*100) + t.Cleanup(cancel) + + _, err = sub.Proof(ctx2) + require.Error(t, err) +} + +func TestService_ReGossiping(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + // create mock network + net, err := mocknet.FullMeshLinked(3) + require.NoError(t, err) + + // create services + servA := newTestServiceWithHost(ctx, t, net.Hosts()[0], false) + servB := newTestServiceWithHost(ctx, t, net.Hosts()[1], false) + servC := newTestServiceWithHost(ctx, t, net.Hosts()[2], false) + + // preconnect peers: A -> B -> C, so A and C are not connected to each other + addrB := host.InfoFromHost(net.Hosts()[1]) // -> B + require.NoError(t, net.Hosts()[0].Connect(ctx, *addrB)) // host[0] is A + require.NoError(t, net.Hosts()[2].Connect(ctx, *addrB)) // host[2] is C + + // start services + require.NoError(t, servA.Start(ctx)) + require.NoError(t, servB.Start(ctx)) + require.NoError(t, servC.Start(ctx)) + + fraud := fraudtest.NewValidProof() + subsA, err := servA.Subscribe(fraud.Type()) + require.NoError(t, err) + defer subsA.Cancel() + + subsB, err := servB.Subscribe(fraud.Type()) + require.NoError(t, err) + defer subsB.Cancel() + + subsC, err := servC.Subscribe(fraud.Type()) + require.NoError(t, err) + defer subsC.Cancel() + + // give some time for subscriptions to land + // this mitigates flakiness + time.Sleep(time.Millisecond * 100) + + // and only after broadcaster + err = servA.Broadcast(ctx, fraud) + require.NoError(t, err) + + _, err = subsA.Proof(ctx) // subscriptions of subA should also receive the proof + require.NoError(t, err) + + _, err = subsB.Proof(ctx) + require.NoError(t, err) + + _, err = subsC.Proof(ctx) + require.NoError(t, err) +} + +func TestService_Get(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + serv := newTestService(ctx, t, false) + require.NoError(t, serv.Start(ctx)) + + fraud := fraudtest.NewValidProof() + _, err := serv.Get(ctx, fraud.Type()) // try to fetch proof + require.Error(t, err) // storage is empty so should error + + sub, err := serv.Subscribe(fraud.Type()) + require.NoError(t, err) + defer sub.Cancel() + + // subscription needs some time and love to avoid flakes + time.Sleep(time.Millisecond * 100) + + err = serv.Broadcast(ctx, fraud) // broadcast stores the fraud as well + require.NoError(t, err) + + _, err = serv.Get(ctx, fraud.Type()) + require.NoError(t, err) +} + +func TestService_Sync(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + net, err := mocknet.FullMeshLinked(2) + require.NoError(t, err) + + servA := newTestServiceWithHost(ctx, t, net.Hosts()[0], false) + require.NoError(t, servA.Start(ctx)) + + fraud := fraudtest.NewValidProof() + err = servA.Broadcast(ctx, fraud) // broadcasting ensures the fraud gets stored on servA + require.NoError(t, err) + + servB := newTestServiceWithHost(ctx, t, net.Hosts()[1], true) // start servB + require.NoError(t, servB.Start(ctx)) + + sub, err := servB.Subscribe(fraud.Type()) // subscribe + require.NoError(t, err) + defer sub.Cancel() + + addrB := host.InfoFromHost(net.Hosts()[1]) + require.NoError(t, net.Hosts()[0].Connect(ctx, *addrB)) // connect A to B + + _, err = sub.Proof(ctx) // heck that we get it from subscription by syncing from servA + require.NoError(t, err) +} + +func newTestService(ctx context.Context, t *testing.T, enabledSyncer bool) *ProofService { + net, err := mocknet.FullMeshLinked(1) + require.NoError(t, err) + return newTestServiceWithHost(ctx, t, net.Hosts()[0], enabledSyncer) +} + +func newTestServiceWithHost(ctx context.Context, t *testing.T, host host.Host, enabledSyncer bool) *ProofService { + ps, err := pubsub.NewFloodSub(ctx, host, pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign)) + require.NoError(t, err) + + store := headertest.NewDummyStore(t) + serv := NewProofService( + ps, + host, + func(ctx context.Context, u uint64) (header.Header, error) { + return store.GetByHeight(ctx, u) + }, + sync.MutexWrap(datastore.NewMapDatastore()), + enabledSyncer, + "private", + ) + + t.Cleanup(func() { + err := serv.Stop(ctx) + if err != nil { + t.Fatal(err) + } + }) + return serv +} diff --git a/store.go b/fraudserv/store.go similarity index 79% rename from store.go rename to fraudserv/store.go index ff812c4..dfd71b0 100644 --- a/store.go +++ b/fraudserv/store.go @@ -1,4 +1,4 @@ -package fraud +package fraudserv import ( "context" @@ -9,6 +9,8 @@ import ( "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" q "github.com/ipfs/go-datastore/query" + + "github.com/celestiaorg/celestia-node/libs/fraud" ) var ( @@ -36,7 +38,7 @@ func getByHash(ctx context.Context, ds datastore.Datastore, hash string) ([]byte } // getAll queries all Fraud Proofs by their type. -func getAll(ctx context.Context, ds datastore.Datastore, proofType ProofType) ([]Proof, error) { +func getAll(ctx context.Context, ds datastore.Datastore, proofType fraud.ProofType) ([]fraud.Proof, error) { entries, err := query(ctx, ds, q.Query{}) if err != nil { return nil, err @@ -44,11 +46,11 @@ func getAll(ctx context.Context, ds datastore.Datastore, proofType ProofType) ([ if len(entries) == 0 { return nil, datastore.ErrNotFound } - proofs := make([]Proof, 0) + proofs := make([]fraud.Proof, 0) for _, data := range entries { - proof, err := Unmarshal(proofType, data.Value) + proof, err := fraud.Unmarshal(proofType, data.Value) if err != nil { - if errors.Is(err, &errNoUnmarshaler{}) { + if errors.Is(err, &fraud.ErrNoUnmarshaler{}) { return nil, err } log.Warn(err) @@ -62,10 +64,10 @@ func getAll(ctx context.Context, ds datastore.Datastore, proofType ProofType) ([ return proofs, nil } -func initStore(topic ProofType, ds datastore.Datastore) datastore.Datastore { +func initStore(topic fraud.ProofType, ds datastore.Datastore) datastore.Datastore { return namespace.Wrap(ds, makeKey(topic)) } -func makeKey(topic ProofType) datastore.Key { +func makeKey(topic fraud.ProofType) datastore.Key { return datastore.NewKey(fmt.Sprintf("%s/%s", storePrefix, topic)) } diff --git a/store_test.go b/fraudserv/store_test.go similarity index 88% rename from store_test.go rename to fraudserv/store_test.go index 2556b8c..90a2f84 100644 --- a/store_test.go +++ b/fraudserv/store_test.go @@ -1,4 +1,4 @@ -package fraud +package fraudserv import ( "context" @@ -9,13 +9,15 @@ import ( "github.com/ipfs/go-datastore/namespace" ds_sync "github.com/ipfs/go-datastore/sync" "github.com/stretchr/testify/require" + + "github.com/celestiaorg/celestia-node/libs/fraud/fraudtest" ) func TestStore_Put(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer t.Cleanup(cancel) - p := newValidProof() + p := fraudtest.NewValidProof() bin, err := p.MarshalBinary() require.NoError(t, err) ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) @@ -25,10 +27,10 @@ func TestStore_Put(t *testing.T) { } func TestStore_GetAll(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer t.Cleanup(cancel) - proof := newValidProof() + proof := fraudtest.NewValidProof() bin, err := proof.MarshalBinary() require.NoError(t, err) ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) @@ -41,14 +43,13 @@ func TestStore_GetAll(t *testing.T) { require.NoError(t, err) require.NotEmpty(t, proofs) require.NoError(t, proof.Validate(nil)) - } func Test_GetAllFailed(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer t.Cleanup(cancel) - proof := newValidProof() + proof := fraudtest.NewValidProof() ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) store := namespace.Wrap(ds, makeKey(proof.Type())) @@ -59,10 +60,10 @@ func Test_GetAllFailed(t *testing.T) { } func Test_getByHash(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer t.Cleanup(cancel) - proof := newValidProof() + proof := fraudtest.NewValidProof() ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) store := namespace.Wrap(ds, makeKey(proof.Type())) bin, err := proof.MarshalBinary() diff --git a/subscription.go b/fraudserv/subscription.go similarity index 75% rename from subscription.go rename to fraudserv/subscription.go index e9cef33..fbf86ba 100644 --- a/subscription.go +++ b/fraudserv/subscription.go @@ -1,4 +1,4 @@ -package fraud +package fraudserv import ( "context" @@ -6,6 +6,8 @@ import ( "reflect" pubsub "github.com/libp2p/go-libp2p-pubsub" + + "github.com/celestiaorg/celestia-node/libs/fraud" ) // subscription wraps pubsub subscription and handles Fraud Proof from the pubsub topic. @@ -13,7 +15,7 @@ type subscription struct { subscription *pubsub.Subscription } -func (s *subscription) Proof(ctx context.Context) (Proof, error) { +func (s *subscription) Proof(ctx context.Context) (fraud.Proof, error) { if s.subscription == nil { panic("fraud: subscription is not created") } @@ -21,7 +23,7 @@ func (s *subscription) Proof(ctx context.Context) (Proof, error) { if err != nil { return nil, err } - proof, ok := data.ValidatorData.(Proof) + proof, ok := data.ValidatorData.(fraud.Proof) if !ok { panic(fmt.Sprintf("fraud: unexpected type received %s", reflect.TypeOf(data.ValidatorData))) } diff --git a/sync.go b/fraudserv/sync.go similarity index 93% rename from sync.go rename to fraudserv/sync.go index e82e33f..3167593 100644 --- a/sync.go +++ b/fraudserv/sync.go @@ -1,4 +1,4 @@ -package fraud +package fraudserv import ( "context" @@ -15,7 +15,8 @@ import ( "github.com/celestiaorg/go-libp2p-messenger/serde" - pb "github.com/celestiaorg/celestia-node/libs/fraud/pb" + "github.com/celestiaorg/celestia-node/libs/fraud" + pb "github.com/celestiaorg/celestia-node/libs/fraud/fraudserv/pb" ) // syncFraudProofs encompasses the behavior for fetching fraud proofs from other peers. @@ -83,10 +84,10 @@ func (f *ProofService) syncFraudProofs(ctx context.Context, id protocol.ID) { log.Debugw("got fraud proofs from peer", "pid", pid) for _, data := range respProofs { f.topicsLk.RLock() - topic, ok := f.topics[ProofType(data.Type)] + topic, ok := f.topics[fraud.ProofType(data.Type)] f.topicsLk.RUnlock() if !ok { - log.Errorf("topic for %s does not exist", ProofType(data.Type)) + log.Errorf("topic for %s does not exist", fraud.ProofType(data.Type)) continue } for _, val := range data.Value { @@ -129,7 +130,7 @@ func (f *ProofService) handleFraudMessageRequest(stream network.Stream) { resp.Proofs = make([]*pb.ProofResponse, 0, len(req.RequestedProofType)) // retrieve fraud proofs as provided by the FraudMessageRequest proofTypes. for _, p := range req.RequestedProofType { - proofs, err := f.Get(f.ctx, ProofType(p)) + proofs, err := f.Get(f.ctx, fraud.ProofType(p)) if err != nil { if err != datastore.ErrNotFound { log.Error(err) diff --git a/fraudtest/dummy_proof.go b/fraudtest/dummy_proof.go new file mode 100644 index 0000000..a5b9f43 --- /dev/null +++ b/fraudtest/dummy_proof.go @@ -0,0 +1,53 @@ +package fraudtest + +import ( + "encoding/json" + "errors" + + "github.com/celestiaorg/go-header" + + "github.com/celestiaorg/celestia-node/libs/fraud" +) + +func init() { + fraud.Register(&DummyProof{}) +} + +type DummyProof struct { + Valid bool +} + +func NewValidProof() *DummyProof { + return &DummyProof{true} +} + +func NewInvalidProof() *DummyProof { + return &DummyProof{false} +} + +func (m *DummyProof) Type() fraud.ProofType { + return "DummyProof" +} + +func (m *DummyProof) HeaderHash() []byte { + return []byte("hash") +} + +func (m *DummyProof) Height() uint64 { + return 1 +} + +func (m *DummyProof) Validate(header.Header) error { + if !m.Valid { + return errors.New("DummyProof: proof is not valid") + } + return nil +} + +func (m *DummyProof) MarshalBinary() (data []byte, err error) { + return json.Marshal(m) +} + +func (m *DummyProof) UnmarshalBinary(data []byte) error { + return json.Unmarshal(data, m) +} diff --git a/fraudtest/dummy_service.go b/fraudtest/dummy_service.go new file mode 100644 index 0000000..800f1b9 --- /dev/null +++ b/fraudtest/dummy_service.go @@ -0,0 +1,29 @@ +package fraudtest + +import ( + "context" + + "github.com/celestiaorg/celestia-node/libs/fraud" +) + +type DummyService struct{} + +func (d *DummyService) Broadcast(context.Context, fraud.Proof) error { + return nil +} + +func (d *DummyService) Subscribe(fraud.ProofType) (fraud.Subscription, error) { + return &subscription{}, nil +} + +func (d *DummyService) Get(context.Context, fraud.ProofType) ([]fraud.Proof, error) { + return nil, nil +} + +type subscription struct{} + +func (s *subscription) Proof(context.Context) (fraud.Proof, error) { + return nil, nil +} + +func (s *subscription) Cancel() {} diff --git a/interface.go b/interface.go index 1ae8b09..1b31827 100644 --- a/interface.go +++ b/interface.go @@ -3,15 +3,11 @@ package fraud import ( "context" - logging "github.com/ipfs/go-log/v2" - "github.com/celestiaorg/go-header" ) -var log = logging.Logger("fraud") - -// headerFetcher aliases a function that is used to fetch an ExtendedHeader from store by height. -type headerFetcher func(context.Context, uint64) (header.Header, error) +// HeaderFetcher aliases a function that is used to fetch an ExtendedHeader from store by height. +type HeaderFetcher func(context.Context, uint64) (header.Header, error) // ProofUnmarshaler aliases a function that parses data to `Proof`. type ProofUnmarshaler func([]byte) (Proof, error) @@ -27,8 +23,9 @@ type Service interface { // Broadcaster is a generic interface that sends a `Proof` to all nodes subscribed on the // Broadcaster's topic. type Broadcaster interface { - // Broadcast takes a fraud `Proof` data structure that implements standard BinaryMarshal - // interface and broadcasts it to all subscribed peers. + // Broadcast takes a fraud `Proof` data structure interface and broadcasts it to local + // subscriptions and peers. It may additionally cache/persist Proofs for future + // access via Getter and to serve Proof requests to peers in the network. Broadcast(context.Context, Proof) error } diff --git a/metrics.go b/metrics.go index 74127c2..1b28101 100644 --- a/metrics.go +++ b/metrics.go @@ -5,10 +5,13 @@ import ( "github.com/ipfs/go-datastore" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric/global" "go.opentelemetry.io/otel/metric/instrument" "go.opentelemetry.io/otel/metric/unit" ) +var meter = global.MeterProvider().Meter("fraud") + // WithMetrics enables metrics to monitor fraud proofs. func WithMetrics(store Getter) { proofTypes := registeredProofTypes() diff --git a/proof.go b/proof.go index d8d0a81..0666cc0 100644 --- a/proof.go +++ b/proof.go @@ -5,33 +5,9 @@ import ( "encoding" "fmt" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/metric/global" - "github.com/celestiaorg/go-header" ) -var ( - meter = global.MeterProvider().Meter("fraud") - tracer = otel.Tracer("fraud") -) - -type ErrFraudExists struct { - Proof []Proof -} - -func (e *ErrFraudExists) Error() string { - return fmt.Sprintf("fraud: %s proof exists\n", e.Proof[0].Type()) -} - -type errNoUnmarshaler struct { - proofType ProofType -} - -func (e *errNoUnmarshaler) Error() string { - return fmt.Sprintf("fraud: unmarshaler for %s type is not registered", e.proofType) -} - // ProofType type defines a unique proof type string. type ProofType string @@ -63,7 +39,6 @@ type Proof interface { func OnProof(ctx context.Context, subscriber Subscriber, p ProofType, handle func(proof Proof)) { subscription, err := subscriber.Subscribe(p) if err != nil { - log.Error(err) return } defer subscription.Cancel() @@ -72,9 +47,6 @@ func OnProof(ctx context.Context, subscriber Subscriber, p ProofType, handle fun // so there is no need to call Validate. proof, err := subscription.Proof(ctx) if err != nil { - if err != context.Canceled { - log.Errorw("reading next proof failed", "err", err) - } return } @@ -87,7 +59,23 @@ func Unmarshal(proofType ProofType, msg []byte) (Proof, error) { defer unmarshalersLk.RUnlock() unmarshaler, ok := defaultUnmarshalers[proofType] if !ok { - return nil, &errNoUnmarshaler{proofType: proofType} + return nil, &ErrNoUnmarshaler{proofType: proofType} } return unmarshaler(msg) } + +type ErrFraudExists struct { + Proof []Proof +} + +func (e *ErrFraudExists) Error() string { + return fmt.Sprintf("fraud: %s proof exists\n", e.Proof[0].Type()) +} + +type ErrNoUnmarshaler struct { + proofType ProofType +} + +func (e *ErrNoUnmarshaler) Error() string { + return fmt.Sprintf("fraud: unmarshaler for %s type is not registered", e.proofType) +} diff --git a/service_test.go b/service_test.go deleted file mode 100644 index 2237454..0000000 --- a/service_test.go +++ /dev/null @@ -1,239 +0,0 @@ -package fraud - -import ( - "context" - "encoding/hex" - "testing" - "time" - - "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/namespace" - "github.com/ipfs/go-datastore/sync" - pubsub "github.com/libp2p/go-libp2p-pubsub" - pubsubpb "github.com/libp2p/go-libp2p-pubsub/pb" - "github.com/libp2p/go-libp2p/core/host" - mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" - "github.com/stretchr/testify/require" - - "github.com/celestiaorg/go-header" - "github.com/celestiaorg/go-header/test" -) - -func TestService_Subscribe(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*1) - t.Cleanup(cancel) - s, _ := CreateTestService(t, false) - proof := newValidProof() - require.NoError(t, s.Start(ctx)) - _, err := s.Subscribe(proof.Type()) - require.NoError(t, err) -} - -func TestService_SubscribeFails(t *testing.T) { - s, _ := CreateTestService(t, false) - proof := newValidProof() - _, err := s.Subscribe(proof.Type()) - require.Error(t, err) -} - -func TestService_BroadcastFails(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*1) - t.Cleanup(cancel) - s, _ := CreateTestService(t, false) - p := newValidProof() - require.Error(t, s.Broadcast(ctx, p)) -} - -func TestService_Broadcast(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) - t.Cleanup(cancel) - - s, _ := CreateTestService(t, false) - proof := newValidProof() - require.NoError(t, s.Start(ctx)) - subs, err := s.Subscribe(proof.Type()) - require.NoError(t, err) - - require.NoError(t, s.Broadcast(ctx, proof)) - _, err = subs.Proof(ctx) - require.NoError(t, err) - require.NoError(t, nil) -} - -func TestService_processIncoming(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - t.Cleanup(cancel) - // create mock network - net, err := mocknet.FullMeshLinked(2) - require.NoError(t, err) - - var tests = []struct { - precondition func() - proof *mockProof - validationResult pubsub.ValidationResult - }{ - { - nil, - newValidProof(), - pubsub.ValidationAccept, - }, - { - nil, - newInvalidProof(), - pubsub.ValidationReject, - }, - { - func() { - delete(defaultUnmarshalers, mockProofType) - }, - newValidProof(), - pubsub.ValidationReject, - }, - } - for _, test := range tests { - bin, err := test.proof.MarshalBinary() - require.NoError(t, err) - // create first fraud service that will broadcast incorrect Fraud Proof - service, _ := createTestServiceWithHost(ctx, t, net.Hosts()[0], false) - msg := &pubsub.Message{ - Message: &pubsubpb.Message{ - Data: bin, - }, - ReceivedFrom: net.Hosts()[1].ID(), - } - if test.precondition != nil { - test.precondition() - } - require.NoError(t, service.Start(ctx)) - res := service.processIncoming(ctx, test.proof.Type(), net.Hosts()[1].ID(), msg) - require.True(t, res == test.validationResult) - } -} - -func TestService_ReGossiping(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) - t.Cleanup(cancel) - // create mock network - net, err := mocknet.FullMeshLinked(3) - require.NoError(t, err) - - // create first fraud service that will broadcast incorrect Fraud Proof - pserviceA, _ := createTestServiceWithHost(ctx, t, net.Hosts()[0], false) - require.NoError(t, err) - // create pub sub in order to listen for Fraud Proof - psB, err := pubsub.NewGossipSub(ctx, net.Hosts()[1], // -> B - pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign)) - require.NoError(t, err) - // create second service that will receive and validate Fraud Proof - pserviceB := NewProofService( - psB, - net.Hosts()[1], - func(ctx context.Context, u uint64) (header.Header, error) { - return &test.DummyHeader{}, nil - }, - sync.MutexWrap(datastore.NewMapDatastore()), - false, - "private", - ) - addrB := host.InfoFromHost(net.Hosts()[1]) // -> B - - // create pub sub in order to listen for Fraud Proof - psC, err := pubsub.NewGossipSub(ctx, net.Hosts()[2], // -> C - pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign)) - require.NoError(t, err) - pserviceC := NewProofService( - psC, - net.Hosts()[2], - func(ctx context.Context, u uint64) (header.Header, error) { - return &test.DummyHeader{}, nil - }, - sync.MutexWrap(datastore.NewMapDatastore()), - false, - "private", - ) - // establish connections - // connect peers: A -> B -> C, so A and C are not connected to each other - require.NoError(t, net.Hosts()[0].Connect(ctx, *addrB)) // host[0] is A - require.NoError(t, net.Hosts()[2].Connect(ctx, *addrB)) // host[2] is C - - befp := newValidProof() - bin, err := befp.MarshalBinary() - require.NoError(t, err) - require.NoError(t, pserviceA.Start(ctx)) - require.NoError(t, pserviceB.Start(ctx)) - require.NoError(t, pserviceC.Start(ctx)) - subsA, err := pserviceA.Subscribe(mockProofType) - require.NoError(t, err) - defer subsA.Cancel() - - subsB, err := pserviceB.Subscribe(mockProofType) - require.NoError(t, err) - defer subsB.Cancel() - - subsC, err := pserviceC.Subscribe(mockProofType) - require.NoError(t, err) - defer subsC.Cancel() - // we cannot avoid sleep because it helps to avoid flakiness - time.Sleep(time.Millisecond * 100) - - err = pserviceA.topics[mockProofType].Publish(ctx, bin, pubsub.WithReadiness(pubsub.MinTopicSize(1))) - require.NoError(t, err) - - newCtx, cancel := context.WithTimeout(ctx, time.Millisecond*100) - t.Cleanup(cancel) - - _, err = subsB.Proof(newCtx) - require.NoError(t, err) - - _, err = subsC.Proof(ctx) - require.NoError(t, err) - // we cannot avoid sleep because it helps to avoid flakiness - time.Sleep(time.Millisecond * 100) -} - -func TestService_Get(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - t.Cleanup(cancel) - proof := newValidProof() - bin, err := proof.MarshalBinary() - require.NoError(t, err) - pService, _ := CreateTestService(t, false) - // try to fetch proof - _, err = pService.Get(ctx, proof.Type()) - // error is expected here because storage is empty - require.Error(t, err) - - // create store - store := initStore(proof.Type(), pService.ds) - // add proof to storage - require.NoError(t, put(ctx, store, hex.EncodeToString(proof.HeaderHash()), bin)) - // fetch proof - _, err = pService.Get(ctx, proof.Type()) - require.NoError(t, err) -} - -func TestService_Sync(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) - t.Cleanup(cancel) - // create mock network - net, err := mocknet.FullMeshLinked(2) - require.NoError(t, err) - - pserviceA, _ := createTestServiceWithHost(ctx, t, net.Hosts()[0], false) - pserviceB, _ := createTestServiceWithHost(ctx, t, net.Hosts()[1], true) - proof := newValidProof() - require.NoError(t, pserviceA.Start(ctx)) - require.NoError(t, pserviceB.Start(ctx)) - subs, err := pserviceB.Subscribe(mockProofType) - require.NoError(t, err) - bin, err := proof.MarshalBinary() - require.NoError(t, err) - store := namespace.Wrap(pserviceA.ds, makeKey(mockProofType)) - require.NoError(t, put(ctx, store, string(proof.HeaderHash()), bin)) - - addrB := host.InfoFromHost(net.Hosts()[1]) - require.NoError(t, net.Hosts()[0].Connect(ctx, *addrB)) - - _, err = subs.Proof(ctx) - require.NoError(t, err) -} diff --git a/testing.go b/testing.go deleted file mode 100644 index 9e027dd..0000000 --- a/testing.go +++ /dev/null @@ -1,149 +0,0 @@ -package fraud - -import ( - "context" - "encoding/json" - "errors" - "testing" - "time" - - "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/sync" - pubsub "github.com/libp2p/go-libp2p-pubsub" - "github.com/libp2p/go-libp2p/core/host" - mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" - "github.com/stretchr/testify/require" - - "github.com/celestiaorg/go-header" - - "github.com/celestiaorg/celestia-node/header/headertest" -) - -type DummyService struct { -} - -func (d *DummyService) Broadcast(context.Context, Proof) error { - return nil -} - -func (d *DummyService) Subscribe(ProofType) (Subscription, error) { - return &subscription{}, nil -} - -func (d *DummyService) Get(context.Context, ProofType) ([]Proof, error) { - return nil, nil -} - -type mockStore struct { - headers map[int64]header.Header - headHeight int64 -} - -// createStore creates a mock store and adds several random -// headers. -func createStore(t *testing.T, numHeaders int) *mockStore { - store := &mockStore{ - headers: make(map[int64]header.Header), - headHeight: 0, - } - - suite := headertest.NewTestSuite(t, numHeaders) - - for i := 0; i < numHeaders; i++ { - header := suite.GenExtendedHeader() - store.headers[header.Height()] = header - - if header.Height() > store.headHeight { - store.headHeight = header.Height() - } - } - return store -} - -func (m *mockStore) GetByHeight(_ context.Context, height uint64) (header.Header, error) { - return m.headers[int64(height)], nil -} - -func (m *mockStore) Close() error { return nil } - -const ( - mockProofType ProofType = "mockProof" -) - -type mockProof struct { - Valid bool -} - -func newValidProof() *mockProof { - return newMockProof(true) -} - -func newInvalidProof() *mockProof { - return newMockProof(false) -} - -func newMockProof(valid bool) *mockProof { - p := &mockProof{valid} - if _, ok := defaultUnmarshalers[p.Type()]; !ok { - Register(&mockProof{}) - } - return p -} - -func (m *mockProof) Type() ProofType { - return mockProofType -} - -func (m *mockProof) HeaderHash() []byte { - return []byte("hash") -} - -func (m *mockProof) Height() uint64 { - return 1 -} - -func (m *mockProof) Validate(header.Header) error { - if !m.Valid { - return errors.New("mockProof: proof is not valid") - } - return nil -} - -func (m *mockProof) MarshalBinary() (data []byte, err error) { - return json.Marshal(m) -} - -func (m *mockProof) UnmarshalBinary(data []byte) error { - return json.Unmarshal(data, m) -} - -func CreateTestService(t *testing.T, enabledSyncer bool) (*ProofService, *mockStore) { //nolint:revive - ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) - t.Cleanup(cancel) - - // create mock network - net, err := mocknet.FullMeshLinked(1) - require.NoError(t, err) - return createTestServiceWithHost(ctx, t, net.Hosts()[0], enabledSyncer) -} - -func createTestServiceWithHost( - ctx context.Context, - t *testing.T, - host host.Host, - enabledSyncer bool, -) (*ProofService, *mockStore) { - // create pubsub for host - ps, err := pubsub.NewGossipSub(ctx, host, - pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign)) - require.NoError(t, err) - store := createStore(t, 10) - return NewProofService( - ps, - host, - store.GetByHeight, - sync.MutexWrap(datastore.NewMapDatastore()), - enabledSyncer, - "private", - ), store -}