Skip to content

Commit

Permalink
fix for deadlock in db concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
hexoscott committed Nov 14, 2024
1 parent 1e2977c commit b42c234
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 22 deletions.
71 changes: 49 additions & 22 deletions erigon-lib/kv/mdbx/kv_mdbx.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ type MdbxOpts struct {
// must be in the range from 12.5% (almost empty) to 50% (half empty)
// which corresponds to the range from 8192 and to 32768 in units respectively
log log.Logger
roTxsLimiter *semaphore.Weighted
readTxLimiter *semaphore.Weighted
writeTxLimiter *semaphore.Weighted
bucketsCfg TableCfgFunc
path string
syncPeriod time.Duration
Expand Down Expand Up @@ -109,7 +110,7 @@ func (opts MdbxOpts) DirtySpace(s uint64) MdbxOpts {
}

func (opts MdbxOpts) RoTxsLimiter(l *semaphore.Weighted) MdbxOpts {
opts.roTxsLimiter = l
opts.readTxLimiter = l
return opts
}

Expand Down Expand Up @@ -386,20 +387,26 @@ func (opts MdbxOpts) Open(ctx context.Context) (kv.RwDB, error) {
// return nil, err
//}

if opts.roTxsLimiter == nil {
if opts.readTxLimiter == nil {
targetSemCount := int64(runtime.GOMAXPROCS(-1) * 16)
opts.roTxsLimiter = semaphore.NewWeighted(targetSemCount) // 1 less than max to allow unlocking to happen
opts.readTxLimiter = semaphore.NewWeighted(targetSemCount) // 1 less than max to allow unlocking to happen
}

if opts.writeTxLimiter == nil {
targetSemCount := int64(runtime.GOMAXPROCS(-1)) - 1
opts.writeTxLimiter = semaphore.NewWeighted(targetSemCount) // 1 less than max to allow unlocking to happen
}

txsCountMutex := &sync.Mutex{}

db := &MdbxKV{
opts: opts,
env: env,
log: opts.log,
buckets: kv.TableCfg{},
txSize: dirtyPagesLimit * opts.pageSize,
roTxsLimiter: opts.roTxsLimiter,
opts: opts,
env: env,
log: opts.log,
buckets: kv.TableCfg{},
txSize: dirtyPagesLimit * opts.pageSize,
readTxLimiter: opts.readTxLimiter,
writeTxLimiter: opts.writeTxLimiter,

txsCountMutex: txsCountMutex,
txsAllDoneOnCloseCond: sync.NewCond(txsCountMutex),
Expand Down Expand Up @@ -468,14 +475,15 @@ func (opts MdbxOpts) MustOpen() kv.RwDB {
}

type MdbxKV struct {
log log.Logger
env *mdbx.Env
buckets kv.TableCfg
roTxsLimiter *semaphore.Weighted // does limit amount of concurrent Ro transactions - in most casess runtime.NumCPU() is good value for this channel capacity - this channel can be shared with other components (like Decompressor)
opts MdbxOpts
txSize uint64
closed atomic.Bool
path string
log log.Logger
env *mdbx.Env
buckets kv.TableCfg
readTxLimiter *semaphore.Weighted // does limit amount of concurrent Ro transactions - in most casess runtime.NumCPU() is good value for this channel capacity - this channel can be shared with other components (like Decompressor)
writeTxLimiter *semaphore.Weighted
opts MdbxOpts
txSize uint64
closed atomic.Bool
path string

txsCount uint
txsCountMutex *sync.Mutex
Expand Down Expand Up @@ -748,7 +756,7 @@ func (db *MdbxKV) BeginRo(ctx context.Context) (txn kv.Tx, err error) {
}

// will return nil err if context is cancelled (may appear to acquire the semaphore)
if semErr := db.roTxsLimiter.Acquire(ctx, 1); semErr != nil {
if semErr := db.readTxLimiter.Acquire(ctx, 1); semErr != nil {
db.trackTxEnd()
return nil, fmt.Errorf("mdbx.MdbxKV.BeginRo: roTxsLimiter error %w", semErr)
}
Expand All @@ -757,7 +765,7 @@ func (db *MdbxKV) BeginRo(ctx context.Context) (txn kv.Tx, err error) {
if txn == nil {
// on error, or if there is whatever reason that we don't return a tx,
// we need to free up the limiter slot, otherwise it could lead to deadlocks
db.roTxsLimiter.Release(1)
db.readTxLimiter.Release(1)
db.trackTxEnd()
}
}()
Expand All @@ -784,17 +792,34 @@ func (db *MdbxKV) BeginRwNosync(ctx context.Context) (kv.RwTx, error) {
}

func (db *MdbxKV) beginRw(ctx context.Context, flags uint) (txn kv.RwTx, err error) {
if db.closed.Load() {
return nil, fmt.Errorf("db closed")
}

select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}

// will return nil err if context is cancelled (may appear to acquire the semaphore)
if semErr := db.writeTxLimiter.Acquire(ctx, 1); semErr != nil {
return nil, semErr
}

if !db.trackTxBegin() {
return nil, fmt.Errorf("db closed")
}

runtime.LockOSThread()
defer func() {
if txn == nil {
// on error, or if there is whatever reason that we don't return a tx,
// we need to free up the limiter slot, otherwise it could lead to deadlocks
db.writeTxLimiter.Release(1)
runtime.UnlockOSThread()
}
}()
tx, err := db.env.BeginTxn(nil, flags)
if err != nil {
runtime.UnlockOSThread() // unlock only in case of error. normal flow is "defer .Rollback()"
Expand Down Expand Up @@ -1048,8 +1073,9 @@ func (tx *MdbxTx) Commit() error {
tx.tx = nil
tx.db.trackTxEnd()
if tx.readOnly {
tx.db.roTxsLimiter.Release(1)
tx.db.readTxLimiter.Release(1)
} else {
tx.db.writeTxLimiter.Release(1)
runtime.UnlockOSThread()
}
tx.db.leakDetector.Del(tx.id)
Expand Down Expand Up @@ -1099,8 +1125,9 @@ func (tx *MdbxTx) Rollback() {
tx.tx = nil
tx.db.trackTxEnd()
if tx.readOnly {
tx.db.roTxsLimiter.Release(1)
tx.db.readTxLimiter.Release(1)
} else {
tx.db.writeTxLimiter.Release(1)
runtime.UnlockOSThread()
}
tx.db.leakDetector.Del(tx.id)
Expand Down
39 changes: 39 additions & 0 deletions erigon-lib/kv/mdbx/kv_mdbx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/order"
"sync"
)

func BaseCaseDB(t *testing.T) kv.RwDB {
Expand Down Expand Up @@ -1087,3 +1088,41 @@ func TestDB_BatchTime(t *testing.T) {
t.Fatal(err)
}
}

func TestDeadlock(t *testing.T) {
path := t.TempDir()
logger := log.New()
table := "Table"
db := NewMDBX(logger).InMem(path).WithTableCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg {
return kv.TableCfg{
table: kv.TableCfgItem{Flags: kv.DupSort},
kv.Sequence: kv.TableCfgItem{},
}
}).MapSize(128 * datasize.MB).MustOpen()
t.Cleanup(db.Close)

wg := sync.WaitGroup{}
for i := 0; i < 300_000; i++ {
wg.Add(1)
go func(idx int) {
ctx := context.Background()
// create a write transaction every X requests
if idx%5 == 0 {
tx, err := db.BeginRw(ctx)
if err != nil {
t.Fatal(err)
}
defer tx.Rollback()
} else {
tx, err := db.BeginRo(ctx)
if err != nil {
t.Fatal(err)
}
defer tx.Rollback()
}
wg.Done()
}(i)
}

wg.Wait()
}

0 comments on commit b42c234

Please sign in to comment.