Skip to content

Commit

Permalink
Add extended relationships filtering to bulk export API
Browse files Browse the repository at this point in the history
  • Loading branch information
josephschorr committed Mar 6, 2024
1 parent af919d9 commit 5f819f6
Show file tree
Hide file tree
Showing 2 changed files with 195 additions and 17 deletions.
56 changes: 39 additions & 17 deletions internal/services/v1/experimental.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,11 +287,12 @@ func (es *experimentalServer) BulkExportRelationships(
ds := datastoremw.MustFromContext(ctx)

var atRevision datastore.Revision
var curNamespace string
var cur dsoptions.Cursor

if req.OptionalCursor != nil {
var err error
atRevision, cur, err = decodeCursor(ds, req.OptionalCursor)
atRevision, curNamespace, cur, err = decodeCursor(ds, req.OptionalCursor)
if err != nil {
return es.rewriteError(ctx, err)
}
Expand Down Expand Up @@ -319,7 +320,7 @@ func (es *experimentalServer) BulkExportRelationships(
})

// Skip the namespaces that are already fully returned
for cur != nil && len(namespaces) > 0 && namespaces[0].Definition.Name < cur.ResourceAndRelation.Namespace {
for cur != nil && len(namespaces) > 0 && namespaces[0].Definition.Name < curNamespace {
namespaces = namespaces[1:]
}

Expand All @@ -346,10 +347,34 @@ func (es *experimentalServer) BulkExportRelationships(
}

emptyRels := make([]*v1.Relationship, limit)

for _, ns := range namespaces {
rels := emptyRels

// Reset the cursor between namespaces.
if ns.Definition.Name != curNamespace {
cur = nil
}

// Skip this namespace if a resource type filter was specified.
if req.OptionalRelationshipFilter != nil && req.OptionalRelationshipFilter.ResourceType != "" {
if ns.Definition.Name != req.OptionalRelationshipFilter.ResourceType {
continue
}
}

// Setup the filter to use for the relationships.
relationshipFilter := datastore.RelationshipsFilter{OptionalResourceType: ns.Definition.Name}
if req.OptionalRelationshipFilter != nil {
rf, err := datastore.RelationshipsFilterFromPublicFilter(req.OptionalRelationshipFilter)
if err != nil {
return es.rewriteError(ctx, err)
}

// Overload the namespace name with the one from the request, because each iteration is for a different namespace.
rf.OptionalResourceType = ns.Definition.Name
relationshipFilter = rf
}

// We want to keep iterating as long as we're sending full batches.
// To bootstrap this loop, we enter the first time with a full rels
// slice of dummy rels that were never sent.
Expand All @@ -366,7 +391,7 @@ func (es *experimentalServer) BulkExportRelationships(
cur, err = queryForEach(
ctx,
reader,
datastore.RelationshipsFilter{OptionalResourceType: ns.Definition.Name},
relationshipFilter,
tplFn,
dsoptions.WithLimit(&limit),
dsoptions.WithAfter(cur),
Expand All @@ -385,6 +410,7 @@ func (es *experimentalServer) BulkExportRelationships(
V1: &implv1.V1Cursor{
Revision: atRevision.String(),
Sections: []string{
ns.Definition.Name,
tuple.MustString(cur),
},
},
Expand All @@ -401,10 +427,6 @@ func (es *experimentalServer) BulkExportRelationships(
return es.rewriteError(ctx, err)
}
}

// Datastore namespace order might not be exactly the same as go namespace order
// so we shouldn't assume cursors are valid across namespaces
cur = nil
}

return nil
Expand Down Expand Up @@ -647,29 +669,29 @@ func queryForEach(
return cur, nil
}

func decodeCursor(ds datastore.Datastore, encoded *v1.Cursor) (datastore.Revision, *core.RelationTuple, error) {
func decodeCursor(ds datastore.Datastore, encoded *v1.Cursor) (datastore.Revision, string, *core.RelationTuple, error) {
decoded, err := cursor.Decode(encoded)
if err != nil {
return datastore.NoRevision, nil, err
return datastore.NoRevision, "", nil, err
}

if decoded.GetV1() == nil {
return datastore.NoRevision, nil, errors.New("malformed cursor: no V1 in OneOf")
return datastore.NoRevision, "", nil, errors.New("malformed cursor: no V1 in OneOf")
}

if len(decoded.GetV1().Sections) != 1 {
return datastore.NoRevision, nil, errors.New("malformed cursor: wrong number of components")
if len(decoded.GetV1().Sections) != 2 {
return datastore.NoRevision, "", nil, errors.New("malformed cursor: wrong number of components")
}

atRevision, err := ds.RevisionFromString(decoded.GetV1().Revision)
if err != nil {
return datastore.NoRevision, nil, err
return datastore.NoRevision, "", nil, err
}

cur := tuple.Parse(decoded.GetV1().GetSections()[0])
cur := tuple.Parse(decoded.GetV1().GetSections()[1])
if cur == nil {
return datastore.NoRevision, nil, errors.New("malformed cursor: invalid encoded relation tuple")
return datastore.NoRevision, "", nil, errors.New("malformed cursor: invalid encoded relation tuple")
}

return atRevision, cur, nil
return atRevision, decoded.GetV1().GetSections()[0], cur, nil
}
156 changes: 156 additions & 0 deletions internal/services/v1/experimental_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
tf "github.com/authzed/spicedb/internal/testfixtures"
"github.com/authzed/spicedb/internal/testserver"
"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/genutil/mapz"
"github.com/authzed/spicedb/pkg/testutil"
"github.com/authzed/spicedb/pkg/tuple"
)
Expand Down Expand Up @@ -225,6 +226,161 @@ func TestBulkExportRelationships(t *testing.T) {
}
}

func TestBulkExportRelationshipsWithFilter(t *testing.T) {
testCases := []struct {
name string
filter *v1.RelationshipFilter
expectedCount int
}{
{
"basic filter",
&v1.RelationshipFilter{
ResourceType: tf.DocumentNS.Name,
},
500,
},
{
"filter by resource ID",
&v1.RelationshipFilter{
OptionalResourceId: "12",
},
1,
},
{
"filter by resource ID prefix",
&v1.RelationshipFilter{
OptionalResourceIdPrefix: "1",
},
111,
},
{
"filter by resource ID prefix and resource type",
&v1.RelationshipFilter{
ResourceType: tf.DocumentNS.Name,
OptionalResourceIdPrefix: "1",
},
55,
},
{
"filter by invalid resource type",
&v1.RelationshipFilter{
ResourceType: "invalid",
},
0,
},
}

batchSize := 14

for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
require := require.New(t)

conn, cleanup, _, _ := testserver.NewTestServer(require, 0, memdb.DisableGC, true, tf.StandardDatastoreWithSchema)
client := v1.NewExperimentalServiceClient(conn)
t.Cleanup(cleanup)

nsAndRels := []struct {
namespace string
relation string
}{
{tf.DocumentNS.Name, "viewer"},
{tf.FolderNS.Name, "viewer"},
{tf.DocumentNS.Name, "owner"},
{tf.FolderNS.Name, "owner"},
{tf.DocumentNS.Name, "editor"},
{tf.FolderNS.Name, "editor"},
}

expectedRels := set.NewStringSetWithSize(1000)
batch := make([]*v1.Relationship, 1000)
for i := range batch {
nsAndRel := nsAndRels[i%len(nsAndRels)]
rel := rel(
nsAndRel.namespace,
strconv.Itoa(i),
nsAndRel.relation,
tf.UserNS.Name,
strconv.Itoa(i),
"",
)
batch[i] = rel

if tc.filter != nil {
filter, err := datastore.RelationshipsFilterFromPublicFilter(tc.filter)
require.NoError(err)
if !filter.Test(tuple.MustFromRelationship(rel)) {
continue
}
}

expectedRels.Add(tuple.MustStringRelationship(rel))
}

require.Equal(tc.expectedCount, expectedRels.Size())

ctx := context.Background()
writer, err := client.BulkImportRelationships(ctx)
require.NoError(err)

require.NoError(writer.Send(&v1.BulkImportRelationshipsRequest{
Relationships: batch,
}))

_, err = writer.CloseAndRecv()
require.NoError(err)

var totalRead uint64
remainingRels := expectedRels.Copy()
var cursor *v1.Cursor

foundRels := mapz.NewSet[string]()
for {
streamCtx, cancel := context.WithCancel(ctx)

stream, err := client.BulkExportRelationships(streamCtx, &v1.BulkExportRelationshipsRequest{
OptionalRelationshipFilter: tc.filter,
OptionalLimit: uint32(batchSize),
OptionalCursor: cursor,
})
require.NoError(err)

batch, err := stream.Recv()
if errors.Is(err, io.EOF) {
cancel()
break
}

require.NoError(err)
require.LessOrEqual(uint32(len(batch.Relationships)), uint32(batchSize))
require.NotNil(batch.AfterResultCursor)
require.NotEmpty(batch.AfterResultCursor.Token)

cursor = batch.AfterResultCursor
totalRead += uint64(len(batch.Relationships))

for _, rel := range batch.Relationships {
if tc.filter != nil {
filter, err := datastore.RelationshipsFilterFromPublicFilter(tc.filter)
require.NoError(err)
require.True(filter.Test(tuple.MustFromRelationship(rel)), "relationship did not match filter: %s", rel)
}

require.True(remainingRels.Has(tuple.MustStringRelationship(rel)), "relationship was not expected or was repeated: %s", rel)
remainingRels.Remove(tuple.MustStringRelationship(rel))
foundRels.Add(tuple.MustStringRelationship(rel))
}

cancel()
}

require.Equal(uint64(tc.expectedCount), totalRead, "found: %v", foundRels.AsSlice())
require.True(remainingRels.IsEmpty(), "rels were not exported %#v", remainingRels.List())
})
}
}

type bulkCheckTest struct {
req string
resp v1.CheckPermissionResponse_Permissionship
Expand Down

0 comments on commit 5f819f6

Please sign in to comment.