Skip to content

Commit

Permalink
Merge pull request #11046 from filecoin-project/fix/fsm-commfail-loop
Browse files Browse the repository at this point in the history
fix: sealing: Fix RetryCommitWait loop when sector cron activation fails
  • Loading branch information
magik6k authored Aug 7, 2023
2 parents 683432e + f620310 commit 8842466
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 8 deletions.
14 changes: 9 additions & 5 deletions itests/kit/deals.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,15 @@ func NewDealHarness(t *testing.T, client *TestFullNode, main *TestMiner, market
//
// TODO: convert input parameters to struct, and add size as an input param.
func (dh *DealHarness) MakeOnlineDeal(ctx context.Context, params MakeFullDealParams) (deal *cid.Cid, res *api.ImportRes, path string) {
deal, res, path = dh.StartRandomDeal(ctx, params)

fmt.Printf("WAIT DEAL SEALEDS START\n")
dh.WaitDealSealed(ctx, deal, false, false, nil)
fmt.Printf("WAIT DEAL SEALEDS END\n")
return deal, res, path
}

func (dh *DealHarness) StartRandomDeal(ctx context.Context, params MakeFullDealParams) (deal *cid.Cid, res *api.ImportRes, path string) {
if params.UseCARFileForStorageDeal {
res, _, path = dh.client.ClientImportCARFile(ctx, params.Rseed, 200)
} else {
Expand All @@ -107,11 +116,6 @@ func (dh *DealHarness) MakeOnlineDeal(ctx context.Context, params MakeFullDealPa
dp.FastRetrieval = params.FastRet
deal = dh.StartDeal(ctx, dp)

// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
time.Sleep(time.Second)
fmt.Printf("WAIT DEAL SEALEDS START\n")
dh.WaitDealSealed(ctx, deal, false, false, nil)
fmt.Printf("WAIT DEAL SEALEDS END\n")
return deal, res, path
}

Expand Down
1 change: 1 addition & 0 deletions itests/kit/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func QuietMiningLogs() {
_ = logging.SetLogLevel("pubsub", "ERROR")
_ = logging.SetLogLevel("gen", "ERROR")
_ = logging.SetLogLevel("rpc", "ERROR")
_ = logging.SetLogLevel("consensus-common", "ERROR")
_ = logging.SetLogLevel("dht/RtRefreshManager", "ERROR")
}

Expand Down
9 changes: 8 additions & 1 deletion itests/kit/node_opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func OwnerAddr(wk *key.Key) NodeOpt {
// the node.
func ConstructorOpts(extra ...node.Option) NodeOpt {
return func(opts *nodeOpts) error {
opts.extraNodeOpts = extra
opts.extraNodeOpts = append(opts.extraNodeOpts, extra...)
return nil
}
}
Expand Down Expand Up @@ -290,6 +290,13 @@ func SplitstoreMessges() NodeOpt {
})
}

func SplitstoreDisable() NodeOpt {
return WithCfgOpt(func(cfg *config.FullNode) error {
cfg.Chainstore.EnableSplitstore = false
return nil
})
}

func WithEthRPC() NodeOpt {
return WithCfgOpt(func(cfg *config.FullNode) error {
cfg.Fevm.EnableEthRPC = true
Expand Down
80 changes: 80 additions & 0 deletions itests/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,3 +730,83 @@ waitForProof:
require.NoError(t, params.UnmarshalCBOR(bytes.NewBuffer(slmsg.Params)))
require.Equal(t, abi.RegisteredPoStProof_StackedDrgWindow2KiBV1_1, params.Proofs[0].PoStProof)
}

func TestWorkerPledgeExpireCommit(t *testing.T) {
kit.QuietMiningLogs()
_ = logging.SetLogLevel("sectors", "debug")

var tasksNoC2 = kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTAddPiece, sealtasks.TTDataCid, sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit2,
sealtasks.TTUnseal, sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTFinalizeUnsealed})

fc := config.DefaultStorageMiner().Fees
fc.MaxCommitGasFee = types.FIL(abi.NewTokenAmount(10000)) // 10000 attofil, way too low for anything to land

ctx := context.Background()
client, miner, worker, ens := kit.EnsembleWorker(t, kit.WithAllSubsystems(), kit.ThroughRPC(), kit.WithNoLocalSealing(true),
kit.MutateSealingConfig(func(sc *config.SealingConfig) {
sc.BatchPreCommits = false
sc.AggregateCommits = true
}),
kit.ConstructorOpts(
node.Override(new(*sealing.Sealing), modules.SealingPipeline(fc)),
),
kit.SplitstoreDisable(), // disable splitstore because messages which take a long time may get dropped
tasksNoC2) // no mock proofs

ens.InterconnectAll().BeginMiningMustPost(2 * time.Millisecond)

e, err := worker.Enabled(ctx)
require.NoError(t, err)
require.True(t, e)

dh := kit.NewDealHarness(t, client, miner, miner)

startEpoch := abi.ChainEpoch(4 << 10)

dh.StartRandomDeal(ctx, kit.MakeFullDealParams{
Rseed: 7,
StartEpoch: startEpoch,
})

var sn abi.SectorNumber

require.Eventually(t, func() bool {
s, err := miner.SectorsListNonGenesis(ctx)
require.NoError(t, err)
if len(s) == 0 {
return false
}
if len(s) > 1 {
t.Fatalf("expected 1 sector, got %d", len(s))
}
sn = s[0]
return true
}, 30*time.Second, 1*time.Second)

t.Log("sector", sn)

t.Log("sector committing")

// wait until after startEpoch
client.WaitTillChain(ctx, kit.HeightAtLeast(startEpoch+20))

t.Log("after start")

sstate, err := miner.SectorsStatus(ctx, sn, false)
require.NoError(t, err)
require.Equal(t, api.SectorState(sealing.SubmitCommitAggregate), sstate.State)

_, err = miner.SectorCommitFlush(ctx)
require.NoError(t, err)

require.Eventually(t, func() bool {
sstate, err := miner.SectorsStatus(ctx, sn, false)
require.NoError(t, err)

t.Logf("sector state: %s", sstate.State)

return sstate.State == api.SectorState(sealing.Removed)
}, 30*time.Second, 1*time.Second)

t.Log("sector removed")
}
17 changes: 15 additions & 2 deletions storage/pipeline/states_failed.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,21 @@ func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo

switch mw.Receipt.ExitCode {
case exitcode.Ok:
// API error in CcommitWait
return ctx.Send(SectorRetryCommitWait{})
si, err := m.Api.StateSectorGetInfo(ctx.Context(), m.maddr, sector.SectorNumber, mw.TipSet)
if err != nil {
// API error
if err := failedCooldown(ctx, sector); err != nil {
return err
}

return ctx.Send(SectorRetryCommitWait{})
}
if si != nil {
// API error in CommitWait?
return ctx.Send(SectorRetryCommitWait{})
}
// if si == nil, something else went wrong; Likely expired deals, we'll
// find out in checkCommit
case exitcode.SysErrOutOfGas:
// API error in CommitWait AND gas estimator guessed a wrong number in SubmitCommit
return ctx.Send(SectorRetrySubmitCommit{})
Expand Down

0 comments on commit 8842466

Please sign in to comment.