Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ydbcp): check status before update operation #72

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 88 additions & 6 deletions internal/connectors/db/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"errors"
"fmt"
"path"
"reflect"
"time"

"ydbcp/internal/config"
Expand All @@ -14,6 +16,7 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3"
"github.com/ydb-platform/ydb-go-sdk/v3/balancers"
"github.com/ydb-platform/ydb-go-sdk/v3/table"
"github.com/ydb-platform/ydb-go-sdk/v3/table/options"
"github.com/ydb-platform/ydb-go-sdk/v3/table/result"
table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types"
"go.uber.org/zap"
Expand All @@ -33,6 +36,11 @@ var (
),
table.CommitTx(),
)
writeTxNoCommit = table.TxControl(
table.BeginTx(
table.WithSerializableReadWrite(),
),
)
)

type DBConnector interface {
Expand All @@ -49,7 +57,7 @@ type DBConnector interface {
[]*types.BackupSchedule, error,
)
ActiveOperations(context.Context) ([]types.Operation, error)
UpdateOperation(context.Context, types.Operation) error
UpdateOperation(context.Context, types.Operation, types.OperationState) error
CreateOperation(context.Context, types.Operation) (string, error)
CreateBackup(context.Context, types.Backup) (string, error)
UpdateBackup(context context.Context, id string, backupState string) error
Expand Down Expand Up @@ -209,24 +217,98 @@ func (d *YdbConnector) ExecuteUpsert(ctx context.Context, queryBuilder queries.W
if err != nil {
return err
}

txWasRolledBack := false
err = d.GetTableClient().Do(
ctx, func(ctx context.Context, s table.Session) (err error) {
_, _, err = s.Execute(
var (
txControl *table.TransactionControl
statsOption options.ExecuteDataQueryOption
)

needToCheckQueryStats := len(queryFormat.ExpectedUpdateStats) > 0
if needToCheckQueryStats {
txControl = writeTxNoCommit
statsOption = options.WithCollectStatsModeBasic()
} else {
txControl = writeTx
statsOption = options.WithCollectStatsModeNone()
}

tx, res, err := s.Execute(
ctx,
writeTx,
txControl,
queryFormat.QueryText,
queryFormat.QueryParams,
statsOption,
)
if err != nil {
return err
}
return nil

defer func(res result.Result) {
err = res.Close()
if err != nil {
xlog.Error(ctx, "Error closing transaction result", zap.Error(err))
}
}(res) // result must be closed

if !needToCheckQueryStats {
return nil
}

if stats := res.Stats(); stats != nil {
updateStats := make(map[string]uint64)

for {
phaseStats, ok := stats.NextPhase()
if !ok {
break
}

for {
tableStats, ook := phaseStats.NextTableAccess()
if !ook {
break
}

_, tableName := path.Split(tableStats.Name)
// We can't receive modifications stats before commit.
// This query only contains modifications (upserts or updates),
// so we can be sure that all reads operations are related to updates.
// We can use this assumption to calculate updated rows count.
if tableStats.Reads.Rows > 0 {
updateStats[tableName] += tableStats.Reads.Rows
}
}
}

if !reflect.DeepEqual(updateStats, queryFormat.ExpectedUpdateStats) {
xlog.Error(ctx, "Expected updated rows count does not match actual",
zap.Any("expected", queryFormat.ExpectedUpdateStats),
zap.Any("actual", updateStats),
)
txWasRolledBack = true
return tx.Rollback(ctx)
}
} else {
xlog.Error(ctx, "Empty stats for upsert query")
txWasRolledBack = true
return tx.Rollback(ctx)
}

_, err = tx.CommitTx(ctx)
return err
},
)
if err != nil {
xlog.Error(ctx, "Error executing query", zap.Error(err))
return err
}

if txWasRolledBack {
return errors.New("transaction wasn't committed")
}
return nil
}

Expand Down Expand Up @@ -303,15 +385,15 @@ func (d *YdbConnector) ActiveOperations(ctx context.Context) (
}

func (d *YdbConnector) UpdateOperation(
ctx context.Context, operation types.Operation,
ctx context.Context, operation types.Operation, prevState types.OperationState,
) error {
if operation.GetAudit() != nil && operation.GetAudit().CompletedAt != nil {
operation.SetUpdatedAt(operation.GetAudit().CompletedAt)
} else {
operation.SetUpdatedAt(timestamppb.Now())
}

return d.ExecuteUpsert(ctx, queries.NewWriteTableQuery().WithUpdateOperation(operation))
return d.ExecuteUpsert(ctx, queries.NewWriteTableQuery().WithUpdateOperation(operation, prevState))
}

func (d *YdbConnector) CreateOperation(
Expand Down
9 changes: 7 additions & 2 deletions internal/connectors/db/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,20 @@ func (c *MockDBConnector) ActiveOperations(_ context.Context) (
}

func (c *MockDBConnector) UpdateOperation(
_ context.Context, op types.Operation,
_ context.Context, op types.Operation, prevState types.OperationState,
) error {
c.guard.Lock()
defer c.guard.Unlock()

if _, exist := c.operations[op.GetID()]; !exist {
return fmt.Errorf("update nonexistent operation %s", types.OperationToString(op))
}
c.operations[op.GetID()] = op.Copy()
if c.operations[op.GetID()].GetState() == prevState {
c.operations[op.GetID()] = op.Copy()
} else {
return fmt.Errorf("operation state was changed %s", types.OperationToString(op))
}

return nil
}

Expand Down
5 changes: 3 additions & 2 deletions internal/connectors/db/yql/queries/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ type QueryFilter struct {
}

type FormatQueryResult struct {
QueryText string
QueryParams *table.QueryParameters
QueryText string
QueryParams *table.QueryParameters
ExpectedUpdateStats map[string]uint64
}

type ReadTableQuery interface {
Expand Down
38 changes: 31 additions & 7 deletions internal/connectors/db/yql/queries/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type WriteTableQuery interface {
WithCreateOperation(operation types.Operation) WriteTableQuery
WithCreateBackupSchedule(schedule types.BackupSchedule) WriteTableQuery
WithUpdateBackup(backup types.Backup) WriteTableQuery
WithUpdateOperation(operation types.Operation) WriteTableQuery
WithUpdateOperation(operation types.Operation, prevState types.OperationState) WriteTableQuery
WithUpdateBackupSchedule(schedule types.BackupSchedule) WriteTableQuery
}

Expand All @@ -35,6 +35,8 @@ type WriteSingleTableQueryImpl struct {
upsertFields []string
tableQueryParams []table.ParameterOption
updateParam *table.ParameterOption
filterFields []string
filterParams []table.ParameterOption
}

type WriteTableQueryImplOption func(*WriteTableQueryImpl)
Expand Down Expand Up @@ -192,7 +194,7 @@ func BuildCreateOperationQuery(operation types.Operation, index int) WriteSingle
return d
}

func BuildUpdateOperationQuery(operation types.Operation, index int) WriteSingleTableQueryImpl {
func BuildUpdateOperationQuery(operation types.Operation, index int, prevState types.OperationState) WriteSingleTableQueryImpl {
d := WriteSingleTableQueryImpl{
index: index,
tableName: "Operations",
Expand All @@ -217,6 +219,16 @@ func BuildUpdateOperationQuery(operation types.Operation, index int) WriteSingle
table_types.TimestampValueFromTime(operation.GetUpdatedAt().AsTime()),
)
}

d.filterFields = append(d.filterFields, "status")
d.filterParams = append(
d.filterParams,
table.ValueParam(
fmt.Sprintf("%s_%d", "$prev_status", d.index),
table_types.StringValueFromString(prevState.String()),
),
)

return d
}

Expand Down Expand Up @@ -381,9 +393,9 @@ func (d *WriteTableQueryImpl) WithUpdateBackup(backup types.Backup) WriteTableQu
return d
}

func (d *WriteTableQueryImpl) WithUpdateOperation(operation types.Operation) WriteTableQuery {
func (d *WriteTableQueryImpl) WithUpdateOperation(operation types.Operation, prevState types.OperationState) WriteTableQuery {
index := len(d.tableQueries)
d.tableQueries = append(d.tableQueries, BuildUpdateOperationQuery(operation, index))
d.tableQueries = append(d.tableQueries, BuildUpdateOperationQuery(operation, index, prevState))
return d
}

Expand Down Expand Up @@ -439,25 +451,36 @@ func ProcessUpdateQuery(
for i := range t.upsertFields {
updates = append(updates, fmt.Sprintf("%s = %s", t.upsertFields[i], paramNames[i]))
}

filters := make([]string, 0)
filters = append(filters, keyParam)
for i := range t.filterFields {
filters = append(filters, fmt.Sprintf("%s = %s", t.filterFields[i], t.filterParams[i].Name()))
}

*queryStrings = append(
*queryStrings, fmt.Sprintf(
"UPDATE %s SET %s WHERE %s", t.tableName, strings.Join(updates, ", "), keyParam,
`UPDATE %s SET %s WHERE %s`,
t.tableName, strings.Join(updates, ", "), strings.Join(filters, " AND "),
),
)
*allParams = append(*allParams, *t.updateParam)
*allParams = append(*allParams, t.tableQueryParams...)
*allParams = append(*allParams, t.filterParams...)
return nil
}

func (d *WriteTableQueryImpl) FormatQuery(ctx context.Context) (*FormatQueryResult, error) {
queryStrings := make([]string, 0)
allParams := make([]table.ParameterOption, 0)
expectedUpdateStats := make(map[string]uint64)
for _, t := range d.tableQueries {
var err error
if t.updateParam == nil {
err = ProcessUpsertQuery(&queryStrings, &allParams, &t)
} else {
err = ProcessUpdateQuery(&queryStrings, &allParams, &t)
expectedUpdateStats[t.tableName]++
}
if err != nil {
return nil, err
Expand All @@ -466,7 +489,8 @@ func (d *WriteTableQueryImpl) FormatQuery(ctx context.Context) (*FormatQueryResu
res := strings.Join(queryStrings, ";\n")
xlog.Debug(ctx, "write query", zap.String("yql", res))
return &FormatQueryResult{
QueryText: res,
QueryParams: table.NewQueryParameters(allParams...),
QueryText: res,
QueryParams: table.NewQueryParameters(allParams...),
ExpectedUpdateStats: expectedUpdateStats,
}, nil
}
2 changes: 1 addition & 1 deletion internal/connectors/db/yql/queries/write_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (w *WriteTableQueryMock) WithUpdateBackup(backup types.Backup) WriteTableQu
return w
}

func (w *WriteTableQueryMock) WithUpdateOperation(operation types.Operation) WriteTableQuery {
func (w *WriteTableQueryMock) WithUpdateOperation(operation types.Operation, _ types.OperationState) WriteTableQuery {
w.Operation = operation
return w
}
Expand Down
11 changes: 9 additions & 2 deletions internal/connectors/db/yql/queries/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
func TestQueryBuilder_UpdateUpdate(t *testing.T) {
const (
queryString = `UPDATE Backups SET status = $status_0, message = $message_0 WHERE id = $id_0;
UPDATE Operations SET status = $status_1, message = $message_1 WHERE id = $id_1`
UPDATE Operations SET status = $status_1, message = $message_1 WHERE id = $id_1 AND status = $prev_status_1`
)
opId := types.GenerateObjectID()
backupId := types.GenerateObjectID()
Expand All @@ -35,7 +35,7 @@ UPDATE Operations SET status = $status_1, message = $message_1 WHERE id = $id_1`
}
builder := NewWriteTableQuery().
WithUpdateBackup(backup).
WithUpdateOperation(&op)
WithUpdateOperation(&op, types.OperationStateRunning)
var (
queryParams = table.NewQueryParameters(
table.ValueParam("$id_0", table_types.StringValueFromString(backupId)),
Expand All @@ -44,15 +44,22 @@ UPDATE Operations SET status = $status_1, message = $message_1 WHERE id = $id_1`
table.ValueParam("$id_1", table_types.StringValueFromString(opId)),
table.ValueParam("$status_1", table_types.StringValueFromString("Done")),
table.ValueParam("$message_1", table_types.StringValueFromString("Abcde")),
table.ValueParam("$prev_status_1", table_types.StringValueFromString(types.OperationStateRunning.String())),
)
)
expectedUpdateStats := map[string]uint64{
"Backups": 1,
"Operations": 1,
}

query, err := builder.FormatQuery(context.Background())
assert.Empty(t, err)
assert.Equal(
t, queryString, query.QueryText,
"bad query format",
)
assert.Equal(t, queryParams, query.QueryParams, "bad query params")
assert.Equal(t, expectedUpdateStats, query.ExpectedUpdateStats, "bad update stats")
}

func TestQueryBuilder_CreateCreate(t *testing.T) {
Expand Down
11 changes: 6 additions & 5 deletions internal/handlers/delete_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,14 @@ func DBOperationHandler(
Status: types.BackupStateUnknown,
}

prevState := operation.GetState()
if deadlineExceeded(dbOp.Audit.CreatedAt, config) {
backupToWrite.Status = types.BackupStateError
operation.SetState(types.OperationStateError)
operation.SetMessage("Operation deadline exceeded")
operation.GetAudit().CompletedAt = timestamppb.Now()
return db.ExecuteUpsert(
ctx, queryBuilderFactory().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite),
ctx, queryBuilderFactory().WithUpdateOperation(operation, prevState).WithUpdateBackup(backupToWrite),
)
}

Expand All @@ -84,14 +85,14 @@ func DBOperationHandler(
operation.SetState(types.OperationStateError)
operation.SetMessage("Backup not found")
operation.GetAudit().CompletedAt = timestamppb.Now()
return db.UpdateOperation(ctx, operation)
return db.UpdateOperation(ctx, operation, prevState)
}

if backups[0].Status != types.BackupStateDeleting {
operation.SetState(types.OperationStateError)
operation.SetMessage(fmt.Sprintf("Unexpected backup status: %s", backups[0].Status))
operation.GetAudit().CompletedAt = timestamppb.Now()
return db.UpdateOperation(ctx, operation)
return db.UpdateOperation(ctx, operation, prevState)
}

deleteBackup := func(pathPrefix string, bucket string) error {
Expand All @@ -118,7 +119,7 @@ func DBOperationHandler(
case types.OperationStatePending:
{
operation.SetState(types.OperationStateRunning)
err := db.UpdateOperation(ctx, operation)
err := db.UpdateOperation(ctx, operation, prevState)
if err != nil {
return fmt.Errorf("can't update operation: %v", err)
}
Expand All @@ -140,6 +141,6 @@ func DBOperationHandler(
}

return db.ExecuteUpsert(
ctx, queryBuilderFactory().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite),
ctx, queryBuilderFactory().WithUpdateOperation(operation, prevState).WithUpdateBackup(backupToWrite),
)
}
Loading
Loading