Skip to content
This repository has been archived by the owner on May 15, 2024. It is now read-only.

feat: add cleanup test, and isolate id mapping code and cleanup code #218

Merged
merged 8 commits into from
Nov 11, 2023
Merged
51 changes: 51 additions & 0 deletions blob/local_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"os"
"path/filepath"

"strings"

"github.com/gammazero/fsutil/disk"
)

Expand Down Expand Up @@ -128,3 +130,52 @@ func (l *LocalStore) Describe(ctx context.Context, id ID) (*Descriptor, error) {
ModificationTime: stat.ModTime(),
}, nil
}

// Lists all locally stored blob IDs. Files in the local store directory which
// do not end in ".bin", or which cannot be parsed into a blob ID, will not be
// included.
func (l *LocalStore) List(ctx context.Context) ([]ID, error) {
var ids []ID

dir, err := os.ReadDir(l.dir)
if err != nil {
return nil, fmt.Errorf("failed to read local store directory: %w", err)
}

for _, binFileEntry := range dir {
if binFileEntry.IsDir() {
// Do not include directories
continue
}

idString, isBin := strings.CutSuffix(binFileEntry.Name(), ".bin")
gammazero marked this conversation as resolved.
Show resolved Hide resolved
if !isBin {
// Do not include files that don't end in .bin
continue
}

var id ID
if err := id.Decode(idString); err != nil {
// Do not include files that cannot be parsed into a blob ID
continue
}

ids = append(ids, ID(id))
}

return ids, nil
}

// Removes the blob. Errors with ErrBlobNotFound if the blob does not exist.
func (l *LocalStore) Remove(ctx context.Context, id ID) error {
binFileName := id.String() + ".bin"
if err := os.Remove(filepath.Join(l.dir, binFileName)); err != nil {
if errors.Is(err, os.ErrNotExist) {
return ErrBlobNotFound
}

return fmt.Errorf("failed to remove bin file '%s': %w", binFileName, err)
}

return nil
}
118 changes: 118 additions & 0 deletions integration/singularity/cleanup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package singularity

import (
"context"
"fmt"
"sync"
"time"

"github.com/filecoin-project/motion/blob"
)

type cleanupSchedulerConfig struct {
interval time.Duration
}

// This is run by the cleanup scheduler to determine whether to clean up a local
// file.
type cleanupReadyCallback func(ctx context.Context, blobID blob.ID) (bool, error)

type cleanupScheduler struct {
cfg cleanupSchedulerConfig
local *blob.LocalStore
cleanupReady cleanupReadyCallback
closing chan struct{}
closed sync.WaitGroup
}

func newCleanupScheduler(
cfg cleanupSchedulerConfig,
local *blob.LocalStore,
cleanupReady cleanupReadyCallback,
) *cleanupScheduler {
return &cleanupScheduler{
cfg: cfg,
local: local,
cleanupReady: cleanupReady,
closing: make(chan struct{}),
}
}

func (cs *cleanupScheduler) start(ctx context.Context) {
cs.closed.Add(1)

go func() {
defer cs.closed.Done()

ctx, cancel := context.WithCancel(context.Background())
go func() {
<-cs.closing
cancel()
}()

ticker := time.NewTicker(cs.cfg.interval)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this until after the first call to cs.clenaup(ctx), to line 57, so that the timer does not start until after the first cleanup is done.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did it in this order because after the initial call, the behavior of the ticker is to tick x duration after the start of the previous cleanup, not after the end of the previous cleanup. so this way the behavior stays consistent with the regular loop, and cleanups will happen at predictable times.

if it is desired to start waiting for the duration after the previous cleanup finishes instead of immediately after it starts, we would probably have to replace the ticker with a timer that is manually scheduled at the end of a cleanup. lmk what u think

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I see.

Would be nice to put the defer ticker.Stop() right after the ticker is created to make clear it is stopped at exit.

defer ticker.Stop()

// Run once immediately on startup
cs.cleanup(ctx)

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
cs.cleanup(ctx)
}
}
}()
}

func (cs *cleanupScheduler) stop(ctx context.Context) error {
close(cs.closing)

done := make(chan struct{})
go func() {
cs.closed.Wait()
close(done)
}()

select {
case <-ctx.Done():
return ctx.Err()
case <-done:
return nil
}
}

func (cs *cleanupScheduler) cleanup(ctx context.Context) error {
logger.Info("Starting cleanup")

ids, err := cs.local.List(ctx)
if err != nil {
return fmt.Errorf("failed to list local blob IDs: %w", err)
}

var removals []blob.ID
for _, id := range ids {
cleanupReady, err := cs.cleanupReady(ctx, id)
if err != nil {
logger.Warnw("failed to check if blob is ready for cleanup, skipping for this cleanup cycle", "err", err)
continue
}
if cleanupReady {
removals = append(removals, id)
}
}

for _, blobID := range removals {
cs.local.Remove(ctx, blobID)
}

if len(removals) > 0 {
logger.Infow("Cleaned up unneeded local files", "count", len(removals))
} else {
logger.Info("Did not find any local files to clean up")
}

return nil
}
113 changes: 113 additions & 0 deletions integration/singularity/cleanup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package singularity

import (
"bytes"
"context"
"crypto/rand"
"io"
"os"
"path/filepath"
"sync"
"testing"
"time"

"github.com/filecoin-project/motion/blob"
"github.com/stretchr/testify/require"
)

func TestCleanupScheduler(t *testing.T) {
localDir := filepath.Join(t.TempDir(), "motion-cleanup-test")
require.NoError(t, os.MkdirAll(localDir, 0777))
local := blob.NewLocalStore(localDir)

var lock sync.Mutex
var cleanupNotReadyBlobs []blob.ID
var cleanupReadyBlobs []blob.ID

// Pushes a new blob and optionally add it to the shouldRemove array for the
// cleanup test
add := func(shouldRemove bool) {
data := make([]byte, 16)
_, err := rand.Reader.Read(data)
require.NoError(t, err)

desc, err := local.Put(context.Background(), io.NopCloser(bytes.NewReader(data)))
require.NoError(t, err)

if shouldRemove {
cleanupReadyBlobs = append(cleanupReadyBlobs, desc.ID)
} else {
cleanupNotReadyBlobs = append(cleanupNotReadyBlobs, desc.ID)
}
}

// Intersperse ready and non-ready blobs
for i := 0; i < 100; i++ {
add(false)
add(true)
}

// Returns true if the value is present in the shouldRemove slice
callback := func(ctx context.Context, blobID blob.ID) (bool, error) {
lock.Lock()
defer lock.Unlock()
for _, other := range cleanupReadyBlobs {
if blobID == other {
return true, nil
}
}
return false, nil
}

cfg := cleanupSchedulerConfig{
interval: time.Second,
}

// Before starting the cleanup scheduler, make sure all blobs are present
listBefore, err := local.List(context.Background())
t.Logf("length before: %v", len(listBefore))
require.NoError(t, err)
lock.Lock()
for _, blob := range cleanupNotReadyBlobs {
require.Contains(t, listBefore, blob)
}
for _, blob := range cleanupReadyBlobs {
require.Contains(t, listBefore, blob)
}
lock.Unlock()

// Start cleanup scheduler and check that all cleanup ready blobs are
// already removed before the first cleanup tick, since there should be one
// iteration immediately on startup
cleanupScheduler := newCleanupScheduler(cfg, local, callback)
cleanupScheduler.start(context.Background())

time.Sleep(cfg.interval / 2)

listAfterStart, err := local.List(context.Background())
t.Logf("length after start: %v", len(listAfterStart))
require.NoError(t, err)
lock.Lock()
for _, blob := range cleanupNotReadyBlobs {
require.Contains(t, listAfterStart, blob)
}
for _, blob := range cleanupReadyBlobs {
require.NotContains(t, listAfterStart, blob)
}
lock.Unlock()

// Add the rest of the blobs to cleanup ready, wait for 1 more tick, and
// make sure that no more blobs are left
lock.Lock()
cleanupReadyBlobs = append(cleanupReadyBlobs, cleanupNotReadyBlobs...)
lock.Unlock()

time.Sleep(cfg.interval)

listAfterTick, err := local.List(context.Background())
t.Logf("length after tick: %v", len(listAfterTick))
require.NoError(t, err)
require.Empty(t, listAfterTick)

require.NoError(t, cleanupScheduler.stop(context.Background()))
}
85 changes: 85 additions & 0 deletions integration/singularity/id_map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package singularity

import (
"errors"
"fmt"
"os"
"path/filepath"
"strconv"

"github.com/filecoin-project/motion/blob"
)

type idMap struct {
dir string
}

func newIDMap(dir string) *idMap {
return &idMap{
dir: dir,
}
}

func (im *idMap) path(blobID blob.ID) string {
return filepath.Join(im.dir, blobID.String()+".id")
}

// Inserts a blob ID to Singularity ID mapping.
func (im *idMap) insert(blobID blob.ID, singularityID int64) error {
idFile, err := os.CreateTemp(im.dir, "motion_local_store_*.id.temp")
if err != nil {
return fmt.Errorf("failed to create temporary file: %w", err)
}
defer func() {
if err := idFile.Close(); err != nil {
logger.Debugw("Failed to close temporary file", "err", err)
}
}()
_, err = idFile.Write([]byte(strconv.FormatUint(uint64(singularityID), 10)))
if err != nil {
if err := os.Remove(idFile.Name()); err != nil {
logger.Debugw("Failed to remove temporary file", "path", idFile.Name(), "err", err)
}
return fmt.Errorf("failed to write ID file: %w", err)
}
if err = os.Rename(idFile.Name(), im.path(blobID)); err != nil {
return fmt.Errorf("failed to move ID file to store: %w", err)
}

return nil
}

// Maps blob ID to Singularity ID. Returns blob.ErrBlobNotFound if no mapping
// exists.
func (im *idMap) get(blobID blob.ID) (int64, error) {
fileIDString, err := os.ReadFile(im.path(blobID))
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return 0, blob.ErrBlobNotFound
}

return 0, fmt.Errorf("could not read ID file: %w", err)
}

fileID, err := strconv.ParseUint(string(fileIDString), 10, 64)
if err != nil {
return 0, fmt.Errorf("could not parse Singularity file ID '%s' of ID file for blob '%s': %w", fileIDString, blobID, err)
}

return int64(fileID), nil
}

// TODO: currently commented to silence unused warning
// // Removes blob ID to Singularity ID mapping. If no ID file existed,
// // blob.ErrBlobNotFound will be returned.
// func (im *idMap) remove(blobID blob.ID) error {
// if err := os.Remove(im.path(blobID)); err != nil {
// if errors.Is(err, os.ErrNotExist) {
// return blob.ErrBlobNotFound
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it fix the warning by not returning an error here? It is reasonable to not return file-not-found error when removing a file, since it is already removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the error isn't related to the return type, the function just isn't called anywhere yet and the linter doesn't like private functions that are unused

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alternatively, i could make this struct and its methods public to get rid of the warning? does it make sense to be public though?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not export (make public) anything unless really needed. Otherwise, it risks becoming an API we need to support.

Fine like it is then

// }

// return fmt.Errorf("could not read ID file: %w", err)
// }

// return nil
// }
Loading