Skip to content

Commit

Permalink
fix(ydbcp): make conditional operation update
Browse files Browse the repository at this point in the history
  • Loading branch information
ulya-sidorina committed Oct 1, 2024
1 parent fe60c5d commit cc352cc
Show file tree
Hide file tree
Showing 8 changed files with 191 additions and 35 deletions.
120 changes: 111 additions & 9 deletions internal/connectors/db/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3"
"github.com/ydb-platform/ydb-go-sdk/v3/balancers"
"github.com/ydb-platform/ydb-go-sdk/v3/table"
"github.com/ydb-platform/ydb-go-sdk/v3/table/options"
"github.com/ydb-platform/ydb-go-sdk/v3/table/result"
table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types"
"go.uber.org/zap"
Expand All @@ -33,6 +34,11 @@ var (
),
table.CommitTx(),
)
writeTxNoCommit = table.TxControl(
table.BeginTx(
table.WithSerializableReadWrite(),
),
)
)

type DBConnector interface {
Expand All @@ -47,11 +53,12 @@ type DBConnector interface {
[]*types.BackupSchedule, error,
)
ActiveOperations(context.Context) ([]types.Operation, error)
UpdateOperation(context.Context, types.Operation) error
UpdateOperation(context.Context, types.Operation, types.OperationState) error
CreateOperation(context.Context, types.Operation) (string, error)
CreateBackup(context.Context, types.Backup) (string, error)
UpdateBackup(context context.Context, id string, backupState string) error
ExecuteUpsert(ctx context.Context, queryBuilder queries.WriteTableQuery) error
ExecuteConditionalUpsert(ctx context.Context, queryBuilder queries.WriteTableQuery) error
Close(context.Context)
}

Expand Down Expand Up @@ -202,14 +209,89 @@ func DoInterfaceSelect[T any](
return entities, nil
}

func (d *YdbConnector) ExecuteUpsert(ctx context.Context, queryBuilder queries.WriteTableQuery) error {
func (d *YdbConnector) ExecuteUpsert(ctx context.Context, queryBuilder queries.WriteTableQuery, checkStats bool) error {
queryFormat, err := queryBuilder.FormatQuery(ctx)
if err != nil {
return err
}
err = d.GetTableClient().Do(
ctx, func(ctx context.Context, s table.Session) (err error) {
_, _, err = s.Execute(
var (
txControl *table.TransactionControl
statsOption options.ExecuteDataQueryOption
affectedRows uint64
)

if checkStats {
txControl = writeTxNoCommit
statsOption = options.WithCollectStatsModeBasic()
} else {
txControl = writeTx
statsOption = options.WithCollectStatsModeNone()
}

tx, res, err := s.Execute(
ctx,
txControl,
queryFormat.QueryText,
queryFormat.QueryParams,
statsOption,
)
if err != nil {
return err
}

defer func(res result.Result) {
err = res.Close()
if err != nil {
xlog.Error(ctx, "Error closing transaction result")
}
}(res) // result must be closed

if !checkStats {
return nil
}

stats := res.Stats()
for phaseStats, ok := stats.NextPhase(); ok; phaseStats, ok = stats.NextPhase() {
for tableStats, ok := phaseStats.NextTableAccess(); ok; tableStats, ok = phaseStats.NextTableAccess() {
if tableStats.Name == "Operations" {
affectedRows = tableStats.Updates.Rows
}
}
}

if affectedRows == 0 {
xlog.Error(ctx, "upsert wasn't applied, 0 rows were affected")
return tx.Rollback(ctx)
}

_, err = tx.CommitTx(ctx)
return err
},
)
if err != nil {
xlog.Error(ctx, "Error executing query", zap.Error(err))
return err
}
return nil
}

func (d *YdbConnector) ExecuteConditionalUpsert(ctx context.Context, queryBuilder queries.WriteTableQuery) error {
queryFormat, err := queryBuilder.FormatQuery(ctx)
if err != nil {
return err
}

var affectedRows int

err = d.GetTableClient().Do(
ctx, func(ctx context.Context, s table.Session) (err error) {
var res result.Result

xlog.Error(ctx, queryFormat.QueryText)

_, res, err = s.Execute(
ctx,
writeTx,
queryFormat.QueryText,
Expand All @@ -218,13 +300,33 @@ func (d *YdbConnector) ExecuteUpsert(ctx context.Context, queryBuilder queries.W
if err != nil {
return err
}
return nil

defer func(res result.Result) {
err = res.Close()
if err != nil {
xlog.Error(ctx, "Error closing transaction result")
}
}(res) // result must be closed

if !res.NextResultSet(ctx) || !res.NextRow() {
affectedRows = 0
} else {
affectedRows = res.CurrentResultSet().RowCount()
}

return res.Err()
},
)
if err != nil {
xlog.Error(ctx, "Error executing query", zap.Error(err))
return err
}

if affectedRows == 0 {
xlog.Error(ctx, "upsert wasn't applied, 0 rows were affected")
return errors.New("upsert wasn't applied, 0 rows were affected")
}

return nil
}

Expand Down Expand Up @@ -286,22 +388,22 @@ func (d *YdbConnector) ActiveOperations(ctx context.Context) (
}

func (d *YdbConnector) UpdateOperation(
ctx context.Context, operation types.Operation,
ctx context.Context, operation types.Operation, prevState types.OperationState,
) error {
if operation.GetAudit() != nil && operation.GetAudit().CompletedAt != nil {
operation.SetUpdatedAt(operation.GetAudit().CompletedAt)
} else {
operation.SetUpdatedAt(timestamppb.Now())
}

return d.ExecuteUpsert(ctx, queries.NewWriteTableQuery().WithUpdateOperation(operation))
return d.ExecuteUpsert(ctx, queries.NewWriteTableQuery().WithUpdateOperation(operation, prevState), true)
}

func (d *YdbConnector) CreateOperation(
ctx context.Context, operation types.Operation,
) (string, error) {
operation.SetID(types.GenerateObjectID())
err := d.ExecuteUpsert(ctx, queries.NewWriteTableQuery().WithCreateOperation(operation))
err := d.ExecuteUpsert(ctx, queries.NewWriteTableQuery().WithCreateOperation(operation), false)
if err != nil {
return "", err
}
Expand All @@ -313,7 +415,7 @@ func (d *YdbConnector) CreateBackup(
) (string, error) {
id := types.GenerateObjectID()
backup.ID = id
err := d.ExecuteUpsert(ctx, queries.NewWriteTableQuery().WithCreateBackup(backup))
err := d.ExecuteUpsert(ctx, queries.NewWriteTableQuery().WithCreateBackup(backup), false)
if err != nil {
return "", err
}
Expand All @@ -327,5 +429,5 @@ func (d *YdbConnector) UpdateBackup(
ID: id,
Status: backupStatus,
}
return d.ExecuteUpsert(ctx, queries.NewWriteTableQuery().WithUpdateBackup(backup))
return d.ExecuteUpsert(ctx, queries.NewWriteTableQuery().WithUpdateBackup(backup), false)
}
13 changes: 11 additions & 2 deletions internal/connectors/db/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,20 @@ func (c *MockDBConnector) ActiveOperations(_ context.Context) (
}

func (c *MockDBConnector) UpdateOperation(
_ context.Context, op types.Operation,
_ context.Context, op types.Operation, prevState types.OperationState,
) error {
c.guard.Lock()
defer c.guard.Unlock()

if _, exist := c.operations[op.GetID()]; !exist {
return fmt.Errorf("update nonexistent operation %s", types.OperationToString(op))
}
c.operations[op.GetID()] = op.Copy()
if c.operations[op.GetID()].GetState() == prevState {
c.operations[op.GetID()] = op.Copy()
} else {
return fmt.Errorf("operation state was changed %s", types.OperationToString(op))
}

return nil
}

Expand Down Expand Up @@ -214,3 +219,7 @@ func (c *MockDBConnector) ExecuteUpsert(_ context.Context, queryBuilder queries.
c.backupSchedules[queryBuilderMock.BackupSchedule.ID] = queryBuilderMock.BackupSchedule
return nil
}

func (c *MockDBConnector) ExecuteConditionalUpsert(ctx context.Context, queryBuilder queries.WriteTableQuery) error {
return c.ExecuteUpsert(ctx, queryBuilder)
}
52 changes: 45 additions & 7 deletions internal/connectors/db/yql/queries/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,29 @@ import (

type WriteTableQuery interface {
FormatQuery(ctx context.Context) (*FormatQueryResult, error)
WithRawQuery(rawQuery string) WriteTableQuery
WithCreateBackup(backup types.Backup) WriteTableQuery
WithCreateOperation(operation types.Operation) WriteTableQuery
WithCreateBackupSchedule(schedule types.BackupSchedule) WriteTableQuery
WithUpdateBackup(backup types.Backup) WriteTableQuery
WithUpdateOperation(operation types.Operation) WriteTableQuery
WithUpdateOperation(operation types.Operation, prevState types.OperationState) WriteTableQuery
WithUpdateBackupSchedule(schedule types.BackupSchedule) WriteTableQuery
}

type WriteTableQueryImpl struct {
tableQueries []WriteSingleTableQueryImpl
tableQueries []WriteSingleTableQueryImpl
checkTableStats []string
}

type WriteSingleTableQueryImpl struct {
rawQuery *string
index int
tableName string
upsertFields []string
tableQueryParams []table.ParameterOption
updateParam *table.ParameterOption
readParams []table.ParameterOption
readFilters []string
}

type WriteTableQueryImplOption func(*WriteTableQueryImpl)
Expand Down Expand Up @@ -192,7 +197,7 @@ func BuildCreateOperationQuery(operation types.Operation, index int) WriteSingle
return d
}

func BuildUpdateOperationQuery(operation types.Operation, index int) WriteSingleTableQueryImpl {
func BuildUpdateOperationQuery(operation types.Operation, index int, prevState types.OperationState) WriteSingleTableQueryImpl {
d := WriteSingleTableQueryImpl{
index: index,
tableName: "Operations",
Expand All @@ -217,6 +222,15 @@ func BuildUpdateOperationQuery(operation types.Operation, index int) WriteSingle
table_types.TimestampValueFromTime(operation.GetUpdatedAt().AsTime()),
)
}

readParam := table.ValueParam(
fmt.Sprintf("%s_%d", "$prev_status", d.index),
table_types.StringValueFromString(prevState.String()),
)

d.readParams = append(d.readParams, readParam)
d.readFilters = append(d.readFilters, fmt.Sprintf("status = %s", readParam.Name()))

return d
}

Expand Down Expand Up @@ -356,6 +370,15 @@ func NewWriteTableQuery() WriteTableQuery {
return &WriteTableQueryImpl{}
}

func (d *WriteTableQueryImpl) WithRawQuery(rawQuery string) WriteTableQuery {
index := len(d.tableQueries)
d.tableQueries = append(d.tableQueries, WriteSingleTableQueryImpl{
index: index,
rawQuery: &rawQuery,
})
return d
}

func (d *WriteTableQueryImpl) WithCreateBackup(backup types.Backup) WriteTableQuery {
index := len(d.tableQueries)
d.tableQueries = append(d.tableQueries, BuildCreateBackupQuery(backup, index))
Expand All @@ -368,9 +391,10 @@ func (d *WriteTableQueryImpl) WithUpdateBackup(backup types.Backup) WriteTableQu
return d
}

func (d *WriteTableQueryImpl) WithUpdateOperation(operation types.Operation) WriteTableQuery {
func (d *WriteTableQueryImpl) WithUpdateOperation(operation types.Operation, prevState types.OperationState) WriteTableQuery {
index := len(d.tableQueries)
d.tableQueries = append(d.tableQueries, BuildUpdateOperationQuery(operation, index))
d.tableQueries = append(d.tableQueries, BuildUpdateOperationQuery(operation, index, prevState))
d.checkTableStats = append(d.checkTableStats, d.tableQueries[index].tableName)
return d
}

Expand Down Expand Up @@ -422,17 +446,28 @@ func ProcessUpdateQuery(
}
paramNames := t.GetParamNames()
keyParam := fmt.Sprintf("id = %s", (*t.updateParam).Name())
t.readFilters = append(t.readFilters, keyParam)

updates := make([]string, 0)
for i := range t.upsertFields {
updates = append(updates, fmt.Sprintf("%s = %s", t.upsertFields[i], paramNames[i]))
}

reads := make([]string, 0)
for i := range t.upsertFields {
reads = append(reads, fmt.Sprintf("%s AS %s", paramNames[i], t.upsertFields[i]))
}

*queryStrings = append(
*queryStrings, fmt.Sprintf(
"UPDATE %s SET %s WHERE %s", t.tableName, strings.Join(updates, ", "), keyParam,
`$rows_to_update = SELECT id, %s FROM %s WHERE %s;
UPDATE %s ON SELECT * FROM $rows_to_update;`,
strings.Join(reads, ", "), t.tableName, strings.Join(t.readFilters, " AND "), t.tableName,
),
)
*allParams = append(*allParams, *t.updateParam)
*allParams = append(*allParams, t.tableQueryParams...)
*allParams = append(*allParams, t.readParams...)
return nil
}

Expand All @@ -441,7 +476,10 @@ func (d *WriteTableQueryImpl) FormatQuery(ctx context.Context) (*FormatQueryResu
allParams := make([]table.ParameterOption, 0)
for _, t := range d.tableQueries {
var err error
if t.updateParam == nil {

if t.rawQuery != nil {
queryStrings = append(queryStrings, *t.rawQuery)
} else if t.updateParam == nil {
err = ProcessUpsertQuery(&queryStrings, &allParams, &t)
} else {
err = ProcessUpdateQuery(&queryStrings, &allParams, &t)
Expand Down
4 changes: 4 additions & 0 deletions internal/connectors/db/yql/queries/write_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ func NewWriteTableQueryMock() WriteTableQuery {
return &WriteTableQueryMock{}
}

func (w *WriteTableQueryMock) WithRawQuery(_ string) WriteTableQuery {
return w
}

func (w *WriteTableQueryMock) FormatQuery(_ context.Context) (*FormatQueryResult, error) {
return &FormatQueryResult{}, nil
}
Expand Down
Loading

0 comments on commit cc352cc

Please sign in to comment.