This repository has been archived by the owner on May 15, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 5
feat: cleanup worker #126
Merged
Merged
feat: cleanup worker #126
Changes from all commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
e9ef55a
feat: local store cleanup worker
elijaharita 8622e33
feat: expose flag and env var for cleanup interval, fix other issues
elijaharita e12d180
fix: start cleanup workers from Start()
elijaharita 0494189
fix: remove unnecessary log line from reader.go
elijaharita 34cc3ad
style: local cleanup interval -> cleanup interval
elijaharita f46d4a7
fix: eof check before buffer init
elijaharita 7675254
refactor: address code organization comments
elijaharita bf6e3cd
fix: better cleanup loop logic
elijaharita 5deacc1
fix: proper shutdown
elijaharita 6621ba9
fix: use rand reader instead of repeated byte pattern
elijaharita 90b9d0d
fix: address hannah's comments
elijaharita File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,8 @@ import ( | |
"path" | ||
"strconv" | ||
"strings" | ||
"sync" | ||
"time" | ||
|
||
"github.com/data-preservation-programs/singularity/client/swagger/http/deal_schedule" | ||
"github.com/data-preservation-programs/singularity/client/swagger/http/file" | ||
|
@@ -34,7 +36,7 @@ type SingularityStore struct { | |
sourceName string | ||
toPack chan uint64 | ||
closing chan struct{} | ||
closed chan struct{} | ||
closed sync.WaitGroup | ||
} | ||
|
||
func NewStore(o ...Option) (*SingularityStore, error) { | ||
|
@@ -49,7 +51,6 @@ func NewStore(o ...Option) (*SingularityStore, error) { | |
sourceName: "source", | ||
toPack: make(chan uint64, 16), | ||
closing: make(chan struct{}), | ||
closed: make(chan struct{}), | ||
}, nil | ||
} | ||
|
||
|
@@ -194,6 +195,7 @@ func (l *SingularityStore) Start(ctx context.Context) error { | |
|
||
logger.Infof("Checking %v storage providers", len(l.storageProviders)) | ||
for _, sp := range l.storageProviders { | ||
logger.Infof("Checking storage provider %s", sp) | ||
var foundSchedule *models.ModelSchedule | ||
logger := logger.With("provider", sp) | ||
for _, schd := range listPreparationSchedulesRes.Payload { | ||
|
@@ -268,18 +270,21 @@ func (l *SingularityStore) Start(ctx context.Context) error { | |
} | ||
} | ||
go l.runPreparationJobs() | ||
go l.runCleanupWorker(context.Background()) | ||
return nil | ||
} | ||
|
||
func (l *SingularityStore) runPreparationJobs() { | ||
l.closed.Add(1) | ||
|
||
ctx := context.Background() | ||
ctx, cancel := context.WithCancel(ctx) | ||
defer cancel() | ||
defer func() { | ||
cancel() | ||
l.closed.Done() | ||
}() | ||
|
||
go func() { | ||
defer func() { | ||
close(l.closed) | ||
}() | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
|
@@ -306,15 +311,24 @@ func (l *SingularityStore) runPreparationJobs() { | |
} | ||
} | ||
}() | ||
<-l.closing | ||
} | ||
|
||
func (l *SingularityStore) Shutdown(ctx context.Context) error { | ||
close(l.closing) | ||
|
||
done := make(chan struct{}) | ||
go func() { | ||
l.closed.Wait() | ||
close(done) | ||
}() | ||
|
||
select { | ||
case <-ctx.Done(): | ||
case <-l.closed: | ||
case <-done: | ||
} | ||
|
||
logger.Infof("Singularity store shut down") | ||
|
||
return nil | ||
} | ||
|
||
|
@@ -396,7 +410,6 @@ func (s *SingularityStore) Get(ctx context.Context, id blob.ID) (io.ReadSeekClos | |
return &SingularityReader{ | ||
client: s.singularityClient, | ||
fileID: fileID, | ||
offset: 0, | ||
size: getFileRes.Payload.Size, | ||
}, nil | ||
} | ||
|
@@ -457,3 +470,105 @@ func (s *SingularityStore) Describe(ctx context.Context, id blob.ID) (*blob.Desc | |
} | ||
return descriptor, nil | ||
} | ||
|
||
func (s *SingularityStore) runCleanupWorker(ctx context.Context) { | ||
s.closed.Add(1) | ||
defer s.closed.Done() | ||
|
||
// Run immediately once before starting ticker | ||
s.runCleanupJob() | ||
|
||
ticker := time.NewTicker(s.cleanupInterval) | ||
for { | ||
select { | ||
case <-ticker.C: | ||
s.runCleanupJob() | ||
case <-s.closing: | ||
return | ||
} | ||
} | ||
} | ||
|
||
func (s *SingularityStore) runCleanupJob() { | ||
if err := s.cleanup(context.Background()); err != nil { | ||
logger.Errorf("Local store cleanup failed: %w", err) | ||
} | ||
} | ||
|
||
func (s *SingularityStore) cleanup(ctx context.Context) error { | ||
|
||
logger.Infof("Starting local store cleanup...") | ||
|
||
dir, err := os.ReadDir(s.local.Dir()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am curious if using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's only scanning one layer of a directory, so i don't think so? unless i misunderstood how the local store is laid out |
||
if err != nil { | ||
return fmt.Errorf("failed to open local store directory: %w", err) | ||
} | ||
|
||
var binsToDelete []string | ||
|
||
binIteration: | ||
for _, entry := range dir { | ||
binFileName := entry.Name() | ||
|
||
id, entryIsBin := strings.CutSuffix(binFileName, ".bin") | ||
if !entryIsBin { | ||
continue | ||
} | ||
|
||
idFileName := id + ".id" | ||
idStream, err := os.Open(path.Join(s.local.Dir(), idFileName)) | ||
if err != nil { | ||
logger.Warnf("Failed to open ID map file for %s: %v", id, err) | ||
continue | ||
} | ||
fileIDString, err := io.ReadAll(idStream) | ||
if err != nil { | ||
logger.Warnf("Failed to read ID map file for %s: %v", id, err) | ||
continue | ||
} | ||
fileID, err := strconv.ParseUint(string(fileIDString), 10, 64) | ||
if err != nil { | ||
logger.Warnf("Failed to parse file ID %s as integer: %v", fileIDString, err) | ||
continue | ||
} | ||
|
||
getFileDealsRes, err := s.singularityClient.File.GetFileDeals(&file.GetFileDealsParams{ | ||
Context: ctx, | ||
ID: int64(fileID), | ||
}) | ||
if err != nil { | ||
logger.Warnf("Failed to get file deals for %v: %v", fileID, err) | ||
continue | ||
} | ||
|
||
// Make sure the file has at least 1 deal for every SP | ||
for _, sp := range s.storageProviders { | ||
var foundDealForSP bool | ||
for _, deal := range getFileDealsRes.Payload { | ||
if deal.Provider == sp.String() { | ||
foundDealForSP = true | ||
break | ||
} | ||
} | ||
|
||
if !foundDealForSP { | ||
// If no deal was found for this file and SP, the local bin file | ||
// cannot be deleted yet - continue to the next one | ||
continue binIteration | ||
} | ||
} | ||
|
||
// If deals have been made for all SPs, the local bin file can be | ||
// deleted | ||
binsToDelete = append(binsToDelete, binFileName) | ||
} | ||
|
||
for _, binFileName := range binsToDelete { | ||
if err := os.Remove(path.Join(s.local.Dir(), binFileName)); err != nil { | ||
logger.Warnf("Failed to delete local bin file %s that was staged for removal: %v", binFileName, err) | ||
} | ||
} | ||
logger.Infof("Cleaned up %v unneeded local files", len(binsToDelete)) | ||
|
||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move checks earlier in the function to fail fast?