Skip to content

Commit

Permalink
created memory with delete
Browse files Browse the repository at this point in the history
Signed-off-by: Xiaoxuan Wang <[email protected]>
  • Loading branch information
wangxiaoxuan273 committed Sep 6, 2023
1 parent 6a4bc19 commit 7731e12
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 85 deletions.
6 changes: 3 additions & 3 deletions content/oci/deletableOci.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type DeletableStore struct {

storage *Storage
tagResolver *resolver.Memory
graph *graph.Memory
graph *graph.MemoryWithDelete
}

// NewDeletableStore returns a new DeletableStore.
Expand All @@ -82,7 +82,7 @@ func NewDeletableStoreWithContext(ctx context.Context, root string) (*DeletableS
indexPath: filepath.Join(rootAbs, ociImageIndexFile),
storage: storage,
tagResolver: resolver.NewMemory(),
graph: graph.NewMemory(),
graph: graph.NewMemoryWithDelete(),
}

if err := ensureDir(filepath.Join(rootAbs, ociBlobsDir)); err != nil {
Expand Down Expand Up @@ -294,7 +294,7 @@ func (ds *DeletableStore) loadIndexFile(ctx context.Context) error {
return fmt.Errorf("failed to decode index file: %w", err)
}
ds.index = &index
return loadIndex(ctx, ds.index, ds.storage, ds.tagResolver, ds.graph)
return loadIndexWithMemoryWithDelete(ctx, ds.index, ds.storage, ds.tagResolver, ds.graph)
}

// SaveIndex writes the `index.json` file to the file system.
Expand Down
19 changes: 19 additions & 0 deletions content/oci/readonlyoci.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,25 @@ func loadIndex(ctx context.Context, index *ocispec.Index, fetcher content.Fetche
return nil
}

// loadIndex loads index into memory.
func loadIndexWithMemoryWithDelete(ctx context.Context, index *ocispec.Index, fetcher content.Fetcher, tagger content.Tagger, graph *graph.MemoryWithDelete) error {
for _, desc := range index.Manifests {
if err := tagger.Tag(ctx, deleteAnnotationRefName(desc), desc.Digest.String()); err != nil {
return err
}
if ref := desc.Annotations[ocispec.AnnotationRefName]; ref != "" {
if err := tagger.Tag(ctx, desc, ref); err != nil {
return err
}
}
plain := descriptor.Plain(desc)
if err := graph.IndexAll(ctx, fetcher, plain); err != nil {
return err
}
}
return nil
}

// resolveBlob returns a descriptor describing the blob identified by dgst.
func resolveBlob(fsys fs.FS, dgst string) (ocispec.Descriptor, error) {
path, err := blobPath(digest.Digest(dgst))
Expand Down
43 changes: 3 additions & 40 deletions internal/graph/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
// Memory is a memory based PredecessorFinder.
type Memory struct {
predecessors sync.Map // map[descriptor.Descriptor]map[descriptor.Descriptor]ocispec.Descriptor
successors sync.Map // map[descriptor.Descriptor]map[descriptor.Descriptor]ocispec.Descriptor
indexed sync.Map // map[descriptor.Descriptor]any
}

Expand Down Expand Up @@ -117,54 +116,18 @@ func (m *Memory) Predecessors(_ context.Context, node ocispec.Descriptor) ([]oci
return res, nil
}

// RemoveFromIndex removes the node from its predecessors and successors.
func (m *Memory) RemoveFromIndex(ctx context.Context, node ocispec.Descriptor) error {
nodeKey := descriptor.FromOCI(node)
// remove the node from its successors' predecessor list
value, _ := m.successors.Load(nodeKey)
successors := value.(*sync.Map)
successors.Range(func(key, _ interface{}) bool {
value, _ = m.predecessors.Load(key)
predecessors := value.(*sync.Map)
predecessors.Delete(nodeKey)
return true
})
m.removeFromMemory(ctx, node)
return nil
}

// index indexes predecessors for each direct successor of the given node.
// There is no data consistency issue as long as deletion is not implemented
// for the underlying storage.
func (m *Memory) index(ctx context.Context, node ocispec.Descriptor, successors []ocispec.Descriptor) {
m.indexIntoMemory(ctx, node)
if len(successors) == 0 {
return
}
predecessorKey := descriptor.FromOCI(node)
for _, successor := range successors {
successorKey := descriptor.FromOCI(successor)
// store in m.predecessors, memory.predecessors[successorKey].Store(node)
pred, _ := m.predecessors.LoadOrStore(successorKey, &sync.Map{})
predecessorsMap := pred.(*sync.Map)
predecessorsMap.Store(predecessorKey, node)
// store in m.successors, memory.successors[predecessorKey].Store(successor)
succ, _ := m.successors.Load(predecessorKey)
successorsMap := succ.(*sync.Map)
successorsMap.Store(successorKey, successor)
value, _ := m.predecessors.LoadOrStore(successorKey, &sync.Map{})
predecessors := value.(*sync.Map)
predecessors.Store(predecessorKey, node)
}
}

func (m *Memory) indexIntoMemory(ctx context.Context, node ocispec.Descriptor) {
key := descriptor.FromOCI(node)
m.predecessors.LoadOrStore(key, &sync.Map{})
m.successors.LoadOrStore(key, &sync.Map{})
m.indexed.LoadOrStore(key, &sync.Map{})
}

func (m *Memory) removeFromMemory(ctx context.Context, node ocispec.Descriptor) {
key := descriptor.FromOCI(node)
m.predecessors.Delete(key)
m.successors.Delete(key)
m.indexed.Delete(key)
}
170 changes: 170 additions & 0 deletions internal/graph/memoryWithDelete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
Copyright The ORAS Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package graph

import (
"context"
"errors"
"sync"

ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"oras.land/oras-go/v2/content"
"oras.land/oras-go/v2/errdef"
"oras.land/oras-go/v2/internal/descriptor"
"oras.land/oras-go/v2/internal/status"
"oras.land/oras-go/v2/internal/syncutil"
)

// MemoryWithDelete is a MemoryWithDelete based PredecessorFinder.
type MemoryWithDelete struct {
predecessors sync.Map // map[descriptor.Descriptor]map[descriptor.Descriptor]ocispec.Descriptor
successors sync.Map // map[descriptor.Descriptor]map[descriptor.Descriptor]ocispec.Descriptor
indexed sync.Map // map[descriptor.Descriptor]any
}

// NewMemoryWithDelete creates a new MemoryWithDelete PredecessorFinder.
func NewMemoryWithDelete() *MemoryWithDelete {
return &MemoryWithDelete{}
}

// Index indexes predecessors for each direct successor of the given node.
// There is no data consistency issue as long as deletion is not implemented
// for the underlying storage.
func (m *MemoryWithDelete) Index(ctx context.Context, fetcher content.Fetcher, node ocispec.Descriptor) error {
successors, err := content.Successors(ctx, fetcher, node)
if err != nil {
return err
}

m.index(ctx, node, successors)
return nil
}

// Index indexes predecessors for all the successors of the given node.
// There is no data consistency issue as long as deletion is not implemented
// for the underlying storage.
func (m *MemoryWithDelete) IndexAll(ctx context.Context, fetcher content.Fetcher, node ocispec.Descriptor) error {
// track content status
tracker := status.NewTracker()

var fn syncutil.GoFunc[ocispec.Descriptor]
fn = func(ctx context.Context, region *syncutil.LimitedRegion, desc ocispec.Descriptor) error {
// skip the node if other go routine is working on it
_, committed := tracker.TryCommit(desc)
if !committed {
return nil
}

// skip the node if it has been indexed
key := descriptor.FromOCI(desc)
_, exists := m.indexed.Load(key)
if exists {
return nil
}

successors, err := content.Successors(ctx, fetcher, desc)
if err != nil {
if errors.Is(err, errdef.ErrNotFound) {
// skip the node if it does not exist
return nil
}
return err
}
m.index(ctx, desc, successors)
m.indexed.Store(key, nil)

if len(successors) > 0 {
// traverse and index successors
return syncutil.Go(ctx, nil, fn, successors...)
}
return nil
}
return syncutil.Go(ctx, nil, fn, node)
}

// Predecessors returns the nodes directly pointing to the current node.
// Predecessors returns nil without error if the node does not exists in the
// store.
// Like other operations, calling Predecessors() is go-routine safe. However,
// it does not necessarily correspond to any consistent snapshot of the stored
// contents.
func (m *MemoryWithDelete) Predecessors(_ context.Context, node ocispec.Descriptor) ([]ocispec.Descriptor, error) {
key := descriptor.FromOCI(node)
value, exists := m.predecessors.Load(key)
if !exists {
return nil, nil
}
predecessors := value.(*sync.Map)

var res []ocispec.Descriptor
predecessors.Range(func(key, value interface{}) bool {
res = append(res, value.(ocispec.Descriptor))
return true
})
return res, nil
}

// RemoveFromIndex removes the node from its predecessors and successors.
func (m *MemoryWithDelete) RemoveFromIndex(ctx context.Context, node ocispec.Descriptor) error {
nodeKey := descriptor.FromOCI(node)
// remove the node from its successors' predecessor list
value, _ := m.successors.Load(nodeKey)
successors := value.(*sync.Map)
successors.Range(func(key, _ interface{}) bool {
value, _ = m.predecessors.Load(key)
predecessors := value.(*sync.Map)
predecessors.Delete(nodeKey)
return true
})
m.removeFromMemoryWithDelete(ctx, node)
return nil
}

// index indexes predecessors for each direct successor of the given node.
// There is no data consistency issue as long as deletion is not implemented
// for the underlying storage.
func (m *MemoryWithDelete) index(ctx context.Context, node ocispec.Descriptor, successors []ocispec.Descriptor) {
m.indexIntoMemoryWithDelete(ctx, node)
if len(successors) == 0 {
return
}
predecessorKey := descriptor.FromOCI(node)
for _, successor := range successors {
successorKey := descriptor.FromOCI(successor)
// store in m.predecessors, MemoryWithDelete.predecessors[successorKey].Store(node)
pred, _ := m.predecessors.LoadOrStore(successorKey, &sync.Map{})
predecessorsMap := pred.(*sync.Map)
predecessorsMap.Store(predecessorKey, node)
// store in m.successors, MemoryWithDelete.successors[predecessorKey].Store(successor)
succ, _ := m.successors.Load(predecessorKey)
successorsMap := succ.(*sync.Map)
successorsMap.Store(successorKey, successor)
}
}

func (m *MemoryWithDelete) indexIntoMemoryWithDelete(ctx context.Context, node ocispec.Descriptor) {
key := descriptor.FromOCI(node)
m.predecessors.LoadOrStore(key, &sync.Map{})
m.successors.LoadOrStore(key, &sync.Map{})
m.indexed.LoadOrStore(key, &sync.Map{})
}

func (m *MemoryWithDelete) removeFromMemoryWithDelete(ctx context.Context, node ocispec.Descriptor) {
key := descriptor.FromOCI(node)
m.predecessors.Delete(key)
m.successors.Delete(key)
m.indexed.Delete(key)
}
Loading

0 comments on commit 7731e12

Please sign in to comment.