Skip to content

Commit

Permalink
feat: update lassie to sync Retriever
Browse files Browse the repository at this point in the history
* Retriever#Retrieve() calls are now synchronous, so we get to wait for the
  direct return value and error synchronously
* Change the AwaitGet call order and make it cancellable
* Make the provider context-cancel aware for cleaner shutdown
* Other minor fixes and adaptions to the new Lassie code
  • Loading branch information
rvagg committed Jan 25, 2023
1 parent 0eeef5d commit f7ec395
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 176 deletions.
24 changes: 11 additions & 13 deletions autoretrieve.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ import (
"github.com/application-research/autoretrieve/paychannelmanager"
lassieclient "github.com/filecoin-project/lassie/pkg/client"
lassieeventrecorder "github.com/filecoin-project/lassie/pkg/eventrecorder"
"github.com/filecoin-project/lassie/pkg/indexerlookup"
lassieretriever "github.com/filecoin-project/lassie/pkg/retriever"
rpcstmgr "github.com/filecoin-project/lotus/chain/stmgr/rpc"
"github.com/filecoin-project/lotus/chain/wallet"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/paychmgr"
"github.com/ipfs/go-cid"
ipfsdatastore "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
flatfs "github.com/ipfs/go-ds-flatfs"
Expand Down Expand Up @@ -154,27 +154,23 @@ func New(cctx *cli.Context, dataDir string, cfg Config) (*Autoretrieve, error) {
// Initialize Filecoin retriever
var retriever *lassieretriever.Retriever
if !cfg.DisableRetrieval {
var ep lassieretriever.Endpoint
var candidateFinder lassieretriever.CandidateFinder
switch cfg.LookupEndpointType {
case EndpointTypeEstuary:
logger.Infof("Using Estuary endpoint type")
ep = endpoint.NewEstuaryEndpoint(cfg.LookupEndpointURL, minerPeerGetter)
logger.Infof("Using Estuary candidate finder type")
candidateFinder = endpoint.NewEstuaryEndpoint(cfg.LookupEndpointURL, minerPeerGetter)
case EndpointTypeIndexer:
logger.Infof("Using indexer endpoint type")
ep = endpoint.NewIndexerEndpoint(cfg.LookupEndpointURL)
logger.Infof("Using indexer candidate finder type")
candidateFinder = indexerlookup.NewCandidateFinder(cfg.LookupEndpointURL)
default:
return nil, errors.New("unrecognized endpoint type")
return nil, errors.New("unrecognized candidate finder type")
}

retrieverCfg, err := cfg.ExtractFilecoinRetrieverConfig(cctx.Context, minerPeerGetter)
if err != nil {
return nil, err
}

confirmer := func(c cid.Cid) (bool, error) {
return blockManager.Has(cctx.Context, c)
}

// Instantiate client
retrievalClient, err := lassieclient.NewClient(
blockstore,
Expand All @@ -190,17 +186,19 @@ func New(cctx *cli.Context, dataDir string, cfg Config) (*Autoretrieve, error) {
return nil, err
}

retriever, err = lassieretriever.NewRetriever(cctx.Context, retrieverCfg, retrievalClient, ep, confirmer)
retriever, err = lassieretriever.NewRetriever(cctx.Context, retrieverCfg, retrievalClient, candidateFinder)
if err != nil {
return nil, err
}
<-retriever.Start()
if cfg.EventRecorderEndpointURL != "" {
logger.Infof("Reporting retrieval events to %v", cfg.EventRecorderEndpointURL)
eventRecorderEndpointAuthorization, err := loadEventRecorderAuth(dataDirPath(cctx))
if err != nil {
return nil, err
}
retriever.RegisterListener(lassieeventrecorder.NewEventRecorder(cctx.Context, cfg.InstanceId, cfg.EventRecorderEndpointURL, eventRecorderEndpointAuthorization))
eventRecorder := lassieeventrecorder.NewEventRecorder(cctx.Context, cfg.InstanceId, cfg.EventRecorderEndpointURL, eventRecorderEndpointAuthorization)
retriever.RegisterSubscriber(eventRecorder.RecordEvent)
}
}

Expand Down
86 changes: 48 additions & 38 deletions bitswap/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (

"github.com/application-research/autoretrieve/blocks"
"github.com/application-research/autoretrieve/metrics"
"github.com/dustin/go-humanize"
lassieretriever "github.com/filecoin-project/lassie/pkg/retriever"
"github.com/filecoin-project/lassie/pkg/types"
"github.com/ipfs/go-bitswap/message"
bitswap_message_pb "github.com/ipfs/go-bitswap/message/pb"
"github.com/ipfs/go-bitswap/network"
Expand Down Expand Up @@ -133,15 +135,15 @@ func NewProvider(
provider.network.Start(provider)

for i := 0; i < int(config.RequestWorkers); i++ {
go provider.handleRequests()
go provider.handleRequests(ctx)
}

for i := 0; i < int(config.ResponseWorkers); i++ {
go provider.handleResponses()
go provider.handleResponses(ctx)
}

for i := 0; i < int(config.RetrievalWorkers); i++ {
go provider.handleRetrievals()
go provider.handleRetrievals(ctx)
}

return provider, nil
Expand All @@ -160,10 +162,8 @@ func (provider *Provider) ReceiveMessage(ctx context.Context, sender peer.ID, in
provider.requestQueue.PushTasks(sender, tasks...)
}

func (provider *Provider) handleRequests() {
ctx := context.Background()

for {
func (provider *Provider) handleRequests(ctx context.Context) {
for ctx.Err() == nil {
peerID, tasks, _ := provider.requestQueue.PopTasks(100)
if len(tasks) == 0 {
time.Sleep(time.Millisecond * 250)
Expand Down Expand Up @@ -256,10 +256,8 @@ func (provider *Provider) handleRequest(
return nil
}

func (provider *Provider) handleResponses() {
ctx := context.Background()

for {
func (provider *Provider) handleResponses(ctx context.Context) {
for ctx.Err() == nil {
peerID, tasks, _ := provider.responseQueue.PopTasks(targetMessageSize)
if len(tasks) == 0 {
time.Sleep(time.Millisecond * 250)
Expand Down Expand Up @@ -325,10 +323,8 @@ func (provider *Provider) handleResponses() {
}
}

func (provider *Provider) handleRetrievals() {
ctx := context.Background()

for {
func (provider *Provider) handleRetrievals(ctx context.Context) {
for ctx.Err() == nil {
peerID, tasks, _ := provider.retrievalQueue.PopTasks(1)
if len(tasks) == 0 {
time.Sleep(time.Millisecond * 250)
Expand All @@ -344,38 +340,52 @@ func (provider *Provider) handleRetrievals() {
continue
}

log.Debugf("Requesting retrieval for %s", cid)

// Try to start a new retrieval (if it's already running then no
// need to error, just continue on to await block)
if err := provider.retriever.Request(cid); err != nil {
if !errors.As(err, &lassieretriever.ErrRetrievalAlreadyRunning{}) {
if errors.Is(err, lassieretriever.ErrNoCandidates) {
// Just do a debug print if there were no candidates because this happens a lot
log.Debugf("No candidates for %s", cid)
} else {
// Otherwise, there was a real failure, print with more importance
log.Errorf("Request for %s failed: %v", cid, err)
}
} else {
log.Debugf("Retrieval already running for %s, no new one will be started", cid)
}
} else {
log.Infof("Started retrieval for %s", cid)
retrievalId, err := types.NewRetrievalID()
if err != nil {
log.Errorf("Failed to create retrieval ID: %s", err.Error())
}

// TODO: if retriever.Request() is changed to be blocking, make
// blockManager.AwaitBlock() cancellable and cancel it after the
// request finishes if there's an error
provider.blockManager.AwaitBlock(ctx, cid, func(block blocks.Block, err error) {
log.Debugf("Starting retrieval for %s (%s)", cid, retrievalId)

// Start a background blockstore fetch with a callback to send the block
// to the peer once it's available.
blockCtx, blockCancel := context.WithCancel(ctx)
if provider.blockManager.AwaitBlock(blockCtx, cid, func(block blocks.Block, err error) {
if err != nil {
log.Debugf("Async block load failed: %s", err)
provider.queueSendDontHave(peerID, task.Priority, block.Cid, "failed_block_load")
} else {
log.Debugf("Async block load completed: %s", block.Cid)
provider.queueSendBlock(peerID, task.Priority, block.Cid, block.Size)
}
})
blockCancel()
}) {
// If the block was already in the blockstore then we don't need to
// start a retrieval.
blockCancel()
continue
}

// Try to start a new retrieval (if it's already running then no
// need to error, just continue on to await block)
result, err := provider.retriever.Retrieve(ctx, retrievalId, cid)
if err != nil {
if errors.Is(err, lassieretriever.ErrRetrievalAlreadyRunning) {
log.Debugf("Retrieval already running for %s, no new one will be started", cid)
continue // Don't send dont_have or run blockCancel(), let it async load
} else if errors.Is(err, lassieretriever.ErrNoCandidates) {
// Just do a debug print if there were no candidates because this happens a lot
log.Debugf("No candidates for %s (%s)", cid, retrievalId)
} else {
// Otherwise, there was a real failure, print with more importance
log.Errorf("Retrieval for %s (%s) failed: %v", cid, retrievalId, err)
}
provider.queueSendDontHave(peerID, task.Priority, cid, "not_available")
} else {
log.Infof("Retrieval for %s (%s) completed (duration: %s, bytes: %s, blocks: %d)", cid, retrievalId, result.Duration, humanize.IBytes(result.Size), result.Blocks)
}

blockCancel()
}

provider.retrievalQueue.TasksDone(peerID, tasks...)
Expand Down
36 changes: 25 additions & 11 deletions blocks/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Manager struct {
}

type waitListEntry struct {
ctx context.Context
callback func(Block, error)
registeredAt time.Time
}
Expand All @@ -55,7 +56,13 @@ func NewManager(inner blockstore.Blockstore, getAwaitTimeout time.Duration) *Man
return mgr
}

func (mgr *Manager) AwaitBlock(ctx context.Context, cid cid.Cid, callback func(Block, error)) {
// AwaitBlock will wait for a block to be added to the blockstore and then
// call the callback with the block. If the block is already in the blockstore,
// the callback will be called immediately. If the block is not in the blockstore
// or the context is cancelled, the callback will not be called.
// Returns true if the block was already in the blockstore, allowing the
// callback to be called, or false otherwise.
func (mgr *Manager) AwaitBlock(ctx context.Context, cid cid.Cid, callback func(Block, error)) bool {
// We need to lock the blockstore here to make sure the requested block
// doesn't get added while being added to the waitlist
mgr.waitListLk.Lock()
Expand All @@ -68,22 +75,25 @@ func (mgr *Manager) AwaitBlock(ctx context.Context, cid cid.Cid, callback func(B
if !ipld.IsNotFound(err) {
mgr.waitListLk.Unlock()
callback(Block{}, err)
return
return false
}

mgr.waitList[cid] = append(mgr.waitList[cid], waitListEntry{
ctx: ctx,
callback: callback,
registeredAt: time.Now(),
})

mgr.waitListLk.Unlock()
return
return false
}

mgr.waitListLk.Unlock()

// Otherwise, we can immediately run the callback
// Otherwise, we can immediately run the callback and notify the caller of
// success
callback(Block{cid, size}, nil)
return true
}

func (mgr *Manager) Put(ctx context.Context, block blocks.Block) error {
Expand Down Expand Up @@ -138,7 +148,9 @@ func (mgr *Manager) notifyWaitCallbacks(block Block) {
mgr.waitListLk.Unlock()
if ok {
for _, entry := range entries {
entry.callback(block, nil)
if entry.ctx.Err() == nil { // not cancelled
entry.callback(block, nil)
}
}
}
}
Expand All @@ -149,20 +161,22 @@ func (mgr *Manager) startPollCleanup() {
for cid := range mgr.waitList {
// For each element in the slice for this CID...
for i := 0; i < len(mgr.waitList[cid]); i++ {
// ...check if it's timed out...
if time.Since(mgr.waitList[cid][i].registeredAt) > mgr.getAwaitTimeout {
// ...check whether the waiter context was cancelled or it's been in the
// list too long...
if mgr.waitList[cid][i].ctx.Err() != nil || time.Since(mgr.waitList[cid][i].registeredAt) > mgr.getAwaitTimeout {
// ...and if so, delete this element by replacing it with
// the last element of the slice and shrinking the length by
// 1, and step the index back
mgr.waitList[cid][i].callback(Block{}, ErrWaitTimeout)
// 1, and step the index back.
if mgr.waitList[cid][i].ctx.Err() == nil {
mgr.waitList[cid][i].callback(Block{}, ErrWaitTimeout)
}
mgr.waitList[cid][i] = mgr.waitList[cid][len(mgr.waitList[cid])-1]
mgr.waitList[cid] = mgr.waitList[cid][:len(mgr.waitList[cid])-1]
i--
}
}

// If the slice is empty now, remove it entirely from the waitList
// map
// If the slice is empty now, remove it entirely from the waitList map
if len(mgr.waitList[cid]) == 0 {
delete(mgr.waitList, cid)
}
Expand Down
8 changes: 4 additions & 4 deletions endpoint/estuary.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

"github.com/application-research/autoretrieve/minerpeergetter"
"github.com/filecoin-project/go-address"
lassieretriever "github.com/filecoin-project/lassie/pkg/retriever"
"github.com/filecoin-project/lassie/pkg/types"
"github.com/ipfs/go-cid"
)

Expand Down Expand Up @@ -39,7 +39,7 @@ func NewEstuaryEndpoint(url string, mpg *minerpeergetter.MinerPeerGetter) *Estua
}
}

func (ee *EstuaryEndpoint) FindCandidates(ctx context.Context, cid cid.Cid) ([]lassieretriever.RetrievalCandidate, error) {
func (ee *EstuaryEndpoint) FindCandidates(ctx context.Context, cid cid.Cid) ([]types.RetrievalCandidate, error) {
// Create URL with CID
endpointURL, err := url.Parse(ee.url)
if err != nil {
Expand All @@ -63,13 +63,13 @@ func (ee *EstuaryEndpoint) FindCandidates(ctx context.Context, cid cid.Cid) ([]l
return nil, ErrEndpointBodyInvalid
}

converted := make([]lassieretriever.RetrievalCandidate, 0, len(unfiltered))
converted := make([]types.RetrievalCandidate, 0, len(unfiltered))
for _, original := range unfiltered {
minerPeer, err := ee.mpg.MinerPeer(ctx, original.Miner)
if err != nil {
return nil, fmt.Errorf("%w: failed to get miner peer: %v", ErrEndpointRequestFailed, err)
}
converted = append(converted, lassieretriever.RetrievalCandidate{
converted = append(converted, types.RetrievalCandidate{
MinerPeer: minerPeer,
RootCid: original.RootCid,
})
Expand Down
Loading

0 comments on commit f7ec395

Please sign in to comment.