Skip to content

Commit

Permalink
add some unit tests for payment
Browse files Browse the repository at this point in the history
  • Loading branch information
forcodedancing committed Jun 19, 2023
1 parent 2c2bca4 commit 695fb68
Show file tree
Hide file tree
Showing 5 changed files with 470 additions and 28 deletions.
26 changes: 26 additions & 0 deletions x/payment/keeper/auto_resume_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package keeper

import (
"github.com/cosmos/cosmos-sdk/store/prefix"
storetypes "github.com/cosmos/cosmos-sdk/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"

"github.com/bnb-chain/greenfield/x/payment/types"
Expand Down Expand Up @@ -52,3 +53,28 @@ func (k Keeper) RemoveAutoResumeRecord(
addr,
))
}

// ExistsAutoResumeRecord checks whether there exists a autoResumeRecord
func (k Keeper) ExistsAutoResumeRecord(
ctx sdk.Context,
timestamp int64,
addr sdk.AccAddress,
) bool {
store := prefix.NewStore(ctx.KVStore(k.storeKey), types.AutoResumeRecordKeyPrefix)
iterator := storetypes.KVStorePrefixIterator(store, []byte{})
defer iterator.Close()

exists := false
for ; iterator.Valid(); iterator.Next() {
val := types.ParseAutoResumeRecordKey(iterator.Key())
if val.Timestamp > timestamp {
break
}
if sdk.MustAccAddressFromHex(val.Addr).Equals(addr) {
exists = true
break
}
}

return exists
}
19 changes: 18 additions & 1 deletion x/payment/keeper/out_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,24 @@ func (k Keeper) SetOutFlow(ctx sdk.Context, addr sdk.AccAddress, outFlow *types.
), bz)
}

// SetOutFlow set a specific OutFlow in the store from its index
// GetOutFlow get a specific OutFlow in the store from its index
func (k Keeper) GetOutFlow(ctx sdk.Context, addr sdk.AccAddress, status types.OutFlowStatus, toAddr sdk.AccAddress) *types.OutFlow {
key := types.OutFlowKey(addr, status, toAddr)
store := prefix.NewStore(ctx.KVStore(k.storeKey), types.OutFlowKeyPrefix)

value := store.Get(key)
if value == nil {
return nil
}

return &types.OutFlow{
ToAddress: toAddr.String(),
Rate: types.ParseOutFlowValue(value),
Status: status,
}
}

// GetOutFlows get OutFlows for a specific from address
func (k Keeper) GetOutFlows(ctx sdk.Context, addr sdk.AccAddress) []types.OutFlow {
key := types.OutFlowKey(addr, types.OUT_FLOW_STATUS_ACTIVE, nil)
store := prefix.NewStore(ctx.KVStore(k.storeKey), types.OutFlowKeyPrefix)
Expand Down
71 changes: 48 additions & 23 deletions x/payment/keeper/stream_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,10 +249,12 @@ func (k Keeper) UpdateStreamRecordByAddr(ctx sdk.Context, change *types.StreamRe

func (k Keeper) ForceSettle(ctx sdk.Context, streamRecord *types.StreamRecord) error {
totalBalance := streamRecord.StaticBalance.Add(streamRecord.BufferBalance)
change := types.NewDefaultStreamRecordChangeWithAddr(types.GovernanceAddress).WithStaticBalanceChange(totalBalance)
_, err := k.UpdateStreamRecordByAddr(ctx, change)
if err != nil {
return fmt.Errorf("update governance stream record failed: %w", err)
if totalBalance.IsPositive() {
change := types.NewDefaultStreamRecordChangeWithAddr(types.GovernanceAddress).WithStaticBalanceChange(totalBalance)
_, err := k.UpdateStreamRecordByAddr(ctx, change)
if err != nil {
return fmt.Errorf("update governance stream record failed: %w", err)
}
}
// force settle
streamRecord.StaticBalance = sdkmath.ZeroInt()
Expand All @@ -278,14 +280,14 @@ func (k Keeper) AutoSettle(ctx sdk.Context) {
if count >= max {
return
}
val := types.ParseAutoSettleRecordKey(iterator.Key())
addr := sdk.MustAccAddressFromHex(val.Addr)
if val.Timestamp > currentTimestamp {
record := types.ParseAutoSettleRecordKey(iterator.Key())
addr := sdk.MustAccAddressFromHex(record.Addr)
if record.Timestamp > currentTimestamp {
return
}
streamRecord, found := k.GetStreamRecord(ctx, addr)
if !found {
ctx.Logger().Error("stream record not found", "addr", val.Addr)
ctx.Logger().Error("stream record not found", "addr", record.Addr)
panic("stream record not found")
}

Expand All @@ -294,11 +296,18 @@ func (k Keeper) AutoSettle(ctx sdk.Context) {
if err != nil {
panic(err)
}
count++ // add one for a stream record
k.SetStreamRecord(ctx, streamRecord)
if streamRecord.Status == types.STREAM_ACCOUNT_STATUS_ACTIVE {
k.SetStreamRecord(ctx, streamRecord)
continue
}
count++ // add one for a stream record
if count >= max {
return
}
}

if k.ExistsAutoResumeRecord(ctx, record.Timestamp, addr) { // this check should be cheap for usually
continue //skip the one if the stream account is in resuming
}

activeFlowKey := types.OutFlowKey(addr, types.OUT_FLOW_STATUS_ACTIVE, nil)
Expand All @@ -307,32 +316,45 @@ func (k Keeper) AutoSettle(ctx sdk.Context) {
defer flowIterator.Close()

totalRate := sdk.ZeroInt()
finished := false
for ; flowIterator.Valid(); flowIterator.Next() {
if count >= max {
break
}
_, outFlow := types.ParseOutFlowKey(flowIterator.Key())
if outFlow.Status == types.OUT_FLOW_STATUS_FROZEN {
finished = true
break
}
rate := types.ParseOutFlowValue(flowIterator.Value())
outFlow.Rate = types.ParseOutFlowValue(flowIterator.Value())

toAddr := sdk.MustAccAddressFromHex(outFlow.ToAddress)
flowChange := types.NewDefaultStreamRecordChangeWithAddr(toAddr).WithRateChange(rate)
flowChange := types.NewDefaultStreamRecordChangeWithAddr(toAddr).WithRateChange(outFlow.Rate.Neg())
_, err := k.UpdateStreamRecordByAddr(ctx, flowChange)
if err != nil {
panic(fmt.Sprintf("update %s stream record failed: %s", outFlow.ToAddress, err.Error()))
ctx.Logger().Error("update stream record failed", "address", outFlow.ToAddress, "rate", outFlow.Rate.Neg())
}

flowStore.Delete(flowIterator.Key())
outFlow.Status = types.OUT_FLOW_STATUS_FROZEN
k.SetOutFlow(ctx, addr, &outFlow)

totalRate = totalRate.Add(rate)
totalRate = totalRate.Add(outFlow.Rate)
count++
}
streamRecord.NetflowRate = streamRecord.NetflowRate.Add(totalRate)
streamRecord.FrozenNetflowRate = totalRate.Add(streamRecord.FrozenNetflowRate)
if streamRecord.FrozenNetflowRate.IsNil() {
streamRecord.FrozenNetflowRate = sdkmath.ZeroInt()
}
streamRecord.FrozenNetflowRate = streamRecord.FrozenNetflowRate.Add(totalRate.Neg())

if finished || !flowIterator.Valid() {
if !streamRecord.NetflowRate.IsZero() {
panic("should not happen") // TODO: assertion for fail quick, remove later
}
k.RemoveAutoSettleRecord(ctx, record.Timestamp, addr)
}

k.SetStreamRecord(ctx, streamRecord)
}
}
Expand All @@ -342,7 +364,7 @@ func (k Keeper) TryResumeStreamRecord(ctx sdk.Context, streamRecord *types.Strea
return fmt.Errorf("stream account %s status is not frozen", streamRecord.Account)
}

if !streamRecord.NetflowRate.IsZero() { // the account is resuming
if !streamRecord.NetflowRate.IsNil() && !streamRecord.NetflowRate.IsZero() { // the account is resuming
return fmt.Errorf("stream account %s status is resuming, although it is frozen now", streamRecord.Account)
}

Expand All @@ -352,23 +374,24 @@ func (k Keeper) TryResumeStreamRecord(ctx sdk.Context, streamRecord *types.Strea

totalRate := streamRecord.NetflowRate.Add(streamRecord.FrozenNetflowRate)
streamRecord.StaticBalance = streamRecord.StaticBalance.Add(depositBalance)
expectedBalanceToResume := totalRate.Mul(sdkmath.NewIntFromUint64(reserveTime))
expectedBalanceToResume := totalRate.Neg().Mul(sdkmath.NewIntFromUint64(reserveTime))
if streamRecord.StaticBalance.LT(expectedBalanceToResume) {
// deposit balance is not enough to resume, only add static balance
k.SetStreamRecord(ctx, streamRecord)
return nil
}

now := ctx.BlockTime().Unix()
streamRecord.SettleTimestamp = now + streamRecord.StaticBalance.Quo(totalRate).Int64() - int64(forcedSettleTime)
streamRecord.BufferBalance = expectedBalanceToResume
streamRecord.StaticBalance = streamRecord.StaticBalance.Sub(expectedBalanceToResume)
streamRecord.CrudTimestamp = now

ctx.Logger().Debug("try to resume stream account", "streamRecord.OutFlowCount", streamRecord.OutFlowCount, "params.MaxAutoResumeFlowCount", params.MaxAutoResumeFlowCount)
if streamRecord.OutFlowCount <= params.MaxAutoResumeFlowCount { //only rough judgement, resume directly
streamRecord.Status = types.STREAM_ACCOUNT_STATUS_ACTIVE
streamRecord.SettleTimestamp = now + streamRecord.StaticBalance.Quo(totalRate).Int64() - int64(forcedSettleTime)
streamRecord.NetflowRate = totalRate
streamRecord.FrozenNetflowRate = sdkmath.ZeroInt()
streamRecord.BufferBalance = expectedBalanceToResume
streamRecord.StaticBalance = streamRecord.StaticBalance.Sub(expectedBalanceToResume)
streamRecord.CrudTimestamp = now

addr := sdk.MustAccAddressFromHex(streamRecord.Account)
frozenFlowKey := types.OutFlowKey(addr, types.OUT_FLOW_STATUS_FROZEN, nil)
Expand All @@ -395,6 +418,8 @@ func (k Keeper) TryResumeStreamRecord(ctx sdk.Context, streamRecord *types.Strea
k.UpdateAutoSettleRecord(ctx, sdk.MustAccAddressFromHex(streamRecord.Account), 0, streamRecord.SettleTimestamp)
return nil
} else { //enqueue for resume in end block
k.SetStreamRecord(ctx, streamRecord)
k.UpdateAutoSettleRecord(ctx, sdk.MustAccAddressFromHex(streamRecord.Account), 0, streamRecord.SettleTimestamp)
k.SetAutoResumeRecord(ctx, &types.AutoResumeRecord{
Timestamp: now,
Addr: streamRecord.Account,
Expand Down Expand Up @@ -449,14 +474,14 @@ func (k Keeper) AutoResume(ctx sdk.Context) {
}

streamRecord.NetflowRate = streamRecord.NetflowRate.Add(totalRate.Neg())
streamRecord.FrozenNetflowRate = streamRecord.NetflowRate.Add(totalRate)
streamRecord.FrozenNetflowRate = streamRecord.FrozenNetflowRate.Add(totalRate)
if !flowIterator.Valid() {
if !streamRecord.FrozenNetflowRate.IsZero() {
panic("should not happen") // TODO: assertion for fail quick, remove later
}
streamRecord.Status = types.STREAM_ACCOUNT_STATUS_ACTIVE
change := types.NewDefaultStreamRecordChangeWithAddr(addr)
_, err := k.UpdateStreamRecordByAddr(ctx, change)
err := k.UpdateStreamRecord(ctx, streamRecord, change)
if err != nil {
panic(fmt.Sprintf("update %s stream record failed: %s", addr, err.Error()))
}
Expand Down
Loading

0 comments on commit 695fb68

Please sign in to comment.