diff --git a/gossip_score.go b/fraudserv/gossip_score.go similarity index 98% rename from gossip_score.go rename to fraudserv/gossip_score.go index 2a97948..2f758fb 100644 --- a/gossip_score.go +++ b/fraudserv/gossip_score.go @@ -1,4 +1,4 @@ -package fraud +package fraudserv import ( "math" diff --git a/helpers.go b/fraudserv/helpers.go similarity index 73% rename from helpers.go rename to fraudserv/helpers.go index 3ba55a1..99b73d5 100644 --- a/helpers.go +++ b/fraudserv/helpers.go @@ -1,4 +1,4 @@ -package fraud +package fraudserv import ( "context" @@ -7,6 +7,8 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" + + "github.com/celestiaorg/celestia-node/libs/fraud" ) func PubsubTopicID(fraudType, networkID string) string { @@ -17,8 +19,12 @@ func protocolID(networkID string) protocol.ID { return protocol.ID(fmt.Sprintf("/%s/fraud/v0.0.1", networkID)) } -func join(p *pubsub.PubSub, proofType ProofType, networkID string, - validate func(context.Context, ProofType, peer.ID, *pubsub.Message) pubsub.ValidationResult) (*pubsub.Topic, error) { +func join( + p *pubsub.PubSub, + proofType fraud.ProofType, + networkID string, + validate func(context.Context, fraud.ProofType, peer.ID, *pubsub.Message) pubsub.ValidationResult, +) (*pubsub.Topic, error) { topic := PubsubTopicID(proofType.String(), networkID) log.Infow("joining topic", "id", topic) t, err := p.Join(topic) diff --git a/pb/proof.pb.go b/fraudserv/pb/proof.pb.go similarity index 99% rename from pb/proof.pb.go rename to fraudserv/pb/proof.pb.go index 2d5b679..09bbc18 100644 --- a/pb/proof.pb.go +++ b/fraudserv/pb/proof.pb.go @@ -4,11 +4,12 @@ package fraud_pb import ( - fmt "fmt" - proto "github.com/gogo/protobuf/proto" - io "io" - math "math" + "fmt" + "io" + "math" math_bits "math/bits" + + "github.com/gogo/protobuf/proto" ) // Reference imports to suppress errors if they are not otherwise used. diff --git a/pb/proof.proto b/fraudserv/pb/proof.proto similarity index 100% rename from pb/proof.proto rename to fraudserv/pb/proof.proto diff --git a/requester.go b/fraudserv/requester.go similarity index 93% rename from requester.go rename to fraudserv/requester.go index 75a1d2d..682bf9b 100644 --- a/requester.go +++ b/fraudserv/requester.go @@ -1,4 +1,4 @@ -package fraud +package fraudserv import ( "context" @@ -9,7 +9,7 @@ import ( "github.com/celestiaorg/go-libp2p-messenger/serde" - pb "github.com/celestiaorg/celestia-node/libs/fraud/pb" + pb "github.com/celestiaorg/celestia-node/libs/fraud/fraudserv/pb" ) const ( diff --git a/service.go b/fraudserv/service.go similarity index 82% rename from service.go rename to fraudserv/service.go index 1e68a80..631c390 100644 --- a/service.go +++ b/fraudserv/service.go @@ -1,4 +1,4 @@ -package fraud +package fraudserv import ( "bytes" @@ -9,12 +9,21 @@ import ( "sync" "github.com/ipfs/go-datastore" + logging "github.com/ipfs/go-log/v2" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" + + "github.com/celestiaorg/celestia-node/libs/fraud" +) + +var ( + log = logging.Logger("fraudserv") + tracer = otel.Tracer("fraudserv") ) // fraudRequests is the amount of external requests that will be tried to get fraud proofs from @@ -30,14 +39,14 @@ type ProofService struct { cancel context.CancelFunc topicsLk sync.RWMutex - topics map[ProofType]*pubsub.Topic + topics map[fraud.ProofType]*pubsub.Topic storesLk sync.RWMutex - stores map[ProofType]datastore.Datastore + stores map[fraud.ProofType]datastore.Datastore pubsub *pubsub.PubSub host host.Host - getter headerFetcher + getter fraud.HeaderFetcher ds datastore.Datastore syncerEnabled bool @@ -46,7 +55,7 @@ type ProofService struct { func NewProofService( p *pubsub.PubSub, host host.Host, - getter headerFetcher, + getter fraud.HeaderFetcher, ds datastore.Datastore, syncerEnabled bool, networkID string, @@ -55,8 +64,8 @@ func NewProofService( pubsub: p, host: host, getter: getter, - topics: make(map[ProofType]*pubsub.Topic), - stores: make(map[ProofType]datastore.Datastore), + topics: make(map[fraud.ProofType]*pubsub.Topic), + stores: make(map[fraud.ProofType]datastore.Datastore), ds: ds, networkID: networkID, syncerEnabled: syncerEnabled, @@ -64,7 +73,7 @@ func NewProofService( } // registerProofTopics registers proofTypes as pubsub topics to be joined. -func (f *ProofService) registerProofTopics(proofTypes ...ProofType) error { +func (f *ProofService) registerProofTopics(proofTypes ...fraud.ProofType) error { for _, proofType := range proofTypes { t, err := join(f.pubsub, proofType, f.networkID, f.processIncoming) if err != nil { @@ -81,7 +90,7 @@ func (f *ProofService) registerProofTopics(proofTypes ...ProofType) error { // if syncer is enabled. func (f *ProofService) Start(context.Context) error { f.ctx, f.cancel = context.WithCancel(context.Background()) - if err := f.registerProofTopics(registeredProofTypes()...); err != nil { + if err := f.registerProofTopics(fraud.Registered()...); err != nil { return err } id := protocolID(f.networkID) @@ -95,13 +104,13 @@ func (f *ProofService) Start(context.Context) error { } // Stop removes the stream handler and cancels the underlying ProofService -func (f *ProofService) Stop(context.Context) error { +func (f *ProofService) Stop(context.Context) (err error) { f.host.RemoveStreamHandler(protocolID(f.networkID)) f.cancel() - return nil + return } -func (f *ProofService) Subscribe(proofType ProofType) (_ Subscription, err error) { +func (f *ProofService) Subscribe(proofType fraud.ProofType) (_ fraud.Subscription, err error) { f.topicsLk.Lock() defer f.topicsLk.Unlock() t, ok := f.topics[proofType] @@ -115,7 +124,7 @@ func (f *ProofService) Subscribe(proofType ProofType) (_ Subscription, err error return &subscription{subs}, nil } -func (f *ProofService) Broadcast(ctx context.Context, p Proof) error { +func (f *ProofService) Broadcast(ctx context.Context, p fraud.Proof) error { bin, err := p.MarshalBinary() if err != nil { return err @@ -132,7 +141,7 @@ func (f *ProofService) Broadcast(ctx context.Context, p Proof) error { // processIncoming encompasses the logic for validating fraud proofs. func (f *ProofService) processIncoming( ctx context.Context, - proofType ProofType, + proofType fraud.ProofType, from peer.ID, msg *pubsub.Message, ) pubsub.ValidationResult { @@ -143,10 +152,10 @@ func (f *ProofService) processIncoming( // unmarshal message to the Proof. // Peer will be added to black list if unmarshalling fails. - proof, err := Unmarshal(proofType, msg.Data) + proof, err := fraud.Unmarshal(proofType, msg.Data) if err != nil { log.Errorw("unmarshalling failed", "err", err) - if !errors.Is(err, &errNoUnmarshaler{}) { + if !errors.Is(err, &fraud.ErrNoUnmarshaler{}) { f.pubsub.BlacklistPeer(from) } span.RecordError(err) @@ -201,7 +210,7 @@ func (f *ProofService) processIncoming( return pubsub.ValidationAccept } -func (f *ProofService) Get(ctx context.Context, proofType ProofType) ([]Proof, error) { +func (f *ProofService) Get(ctx context.Context, proofType fraud.ProofType) ([]fraud.Proof, error) { f.storesLk.Lock() store, ok := f.stores[proofType] if !ok { @@ -214,7 +223,7 @@ func (f *ProofService) Get(ctx context.Context, proofType ProofType) ([]Proof, e } // put adds a fraud proof to the local storage. -func (f *ProofService) put(ctx context.Context, proofType ProofType, hash string, data []byte) error { +func (f *ProofService) put(ctx context.Context, proofType fraud.ProofType, hash string, data []byte) error { f.storesLk.Lock() store, ok := f.stores[proofType] if !ok { @@ -226,7 +235,7 @@ func (f *ProofService) put(ctx context.Context, proofType ProofType, hash string } // verifyLocal checks if a fraud proof has been stored locally. -func (f *ProofService) verifyLocal(ctx context.Context, proofType ProofType, hash string, data []byte) bool { +func (f *ProofService) verifyLocal(ctx context.Context, proofType fraud.ProofType, hash string, data []byte) bool { f.storesLk.RLock() storage, ok := f.stores[proofType] f.storesLk.RUnlock() diff --git a/fraudserv/service_test.go b/fraudserv/service_test.go new file mode 100644 index 0000000..f297ab9 --- /dev/null +++ b/fraudserv/service_test.go @@ -0,0 +1,196 @@ +package fraudserv + +import ( + "context" + "testing" + "time" + + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/sync" + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p/core/host" + mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" + "github.com/stretchr/testify/require" + + "github.com/celestiaorg/go-header" + "github.com/celestiaorg/go-header/headertest" + + "github.com/celestiaorg/celestia-node/libs/fraud/fraudtest" +) + +func TestService_SubscribeBroadcastValid(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + t.Cleanup(cancel) + + serv := newTestService(ctx, t, false) + require.NoError(t, serv.Start(ctx)) + + fraud := fraudtest.NewValidProof() + sub, err := serv.Subscribe(fraud.Type()) + require.NoError(t, err) + defer sub.Cancel() + + require.NoError(t, serv.Broadcast(ctx, fraud)) + _, err = sub.Proof(ctx) + require.NoError(t, err) +} + +func TestService_SubscribeBroadcastInvalid(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + t.Cleanup(cancel) + + serv := newTestService(ctx, t, false) + require.NoError(t, serv.Start(ctx)) + + fraud := fraudtest.NewInvalidProof() + sub, err := serv.Subscribe(fraud.Type()) + require.NoError(t, err) + defer sub.Cancel() + + err = serv.Broadcast(ctx, fraud) + require.Error(t, err) + + ctx2, cancel := context.WithTimeout(context.Background(), time.Millisecond*100) + t.Cleanup(cancel) + + _, err = sub.Proof(ctx2) + require.Error(t, err) +} + +func TestService_ReGossiping(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + // create mock network + net, err := mocknet.FullMeshLinked(3) + require.NoError(t, err) + + // create services + servA := newTestServiceWithHost(ctx, t, net.Hosts()[0], false) + servB := newTestServiceWithHost(ctx, t, net.Hosts()[1], false) + servC := newTestServiceWithHost(ctx, t, net.Hosts()[2], false) + + // preconnect peers: A -> B -> C, so A and C are not connected to each other + addrB := host.InfoFromHost(net.Hosts()[1]) // -> B + require.NoError(t, net.Hosts()[0].Connect(ctx, *addrB)) // host[0] is A + require.NoError(t, net.Hosts()[2].Connect(ctx, *addrB)) // host[2] is C + + // start services + require.NoError(t, servA.Start(ctx)) + require.NoError(t, servB.Start(ctx)) + require.NoError(t, servC.Start(ctx)) + + fraud := fraudtest.NewValidProof() + subsA, err := servA.Subscribe(fraud.Type()) + require.NoError(t, err) + defer subsA.Cancel() + + subsB, err := servB.Subscribe(fraud.Type()) + require.NoError(t, err) + defer subsB.Cancel() + + subsC, err := servC.Subscribe(fraud.Type()) + require.NoError(t, err) + defer subsC.Cancel() + + // give some time for subscriptions to land + // this mitigates flakiness + time.Sleep(time.Millisecond * 100) + + // and only after broadcaster + err = servA.Broadcast(ctx, fraud) + require.NoError(t, err) + + _, err = subsA.Proof(ctx) // subscriptions of subA should also receive the proof + require.NoError(t, err) + + _, err = subsB.Proof(ctx) + require.NoError(t, err) + + _, err = subsC.Proof(ctx) + require.NoError(t, err) +} + +func TestService_Get(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + serv := newTestService(ctx, t, false) + require.NoError(t, serv.Start(ctx)) + + fraud := fraudtest.NewValidProof() + _, err := serv.Get(ctx, fraud.Type()) // try to fetch proof + require.Error(t, err) // storage is empty so should error + + sub, err := serv.Subscribe(fraud.Type()) + require.NoError(t, err) + defer sub.Cancel() + + // subscription needs some time and love to avoid flakes + time.Sleep(time.Millisecond * 100) + + err = serv.Broadcast(ctx, fraud) // broadcast stores the fraud as well + require.NoError(t, err) + + _, err = serv.Get(ctx, fraud.Type()) + require.NoError(t, err) +} + +func TestService_Sync(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + net, err := mocknet.FullMeshLinked(2) + require.NoError(t, err) + + servA := newTestServiceWithHost(ctx, t, net.Hosts()[0], false) + require.NoError(t, servA.Start(ctx)) + + fraud := fraudtest.NewValidProof() + err = servA.Broadcast(ctx, fraud) // broadcasting ensures the fraud gets stored on servA + require.NoError(t, err) + + servB := newTestServiceWithHost(ctx, t, net.Hosts()[1], true) // start servB + require.NoError(t, servB.Start(ctx)) + + sub, err := servB.Subscribe(fraud.Type()) // subscribe + require.NoError(t, err) + defer sub.Cancel() + + addrB := host.InfoFromHost(net.Hosts()[1]) + require.NoError(t, net.Hosts()[0].Connect(ctx, *addrB)) // connect A to B + + _, err = sub.Proof(ctx) // heck that we get it from subscription by syncing from servA + require.NoError(t, err) +} + +func newTestService(ctx context.Context, t *testing.T, enabledSyncer bool) *ProofService { + net, err := mocknet.FullMeshLinked(1) + require.NoError(t, err) + return newTestServiceWithHost(ctx, t, net.Hosts()[0], enabledSyncer) +} + +func newTestServiceWithHost(ctx context.Context, t *testing.T, host host.Host, enabledSyncer bool) *ProofService { + ps, err := pubsub.NewFloodSub(ctx, host, pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign)) + require.NoError(t, err) + + store := headertest.NewDummyStore(t) + serv := NewProofService( + ps, + host, + func(ctx context.Context, u uint64) (header.Header, error) { + return store.GetByHeight(ctx, u) + }, + sync.MutexWrap(datastore.NewMapDatastore()), + enabledSyncer, + "private", + ) + + t.Cleanup(func() { + err := serv.Stop(ctx) + if err != nil { + t.Fatal(err) + } + }) + return serv +} diff --git a/store.go b/fraudserv/store.go similarity index 79% rename from store.go rename to fraudserv/store.go index ff812c4..dfd71b0 100644 --- a/store.go +++ b/fraudserv/store.go @@ -1,4 +1,4 @@ -package fraud +package fraudserv import ( "context" @@ -9,6 +9,8 @@ import ( "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" q "github.com/ipfs/go-datastore/query" + + "github.com/celestiaorg/celestia-node/libs/fraud" ) var ( @@ -36,7 +38,7 @@ func getByHash(ctx context.Context, ds datastore.Datastore, hash string) ([]byte } // getAll queries all Fraud Proofs by their type. -func getAll(ctx context.Context, ds datastore.Datastore, proofType ProofType) ([]Proof, error) { +func getAll(ctx context.Context, ds datastore.Datastore, proofType fraud.ProofType) ([]fraud.Proof, error) { entries, err := query(ctx, ds, q.Query{}) if err != nil { return nil, err @@ -44,11 +46,11 @@ func getAll(ctx context.Context, ds datastore.Datastore, proofType ProofType) ([ if len(entries) == 0 { return nil, datastore.ErrNotFound } - proofs := make([]Proof, 0) + proofs := make([]fraud.Proof, 0) for _, data := range entries { - proof, err := Unmarshal(proofType, data.Value) + proof, err := fraud.Unmarshal(proofType, data.Value) if err != nil { - if errors.Is(err, &errNoUnmarshaler{}) { + if errors.Is(err, &fraud.ErrNoUnmarshaler{}) { return nil, err } log.Warn(err) @@ -62,10 +64,10 @@ func getAll(ctx context.Context, ds datastore.Datastore, proofType ProofType) ([ return proofs, nil } -func initStore(topic ProofType, ds datastore.Datastore) datastore.Datastore { +func initStore(topic fraud.ProofType, ds datastore.Datastore) datastore.Datastore { return namespace.Wrap(ds, makeKey(topic)) } -func makeKey(topic ProofType) datastore.Key { +func makeKey(topic fraud.ProofType) datastore.Key { return datastore.NewKey(fmt.Sprintf("%s/%s", storePrefix, topic)) } diff --git a/store_test.go b/fraudserv/store_test.go similarity index 88% rename from store_test.go rename to fraudserv/store_test.go index 2556b8c..90a2f84 100644 --- a/store_test.go +++ b/fraudserv/store_test.go @@ -1,4 +1,4 @@ -package fraud +package fraudserv import ( "context" @@ -9,13 +9,15 @@ import ( "github.com/ipfs/go-datastore/namespace" ds_sync "github.com/ipfs/go-datastore/sync" "github.com/stretchr/testify/require" + + "github.com/celestiaorg/celestia-node/libs/fraud/fraudtest" ) func TestStore_Put(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer t.Cleanup(cancel) - p := newValidProof() + p := fraudtest.NewValidProof() bin, err := p.MarshalBinary() require.NoError(t, err) ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) @@ -25,10 +27,10 @@ func TestStore_Put(t *testing.T) { } func TestStore_GetAll(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer t.Cleanup(cancel) - proof := newValidProof() + proof := fraudtest.NewValidProof() bin, err := proof.MarshalBinary() require.NoError(t, err) ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) @@ -41,14 +43,13 @@ func TestStore_GetAll(t *testing.T) { require.NoError(t, err) require.NotEmpty(t, proofs) require.NoError(t, proof.Validate(nil)) - } func Test_GetAllFailed(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer t.Cleanup(cancel) - proof := newValidProof() + proof := fraudtest.NewValidProof() ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) store := namespace.Wrap(ds, makeKey(proof.Type())) @@ -59,10 +60,10 @@ func Test_GetAllFailed(t *testing.T) { } func Test_getByHash(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer t.Cleanup(cancel) - proof := newValidProof() + proof := fraudtest.NewValidProof() ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) store := namespace.Wrap(ds, makeKey(proof.Type())) bin, err := proof.MarshalBinary() diff --git a/subscription.go b/fraudserv/subscription.go similarity index 75% rename from subscription.go rename to fraudserv/subscription.go index e9cef33..fbf86ba 100644 --- a/subscription.go +++ b/fraudserv/subscription.go @@ -1,4 +1,4 @@ -package fraud +package fraudserv import ( "context" @@ -6,6 +6,8 @@ import ( "reflect" pubsub "github.com/libp2p/go-libp2p-pubsub" + + "github.com/celestiaorg/celestia-node/libs/fraud" ) // subscription wraps pubsub subscription and handles Fraud Proof from the pubsub topic. @@ -13,7 +15,7 @@ type subscription struct { subscription *pubsub.Subscription } -func (s *subscription) Proof(ctx context.Context) (Proof, error) { +func (s *subscription) Proof(ctx context.Context) (fraud.Proof, error) { if s.subscription == nil { panic("fraud: subscription is not created") } @@ -21,7 +23,7 @@ func (s *subscription) Proof(ctx context.Context) (Proof, error) { if err != nil { return nil, err } - proof, ok := data.ValidatorData.(Proof) + proof, ok := data.ValidatorData.(fraud.Proof) if !ok { panic(fmt.Sprintf("fraud: unexpected type received %s", reflect.TypeOf(data.ValidatorData))) } diff --git a/sync.go b/fraudserv/sync.go similarity index 93% rename from sync.go rename to fraudserv/sync.go index e82e33f..3167593 100644 --- a/sync.go +++ b/fraudserv/sync.go @@ -1,4 +1,4 @@ -package fraud +package fraudserv import ( "context" @@ -15,7 +15,8 @@ import ( "github.com/celestiaorg/go-libp2p-messenger/serde" - pb "github.com/celestiaorg/celestia-node/libs/fraud/pb" + "github.com/celestiaorg/celestia-node/libs/fraud" + pb "github.com/celestiaorg/celestia-node/libs/fraud/fraudserv/pb" ) // syncFraudProofs encompasses the behavior for fetching fraud proofs from other peers. @@ -83,10 +84,10 @@ func (f *ProofService) syncFraudProofs(ctx context.Context, id protocol.ID) { log.Debugw("got fraud proofs from peer", "pid", pid) for _, data := range respProofs { f.topicsLk.RLock() - topic, ok := f.topics[ProofType(data.Type)] + topic, ok := f.topics[fraud.ProofType(data.Type)] f.topicsLk.RUnlock() if !ok { - log.Errorf("topic for %s does not exist", ProofType(data.Type)) + log.Errorf("topic for %s does not exist", fraud.ProofType(data.Type)) continue } for _, val := range data.Value { @@ -129,7 +130,7 @@ func (f *ProofService) handleFraudMessageRequest(stream network.Stream) { resp.Proofs = make([]*pb.ProofResponse, 0, len(req.RequestedProofType)) // retrieve fraud proofs as provided by the FraudMessageRequest proofTypes. for _, p := range req.RequestedProofType { - proofs, err := f.Get(f.ctx, ProofType(p)) + proofs, err := f.Get(f.ctx, fraud.ProofType(p)) if err != nil { if err != datastore.ErrNotFound { log.Error(err) diff --git a/fraudtest/dummy_proof.go b/fraudtest/dummy_proof.go new file mode 100644 index 0000000..a5b9f43 --- /dev/null +++ b/fraudtest/dummy_proof.go @@ -0,0 +1,53 @@ +package fraudtest + +import ( + "encoding/json" + "errors" + + "github.com/celestiaorg/go-header" + + "github.com/celestiaorg/celestia-node/libs/fraud" +) + +func init() { + fraud.Register(&DummyProof{}) +} + +type DummyProof struct { + Valid bool +} + +func NewValidProof() *DummyProof { + return &DummyProof{true} +} + +func NewInvalidProof() *DummyProof { + return &DummyProof{false} +} + +func (m *DummyProof) Type() fraud.ProofType { + return "DummyProof" +} + +func (m *DummyProof) HeaderHash() []byte { + return []byte("hash") +} + +func (m *DummyProof) Height() uint64 { + return 1 +} + +func (m *DummyProof) Validate(header.Header) error { + if !m.Valid { + return errors.New("DummyProof: proof is not valid") + } + return nil +} + +func (m *DummyProof) MarshalBinary() (data []byte, err error) { + return json.Marshal(m) +} + +func (m *DummyProof) UnmarshalBinary(data []byte) error { + return json.Unmarshal(data, m) +} diff --git a/fraudtest/dummy_service.go b/fraudtest/dummy_service.go new file mode 100644 index 0000000..800f1b9 --- /dev/null +++ b/fraudtest/dummy_service.go @@ -0,0 +1,29 @@ +package fraudtest + +import ( + "context" + + "github.com/celestiaorg/celestia-node/libs/fraud" +) + +type DummyService struct{} + +func (d *DummyService) Broadcast(context.Context, fraud.Proof) error { + return nil +} + +func (d *DummyService) Subscribe(fraud.ProofType) (fraud.Subscription, error) { + return &subscription{}, nil +} + +func (d *DummyService) Get(context.Context, fraud.ProofType) ([]fraud.Proof, error) { + return nil, nil +} + +type subscription struct{} + +func (s *subscription) Proof(context.Context) (fraud.Proof, error) { + return nil, nil +} + +func (s *subscription) Cancel() {} diff --git a/interface.go b/interface.go index 1ae8b09..1b31827 100644 --- a/interface.go +++ b/interface.go @@ -3,15 +3,11 @@ package fraud import ( "context" - logging "github.com/ipfs/go-log/v2" - "github.com/celestiaorg/go-header" ) -var log = logging.Logger("fraud") - -// headerFetcher aliases a function that is used to fetch an ExtendedHeader from store by height. -type headerFetcher func(context.Context, uint64) (header.Header, error) +// HeaderFetcher aliases a function that is used to fetch an ExtendedHeader from store by height. +type HeaderFetcher func(context.Context, uint64) (header.Header, error) // ProofUnmarshaler aliases a function that parses data to `Proof`. type ProofUnmarshaler func([]byte) (Proof, error) @@ -27,8 +23,9 @@ type Service interface { // Broadcaster is a generic interface that sends a `Proof` to all nodes subscribed on the // Broadcaster's topic. type Broadcaster interface { - // Broadcast takes a fraud `Proof` data structure that implements standard BinaryMarshal - // interface and broadcasts it to all subscribed peers. + // Broadcast takes a fraud `Proof` data structure interface and broadcasts it to local + // subscriptions and peers. It may additionally cache/persist Proofs for future + // access via Getter and to serve Proof requests to peers in the network. Broadcast(context.Context, Proof) error } diff --git a/metrics.go b/metrics.go index 74127c2..1b28101 100644 --- a/metrics.go +++ b/metrics.go @@ -5,10 +5,13 @@ import ( "github.com/ipfs/go-datastore" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric/global" "go.opentelemetry.io/otel/metric/instrument" "go.opentelemetry.io/otel/metric/unit" ) +var meter = global.MeterProvider().Meter("fraud") + // WithMetrics enables metrics to monitor fraud proofs. func WithMetrics(store Getter) { proofTypes := registeredProofTypes() diff --git a/proof.go b/proof.go index d8d0a81..0666cc0 100644 --- a/proof.go +++ b/proof.go @@ -5,33 +5,9 @@ import ( "encoding" "fmt" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/metric/global" - "github.com/celestiaorg/go-header" ) -var ( - meter = global.MeterProvider().Meter("fraud") - tracer = otel.Tracer("fraud") -) - -type ErrFraudExists struct { - Proof []Proof -} - -func (e *ErrFraudExists) Error() string { - return fmt.Sprintf("fraud: %s proof exists\n", e.Proof[0].Type()) -} - -type errNoUnmarshaler struct { - proofType ProofType -} - -func (e *errNoUnmarshaler) Error() string { - return fmt.Sprintf("fraud: unmarshaler for %s type is not registered", e.proofType) -} - // ProofType type defines a unique proof type string. type ProofType string @@ -63,7 +39,6 @@ type Proof interface { func OnProof(ctx context.Context, subscriber Subscriber, p ProofType, handle func(proof Proof)) { subscription, err := subscriber.Subscribe(p) if err != nil { - log.Error(err) return } defer subscription.Cancel() @@ -72,9 +47,6 @@ func OnProof(ctx context.Context, subscriber Subscriber, p ProofType, handle fun // so there is no need to call Validate. proof, err := subscription.Proof(ctx) if err != nil { - if err != context.Canceled { - log.Errorw("reading next proof failed", "err", err) - } return } @@ -87,7 +59,23 @@ func Unmarshal(proofType ProofType, msg []byte) (Proof, error) { defer unmarshalersLk.RUnlock() unmarshaler, ok := defaultUnmarshalers[proofType] if !ok { - return nil, &errNoUnmarshaler{proofType: proofType} + return nil, &ErrNoUnmarshaler{proofType: proofType} } return unmarshaler(msg) } + +type ErrFraudExists struct { + Proof []Proof +} + +func (e *ErrFraudExists) Error() string { + return fmt.Sprintf("fraud: %s proof exists\n", e.Proof[0].Type()) +} + +type ErrNoUnmarshaler struct { + proofType ProofType +} + +func (e *ErrNoUnmarshaler) Error() string { + return fmt.Sprintf("fraud: unmarshaler for %s type is not registered", e.proofType) +} diff --git a/service_test.go b/service_test.go deleted file mode 100644 index 2237454..0000000 --- a/service_test.go +++ /dev/null @@ -1,239 +0,0 @@ -package fraud - -import ( - "context" - "encoding/hex" - "testing" - "time" - - "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/namespace" - "github.com/ipfs/go-datastore/sync" - pubsub "github.com/libp2p/go-libp2p-pubsub" - pubsubpb "github.com/libp2p/go-libp2p-pubsub/pb" - "github.com/libp2p/go-libp2p/core/host" - mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" - "github.com/stretchr/testify/require" - - "github.com/celestiaorg/go-header" - "github.com/celestiaorg/go-header/test" -) - -func TestService_Subscribe(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*1) - t.Cleanup(cancel) - s, _ := CreateTestService(t, false) - proof := newValidProof() - require.NoError(t, s.Start(ctx)) - _, err := s.Subscribe(proof.Type()) - require.NoError(t, err) -} - -func TestService_SubscribeFails(t *testing.T) { - s, _ := CreateTestService(t, false) - proof := newValidProof() - _, err := s.Subscribe(proof.Type()) - require.Error(t, err) -} - -func TestService_BroadcastFails(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*1) - t.Cleanup(cancel) - s, _ := CreateTestService(t, false) - p := newValidProof() - require.Error(t, s.Broadcast(ctx, p)) -} - -func TestService_Broadcast(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) - t.Cleanup(cancel) - - s, _ := CreateTestService(t, false) - proof := newValidProof() - require.NoError(t, s.Start(ctx)) - subs, err := s.Subscribe(proof.Type()) - require.NoError(t, err) - - require.NoError(t, s.Broadcast(ctx, proof)) - _, err = subs.Proof(ctx) - require.NoError(t, err) - require.NoError(t, nil) -} - -func TestService_processIncoming(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - t.Cleanup(cancel) - // create mock network - net, err := mocknet.FullMeshLinked(2) - require.NoError(t, err) - - var tests = []struct { - precondition func() - proof *mockProof - validationResult pubsub.ValidationResult - }{ - { - nil, - newValidProof(), - pubsub.ValidationAccept, - }, - { - nil, - newInvalidProof(), - pubsub.ValidationReject, - }, - { - func() { - delete(defaultUnmarshalers, mockProofType) - }, - newValidProof(), - pubsub.ValidationReject, - }, - } - for _, test := range tests { - bin, err := test.proof.MarshalBinary() - require.NoError(t, err) - // create first fraud service that will broadcast incorrect Fraud Proof - service, _ := createTestServiceWithHost(ctx, t, net.Hosts()[0], false) - msg := &pubsub.Message{ - Message: &pubsubpb.Message{ - Data: bin, - }, - ReceivedFrom: net.Hosts()[1].ID(), - } - if test.precondition != nil { - test.precondition() - } - require.NoError(t, service.Start(ctx)) - res := service.processIncoming(ctx, test.proof.Type(), net.Hosts()[1].ID(), msg) - require.True(t, res == test.validationResult) - } -} - -func TestService_ReGossiping(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) - t.Cleanup(cancel) - // create mock network - net, err := mocknet.FullMeshLinked(3) - require.NoError(t, err) - - // create first fraud service that will broadcast incorrect Fraud Proof - pserviceA, _ := createTestServiceWithHost(ctx, t, net.Hosts()[0], false) - require.NoError(t, err) - // create pub sub in order to listen for Fraud Proof - psB, err := pubsub.NewGossipSub(ctx, net.Hosts()[1], // -> B - pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign)) - require.NoError(t, err) - // create second service that will receive and validate Fraud Proof - pserviceB := NewProofService( - psB, - net.Hosts()[1], - func(ctx context.Context, u uint64) (header.Header, error) { - return &test.DummyHeader{}, nil - }, - sync.MutexWrap(datastore.NewMapDatastore()), - false, - "private", - ) - addrB := host.InfoFromHost(net.Hosts()[1]) // -> B - - // create pub sub in order to listen for Fraud Proof - psC, err := pubsub.NewGossipSub(ctx, net.Hosts()[2], // -> C - pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign)) - require.NoError(t, err) - pserviceC := NewProofService( - psC, - net.Hosts()[2], - func(ctx context.Context, u uint64) (header.Header, error) { - return &test.DummyHeader{}, nil - }, - sync.MutexWrap(datastore.NewMapDatastore()), - false, - "private", - ) - // establish connections - // connect peers: A -> B -> C, so A and C are not connected to each other - require.NoError(t, net.Hosts()[0].Connect(ctx, *addrB)) // host[0] is A - require.NoError(t, net.Hosts()[2].Connect(ctx, *addrB)) // host[2] is C - - befp := newValidProof() - bin, err := befp.MarshalBinary() - require.NoError(t, err) - require.NoError(t, pserviceA.Start(ctx)) - require.NoError(t, pserviceB.Start(ctx)) - require.NoError(t, pserviceC.Start(ctx)) - subsA, err := pserviceA.Subscribe(mockProofType) - require.NoError(t, err) - defer subsA.Cancel() - - subsB, err := pserviceB.Subscribe(mockProofType) - require.NoError(t, err) - defer subsB.Cancel() - - subsC, err := pserviceC.Subscribe(mockProofType) - require.NoError(t, err) - defer subsC.Cancel() - // we cannot avoid sleep because it helps to avoid flakiness - time.Sleep(time.Millisecond * 100) - - err = pserviceA.topics[mockProofType].Publish(ctx, bin, pubsub.WithReadiness(pubsub.MinTopicSize(1))) - require.NoError(t, err) - - newCtx, cancel := context.WithTimeout(ctx, time.Millisecond*100) - t.Cleanup(cancel) - - _, err = subsB.Proof(newCtx) - require.NoError(t, err) - - _, err = subsC.Proof(ctx) - require.NoError(t, err) - // we cannot avoid sleep because it helps to avoid flakiness - time.Sleep(time.Millisecond * 100) -} - -func TestService_Get(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - t.Cleanup(cancel) - proof := newValidProof() - bin, err := proof.MarshalBinary() - require.NoError(t, err) - pService, _ := CreateTestService(t, false) - // try to fetch proof - _, err = pService.Get(ctx, proof.Type()) - // error is expected here because storage is empty - require.Error(t, err) - - // create store - store := initStore(proof.Type(), pService.ds) - // add proof to storage - require.NoError(t, put(ctx, store, hex.EncodeToString(proof.HeaderHash()), bin)) - // fetch proof - _, err = pService.Get(ctx, proof.Type()) - require.NoError(t, err) -} - -func TestService_Sync(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) - t.Cleanup(cancel) - // create mock network - net, err := mocknet.FullMeshLinked(2) - require.NoError(t, err) - - pserviceA, _ := createTestServiceWithHost(ctx, t, net.Hosts()[0], false) - pserviceB, _ := createTestServiceWithHost(ctx, t, net.Hosts()[1], true) - proof := newValidProof() - require.NoError(t, pserviceA.Start(ctx)) - require.NoError(t, pserviceB.Start(ctx)) - subs, err := pserviceB.Subscribe(mockProofType) - require.NoError(t, err) - bin, err := proof.MarshalBinary() - require.NoError(t, err) - store := namespace.Wrap(pserviceA.ds, makeKey(mockProofType)) - require.NoError(t, put(ctx, store, string(proof.HeaderHash()), bin)) - - addrB := host.InfoFromHost(net.Hosts()[1]) - require.NoError(t, net.Hosts()[0].Connect(ctx, *addrB)) - - _, err = subs.Proof(ctx) - require.NoError(t, err) -} diff --git a/testing.go b/testing.go deleted file mode 100644 index 9e027dd..0000000 --- a/testing.go +++ /dev/null @@ -1,149 +0,0 @@ -package fraud - -import ( - "context" - "encoding/json" - "errors" - "testing" - "time" - - "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/sync" - pubsub "github.com/libp2p/go-libp2p-pubsub" - "github.com/libp2p/go-libp2p/core/host" - mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" - "github.com/stretchr/testify/require" - - "github.com/celestiaorg/go-header" - - "github.com/celestiaorg/celestia-node/header/headertest" -) - -type DummyService struct { -} - -func (d *DummyService) Broadcast(context.Context, Proof) error { - return nil -} - -func (d *DummyService) Subscribe(ProofType) (Subscription, error) { - return &subscription{}, nil -} - -func (d *DummyService) Get(context.Context, ProofType) ([]Proof, error) { - return nil, nil -} - -type mockStore struct { - headers map[int64]header.Header - headHeight int64 -} - -// createStore creates a mock store and adds several random -// headers. -func createStore(t *testing.T, numHeaders int) *mockStore { - store := &mockStore{ - headers: make(map[int64]header.Header), - headHeight: 0, - } - - suite := headertest.NewTestSuite(t, numHeaders) - - for i := 0; i < numHeaders; i++ { - header := suite.GenExtendedHeader() - store.headers[header.Height()] = header - - if header.Height() > store.headHeight { - store.headHeight = header.Height() - } - } - return store -} - -func (m *mockStore) GetByHeight(_ context.Context, height uint64) (header.Header, error) { - return m.headers[int64(height)], nil -} - -func (m *mockStore) Close() error { return nil } - -const ( - mockProofType ProofType = "mockProof" -) - -type mockProof struct { - Valid bool -} - -func newValidProof() *mockProof { - return newMockProof(true) -} - -func newInvalidProof() *mockProof { - return newMockProof(false) -} - -func newMockProof(valid bool) *mockProof { - p := &mockProof{valid} - if _, ok := defaultUnmarshalers[p.Type()]; !ok { - Register(&mockProof{}) - } - return p -} - -func (m *mockProof) Type() ProofType { - return mockProofType -} - -func (m *mockProof) HeaderHash() []byte { - return []byte("hash") -} - -func (m *mockProof) Height() uint64 { - return 1 -} - -func (m *mockProof) Validate(header.Header) error { - if !m.Valid { - return errors.New("mockProof: proof is not valid") - } - return nil -} - -func (m *mockProof) MarshalBinary() (data []byte, err error) { - return json.Marshal(m) -} - -func (m *mockProof) UnmarshalBinary(data []byte) error { - return json.Unmarshal(data, m) -} - -func CreateTestService(t *testing.T, enabledSyncer bool) (*ProofService, *mockStore) { //nolint:revive - ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) - t.Cleanup(cancel) - - // create mock network - net, err := mocknet.FullMeshLinked(1) - require.NoError(t, err) - return createTestServiceWithHost(ctx, t, net.Hosts()[0], enabledSyncer) -} - -func createTestServiceWithHost( - ctx context.Context, - t *testing.T, - host host.Host, - enabledSyncer bool, -) (*ProofService, *mockStore) { - // create pubsub for host - ps, err := pubsub.NewGossipSub(ctx, host, - pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign)) - require.NoError(t, err) - store := createStore(t, 10) - return NewProofService( - ps, - host, - store.GetByHeight, - sync.MutexWrap(datastore.NewMapDatastore()), - enabledSyncer, - "private", - ), store -}