Skip to content
This repository has been archived by the owner on May 15, 2024. It is now read-only.

feat: cleanup worker #126

Merged
merged 11 commits into from
Sep 20, 2023
Merged

feat: cleanup worker #126

merged 11 commits into from
Sep 20, 2023

Conversation

elijaharita
Copy link
Contributor

@elijaharita elijaharita commented Sep 15, 2023

branched off feat/singularity-retrieval, will rebase off main when it merges

i'm not really sure how to test this. any recommendations @hannahhoward?

closes #112

@elijaharita elijaharita marked this pull request as ready for review September 15, 2023 23:16
integration/singularity/store.go Outdated Show resolved Hide resolved
@@ -39,6 +40,7 @@ type (
totalDealSize string
maxPendingDealSize string
maxPendingDealNumber int
localCleanupInterval time.Duration
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only cleanup is local right? Remove local prefix in variable names?

}

func (r *SingularityReader) Read(p []byte) (int, error) {
logger.Infof("buffer size: %v", len(p))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Change to debug level as this can be noisy?
  • Use w variation instead of format for nicer JSON log formatting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops, this was supposed to be removed. i will remove it.

logger.Infof("buffer size: %v", len(p))

buf := bytes.NewBuffer(p)
buf.Reset()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious why buffer is being reset? it's a fresh buffer right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the buffer is constructed from the byte array passed into the function, but creating a new buffer like that starts the cursor at the end of the byte array, so buf.Reset() puts the cursor back at the start so the array will be overwritten instead of extended from the end

buf := bytes.NewBuffer(p)
buf.Reset()

if r.offset >= r.size {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move checks earlier in the function to fail fast?

}
}()
case <-s.closing:
break cleanupLoop
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
break cleanupLoop
return

}()

ticker := time.NewTicker(s.localCleanupInterval)
cleanupLoop:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
cleanupLoop:


func (s *SingularityStore) cleanup(ctx context.Context) error {
if s.cleanupActive.Load() {
return errCleanupAlreadyRunning
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning error here means if a cycle is running, the next one that may get started will be skipped. In the worst case, it means it can take up to 2X the configured interval for cleanup to run.

We should document this behaviour on the options, or alternatively use a mutex instead that locks and unlocks the cycle, such that the next cycle starts immediately. i.e. average cleanup interval will converge to the configured value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, this made me realize that a mutex is not even necessary, since the worker is already inside a goroutine


logger.Infof("Starting local store cleanup...")

dir, err := os.ReadDir(s.local.Dir())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious if using filepath.Walk would make the search logic simpler?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's only scanning one layer of a directory, so i don't think so? unless i misunderstood how the local store is laid out

wantBlob := []byte("fish")
buf := bytes.NewBuffer(wantBlob)
buf := new(bytes.Buffer)
for i := 0; i < 10000000; i++ {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use rand.Read to generate random bytes of desired length?

Copy link
Contributor

@hannahhoward hannahhoward left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the SP deal iteration loop needs tweaking

docker-compose-local-dev.yml Outdated Show resolved Hide resolved
integration/singularity/store.go Outdated Show resolved Hide resolved
@hannahhoward
Copy link
Contributor

merging

@hannahhoward hannahhoward merged commit 20d6046 into main Sep 20, 2023
7 checks passed
@hannahhoward hannahhoward deleted the feat/cleanup-process branch September 20, 2023 23:07
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Local data deletion once data is stored on Filecoin
3 participants