Skip to content

Commit

Permalink
Revert "tune ap memory cache policy (#18852)" (#18915)
Browse files Browse the repository at this point in the history
Revert "tune ap memory cache policy (#18852)"

Approved by: @XuPeng-SH, @zhangxu19830126
  • Loading branch information
badboynt1 authored Sep 21, 2024
1 parent be8b64e commit 759a66e
Show file tree
Hide file tree
Showing 8 changed files with 188 additions and 131 deletions.
262 changes: 149 additions & 113 deletions pkg/pb/shard/shard.pb.go

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions pkg/vm/engine/disttae/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,16 @@ func (e *Engine) Hints() (h engine.Hints) {
return
}

func determineScanType(relData engine.RelData, readerNum int) (scanType int) {
scanType = NORMAL
if relData.DataCnt() < readerNum*SMALLSCAN_THRESHOLD || readerNum == 1 {
scanType = SMALL
} else if (readerNum * LARGESCAN_THRESHOLD) <= relData.DataCnt() {
scanType = LARGE
}
return
}

func (e *Engine) BuildBlockReaders(
ctx context.Context,
p any,
Expand All @@ -657,6 +667,7 @@ func (e *Engine) BuildBlockReaders(
return nil, err
}

scanType := determineScanType(relData, newNum)
mod := blkCnt % newNum
divide := blkCnt / newNum
for i := 0; i < newNum; i++ {
Expand All @@ -683,6 +694,7 @@ func (e *Engine) BuildBlockReaders(
if err != nil {
return nil, err
}
rd.scanType = scanType
rds = append(rds, rd)
}
return rds, nil
Expand Down
6 changes: 1 addition & 5 deletions pkg/vm/engine/disttae/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,10 +357,7 @@ func (r *reader) Read(
}

var policy fileservice.Policy
if r.readBlockCnt == 0 {
r.smallScanThreshHold = GetSmallScanThreshHold()
}
if r.readBlockCnt > r.smallScanThreshHold {
if r.scanType == LARGE || r.scanType == NORMAL {
policy = fileservice.SkipMemoryCacheWrites
}

Expand All @@ -384,7 +381,6 @@ func (r *reader) Read(
if err != nil {
return false, err
}
r.readBlockCnt++

if filter.Valid {
// we collect mem cache hit related statistics info for blk read here
Expand Down
3 changes: 3 additions & 0 deletions pkg/vm/engine/disttae/txn_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -1747,6 +1747,7 @@ func (tbl *txnTable) BuildReaders(
}
}

scanType := determineScanType(relData, newNum)
def := tbl.GetTableDef(ctx)
mod := blkCnt % newNum
divide := blkCnt / newNum
Expand Down Expand Up @@ -1776,6 +1777,8 @@ func (tbl *txnTable) BuildReaders(
if err != nil {
return nil, err
}

rd.scanType = scanType
rds = append(rds, rd)
}
return rds, nil
Expand Down
5 changes: 5 additions & 0 deletions pkg/vm/engine/disttae/txn_table_sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ type shardingLocalReader struct {
//relation data to distribute to remote CN which holds shard's partition state.
remoteRelData engine.RelData
remoteTombApplyPolicy engine.TombstoneApplyPolicy
remoteScanType int
}

// TODO::
Expand Down Expand Up @@ -477,6 +478,7 @@ func (r *shardingLocalReader) Read(
func(param *shard.ReadParam) {
param.ReaderBuildParam.RelData = relData
param.ReaderBuildParam.Expr = expr
param.ReaderBuildParam.ScanType = int32(r.remoteScanType)
param.ReaderBuildParam.TombstoneApplyPolicy = int32(r.remoteTombApplyPolicy)
},
func(resp []byte) {
Expand Down Expand Up @@ -624,6 +626,7 @@ func (tbl *txnTableDelegate) BuildShardingReaders(
}
}

scanType := determineScanType(relData, newNum)
mod := blkCnt % newNum
divide := blkCnt / newNum
current := 0
Expand All @@ -644,6 +647,7 @@ func (tbl *txnTableDelegate) BuildShardingReaders(
tblDelegate: tbl,
//remoteRelData: remoteRelData,
remoteTombApplyPolicy: engine.Policy_SkipUncommitedInMemory | engine.Policy_SkipUncommitedS3,
remoteScanType: scanType,
}

if localRelData.DataCnt() > 0 {
Expand All @@ -668,6 +672,7 @@ func (tbl *txnTableDelegate) BuildShardingReaders(
if err != nil {
return nil, err
}
lrd.scanType = scanType
srd.lrd = lrd
}

Expand Down
27 changes: 15 additions & 12 deletions pkg/vm/engine/disttae/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,12 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm/process"
)

func GetSmallScanThreshHold() uint64 {
if ncpu > 32 {
return 100
}
if ncpu > 16 {
return 200
}
return 400
}
const (
PREFETCH_THRESHOLD = 256
PREFETCH_ROUNDS = 24
SMALLSCAN_THRESHOLD = 100
LARGESCAN_THRESHOLD = 1500
)

const (
INSERT = iota
Expand Down Expand Up @@ -121,6 +118,12 @@ func noteSplitAlter(note string) (bool, int, uint64, string) {
panic("bad format of alter note")
}

const (
SMALL = iota
NORMAL
LARGE
)

const (
MO_DATABASE_ID_NAME_IDX = 1
MO_DATABASE_ID_ACCOUNT_IDX = 2
Expand Down Expand Up @@ -912,9 +915,9 @@ type reader struct {
isTombstone bool
source engine.DataSource

memFilter MemPKFilter
readBlockCnt uint64
smallScanThreshHold uint64
memFilter MemPKFilter

scanType int
}

type mergeReader struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/vm/engine/test/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,7 @@ func Test_ShardingRemoteReader(t *testing.T) {
data, err := relData.MarshalBinary()
require.NoError(t, err)
readerBuildParam.ReaderBuildParam.RelData = data
readerBuildParam.ReaderBuildParam.ScanType = disttae.SMALL
readerBuildParam.ReaderBuildParam.TombstoneApplyPolicy =
int32(engine.Policy_SkipUncommitedInMemory | engine.Policy_SkipUncommitedS3)
res, err := disttae.HandleShardingReadBuildReader(
Expand Down
3 changes: 2 additions & 1 deletion proto/shard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,8 @@ message KeyParam {
message ReaderBuildParam {
bytes relData = 1;
plan.Expr expr = 2;
int32 tombstoneApplyPolicy = 3;
int32 scanType = 3;
int32 tombstoneApplyPolicy = 4;
}

message ReaderBuildResult {
Expand Down

0 comments on commit 759a66e

Please sign in to comment.