Skip to content

Commit

Permalink
Merge branch 'master' of github.com:tikv/client-go into cache-last-node
Browse files Browse the repository at this point in the history
  • Loading branch information
ekexium committed Jul 31, 2024
2 parents bc66d00 + eec8198 commit 48ae3db
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 24 deletions.
8 changes: 4 additions & 4 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,7 +829,7 @@ func (s *RegionRequestSender) SendReqCtx(
if err := s.replicaSelector.backoffOnNoCandidate(bo); err != nil {
return nil, nil, retryTimes, err
}
if cost := time.Since(startTime); cost > slowLogSendReqTime || cost > timeout {
if cost := time.Since(startTime); cost > slowLogSendReqTime || cost > timeout || bo.GetTotalSleep() > 1000 {
s.logSendReqError(bo, "throwing pseudo region error due to no replica available", regionID, retryTimes, req, cost, bo.GetTotalSleep()-startBackOff, timeout)
}
}
Expand Down Expand Up @@ -870,7 +870,7 @@ func (s *RegionRequestSender) SendReqCtx(
resp, retry, err = s.sendReqToRegion(bo, rpcCtx, req, timeout)
req.IsRetryRequest = true
if err != nil {
if cost := time.Since(startTime); cost > slowLogSendReqTime || cost > timeout {
if cost := time.Since(startTime); cost > slowLogSendReqTime || cost > timeout || bo.GetTotalSleep() > 1000 {
msg := fmt.Sprintf("send request failed, err: %v", err.Error())
s.logSendReqError(bo, msg, regionID, retryTimes, req, cost, bo.GetTotalSleep()-startBackOff, timeout)
}
Expand Down Expand Up @@ -906,7 +906,7 @@ func (s *RegionRequestSender) SendReqCtx(
if regionErr != nil {
retry, err = s.onRegionError(bo, rpcCtx, req, regionErr)
if err != nil {
if cost := time.Since(startTime); cost > slowLogSendReqTime || cost > timeout {
if cost := time.Since(startTime); cost > slowLogSendReqTime || cost > timeout || bo.GetTotalSleep() > 1000 {
msg := fmt.Sprintf("send request on region error failed, err: %v", err.Error())
s.logSendReqError(bo, msg, regionID, retryTimes, req, cost, bo.GetTotalSleep()-startBackOff, timeout)
}
Expand All @@ -916,7 +916,7 @@ func (s *RegionRequestSender) SendReqCtx(
retryTimes++
continue
}
if cost := time.Since(startTime); cost > slowLogSendReqTime || cost > timeout {
if cost := time.Since(startTime); cost > slowLogSendReqTime || cost > timeout || bo.GetTotalSleep() > 1000 {
s.logSendReqError(bo, "send request meet region error without retry", regionID, retryTimes, req, cost, bo.GetTotalSleep()-startBackOff, timeout)
}
} else {
Expand Down
6 changes: 4 additions & 2 deletions internal/unionstore/memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,15 @@ type MemDB struct {
missCount atomic.Uint64
}

const unlimitedSize = math.MaxUint64

func newMemDB() *MemDB {
db := new(MemDB)
db.allocator.init()
db.root = nullAddr
db.stages = make([]MemDBCheckpoint, 0, 2)
db.entrySizeLimit = math.MaxUint64
db.bufferSizeLimit = math.MaxUint64
db.entrySizeLimit = unlimitedSize
db.bufferSizeLimit = unlimitedSize
db.vlog.memdb = db
db.skipMutex = false
db.lastTraversedNode.Store(&nullNodeAddr)
Expand Down
31 changes: 16 additions & 15 deletions internal/unionstore/pipelined_memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,16 @@ type PipelinedMemDB struct {
// Like MemDB, this RWMutex only used to ensure memdbSnapGetter.Get will not race with
// concurrent memdb.Set, memdb.SetWithFlags, memdb.Delete and memdb.UpdateFlags.
sync.RWMutex
onFlushing atomic.Bool
errCh chan error
flushFunc FlushFunc
bufferBatchGetter BufferBatchGetter
memDB *MemDB
flushingMemDB *MemDB // the flushingMemDB is not wrapped by a mutex, because there is no data race in it.
len, size int // len and size records the total flushed and onflushing memdb.
generation uint64
entryLimit, bufferLimit uint64
flushOption flushOption
onFlushing atomic.Bool
errCh chan error
flushFunc FlushFunc
bufferBatchGetter BufferBatchGetter
memDB *MemDB
flushingMemDB *MemDB // the flushingMemDB is not wrapped by a mutex, because there is no data race in it.
len, size int // len and size records the total flushed and onflushing memdb.
generation uint64
entryLimit uint64
flushOption flushOption
// prefetchCache is used to cache the result of BatchGet, it's invalidated when Flush.
// the values are wrapped by util.Option.
// None -> not found
Expand Down Expand Up @@ -113,7 +113,6 @@ func NewPipelinedMemDB(bufferBatchGetter BufferBatchGetter, flushFunc FlushFunc)
generation: 0,
// keep entryLimit and bufferLimit same with the memdb's default values.
entryLimit: memdb.entrySizeLimit,
bufferLimit: memdb.bufferSizeLimit,
flushOption: flushOpt,
startTime: time.Now(),
}
Expand Down Expand Up @@ -313,7 +312,8 @@ func (p *PipelinedMemDB) Flush(force bool) (bool, error) {
p.missCount += p.memDB.missCount.Load()
p.hitCount += p.memDB.hitCount.Load()
p.memDB = newMemDB()
p.memDB.SetEntrySizeLimit(p.entryLimit, p.bufferLimit)
// buffer size is limited by ForceFlushMemSizeThreshold. Do not set bufferLimit
p.memDB.SetEntrySizeLimit(p.entryLimit, unlimitedSize)
p.memDB.setSkipMutex(true)
p.generation++
go func(generation uint64) {
Expand Down Expand Up @@ -411,9 +411,10 @@ func (p *PipelinedMemDB) IterReverse([]byte, []byte) (Iterator, error) {
}

// SetEntrySizeLimit sets the size limit for each entry and total buffer.
func (p *PipelinedMemDB) SetEntrySizeLimit(entryLimit, bufferLimit uint64) {
p.entryLimit, p.bufferLimit = entryLimit, bufferLimit
p.memDB.SetEntrySizeLimit(entryLimit, bufferLimit)
func (p *PipelinedMemDB) SetEntrySizeLimit(entryLimit, _ uint64) {
p.entryLimit = entryLimit
// buffer size is limited by ForceFlushMemSizeThreshold. Do not set bufferLimit.
p.memDB.SetEntrySizeLimit(entryLimit, unlimitedSize)
}

func (p *PipelinedMemDB) Len() int {
Expand Down
5 changes: 2 additions & 3 deletions internal/unionstore/union_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ package unionstore

import (
"context"
"math"
"time"

tikverr "github.com/tikv/client-go/v2/error"
Expand Down Expand Up @@ -155,10 +154,10 @@ func (us *KVUnionStore) UnmarkPresumeKeyNotExists(k []byte) {
// SetEntrySizeLimit sets the size limit for each entry and total buffer.
func (us *KVUnionStore) SetEntrySizeLimit(entryLimit, bufferLimit uint64) {
if entryLimit == 0 {
entryLimit = math.MaxUint64
entryLimit = unlimitedSize
}
if bufferLimit == 0 {
bufferLimit = math.MaxUint64
bufferLimit = unlimitedSize
}
us.memBuffer.SetEntrySizeLimit(entryLimit, bufferLimit)
}
Expand Down

0 comments on commit 48ae3db

Please sign in to comment.