Skip to content

Commit

Permalink
Fix experimental LookupResources2 to shear the tree earlier on indire…
Browse files Browse the repository at this point in the history
…ct permissions
  • Loading branch information
josephschorr committed Aug 5, 2024
1 parent 4429644 commit 5478cfc
Show file tree
Hide file tree
Showing 9 changed files with 616 additions and 119 deletions.
428 changes: 422 additions & 6 deletions internal/dispatch/graph/lookupresources2_test.go

Large diffs are not rendered by default.

14 changes: 11 additions & 3 deletions internal/dispatch/graph/lookupresources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,12 @@ func TestMaxDepthLookup(t *testing.T) {
require.Error(err)
}

func joinTuples(first []*core.RelationTuple, second []*core.RelationTuple) []*core.RelationTuple {
return append(first, second...)
func joinTuples(first []*core.RelationTuple, others ...[]*core.RelationTuple) []*core.RelationTuple {
current := first
for _, second := range others {
current = append(current, second...)
}
return current
}

func genTuplesWithOffset(resourceName string, relation string, subjectName string, subjectID string, offset int, number int) []*core.RelationTuple {
Expand All @@ -357,11 +361,15 @@ func genSubjectTuples(resourceName string, relation string, subjectName string,
}

func genTuplesWithCaveat(resourceName string, relation string, subjectName string, subjectID string, caveatName string, context map[string]any, offset int, number int) []*core.RelationTuple {
return genTuplesWithCaveatAndSubjectRelation(resourceName, relation, subjectName, subjectID, "...", caveatName, context, offset, number)
}

func genTuplesWithCaveatAndSubjectRelation(resourceName string, relation string, subjectName string, subjectID string, subjectRelation string, caveatName string, context map[string]any, offset int, number int) []*core.RelationTuple {
tuples := make([]*core.RelationTuple, 0, number)
for i := 0; i < number; i++ {
tpl := &core.RelationTuple{
ResourceAndRelation: ONR(resourceName, fmt.Sprintf("%s-%d", resourceName, i+offset), relation),
Subject: ONR(subjectName, subjectID, "..."),
Subject: ONR(subjectName, subjectID, subjectRelation),
}
if caveatName != "" {
tpl = tuple.MustWithCaveat(tpl, caveatName, context)
Expand Down
4 changes: 2 additions & 2 deletions internal/graph/lookupresources2.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,8 +615,8 @@ func (crr *CursoredLookupResources2) redispatchOrReport(
}, stream)
}

// Otherwise, we need to dispatch and filter results by batch checking along the way.
return runDispatchAndChecker(
// Otherwise, we need to filter results by batch checking along the way before dispatching.
return runCheckerAndDispatch(
ctx,
parentRequest,
foundResources,
Expand Down
205 changes: 114 additions & 91 deletions internal/graph/lr2streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package graph

import (
"context"
"strconv"
"sync"

"github.com/authzed/spicedb/internal/dispatch"
Expand All @@ -15,10 +16,10 @@ import (
"github.com/authzed/spicedb/pkg/typesystem"
)

// runDispatchAndChecker runs the dispatch and checker for a lookup resources call, and publishes
// the results to the parent stream. This function is responsible for handling the dispatching
// of the lookup resources call, and then checking the results to filter them.
func runDispatchAndChecker(
// runCheckerAndDispatch runs the dispatch and checker for a lookup resources call, and publishes
// the results to the parent stream. This function is responsible for handling checking the
// results to filter them, and then dispatching those found.
func runCheckerAndDispatch(
ctx context.Context,
parentReq ValidatedLookupResources2Request,
foundResources dispatchableResourcesSubjectMap2,
Expand All @@ -35,13 +36,19 @@ func runDispatchAndChecker(
// Only allow max one dispatcher and one checker to run concurrently.
concurrencyLimit = min(concurrencyLimit, 2)

rdc := &dispatchAndCheckRunner{
currentCheckIndex, err := ci.integerSectionValue()
if err != nil {
return err
}

rdc := &checkAndDispatchRunner{
parentRequest: parentReq,
foundResources: foundResources,
ci: ci,
parentStream: parentStream,
newSubjectType: newSubjectType,
filteredSubjectIDs: filteredSubjectIDs,
currentCheckIndex: currentCheckIndex,
entrypoint: entrypoint,
lrDispatcher: lrDispatcher,
checkDispatcher: checkDispatcher,
Expand All @@ -53,87 +60,53 @@ func runDispatchAndChecker(
return rdc.runAndWait()
}

type dispatchAndCheckRunner struct {
parentRequest ValidatedLookupResources2Request
foundResources dispatchableResourcesSubjectMap2
ci cursorInformation
parentStream dispatch.LookupResources2Stream
newSubjectType *core.RelationReference
type checkAndDispatchRunner struct {
parentRequest ValidatedLookupResources2Request
foundResources dispatchableResourcesSubjectMap2
ci cursorInformation
parentStream dispatch.LookupResources2Stream
newSubjectType *core.RelationReference
entrypoint typesystem.ReachabilityEntrypoint
lrDispatcher dispatch.LookupResources2
checkDispatcher dispatch.Check
dispatchChunkSize uint16

filteredSubjectIDs []string
entrypoint typesystem.ReachabilityEntrypoint
lrDispatcher dispatch.LookupResources2
checkDispatcher dispatch.Check
dispatchChunkSize uint16
currentCheckIndex int

taskrunner *taskrunner.TaskRunner

lock *sync.Mutex
}

func (rdc *dispatchAndCheckRunner) dispatchAndCollect(ctx context.Context, cursor *v1.Cursor) ([]*v1.DispatchLookupResources2Response, error) {
collectingStream := dispatch.NewCollectingDispatchStream[*v1.DispatchLookupResources2Response](ctx)
err := rdc.lrDispatcher.DispatchLookupResources2(&v1.DispatchLookupResources2Request{
ResourceRelation: rdc.parentRequest.ResourceRelation,
SubjectRelation: rdc.newSubjectType,
SubjectIds: rdc.filteredSubjectIDs,
TerminalSubject: rdc.parentRequest.TerminalSubject,
Metadata: &v1.ResolverMeta{
AtRevision: rdc.parentRequest.Revision.String(),
DepthRemaining: rdc.parentRequest.Metadata.DepthRemaining - 1,
},
OptionalCursor: cursor,
OptionalLimit: uint32(rdc.dispatchChunkSize),
}, collectingStream)
return collectingStream.Results(), err
}

func (rdc *dispatchAndCheckRunner) runDispatch(ctx context.Context, cursor *v1.Cursor) error {
rdc.lock.Lock()
if rdc.ci.limits.hasExhaustedLimit() {
rdc.lock.Unlock()
return nil
}
rdc.lock.Unlock()

collected, err := rdc.dispatchAndCollect(ctx, cursor)
if err != nil {
return err
}

if len(collected) == 0 {
return nil
}

// Kick off a worker to filter the results via a check and then publish what was found.
func (rdc *checkAndDispatchRunner) runAndWait() error {
// Kick off a check at the current cursor, to filter a portion of the initial results set.
rdc.taskrunner.Schedule(func(ctx context.Context) error {
return rdc.runChecker(ctx, collected)
return rdc.runChecker(ctx, rdc.currentCheckIndex)
})

// Start another dispatch at the cursor of the last response, to run in the background
// and collect more results for filtering while the checker is running.
rdc.taskrunner.Schedule(func(ctx context.Context) error {
return rdc.runDispatch(ctx, collected[len(collected)-1].AfterResponseCursor)
})

return nil
return rdc.taskrunner.Wait()
}

func (rdc *dispatchAndCheckRunner) runChecker(ctx context.Context, collected []*v1.DispatchLookupResources2Response) error {
func (rdc *checkAndDispatchRunner) runChecker(ctx context.Context, startingIndex int) error {
rdc.lock.Lock()
if rdc.ci.limits.hasExhaustedLimit() {
rdc.lock.Unlock()
return nil
}
rdc.lock.Unlock()

checkHints := make([]*v1.CheckHint, 0, len(collected))
resourceIDsToCheck := make([]string, 0, len(collected))
for _, resource := range collected {
resourceIDsToCheck = append(resourceIDsToCheck, resource.Resource.ResourceId)
endingIndex := min(startingIndex+int(rdc.dispatchChunkSize), len(rdc.filteredSubjectIDs))
resourceIDsToCheck := rdc.filteredSubjectIDs[startingIndex:endingIndex]
if len(resourceIDsToCheck) == 0 {
return nil
}

checkHints := make([]*v1.CheckHint, 0, len(resourceIDsToCheck))
for _, resourceID := range resourceIDsToCheck {
checkHint, err := hints.HintForEntrypoint(
rdc.entrypoint,
resource.Resource.ResourceId,
resourceID,
rdc.parentRequest.TerminalSubject,
&v1.ResourceCheckResult{
Membership: v1.ResourceCheckResult_MEMBER,
Expand All @@ -144,9 +117,10 @@ func (rdc *dispatchAndCheckRunner) runChecker(ctx context.Context, collected []*
checkHints = append(checkHints, checkHint)
}

// Batch check the results to filter to those visible and then publish just the visible resources.
// NOTE: we are checking the containing permission here, *not* the target relation, as
// the goal is to shear for the containing permission.
resultsByResourceID, checkMetadata, err := computed.ComputeBulkCheck(ctx, rdc.checkDispatcher, computed.CheckParameters{
ResourceType: rdc.parentRequest.ResourceRelation,
ResourceType: rdc.newSubjectType,
Subject: rdc.parentRequest.TerminalSubject,
CaveatContext: rdc.parentRequest.Context.AsMap(),
AtRevision: rdc.parentRequest.Revision,
Expand All @@ -158,10 +132,12 @@ func (rdc *dispatchAndCheckRunner) runChecker(ctx context.Context, collected []*
return err
}

// Publish any resources that are visible.
isFirstPublishCall := true
for _, resource := range collected {
result, ok := resultsByResourceID[resource.Resource.ResourceId]
adjustedResources := rdc.foundResources.cloneAsMutable()

// Dispatch any resources that are visible.
resourceIDToDispatch := make([]string, 0, len(resourceIDsToCheck))
for _, resourceID := range resourceIDsToCheck {
result, ok := resultsByResourceID[resourceID]
if !ok {
continue
}
Expand All @@ -171,19 +147,9 @@ func (rdc *dispatchAndCheckRunner) runChecker(ctx context.Context, collected []*
fallthrough

case v1.ResourceCheckResult_CAVEATED_MEMBER:
rdc.lock.Lock()
if err := publishResultToParentStream(resource, rdc.ci, rdc.foundResources, result.MissingExprFields, isFirstPublishCall, checkMetadata, rdc.parentStream); err != nil {
rdc.lock.Unlock()
return err
}

isFirstPublishCall = false

if rdc.ci.limits.hasExhaustedLimit() {
rdc.lock.Unlock()
return nil
}
rdc.lock.Unlock()
// Record any additional caveats missing from the check.
adjustedResources.withAdditionalMissingContextForDispatchedResourceID(resourceID, result.MissingExprFields)
resourceIDToDispatch = append(resourceIDToDispatch, resourceID)

case v1.ResourceCheckResult_NOT_MEMBER:
// Skip.
Expand All @@ -194,18 +160,74 @@ func (rdc *dispatchAndCheckRunner) runChecker(ctx context.Context, collected []*
}
}

if len(resourceIDToDispatch) > 0 {
// Schedule a dispatch of those resources.
rdc.taskrunner.Schedule(func(ctx context.Context) error {
return rdc.runDispatch(ctx, resourceIDToDispatch, adjustedResources.asReadOnly(), checkMetadata, startingIndex)
})
}

// Start the next check chunk (if applicable).
nextIndex := startingIndex + len(resourceIDsToCheck)
if nextIndex < len(rdc.filteredSubjectIDs) {
rdc.taskrunner.Schedule(func(ctx context.Context) error {
return rdc.runChecker(ctx, nextIndex)
})
}

return nil
}

func (rdc *dispatchAndCheckRunner) runAndWait() error {
currentCursor := rdc.ci.currentCursor
func (rdc *checkAndDispatchRunner) runDispatch(
ctx context.Context,
resourceIDsToDispatch []string,
adjustedResources dispatchableResourcesSubjectMap2,
checkMetadata *v1.ResponseMeta,
startingIndex int,
) error {
rdc.lock.Lock()
if rdc.ci.limits.hasExhaustedLimit() {
rdc.lock.Unlock()
return nil
}
rdc.lock.Unlock()

// Kick off a dispatch at the current cursor.
rdc.taskrunner.Schedule(func(ctx context.Context) error {
return rdc.runDispatch(ctx, currentCursor)
// NOTE: Since we extracted a custom section from the cursor at the beginning of this run, we have to add
// the starting index to the cursor to ensure that the next run starts from the correct place, and we have
// to use the *updated* cursor below on the dispatch.
updatedCi, err := rdc.ci.withOutgoingSection(strconv.Itoa(startingIndex))
if err != nil {
return err
}
responsePartialCursor := updatedCi.responsePartialCursor()

// Dispatch to the parent resource type and publish any results found.
isFirstPublishCall := true

wrappedStream := dispatch.NewHandlingDispatchStream(ctx, func(result *v1.DispatchLookupResources2Response) error {
if err := ctx.Err(); err != nil {
return err
}

if err := publishResultToParentStream(result, rdc.ci, responsePartialCursor, adjustedResources, nil, isFirstPublishCall, checkMetadata, rdc.parentStream); err != nil {
return err
}
isFirstPublishCall = false
return nil
})

return rdc.taskrunner.Wait()
return rdc.lrDispatcher.DispatchLookupResources2(&v1.DispatchLookupResources2Request{
ResourceRelation: rdc.parentRequest.ResourceRelation,
SubjectRelation: rdc.newSubjectType,
SubjectIds: resourceIDsToDispatch,
TerminalSubject: rdc.parentRequest.TerminalSubject,
Metadata: &v1.ResolverMeta{
AtRevision: rdc.parentRequest.Revision.String(),
DepthRemaining: rdc.parentRequest.Metadata.DepthRemaining - 1,
},
OptionalCursor: updatedCi.currentCursor,
OptionalLimit: rdc.ci.limits.currentLimit,
}, wrappedStream)
}

// unfilteredLookupResourcesDispatchStreamForEntrypoint creates a new dispatch stream that wraps
Expand All @@ -227,7 +249,7 @@ func unfilteredLookupResourcesDispatchStreamForEntrypoint(
default:
}

if err := publishResultToParentStream(result, ci, foundResources, nil, isFirstPublishCall, emptyMetadata, parentStream); err != nil {
if err := publishResultToParentStream(result, ci, ci.responsePartialCursor(), foundResources, nil, isFirstPublishCall, emptyMetadata, parentStream); err != nil {
return err
}
isFirstPublishCall = false
Expand All @@ -242,6 +264,7 @@ func unfilteredLookupResourcesDispatchStreamForEntrypoint(
func publishResultToParentStream(
result *v1.DispatchLookupResources2Response,
ci cursorInformation,
responseCursor *v1.Cursor,
foundResources dispatchableResourcesSubjectMap2,
additionalMissingContext []string,
isFirstPublishCall bool,
Expand All @@ -261,7 +284,7 @@ func publishResultToParentStream(

// The cursor for the response is that of the parent response + the cursor from the result itself.
afterResponseCursor, err := combineCursors(
ci.responsePartialCursor(),
responseCursor,
result.AfterResponseCursor,
)
if err != nil {
Expand Down
Loading

0 comments on commit 5478cfc

Please sign in to comment.