Skip to content

Commit

Permalink
Merge pull request #3358 from iotaledger/reject-grace-delay
Browse files Browse the repository at this point in the history
Postpone recovery from rejections for several L1 milestones.
  • Loading branch information
jorgemmsilva authored Apr 23, 2024
2 parents 1efdea9 + 9edbfd2 commit 13579da
Show file tree
Hide file tree
Showing 13 changed files with 169 additions and 68 deletions.
1 change: 1 addition & 0 deletions components/chains/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func provide(c *dig.Container) error {
ParamsValidator.Address,
ParamsChains.DeriveAliasOutputByQuorum,
ParamsChains.PipeliningLimit,
ParamsChains.PostponeRecoveryMilestones,
ParamsChains.ConsensusDelay,
ParamsChains.RecoveryTimeout,
deps.NetworkProvider,
Expand Down
1 change: 1 addition & 0 deletions components/chains/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type ParametersChains struct {
PullMissingRequestsFromCommittee bool `default:"true" usage:"whether or not to pull missing requests from other committee members"`
DeriveAliasOutputByQuorum bool `default:"true" usage:"false means we propose own AliasOutput, true - by majority vote."`
PipeliningLimit int `default:"-1" usage:"-1 -- infinite, 0 -- disabled, X -- build the chain if there is up to X transactions unconfirmed by L1."`
PostponeRecoveryMilestones int `default:"3" usage:"number of milestones to wait until a chain transition is considered as rejected"`
ConsensusDelay time.Duration `default:"500ms" usage:"Minimal delay between consensus runs."`
RecoveryTimeout time.Duration `default:"20s" usage:"Time after which another consensus attempt is made."`
RedeliveryPeriod time.Duration `default:"2s" usage:"the resend period for msg."`
Expand Down
102 changes: 61 additions & 41 deletions packages/chain/chainmanager/chain_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,28 +160,29 @@ type cmtLogInst struct {
}

type chainMgrImpl struct {
chainID isc.ChainID // This instance is responsible for this chain.
chainStore state.Store // Store of the chain state.
cmtLogs map[iotago.Ed25519Address]*cmtLogInst // All the committee log instances for this chain.
consensusStateRegistry cmt_log.ConsensusStateRegistry // Persistent store for log indexes.
latestActiveCmt *iotago.Ed25519Address // The latest active committee.
latestConfirmedAO *isc.AliasOutputWithID // The latest confirmed AO (follows Active AO).
activeNodesCB func() ([]*cryptolib.PublicKey, []*cryptolib.PublicKey) // All the nodes authorized for being access nodes (for the ActiveAO).
trackActiveStateCB func(ao *isc.AliasOutputWithID) // We will call this to set new AO for the active state.
savePreliminaryBlockCB func(block state.Block) // We will call this, when a preliminary block matching the tx signatures is received.
committeeUpdatedCB func(dkShare tcrypto.DKShare) // Will be called, when a committee changes.
needConsensus *NeedConsensus // Query for a consensus.
needPublishTX *shrinkingmap.ShrinkingMap[iotago.TransactionID, *NeedPublishTX] // Query to post TXes.
dkShareRegistryProvider registry.DKShareRegistryProvider // Source for DKShares.
varAccessNodeState VarAccessNodeState
output *Output
asGPA gpa.GPA
me gpa.NodeID
nodeIDFromPubKey func(pubKey *cryptolib.PublicKey) gpa.NodeID
deriveAOByQuorum bool // Config parameter.
pipeliningLimit int // Config parameter.
metrics *metrics.ChainCmtLogMetrics
log *logger.Logger
chainID isc.ChainID // This instance is responsible for this chain.
chainStore state.Store // Store of the chain state.
cmtLogs map[iotago.Ed25519Address]*cmtLogInst // All the committee log instances for this chain.
consensusStateRegistry cmt_log.ConsensusStateRegistry // Persistent store for log indexes.
latestActiveCmt *iotago.Ed25519Address // The latest active committee.
latestConfirmedAO *isc.AliasOutputWithID // The latest confirmed AO (follows Active AO).
activeNodesCB func() ([]*cryptolib.PublicKey, []*cryptolib.PublicKey) // All the nodes authorized for being access nodes (for the ActiveAO).
trackActiveStateCB func(ao *isc.AliasOutputWithID) // We will call this to set new AO for the active state.
savePreliminaryBlockCB func(block state.Block) // We will call this, when a preliminary block matching the tx signatures is received.
committeeUpdatedCB func(dkShare tcrypto.DKShare) // Will be called, when a committee changes.
needConsensus *NeedConsensus // Query for a consensus.
needPublishTX *shrinkingmap.ShrinkingMap[iotago.TransactionID, *NeedPublishTX] // Query to post TXes.
dkShareRegistryProvider registry.DKShareRegistryProvider // Source for DKShares.
varAccessNodeState VarAccessNodeState
output *Output
asGPA gpa.GPA
me gpa.NodeID
nodeIDFromPubKey func(pubKey *cryptolib.PublicKey) gpa.NodeID
deriveAOByQuorum bool // Config parameter.
pipeliningLimit int // Config parameter.
postponeRecoveryMilestones int // Config parameter.
metrics *metrics.ChainCmtLogMetrics
log *logger.Logger
}

var (
Expand All @@ -202,28 +203,30 @@ func New(
committeeUpdatedCB func(dkShare tcrypto.DKShare),
deriveAOByQuorum bool,
pipeliningLimit int,
postponeRecoveryMilestones int,
metrics *metrics.ChainCmtLogMetrics,
log *logger.Logger,
) (ChainMgr, error) {
cmi := &chainMgrImpl{
chainID: chainID,
chainStore: chainStore,
cmtLogs: map[iotago.Ed25519Address]*cmtLogInst{},
consensusStateRegistry: consensusStateRegistry,
activeNodesCB: activeNodesCB,
trackActiveStateCB: trackActiveStateCB,
savePreliminaryBlockCB: savePreliminaryBlockCB,
committeeUpdatedCB: committeeUpdatedCB,
needConsensus: nil,
needPublishTX: shrinkingmap.New[iotago.TransactionID, *NeedPublishTX](),
dkShareRegistryProvider: dkShareRegistryProvider,
varAccessNodeState: NewVarAccessNodeState(chainID, log.Named("VAS")),
me: me,
nodeIDFromPubKey: nodeIDFromPubKey,
deriveAOByQuorum: deriveAOByQuorum,
pipeliningLimit: pipeliningLimit,
metrics: metrics,
log: log,
chainID: chainID,
chainStore: chainStore,
cmtLogs: map[iotago.Ed25519Address]*cmtLogInst{},
consensusStateRegistry: consensusStateRegistry,
activeNodesCB: activeNodesCB,
trackActiveStateCB: trackActiveStateCB,
savePreliminaryBlockCB: savePreliminaryBlockCB,
committeeUpdatedCB: committeeUpdatedCB,
needConsensus: nil,
needPublishTX: shrinkingmap.New[iotago.TransactionID, *NeedPublishTX](),
dkShareRegistryProvider: dkShareRegistryProvider,
varAccessNodeState: NewVarAccessNodeState(chainID, log.Named("VAS")),
me: me,
nodeIDFromPubKey: nodeIDFromPubKey,
deriveAOByQuorum: deriveAOByQuorum,
pipeliningLimit: pipeliningLimit,
metrics: metrics,
postponeRecoveryMilestones: postponeRecoveryMilestones,
log: log,
}
cmi.output = &Output{cmi: cmi}
cmi.asGPA = gpa.NewOwnHandler(me, cmi)
Expand All @@ -248,6 +251,8 @@ func (cmi *chainMgrImpl) Input(input gpa.Input) gpa.OutMessages {
return cmi.handleInputConsensusOutputSkip(input)
case *inputConsensusTimeout:
return cmi.handleInputConsensusTimeout(input)
case *inputMilestoneReceived:
return cmi.handleInputMilestoneReceived()
case *inputCanPropose:
return cmi.handleInputCanPropose()
}
Expand Down Expand Up @@ -400,6 +405,13 @@ func (cmi *chainMgrImpl) handleInputConsensusTimeout(input *inputConsensusTimeou
})
}

func (cmi *chainMgrImpl) handleInputMilestoneReceived() gpa.OutMessages {
cmi.log.Debugf("handleInputMilestoneReceived")
return cmi.withAllCmtLogs(func(cl gpa.GPA) gpa.OutMessages {
return cl.Input(cmt_log.NewInputMilestoneReceived())
})
}

func (cmi *chainMgrImpl) handleInputCanPropose() gpa.OutMessages {
cmi.log.Debugf("handleInputCanPropose")
return cmi.withAllCmtLogs(func(cl gpa.GPA) gpa.OutMessages {
Expand Down Expand Up @@ -579,7 +591,15 @@ func (cmi *chainMgrImpl) ensureCmtLog(committeeAddr iotago.Ed25519Address) (*cmt
}

clInst, err := cmt_log.New(
cmi.me, cmi.chainID, dkShare, cmi.consensusStateRegistry, cmi.nodeIDFromPubKey, cmi.deriveAOByQuorum, cmi.pipeliningLimit, cmi.metrics,
cmi.me,
cmi.chainID,
dkShare,
cmi.consensusStateRegistry,
cmi.nodeIDFromPubKey,
cmi.deriveAOByQuorum,
cmi.pipeliningLimit,
cmi.postponeRecoveryMilestones,
cmi.metrics,
cmi.log.Named(fmt.Sprintf("CL-%v", dkShare.GetSharedPublic().AsEd25519Address().String()[:10])),
)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion packages/chain/chainmanager/chain_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func testChainMgrBasic(t *testing.T, n, f int) {
}
cm, err := chainmanager.New(
nid, chainID, stores[nid], consensusStateRegistry, dkRegs[i], gpa.NodeIDFromPublicKey,
activeAccessNodesCB, trackActiveStateCB, savePreliminaryBlockCB, updateCommitteeNodesCB, true, -1, nil,
activeAccessNodesCB, trackActiveStateCB, savePreliminaryBlockCB, updateCommitteeNodesCB, true, -1, 1, nil,
log.Named(nid.ShortString()),
)
require.NoError(t, err)
Expand Down
15 changes: 15 additions & 0 deletions packages/chain/chainmanager/input_milestone_received.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package chainmanager

import "github.com/iotaledger/wasp/packages/gpa"

// This event is introduced to avoid too-fast recovery from the
// L1 rejections, because L1 sometimes report them prematurely.
type inputMilestoneReceived struct{}

func NewInputMilestoneReceived() gpa.Input {
return &inputMilestoneReceived{}
}

func (inp *inputMilestoneReceived) String() string {
return "{chainMgr.inputMilestoneReceived}"
}
13 changes: 12 additions & 1 deletion packages/chain/cmt_log/cmt_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ func New(
nodeIDFromPubKey func(pubKey *cryptolib.PublicKey) gpa.NodeID,
deriveAOByQuorum bool,
pipeliningLimit int,
postponeRecoveryMilestones int,
cclMetrics *metrics.ChainCmtLogMetrics,
log *logger.Logger,
) (CmtLog, error) {
Expand Down Expand Up @@ -228,7 +229,9 @@ func New(
// Nothing to do, if we cannot persist this.
panic(fmt.Errorf("cannot persist the cmtLog state: %w", err))
}
}, log.Named("VO"))
},
postponeRecoveryMilestones,
log.Named("VO"))
cl.varLogIndex = NewVarLogIndex(nodeIDs, n, f, prevLI, cl.varOutput.LogIndexAgreed, cclMetrics, log.Named("VLI"))
cl.varLocalView = NewVarLocalView(pipeliningLimit, cl.varOutput.TipAOChanged, log.Named("VLV"))
cl.asGPA = gpa.NewOwnHandler(me, cl)
Expand Down Expand Up @@ -256,6 +259,9 @@ func (cl *cmtLogImpl) Input(input gpa.Input) gpa.OutMessages {
return cl.handleInputConsensusOutputRejected(input)
case *inputConsensusTimeout:
return cl.handleInputConsensusTimeout(input)
case *inputMilestoneReceived:
cl.handleInputMilestoneReceived()
return nil
case *inputCanPropose:
cl.handleInputCanPropose()
return nil
Expand Down Expand Up @@ -297,6 +303,7 @@ func (cl *cmtLogImpl) handleInputConsensusOutputConfirmed(input *inputConsensusO

// > ...
func (cl *cmtLogImpl) handleInputConsensusOutputRejected(input *inputConsensusOutputRejected) gpa.OutMessages {
cl.varOutput.HaveRejection()
msgs := gpa.NoMessages()
msgs.AddAll(cl.varLogIndex.ConsensusOutputReceived(input.logIndex)) // This should be superfluous, always follows handleInputConsensusOutputDone.
if _, tipUpdated := cl.varLocalView.AliasOutputRejected(input.aliasOutput); tipUpdated {
Expand Down Expand Up @@ -324,6 +331,10 @@ func (cl *cmtLogImpl) handleInputConsensusTimeout(input *inputConsensusTimeout)
return cl.varLogIndex.ConsensusRecoverReceived(input.logIndex)
}

func (cl *cmtLogImpl) handleInputMilestoneReceived() {
cl.varOutput.HaveMilestone()
}

func (cl *cmtLogImpl) handleInputCanPropose() {
cl.varOutput.CanPropose()
}
Expand Down
2 changes: 1 addition & 1 deletion packages/chain/cmt_log/cmt_log_rapid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func newCmtLogTestRapidSM(t *rapid.T) *cmtLogTestRapidSM {
dkShare, err := committeeKeyShares[i].LoadDKShare(committeeAddress)
require.NoError(t, err)
consensusStateRegistry := testutil.NewConsensusStateRegistry() // Empty store in this case.
cmtLogInst, err := cmt_log.New(gpaNodeIDs[i], sm.chainID, dkShare, consensusStateRegistry, gpa.NodeIDFromPublicKey, true, -1, nil, log.Named(fmt.Sprintf("N%v", i)))
cmtLogInst, err := cmt_log.New(gpaNodeIDs[i], sm.chainID, dkShare, consensusStateRegistry, gpa.NodeIDFromPublicKey, true, -1, 1, nil, log.Named(fmt.Sprintf("N%v", i)))
require.NoError(t, err)
gpaNodes[gpaNodeIDs[i]] = cmtLogInst.AsGPA()
}
Expand Down
2 changes: 1 addition & 1 deletion packages/chain/cmt_log/cmt_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func testCmtLogBasic(t *testing.T, n, f int) {
dkShare, err := committeeKeyShares[i].LoadDKShare(committeeAddress)
require.NoError(t, err)
consensusStateRegistry := testutil.NewConsensusStateRegistry() // Empty store in this case.
cmtLogInst, err := cmt_log.New(gpaNodeIDs[i], chainID, dkShare, consensusStateRegistry, gpa.NodeIDFromPublicKey, true, -1, nil, log.Named(fmt.Sprintf("N%v", i)))
cmtLogInst, err := cmt_log.New(gpaNodeIDs[i], chainID, dkShare, consensusStateRegistry, gpa.NodeIDFromPublicKey, true, -1, 1, nil, log.Named(fmt.Sprintf("N%v", i)))
require.NoError(t, err)
gpaNodes[gpaNodeIDs[i]] = cmtLogInst.AsGPA()
}
Expand Down
15 changes: 15 additions & 0 deletions packages/chain/cmt_log/input_milestone_received.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package cmt_log

import "github.com/iotaledger/wasp/packages/gpa"

// This event is introduced to avoid too-fast recovery from the
// L1 rejections, because L1 sometimes report them prematurely.
type inputMilestoneReceived struct{}

func NewInputMilestoneReceived() gpa.Input {
return &inputMilestoneReceived{}
}

func (inp *inputMilestoneReceived) String() string {
return "{cmtLog.inputMilestoneReceived}"
}
61 changes: 46 additions & 15 deletions packages/chain/cmt_log/var_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,35 @@ type VarOutput interface {
Value() *Output
LogIndexAgreed(li LogIndex)
TipAOChanged(ao *isc.AliasOutputWithID)
HaveRejection()
HaveMilestone()
CanPropose()
Suspended(suspended bool)
}

type varOutputImpl struct {
candidateLI LogIndex
candidateAO *isc.AliasOutputWithID
canPropose bool
suspended bool
outValue *Output
persistUsed func(li LogIndex)
log *logger.Logger
candidateLI LogIndex
candidateAO *isc.AliasOutputWithID
canPropose bool
milestonesToWait int
suspended bool
outValue *Output
persistUsed func(li LogIndex)
postponeRecoveryMilestones int
log *logger.Logger
}

func NewVarOutput(persistUsed func(li LogIndex), log *logger.Logger) VarOutput {
func NewVarOutput(persistUsed func(li LogIndex), postponeRecoveryMilestones int, log *logger.Logger) VarOutput {
return &varOutputImpl{
candidateLI: NilLogIndex(),
candidateAO: nil,
canPropose: true,
suspended: false,
outValue: nil,
persistUsed: persistUsed,
log: log,
candidateLI: NilLogIndex(),
candidateAO: nil,
canPropose: true,
milestonesToWait: 0,
suspended: false,
outValue: nil,
persistUsed: persistUsed,
postponeRecoveryMilestones: postponeRecoveryMilestones,
log: log,
}
}

Expand Down Expand Up @@ -63,6 +69,26 @@ func (vo *varOutputImpl) TipAOChanged(ao *isc.AliasOutputWithID) {
vo.tryOutput()
}

// This works in hand with HaveMilestone. See the comment there.
func (vo *varOutputImpl) HaveRejection() {
vo.milestonesToWait = vo.postponeRecoveryMilestones
vo.tryOutput()
}

// We set the milestonesToWait on any rejection, but start to decrease it only
// after the rejection is resolved completely. This way we make a grace-delay
// to work around the L1 problems with reporting rejection prematurely.
func (vo *varOutputImpl) HaveMilestone() {
if vo.milestonesToWait == 0 {
return
}
if vo.candidateLI.IsNil() || vo.candidateAO == nil {
return
}
vo.milestonesToWait--
vo.tryOutput()
}

func (vo *varOutputImpl) CanPropose() {
vo.canPropose = true
vo.tryOutput()
Expand All @@ -83,6 +109,11 @@ func (vo *varOutputImpl) tryOutput() {
// Keep output unchanged.
return
}
if vo.milestonesToWait > 0 {
// Postponed, wait for several milestones after a rejection.
vo.log.Debugf("TIP decision postponed, milestonesToWait=%v", vo.milestonesToWait)
return
}
//
// Output the new data.
vo.persistUsed(vo.candidateLI)
Expand Down
3 changes: 3 additions & 0 deletions packages/chain/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ func New(
onChainDisconnect func(),
deriveAliasOutputByQuorum bool,
pipeliningLimit int,
postponeRecoveryMilestones int,
consensusDelay time.Duration,
recoveryTimeout time.Duration,
validatorAgentID isc.AgentID,
Expand Down Expand Up @@ -395,6 +396,7 @@ func New(
},
deriveAliasOutputByQuorum,
pipeliningLimit,
postponeRecoveryMilestones,
cni.chainMetrics.CmtLog,
cni.log.Named("CM"),
)
Expand Down Expand Up @@ -732,6 +734,7 @@ func (cni *chainNodeImpl) handleMilestoneTimestamp(timestamp time.Time) {
cni.log.Debugf("handleMilestoneTimestamp: %v", timestamp)
cni.tangleTime = timestamp
cni.mempool.TangleTimeUpdated(timestamp)
cni.sendMessages(cni.chainMgr.Input(chainmanager.NewInputMilestoneReceived()))
cni.consensusInsts.ForEach(func(address iotago.Ed25519Address, consensusInstances *shrinkingmap.ShrinkingMap[cmt_log.LogIndex, *consensusInst]) bool {
consensusInstances.ForEach(func(li cmt_log.LogIndex, consensusInstance *consensusInst) bool {
if consensusInstance.cancelFunc != nil {
Expand Down
1 change: 1 addition & 0 deletions packages/chain/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,7 @@ func newEnv(t *testing.T, n, f int, reliable bool) *testEnv {
nil,
true,
-1,
1,
10*time.Millisecond,
10*time.Second,
accounts.CommonAccount(),
Expand Down
Loading

0 comments on commit 13579da

Please sign in to comment.