Skip to content

Commit

Permalink
Use datastore in top level Index (#118)
Browse files Browse the repository at this point in the history
* use datastore in top-level Index

* fix test race

* fix test race

* fix test race

* namespace inverted index ds

* changes as per review

* last review

* remove dedup map

* log test
  • Loading branch information
aarshkshah1992 authored Feb 21, 2022
1 parent e07b050 commit eac7733
Show file tree
Hide file tree
Showing 10 changed files with 178 additions and 206 deletions.
3 changes: 2 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ jobs:
- go/test:
covermode: atomic
failfast: true
race: true
race: true
verbose: true
10 changes: 3 additions & 7 deletions dagstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,10 @@ import (
"os"
"sync"

"github.com/libp2p/go-libp2p-core/peer"

mh "github.com/multiformats/go-multihash"

carindex "github.com/ipld/go-car/v2/index"

"github.com/filecoin-project/go-indexer-core/store/memory"

ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/ipfs/go-datastore/query"
Expand Down Expand Up @@ -209,7 +205,7 @@ func NewDAGStore(cfg Config) (*DAGStore, error) {

if cfg.TopLevelIndex == nil {
log.Info("using in-memory inverted index")
cfg.TopLevelIndex = index.NewInverted(memory.New(), peer.ID(""))
cfg.TopLevelIndex = index.NewInverted(dssync.MutexWrap(ds.NewMapDatastore()))
}

// handle the datastore.
Expand Down Expand Up @@ -351,8 +347,8 @@ func (d *DAGStore) GetIterableIndex(key shard.Key) (carindex.IterableIndex, erro
return ii, nil
}

func (d *DAGStore) ShardsContainingMultihash(h mh.Multihash) ([]shard.Key, error) {
return d.TopLevelIndex.GetShardsForMultihash(h)
func (d *DAGStore) ShardsContainingMultihash(ctx context.Context, h mh.Multihash) ([]shard.Key, error) {
return d.TopLevelIndex.GetShardsForMultihash(ctx, h)
}

type RegisterOpts struct {
Expand Down
2 changes: 1 addition & 1 deletion dagstore_async.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (d *DAGStore) initializeShard(ctx context.Context, s *Shard, mnt mount.Moun
iterableIdx, ok := idx.(carindex.IterableIndex)
if ok {
mhIter := &mhIdx{iterableIdx: iterableIdx}
if err := d.TopLevelIndex.AddMultihashesForShard(mhIter, s.key); err != nil {
if err := d.TopLevelIndex.AddMultihashesForShard(ctx, mhIter, s.key); err != nil {
log.Errorw("failed to add shard multihashes to the inverted index", "shard", s.key, "error", err)
}
} else {
Expand Down
6 changes: 2 additions & 4 deletions dagstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func TestRegisterCarV1(t *testing.T) {
}

func TestRegisterCarV2(t *testing.T) {
ctx := context.Background()
dagst, err := NewDAGStore(Config{
MountRegistry: testRegistry(t),
TransientsDir: t.TempDir(),
Expand Down Expand Up @@ -168,7 +169,7 @@ func TestRegisterCarV2(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, ii)
err = ii.ForEach(func(h multihash.Multihash, _ uint64) error {
k2, err := dagst.ShardsContainingMultihash(h)
k2, err := dagst.ShardsContainingMultihash(ctx, h)
if err != nil {
return err
}
Expand Down Expand Up @@ -207,9 +208,6 @@ func TestRegisterConcurrentShards(t *testing.T) {
t.Run("8", func(t *testing.T) { run(t, 8) })
t.Run("16", func(t *testing.T) { run(t, 16) })
t.Run("32", func(t *testing.T) { run(t, 32) })
t.Run("64", func(t *testing.T) { run(t, 64) })
t.Run("128", func(t *testing.T) { run(t, 128) })
t.Run("256", func(t *testing.T) { run(t, 256) })
}

func TestAcquireInexistentShard(t *testing.T) {
Expand Down
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@ module github.com/filecoin-project/dagstore
go 1.16

require (
github.com/filecoin-project/go-indexer-core v0.2.4
github.com/ipfs/go-block-format v0.0.3
github.com/ipfs/go-cid v0.1.0
github.com/ipfs/go-datastore v0.5.0
github.com/ipfs/go-ds-leveldb v0.5.0
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-log/v2 v2.3.0
github.com/ipld/go-car/v2 v2.1.1
github.com/libp2p/go-libp2p-core v0.9.0
github.com/libp2p/go-libp2p-core v0.9.0 // indirect
github.com/mr-tron/base58 v1.2.0
github.com/multiformats/go-multicodec v0.3.1-0.20210902112759-1539a079fd61
github.com/multiformats/go-multihash v0.1.0
github.com/stretchr/testify v1.7.0
github.com/syndtr/goleveldb v1.0.0
github.com/whyrusleeping/cbor-gen v0.0.0-20200123233031-1cdf64d27158
golang.org/x/exp v0.0.0-20210714144626-1041f73d31d8
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
Expand Down
86 changes: 11 additions & 75 deletions go.sum

Large diffs are not rendered by default.

135 changes: 88 additions & 47 deletions index/inverted_index_impl.go
Original file line number Diff line number Diff line change
@@ -1,83 +1,124 @@
package index

import (
"errors"
"context"
"encoding/json"
"fmt"
"sync"

"github.com/libp2p/go-libp2p-core/peer"
"github.com/ipfs/go-datastore/namespace"

"github.com/filecoin-project/go-indexer-core"
ds "github.com/ipfs/go-datastore"

"github.com/multiformats/go-multihash"

"github.com/filecoin-project/dagstore/shard"
)

var InvertedIndexErrNotFound = errors.New("multihash not found in Index")
var _ Inverted = (*invertedIndexImpl)(nil)

var _ Inverted = (*indexerCoreIndex)(nil)

type indexerCoreIndex struct {
is indexer.Interface
selfPeerID peer.ID
type invertedIndexImpl struct {
mu sync.Mutex
ds ds.Batching
}

// NewInverted returns a new inverted index that uses `go-indexer-core`
// as it's storage backend. We use `go-indexer-core` as the backend here
// as it's been optimized to store (multihash -> Value) kind of data and
// supports bulk updates via context ID and metadata-deduplication which are useful properties for our use case here.
func NewInverted(is indexer.Interface, selfPeerID peer.ID) *indexerCoreIndex {
return &indexerCoreIndex{
is: is,
selfPeerID: selfPeerID,
func NewInverted(dts ds.Batching) *invertedIndexImpl {
dts = namespace.Wrap(dts, ds.NewKey("/inverted/index"))
return &invertedIndexImpl{
ds: dts,
}
}

func (d *indexerCoreIndex) AddMultihashesForShard(mhIter MultihashIterator, s shard.Key) error {
return mhIter.ForEach(func(mh multihash.Multihash) error {
// go-indexer-core appends values to the existing values we already have for the key
// it also takes care of de-duplicating values.
return d.is.Put(valueForShardKey(s, d.selfPeerID), mh)
})
}

func (d *indexerCoreIndex) DeleteMultihashesForShard(sk shard.Key, mhIter MultihashIterator) error {
return mhIter.ForEach(func(mh multihash.Multihash) error {
// remove the given value i.e. shard key from the index for the given multihash.
return d.is.Remove(valueForShardKey(sk, d.selfPeerID), mh)
})
}
func (d *invertedIndexImpl) AddMultihashesForShard(ctx context.Context, mhIter MultihashIterator, s shard.Key) error {
d.mu.Lock()
defer d.mu.Unlock()

func (d *indexerCoreIndex) GetShardsForMultihash(mh multihash.Multihash) ([]shard.Key, error) {
values, found, err := d.is.Get(mh)
batch, err := d.ds.Batch(ctx)
if err != nil {
return nil, fmt.Errorf("failed to lookup index for multihash %s, err: %w", mh, err)
return fmt.Errorf("failed to create ds batch: %w", err)
}
if !found || len(values) == 0 {
return nil, fmt.Errorf("cid not found, multihash=%s, err: %w", mh, InvertedIndexErrNotFound)

if err := mhIter.ForEach(func(mh multihash.Multihash) error {
key := ds.NewKey(string(mh))
// do we already have an entry for this multihash ?
val, err := d.ds.Get(ctx, key)
if err != nil && err != ds.ErrNotFound {
return fmt.Errorf("failed to get value for multihash %s, err: %w", mh, err)
}

// if we don't have an existing entry for this mh, create one
if err == ds.ErrNotFound {
s := []shard.Key{s}
bz, err := json.Marshal(s)
if err != nil {
return fmt.Errorf("failed to marshal shard list to bytes: %w", err)
}
if err := batch.Put(ctx, key, bz); err != nil {
return fmt.Errorf("failed to put mh=%s, err=%w", mh, err)
}
return nil
}

// else , append the shard key to the existing list
var es []shard.Key
if err := json.Unmarshal(val, &es); err != nil {
return fmt.Errorf("failed to unmarshal shard keys: %w", err)
}

// if we already have the shard key indexed for the multihash, nothing to do here.
if has(es, s) {
return nil
}

es = append(es, s)
bz, err := json.Marshal(es)
if err != nil {
return fmt.Errorf("failed to marshal shard keys: %w", err)
}
if err := batch.Put(ctx, key, bz); err != nil {
return fmt.Errorf("failed to put mh=%s, err%w", mh, err)
}

return nil
}); err != nil {
return fmt.Errorf("failed to add index entry: %w", err)
}

shardKeys := make([]shard.Key, 0, len(values))
for _, v := range values {
shardKeys = append(shardKeys, shardKeyFromValue(v))
if err := batch.Commit(ctx); err != nil {
return fmt.Errorf("failed to commit batch: %w", err)
}

return shardKeys, nil
}
if err := d.ds.Sync(ctx, ds.Key{}); err != nil {
return fmt.Errorf("failed to sync puts: %w", err)
}

func (d *indexerCoreIndex) Size() (int64, error) {
return d.is.Size()
return nil
}

func shardKeyFromValue(val indexer.Value) shard.Key {
str := string(val.ContextID)
return shard.KeyFromString(str)
func (d *invertedIndexImpl) GetShardsForMultihash(ctx context.Context, mh multihash.Multihash) ([]shard.Key, error) {
key := ds.NewKey(string(mh))
sbz, err := d.ds.Get(ctx, key)
if err != nil {
return nil, fmt.Errorf("failed to lookup index for mh %s, err: %w", mh, err)
}

var shardKeys []shard.Key
if err := json.Unmarshal(sbz, &shardKeys); err != nil {
return nil, fmt.Errorf("failed to unmarshal shard keys for mh=%s, err=%w", mh, err)
}

return shardKeys, nil
}

func valueForShardKey(key shard.Key, selfPeerID peer.ID) indexer.Value {
return indexer.Value{
ProviderID: selfPeerID,
ContextID: []byte(key.String()),
MetadataBytes: []byte("N/A"),
func has(es []shard.Key, k shard.Key) bool {
for _, s := range es {
if s == k {
return true
}
}
return false
}
Loading

0 comments on commit eac7733

Please sign in to comment.