diff --git a/CHANGELOG.md b/CHANGELOG.md index ef0bfbd4f..496147ab6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +* Fixed error with incompleted data returen from transaction.ReadQueryResult method +* Added option WithResponsePartLimitSizeBytes for queries with query service + ## v3.92.2 * Added `table/options.WithShardNodesInfo()` experimental option to get shard nodeId for describe table call diff --git a/internal/query/execute_query.go b/internal/query/execute_query.go index e3de1803b..3db10cad9 100644 --- a/internal/query/execute_query.go +++ b/internal/query/execute_query.go @@ -31,6 +31,7 @@ type executeSettings interface { CallOptions() []grpc.CallOption RetryOpts() []retry.Option ResourcePool() string + ResponsePartLimitSizeBytes() int64 } type executeScriptConfig interface { @@ -77,6 +78,7 @@ func executeQueryRequest(a *allocator.Allocator, sessionID, q string, cfg execut request.StatsMode = Ydb_Query.StatsMode(cfg.StatsMode()) request.ConcurrentResultSets = false request.PoolId = cfg.ResourcePool() + request.ResponsePartLimitBytes = cfg.ResponsePartLimitSizeBytes() return request, cfg.CallOptions() } diff --git a/internal/query/options/execute.go b/internal/query/options/execute.go index 76110d93a..5489b4a21 100644 --- a/internal/query/options/execute.go +++ b/internal/query/options/execute.go @@ -27,15 +27,16 @@ type ( // executeSettings is a holder for execute settings executeSettings struct { - syntax Syntax - params params.Parameters - execMode ExecMode - statsMode StatsMode - resourcePool string - statsCallback func(queryStats stats.QueryStats) - callOptions []grpc.CallOption - txControl *tx.Control - retryOptions []retry.Option + syntax Syntax + params params.Parameters + execMode ExecMode + statsMode StatsMode + resourcePool string + statsCallback func(queryStats stats.QueryStats) + callOptions []grpc.CallOption + txControl *tx.Control + retryOptions []retry.Option + responsePartLimitBytes int64 } // Execute is an interface for execute method options @@ -58,7 +59,8 @@ type ( mode StatsMode callback func(stats.QueryStats) } - execModeOption = ExecMode + execModeOption = ExecMode + responsePartLimitBytes int64 ) func (poolID resourcePool) applyExecuteOption(s *executeSettings) { @@ -175,6 +177,10 @@ func (s *executeSettings) Params() *params.Parameters { return &s.params } +func (s *executeSettings) ResponsePartLimitSizeBytes() int64 { + return s.responsePartLimitBytes +} + func WithParameters(parameters *params.Parameters) parametersOption { return parametersOption(*parameters) } @@ -201,6 +207,14 @@ func WithExecMode(mode ExecMode) execModeOption { return mode } +func WithResponsePartLimitSizeBytes(size int64) responsePartLimitBytes { + return responsePartLimitBytes(size) +} + +func (size responsePartLimitBytes) applyExecuteOption(s *executeSettings) { + s.responsePartLimitBytes = int64(size) +} + func WithSyntax(syntax Syntax) syntaxOption { return syntax } diff --git a/internal/query/result.go b/internal/query/result.go index 067eca262..315d71e27 100644 --- a/internal/query/result.go +++ b/internal/query/result.go @@ -19,9 +19,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) -var ( - errReadNextResultSet = xerrors.Wrap(errors.New("ydb: stop read the result set because see part of next result set")) -) +var errReadNextResultSet = xerrors.Wrap(errors.New("ydb: stop read the result set because see part of next result set")) var ( _ result.Result = (*streamResult)(nil) diff --git a/query/execute_options.go b/query/execute_options.go index a8d2ac770..0420403d2 100644 --- a/query/execute_options.go +++ b/query/execute_options.go @@ -55,6 +55,12 @@ func WithStatsMode(mode options.StatsMode, callback func(Stats)) options.Execute return options.WithStatsMode(mode, callback) } +// WithResponsePartLimitSizeBytes limit size of each part (data portion) in stream for query service resoponse +// it isn't limit total size of answer +func WithResponsePartLimitSizeBytes(size int64) options.Execute { + return options.WithResponsePartLimitSizeBytes(size) +} + func WithCallOptions(opts ...grpc.CallOption) options.Execute { return options.WithCallOptions(opts...) } diff --git a/tests/integration/query_execute_test.go b/tests/integration/query_execute_test.go index dd7583240..d3b7de485 100644 --- a/tests/integration/query_execute_test.go +++ b/tests/integration/query_execute_test.go @@ -409,3 +409,94 @@ func TestIssue1456TooManyUnknownTransactions(t *testing.T) { wg.Wait() }) } + +func TestQueryHelperQueryResultSet(t *testing.T) { + t.Run("FailOnSecondResultSet", func(t *testing.T) { + scope := newScope(t) + + var secondRowError error + err := scope.Driver().Query().DoTx(scope.Ctx, func(ctx context.Context, tx query.TxActor) error { + rs, err := tx.QueryResultSet(ctx, "SELECT 1; SELECT 2") + if err != nil { + return err + } + + _, err = rs.NextRow(ctx) + if err != nil { + return err + } + + _, secondRowError = rs.NextRow(ctx) + + return nil + }) + require.NoError(t, err) + require.Error(t, secondRowError) + require.NotErrorIs(t, secondRowError, io.EOF) + }) +} + +func TestQueryPartLimiter(t *testing.T) { + scope := newScope(t) + + var readPartCount int + scope.Driver(ydb.WithTraceQuery(trace.Query{ + OnResultNextPart: func(info trace.QueryResultNextPartStartInfo) func(info trace.QueryResultNextPartDoneInfo) { + return func(info trace.QueryResultNextPartDoneInfo) { + if info.Error == nil { + readPartCount++ + } + } + }, + })) + + targetCount := 1000 + items := make([]types.Value, 0, targetCount) + for i := 0; i < targetCount; i++ { + item := types.StructValue( + types.StructFieldValue("val", types.Int64Value(int64(i))), + ) + items = append(items, item) + } + + getPartCount := func(partSize int64) int { + partCount := 0 + err := scope.Driver().Query().DoTx(scope.Ctx, func(ctx context.Context, tx query.TxActor) error { + oldParts := readPartCount + rs, err := tx.QueryResultSet(ctx, ` +DECLARE $arg AS List>; + +SELECT * FROM AS_TABLE($arg); +`, + query.WithParameters(ydb.ParamsBuilder().Param("$arg").Any(types.ListValue(items...)).Build()), + query.WithResponsePartLimitSizeBytes(partSize), + ) + if err != nil { + return err + } + + rowCount := 0 + for { + _, err = rs.NextRow(scope.Ctx) + if errors.Is(err, io.EOF) { + break + } + require.NoError(t, err) + rowCount++ + } + require.Equal(t, targetCount, rowCount) + + partCount = readPartCount - oldParts + return nil + }) + + require.NoError(t, err) + return partCount + } + + partsWithBigSize := getPartCount(1000000) + partsWithLittleSize := getPartCount(100) + + require.Equal(t, 1, partsWithBigSize) + require.Greater(t, partsWithLittleSize, 1) +} diff --git a/tests/integration/query_regression_test.go b/tests/integration/query_regression_test.go index e4a7cd0ec..95c8408f3 100644 --- a/tests/integration/query_regression_test.go +++ b/tests/integration/query_regression_test.go @@ -5,9 +5,10 @@ package integration import ( "context" + "errors" "fmt" + "io" "os" - "slices" "strconv" "strings" "testing" @@ -16,11 +17,13 @@ import ( "github.com/stretchr/testify/require" "github.com/ydb-platform/ydb-go-sdk/v3" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options" "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/tx" "github.com/ydb-platform/ydb-go-sdk/v3/internal/version" "github.com/ydb-platform/ydb-go-sdk/v3/query" "github.com/ydb-platform/ydb-go-sdk/v3/table" "github.com/ydb-platform/ydb-go-sdk/v3/table/types" + "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) func TestUUIDSerializationQueryServiceIssue1501(t *testing.T) { @@ -311,9 +314,19 @@ SELECT CAST($arg1 AS Utf8) AS v1, CAST($arg2 AS Utf8) AS v2 func TestReadTwoPartsIntoMemoryIssue1559(t *testing.T) { scope := newScope(t) + var readPartCount int + scope.Driver(ydb.WithTraceQuery(trace.Query{ + OnResultNextPart: func(info trace.QueryResultNextPartStartInfo) func(info trace.QueryResultNextPartDoneInfo) { + return func(info trace.QueryResultNextPartDoneInfo) { + if info.Error == nil { + readPartCount++ + } + } + }, + })) + // prepare data - const targetCount = 100000 // must be more then returned in one part - const batchSize = 20000 + const targetCount = 1000 // must be more then returned in one part items := make([]types.Value, 0, targetCount) for i := 0; i < targetCount; i++ { item := types.StructValue( @@ -323,14 +336,10 @@ func TestReadTwoPartsIntoMemoryIssue1559(t *testing.T) { items = append(items, item) } - t.Log("inserting items to a table") - batches := slices.Chunk(items, batchSize) - for batch := range batches { - err := scope.Driver().Table().Do(scope.Ctx, func(ctx context.Context, s table.Session) error { - return s.BulkUpsert(ctx, scope.TablePath(), types.ListValue(batch...)) - }) - require.NoError(t, err) - } + err := scope.Driver().Table().Do(scope.Ctx, func(ctx context.Context, s table.Session) error { + return s.BulkUpsert(ctx, scope.TablePath(), types.ListValue(items...)) + }) + require.NoError(t, err) q := fmt.Sprintf("SELECT COUNT(*) FROM `%s`", scope.TablePath()) @@ -347,21 +356,29 @@ func TestReadTwoPartsIntoMemoryIssue1559(t *testing.T) { var rows []query.Row // reproduce the problem + var partReaded int scope.Driver().Query().DoTx(scope.Ctx, func(ctx context.Context, tx query.TxActor) error { - rs, err := tx.QueryResultSet(scope.Ctx, q) + oldCOunt := readPartCount + rs, err := tx.QueryResultSet(scope.Ctx, q, options.WithResponsePartLimitSizeBytes(100)) if err != nil { return err } rows = make([]query.Row, 0, targetCount) - for row, err := range rs.Rows(scope.Ctx) { + for { + row, err := rs.NextRow(ctx) + if errors.Is(err, io.EOF) { + break + } + require.NoError(t, err) rows = append(rows, row) } + partReaded = readPartCount - oldCOunt return nil }) require.NoError(t, err) - require.Equal(t, targetCount, len(rows)) + require.Greater(t, partReaded, 1) }