-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: update lassie to sync Retriever (before provider rewrite revert) #167
base: master
Are you sure you want to change the base?
Conversation
d6c4231
to
8814195
Compare
8814195
to
f7ec395
Compare
Codecov ReportBase: 4.87% // Head: 5.24% // Increases project coverage by
Additional details and impacted files@@ Coverage Diff @@
## master #167 +/- ##
=========================================
+ Coverage 4.87% 5.24% +0.36%
=========================================
Files 15 14 -1
Lines 1723 1697 -26
=========================================
+ Hits 84 89 +5
+ Misses 1634 1603 -31
Partials 5 5
Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. ☔ View full report at Codecov. |
0d024b6
to
e3a5629
Compare
* Retriever#Retrieve() calls are now synchronous, so we get to wait for the direct return value and error synchronously * Change the AwaitGet call order and make it cancellable * Make the provider context-cancel aware for cleaner shutdown * Other minor fixes and adaptions to the new Lassie code
e3a5629
to
b6db210
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are just some possibilities. I don't know whether they fix underlying issues though
blockManager: blockManager, | ||
retriever: retriever, | ||
requestQueue: peertaskqueue.New(peertaskqueue.TaskMerger(&overwriteTaskMerger{}), peertaskqueue.IgnoreFreezing(true)), | ||
requestQueueSignalChan: make(chan struct{}, 10), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so here's my recommendation for these signals:
- make them buffer 1
- when writing, call:
select {
case provider.requestQueueSignalChan <- struct{}{}:
default:
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, nice, so if it blocks then bail, I didn't think of that!
select { | ||
case <-ctx.Done(): | ||
case <-time.After(time.Millisecond * 250): | ||
case <-provider.responseQueueSignalChan: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when len(tasks) != 0, you had better still optionally drain the signal chan, i.e
if len(tasks == 0) {
///...
continue
}
select {
case <-provider.responseQueueSignalChan:
default:
}
///...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wouldn't be surprised if you're getting stuck cause of this.
@@ -325,13 +339,15 @@ func (provider *Provider) handleResponses() { | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we calling TasksDone twice when an error occurs sending a message?
case <-ctx.Done(): | ||
case <-time.After(time.Millisecond * 250): | ||
case <-provider.retrievalQueueSignalChan: | ||
} | ||
continue | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same, better drain the signal queue
|
||
for { | ||
func (provider *Provider) handleRetrievals(ctx context.Context) { | ||
for ctx.Err() == nil { | ||
peerID, tasks, _ := provider.retrievalQueue.PopTasks(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now that retreival is synchronous, this appears to limit things to one retrieval per worker queue, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, default is 8 workers, but I hadn't seen a reason to increase this (yet) because it appears the pipe of incoming requests is so small; but maybe I'm not seeing it right
DRAFT for now because it's using filecoin-project/lassie#41
@elijaharita I'd like your eyes on this for the synchronous call in the provider,
provider.retriever.Retrieve()
now blocks, holding up one of the goroutines while a retrieval attempt happens. Most of them still fail from indexer lookup fails but the ones that attempt may cause a backlog.Is there any mechanism in here to prevent an excessive backlog of tasks on the
retrievalQueue
that we can't process fast enough to keep up with incoming? I don't see it but I might be missing something. I'm also not experiencing a huge queue while running this locally but that might change when on a faster open network.