Skip to content

Commit

Permalink
[Disk Manager]: Add test to reproduce hanging ydb requests (#2012)
Browse files Browse the repository at this point in the history
Test which reproduces the behavior mentioned in #501.
  • Loading branch information
jkuradobery authored Sep 19, 2024
1 parent cd838f8 commit 8384f0b
Showing 1 changed file with 189 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@ import (
"os"
"sort"
"strconv"
"strings"
"sync"
"testing"
"time"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/dataplane/common"
"golang.org/x/sync/errgroup"

"github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/common"
dataplane_common "github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/dataplane/common"
snapshot_config "github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/dataplane/snapshot/config"
"github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/schema"
"github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/dataplane/test"
Expand Down Expand Up @@ -381,15 +385,15 @@ func createFixture(t *testing.T) *fixture {

////////////////////////////////////////////////////////////////////////////////

func makeChunk(chunkIndex uint32, chunkData string) common.Chunk {
return common.Chunk{
func makeChunk(chunkIndex uint32, chunkData string) dataplane_common.Chunk {
return dataplane_common.Chunk{
Index: chunkIndex,
Data: []byte(chunkData),
}
}

func makeZeroChunk(chunkIndex uint32) common.Chunk {
return common.Chunk{
func makeZeroChunk(chunkIndex uint32) dataplane_common.Chunk {
return dataplane_common.Chunk{
Index: chunkIndex,
Zero: true,
}
Expand Down Expand Up @@ -849,7 +853,7 @@ func TestShallowCopySnapshot(t *testing.T) {
chunk3 := makeChunk(10, "aaa")
chunkID3, err := f.storage.WriteChunk(f.ctx, "", "src", chunk3, testCase.useS3)
require.NoError(t, err)
chunk4 := common.Chunk{
chunk4 := dataplane_common.Chunk{
Index: 11,
Zero: true,
}
Expand Down Expand Up @@ -1033,10 +1037,10 @@ func TestShallowCopySnapshotWithRandomFailure(t *testing.T) {
defer wg.Done()

for j := i * chunksPerWorker; j < (i+1)*chunksPerWorker; j++ {
var chunk common.Chunk
var chunk dataplane_common.Chunk

data := []byte(fmt.Sprintf("chunk-%v", j))
chunk = common.Chunk{Index: j, Data: data}
chunk = dataplane_common.Chunk{Index: j, Data: data}

chunkID, err := f.storage.WriteChunk(f.ctx, "", "src", chunk, testCase.useS3)
require.NoError(t, err)
Expand Down Expand Up @@ -1204,7 +1208,7 @@ func TestFlattenChunkData(t *testing.T) {
f := createFixture(t)
defer f.teardown()

chunk := common.Chunk{
chunk := dataplane_common.Chunk{
Index: 0,
Data: []byte("abcdef"),
}
Expand Down Expand Up @@ -1234,7 +1238,7 @@ func TestReadChunk(t *testing.T) {
chunkMapEntries := readChunkMap(f, snapshotID)
require.Len(t, chunkMapEntries, 1)

readChunk := common.Chunk{
readChunk := dataplane_common.Chunk{
ID: chunkID,
Data: make([]byte, len(chunk.Data)),
StoredInS3: chunkMapEntries[0].storedInS3,
Expand All @@ -1252,7 +1256,7 @@ func TestReadNonExistentChunk(t *testing.T) {
f := createFixture(t)
defer f.teardown()

readChunk := common.Chunk{
readChunk := dataplane_common.Chunk{
ID: "nonexistent",
Data: make([]byte, 10000),
StoredInS3: testCase.useS3,
Expand Down Expand Up @@ -1351,7 +1355,7 @@ func TestChunkChecksumMismatch(t *testing.T) {

updateBlobChecksum(f, chunkID, 1111111111, testCase.useS3)

readChunk := common.Chunk{
readChunk := dataplane_common.Chunk{
ID: chunkID,
Data: make([]byte, len(chunk.Data)),
StoredInS3: testCase.useS3,
Expand All @@ -1373,7 +1377,7 @@ func TestS3BlobMetadataMissing(t *testing.T) {

clearS3BlobMetadata(f, chunkID)

readChunk := common.Chunk{
readChunk := dataplane_common.Chunk{
ID: chunkID,
Data: make([]byte, len(chunk.Data)),
StoredInS3: true,
Expand Down Expand Up @@ -1404,7 +1408,7 @@ func TestChunkCompression(t *testing.T) {
chunkID, err := f.storage.WriteChunk(f.ctx, "", "test", chunk, testCase.useS3)
require.NoError(t, err)

readChunk := common.Chunk{
readChunk := dataplane_common.Chunk{
ID: chunkID,
Data: make([]byte, len(chunk.Data)),
StoredInS3: testCase.useS3,
Expand Down Expand Up @@ -1446,7 +1450,7 @@ func TestGetDataChunkCount(t *testing.T) {
}

for i, data := range chunks {
var chunk common.Chunk
var chunk dataplane_common.Chunk

if len(data) != 0 {
chunk = makeChunk(uint32(i), data)
Expand Down Expand Up @@ -1500,7 +1504,7 @@ func TestCompressionMetricsCollection(t *testing.T) {
)
require.NoError(t, err)

readChunk := common.Chunk{
readChunk := dataplane_common.Chunk{
ID: chunkID,
Data: make([]byte, len(chunk.Data)),
StoredInS3: testCase.useS3,
Expand All @@ -1527,3 +1531,172 @@ func expectHistogramCalledOnce(registry *mocks.RegistryMock, name string, compre
),
).Return(nil).Once()
}

////////////////////////////////////////////////////////////////////////////////

type ydbTestFixture struct {
t *testing.T
ctx context.Context
cancel context.CancelFunc
db *persistence.YDBClient
folder string
}

func newYdbTestFixture(t *testing.T) *ydbTestFixture {
ctx, cancel := context.WithCancel(test.NewContext())
db, err := newYDB(ctx)
require.NoError(t, err)
folder := fmt.Sprintf("ydb_test/%v", t.Name())
return &ydbTestFixture{
t: t,
ctx: ctx,
cancel: cancel,
db: db,
folder: folder,
}
}

func tableDescription() persistence.CreateTableDescription {
optional := persistence.Optional
return persistence.NewCreateTableDescription(
persistence.WithColumn("shard_id", optional(persistence.TypeUint64)),
persistence.WithColumn("chunk_id", optional(persistence.TypeUTF8)),
persistence.WithColumn("data", optional(persistence.TypeString)),
persistence.WithPrimaryKeyColumn("shard_id", "chunk_id"),
persistence.WithUniformPartitions(5),
persistence.WithExternalBlobs("rotencrypted"),
)
}

func setupTestYdbRequestDoesNotHang(t *testing.T) {
f := newYdbTestFixture(t)
err := f.db.CreateOrAlterTable(
f.ctx,
f.folder,
"table",
tableDescription(),
false, // dropUnusedColumns
)
require.NoError(f.t, err)
}

func (f *ydbTestFixture) teardown() {
require.NoError(f.t, f.db.Close(f.ctx))
f.cancel()
}

func (f *ydbTestFixture) writeChunkData(
ctx context.Context,
chunkIndex int,
) error {

dataToWrite := make([]byte, 4096*1024)
_, err := f.db.ExecuteRW(ctx, fmt.Sprintf(`
--!syntax_v1
pragma TablePathPrefix = "%v";
declare $shard_id as Uint64;
declare $chunk_id as Utf8;
declare $data as String;
upsert into %v (shard_id, chunk_id, data)
values ($shard_id, $chunk_id, $data)
`, f.db.AbsolutePath(f.folder), "table"),
persistence.ValueParam(
"$shard_id", persistence.Uint64Value(uint64(chunkIndex))),
persistence.ValueParam(
"$chunk_id",
persistence.UTF8Value(fmt.Sprintf("chunk_%d", chunkIndex)),
),
persistence.ValueParam(
"$data",
persistence.StringValue(dataToWrite),
),
)
return err
}

////////////////////////////////////////////////////////////////////////////////

func TestYDBRequestDoesNotHang(t *testing.T) {
// Test that reproduces the issue with hanging transactions in ydb after
// parallel requests that write external blobs are cancelled.
// See: https://github.com/ydb-platform/ydb-go-sdk/issues/1025
// See: https://github.com/ydb-platform/nbs/issues/501
// We need 50 iteration to guarantee for the bug to reproduce,
// usually it takes less iterations.
setupTestYdbRequestDoesNotHang(t)
for i := 0; i < 50; i++ {
func() {
f := newYdbTestFixture(t)
defer f.teardown()
var errGrp errgroup.Group
ctx, cancel := context.WithCancel(f.ctx)
logging.Info(
ctx,
"Writing 100 chunks in parallel and cancelling the context",
)
for chunkIdex := 0; chunkIdex < 100; chunkIdex++ {
chunkIndex := chunkIdex
errGrp.Go(
func() error {
err := f.writeChunkData(ctx, chunkIndex)
if err == nil {
return nil
}
if strings.Contains(err.Error(), "context canceled") {
return nil
}

return err
},
)
}

time.Sleep(
common.RandomDuration(
10*time.Millisecond,
500*time.Millisecond,
),
)
cancel()
require.NoError(f.t, errGrp.Wait())

logging.Info(
f.ctx,
"Write 100 more chunks to ensure that transactions do not hang",
)
transactionTimeout := time.Minute * 3
ctx, cancel = context.WithTimeout(
f.ctx,
transactionTimeout,
)
defer cancel()

for chunkIdex := 100; chunkIdex < 200; chunkIdex++ {
chunkIndex := chunkIdex
errGrp.Go(
func() error {
err := f.writeChunkData(ctx, chunkIndex)
if err == nil {
return nil
}

errorIsTimeout := strings.Contains(
err.Error(),
"context deadline exceeded",
)
if errorIsTimeout {
return fmt.Errorf(
"request to ydb took more than %v",
transactionTimeout,
)
}

return err
},
)
}
require.NoError(f.t, errGrp.Wait())
}()
}
}

0 comments on commit 8384f0b

Please sign in to comment.