Skip to content

Commit

Permalink
fix(ydbcp): make conditional operation update
Browse files Browse the repository at this point in the history
  • Loading branch information
ulya-sidorina committed Oct 3, 2024
1 parent e57ba3b commit de2a512
Show file tree
Hide file tree
Showing 14 changed files with 178 additions and 64 deletions.
84 changes: 78 additions & 6 deletions internal/connectors/db/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"errors"
"fmt"
"path"
"reflect"
"time"

"ydbcp/internal/config"
Expand All @@ -14,6 +16,7 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3"
"github.com/ydb-platform/ydb-go-sdk/v3/balancers"
"github.com/ydb-platform/ydb-go-sdk/v3/table"
"github.com/ydb-platform/ydb-go-sdk/v3/table/options"
"github.com/ydb-platform/ydb-go-sdk/v3/table/result"
table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types"
"go.uber.org/zap"
Expand All @@ -33,6 +36,11 @@ var (
),
table.CommitTx(),
)
writeTxNoCommit = table.TxControl(
table.BeginTx(
table.WithSerializableReadWrite(),
),
)
)

type DBConnector interface {
Expand All @@ -49,7 +57,7 @@ type DBConnector interface {
[]*types.BackupSchedule, error,
)
ActiveOperations(context.Context) ([]types.Operation, error)
UpdateOperation(context.Context, types.Operation) error
UpdateOperation(context.Context, types.Operation, types.OperationState) error
CreateOperation(context.Context, types.Operation) (string, error)
CreateBackup(context.Context, types.Backup) (string, error)
UpdateBackup(context context.Context, id string, backupState string) error
Expand Down Expand Up @@ -211,16 +219,80 @@ func (d *YdbConnector) ExecuteUpsert(ctx context.Context, queryBuilder queries.W
}
err = d.GetTableClient().Do(
ctx, func(ctx context.Context, s table.Session) (err error) {
_, _, err = s.Execute(
var (
txControl *table.TransactionControl
statsOption options.ExecuteDataQueryOption
)

needToCheckQueryStats := len(queryFormat.ExpectedUpdateStats) > 0
if needToCheckQueryStats {
txControl = writeTxNoCommit
statsOption = options.WithCollectStatsModeBasic()
} else {
txControl = writeTx
statsOption = options.WithCollectStatsModeNone()
}

tx, res, err := s.Execute(
ctx,
writeTx,
txControl,
queryFormat.QueryText,
queryFormat.QueryParams,
statsOption,
)
if err != nil {
return err
}
return nil

defer func(res result.Result) {
err = res.Close()
if err != nil {
xlog.Error(ctx, "Error closing transaction result", zap.Error(err))
}
}(res) // result must be closed

if !needToCheckQueryStats {
return nil
}

if stats := res.Stats(); stats != nil {
updateStats := make(map[string]uint64)

for {
phaseStats, ok := stats.NextPhase()
if !ok {
break
}

for {
tableStats, ook := phaseStats.NextTableAccess()
if !ook {
break
}

_, tableName := path.Split(tableStats.Name)
// We can't receive modifications stats before commit.
// This query only contains modifications (upserts or updates),
// so we can be sure that all reads operations are related to updates.
// We can use this assumption to calculate updated rows count.
updateStats[tableName] += tableStats.Reads.Rows
}
}

if !reflect.DeepEqual(updateStats, queryFormat.ExpectedUpdateStats) {
xlog.Error(ctx, "Expected updated rows count does not match actual",
zap.Any("expected", queryFormat.ExpectedUpdateStats),
zap.Any("actual", updateStats),
)
return tx.Rollback(ctx)
}
} else {
xlog.Error(ctx, "Empty stats for upsert query")
return tx.Rollback(ctx)
}

_, err = tx.CommitTx(ctx)
return err
},
)
if err != nil {
Expand Down Expand Up @@ -303,15 +375,15 @@ func (d *YdbConnector) ActiveOperations(ctx context.Context) (
}

func (d *YdbConnector) UpdateOperation(
ctx context.Context, operation types.Operation,
ctx context.Context, operation types.Operation, prevState types.OperationState,
) error {
if operation.GetAudit() != nil && operation.GetAudit().CompletedAt != nil {
operation.SetUpdatedAt(operation.GetAudit().CompletedAt)
} else {
operation.SetUpdatedAt(timestamppb.Now())
}

return d.ExecuteUpsert(ctx, queries.NewWriteTableQuery().WithUpdateOperation(operation))
return d.ExecuteUpsert(ctx, queries.NewWriteTableQuery().WithUpdateOperation(operation, prevState))
}

func (d *YdbConnector) CreateOperation(
Expand Down
9 changes: 7 additions & 2 deletions internal/connectors/db/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,20 @@ func (c *MockDBConnector) ActiveOperations(_ context.Context) (
}

func (c *MockDBConnector) UpdateOperation(
_ context.Context, op types.Operation,
_ context.Context, op types.Operation, prevState types.OperationState,
) error {
c.guard.Lock()
defer c.guard.Unlock()

if _, exist := c.operations[op.GetID()]; !exist {
return fmt.Errorf("update nonexistent operation %s", types.OperationToString(op))
}
c.operations[op.GetID()] = op.Copy()
if c.operations[op.GetID()].GetState() == prevState {
c.operations[op.GetID()] = op.Copy()
} else {
return fmt.Errorf("operation state was changed %s", types.OperationToString(op))
}

return nil
}

Expand Down
5 changes: 3 additions & 2 deletions internal/connectors/db/yql/queries/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ type QueryFilter struct {
}

type FormatQueryResult struct {
QueryText string
QueryParams *table.QueryParameters
QueryText string
QueryParams *table.QueryParameters
ExpectedUpdateStats map[string]uint64
}

type ReadTableQuery interface {
Expand Down
38 changes: 31 additions & 7 deletions internal/connectors/db/yql/queries/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type WriteTableQuery interface {
WithCreateOperation(operation types.Operation) WriteTableQuery
WithCreateBackupSchedule(schedule types.BackupSchedule) WriteTableQuery
WithUpdateBackup(backup types.Backup) WriteTableQuery
WithUpdateOperation(operation types.Operation) WriteTableQuery
WithUpdateOperation(operation types.Operation, prevState types.OperationState) WriteTableQuery
WithUpdateBackupSchedule(schedule types.BackupSchedule) WriteTableQuery
}

Expand All @@ -35,6 +35,8 @@ type WriteSingleTableQueryImpl struct {
upsertFields []string
tableQueryParams []table.ParameterOption
updateParam *table.ParameterOption
filterFields []string
filterParams []table.ParameterOption
}

type WriteTableQueryImplOption func(*WriteTableQueryImpl)
Expand Down Expand Up @@ -192,7 +194,7 @@ func BuildCreateOperationQuery(operation types.Operation, index int) WriteSingle
return d
}

func BuildUpdateOperationQuery(operation types.Operation, index int) WriteSingleTableQueryImpl {
func BuildUpdateOperationQuery(operation types.Operation, index int, prevState types.OperationState) WriteSingleTableQueryImpl {
d := WriteSingleTableQueryImpl{
index: index,
tableName: "Operations",
Expand All @@ -217,6 +219,16 @@ func BuildUpdateOperationQuery(operation types.Operation, index int) WriteSingle
table_types.TimestampValueFromTime(operation.GetUpdatedAt().AsTime()),
)
}

d.filterFields = append(d.filterFields, "status")
d.filterParams = append(
d.filterParams,
table.ValueParam(
fmt.Sprintf("%s_%d", "$prev_status", d.index),
table_types.StringValueFromString(prevState.String()),
),
)

return d
}

Expand Down Expand Up @@ -381,9 +393,9 @@ func (d *WriteTableQueryImpl) WithUpdateBackup(backup types.Backup) WriteTableQu
return d
}

func (d *WriteTableQueryImpl) WithUpdateOperation(operation types.Operation) WriteTableQuery {
func (d *WriteTableQueryImpl) WithUpdateOperation(operation types.Operation, prevState types.OperationState) WriteTableQuery {
index := len(d.tableQueries)
d.tableQueries = append(d.tableQueries, BuildUpdateOperationQuery(operation, index))
d.tableQueries = append(d.tableQueries, BuildUpdateOperationQuery(operation, index, prevState))
return d
}

Expand Down Expand Up @@ -439,25 +451,36 @@ func ProcessUpdateQuery(
for i := range t.upsertFields {
updates = append(updates, fmt.Sprintf("%s = %s", t.upsertFields[i], paramNames[i]))
}

filters := make([]string, 0)
filters = append(filters, keyParam)
for i := range t.filterFields {
filters = append(filters, fmt.Sprintf("%s = %s", t.filterFields[i], t.filterParams[i].Name()))
}

*queryStrings = append(
*queryStrings, fmt.Sprintf(
"UPDATE %s SET %s WHERE %s", t.tableName, strings.Join(updates, ", "), keyParam,
`UPDATE %s SET %s WHERE %s`,
t.tableName, strings.Join(updates, ", "), strings.Join(filters, " AND "),
),
)
*allParams = append(*allParams, *t.updateParam)
*allParams = append(*allParams, t.tableQueryParams...)
*allParams = append(*allParams, t.filterParams...)
return nil
}

func (d *WriteTableQueryImpl) FormatQuery(ctx context.Context) (*FormatQueryResult, error) {
queryStrings := make([]string, 0)
allParams := make([]table.ParameterOption, 0)
expectedUpdateStats := make(map[string]uint64)
for _, t := range d.tableQueries {
var err error
if t.updateParam == nil {
err = ProcessUpsertQuery(&queryStrings, &allParams, &t)
} else {
err = ProcessUpdateQuery(&queryStrings, &allParams, &t)
expectedUpdateStats[t.tableName]++
}
if err != nil {
return nil, err
Expand All @@ -466,7 +489,8 @@ func (d *WriteTableQueryImpl) FormatQuery(ctx context.Context) (*FormatQueryResu
res := strings.Join(queryStrings, ";\n")
xlog.Debug(ctx, "write query", zap.String("yql", res))
return &FormatQueryResult{
QueryText: res,
QueryParams: table.NewQueryParameters(allParams...),
QueryText: res,
QueryParams: table.NewQueryParameters(allParams...),
ExpectedUpdateStats: expectedUpdateStats,
}, nil
}
2 changes: 1 addition & 1 deletion internal/connectors/db/yql/queries/write_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (w *WriteTableQueryMock) WithUpdateBackup(backup types.Backup) WriteTableQu
return w
}

func (w *WriteTableQueryMock) WithUpdateOperation(operation types.Operation) WriteTableQuery {
func (w *WriteTableQueryMock) WithUpdateOperation(operation types.Operation, _ types.OperationState) WriteTableQuery {
w.Operation = operation
return w
}
Expand Down
11 changes: 9 additions & 2 deletions internal/connectors/db/yql/queries/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
func TestQueryBuilder_UpdateUpdate(t *testing.T) {
const (
queryString = `UPDATE Backups SET status = $status_0, message = $message_0 WHERE id = $id_0;
UPDATE Operations SET status = $status_1, message = $message_1 WHERE id = $id_1`
UPDATE Operations SET status = $status_1, message = $message_1 WHERE id = $id_1 AND status = $prev_status_1`
)
opId := types.GenerateObjectID()
backupId := types.GenerateObjectID()
Expand All @@ -35,7 +35,7 @@ UPDATE Operations SET status = $status_1, message = $message_1 WHERE id = $id_1`
}
builder := NewWriteTableQuery().
WithUpdateBackup(backup).
WithUpdateOperation(&op)
WithUpdateOperation(&op, types.OperationStateRunning)
var (
queryParams = table.NewQueryParameters(
table.ValueParam("$id_0", table_types.StringValueFromString(backupId)),
Expand All @@ -44,15 +44,22 @@ UPDATE Operations SET status = $status_1, message = $message_1 WHERE id = $id_1`
table.ValueParam("$id_1", table_types.StringValueFromString(opId)),
table.ValueParam("$status_1", table_types.StringValueFromString("Done")),
table.ValueParam("$message_1", table_types.StringValueFromString("Abcde")),
table.ValueParam("$prev_status_1", table_types.StringValueFromString(types.OperationStateRunning.String())),
)
)
expectedUpdateStats := map[string]uint64{
"Backups": 1,
"Operations": 1,
}

query, err := builder.FormatQuery(context.Background())
assert.Empty(t, err)
assert.Equal(
t, queryString, query.QueryText,
"bad query format",
)
assert.Equal(t, queryParams, query.QueryParams, "bad query params")
assert.Equal(t, expectedUpdateStats, query.ExpectedUpdateStats, "bad update stats")
}

func TestQueryBuilder_CreateCreate(t *testing.T) {
Expand Down
11 changes: 6 additions & 5 deletions internal/handlers/delete_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,14 @@ func DBOperationHandler(
Status: types.BackupStateUnknown,
}

prevState := operation.GetState()
if deadlineExceeded(dbOp.Audit.CreatedAt, config) {
backupToWrite.Status = types.BackupStateError
operation.SetState(types.OperationStateError)
operation.SetMessage("Operation deadline exceeded")
operation.GetAudit().CompletedAt = timestamppb.Now()
return db.ExecuteUpsert(
ctx, queryBuilderFactory().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite),
ctx, queryBuilderFactory().WithUpdateOperation(operation, prevState).WithUpdateBackup(backupToWrite),
)
}

Expand All @@ -84,14 +85,14 @@ func DBOperationHandler(
operation.SetState(types.OperationStateError)
operation.SetMessage("Backup not found")
operation.GetAudit().CompletedAt = timestamppb.Now()
return db.UpdateOperation(ctx, operation)
return db.UpdateOperation(ctx, operation, prevState)
}

if backups[0].Status != types.BackupStateDeleting {
operation.SetState(types.OperationStateError)
operation.SetMessage(fmt.Sprintf("Unexpected backup status: %s", backups[0].Status))
operation.GetAudit().CompletedAt = timestamppb.Now()
return db.UpdateOperation(ctx, operation)
return db.UpdateOperation(ctx, operation, prevState)
}

deleteBackup := func(pathPrefix string, bucket string) error {
Expand All @@ -118,7 +119,7 @@ func DBOperationHandler(
case types.OperationStatePending:
{
operation.SetState(types.OperationStateRunning)
err := db.UpdateOperation(ctx, operation)
err := db.UpdateOperation(ctx, operation, prevState)
if err != nil {
return fmt.Errorf("can't update operation: %v", err)
}
Expand All @@ -140,6 +141,6 @@ func DBOperationHandler(
}

return db.ExecuteUpsert(
ctx, queryBuilderFactory().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite),
ctx, queryBuilderFactory().WithUpdateOperation(operation, prevState).WithUpdateBackup(backupToWrite),
)
}
Loading

0 comments on commit de2a512

Please sign in to comment.