Skip to content

Commit

Permalink
fix(ydbcp): make conditional operation update
Browse files Browse the repository at this point in the history
  • Loading branch information
ulya-sidorina committed Sep 26, 2024
1 parent fe60c5d commit d72915c
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 1 deletion.
52 changes: 52 additions & 0 deletions internal/connectors/db/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type DBConnector interface {
CreateBackup(context.Context, types.Backup) (string, error)
UpdateBackup(context context.Context, id string, backupState string) error
ExecuteUpsert(ctx context.Context, queryBuilder queries.WriteTableQuery) error
ExecuteConditionalUpsert(ctx context.Context, queryBuilder queries.WriteTableQuery) error
Close(context.Context)
}

Expand Down Expand Up @@ -228,6 +229,57 @@ func (d *YdbConnector) ExecuteUpsert(ctx context.Context, queryBuilder queries.W
return nil
}

func (d *YdbConnector) ExecuteConditionalUpsert(ctx context.Context, queryBuilder queries.WriteTableQuery) error {
queryFormat, err := queryBuilder.FormatQuery(ctx)
if err != nil {
return err
}

err = d.GetTableClient().Do(
ctx, func(ctx context.Context, s table.Session) (err error) {
var res result.Result

_, res, err = s.Execute(
ctx,
writeTx,
queryFormat.QueryText,
queryFormat.QueryParams,
)
if err != nil {
return err
}

defer func(res result.Result) {
err = res.Close()
if err != nil {
xlog.Error(ctx, "Error closing transaction result")
}
}(res) // result must be closed

if res.ResultSetCount() != 1 {
return errors.New("expected 1 result set")
}

resultSet := res.CurrentResultSet()
if resultSet == nil {
return errors.New("empty result set")
}

if resultSet.RowCount() == 0 {
return errors.New("upsert wasn't applied")
}

return res.Err()
},
)
if err != nil {
xlog.Error(ctx, "Error executing query", zap.Error(err))
return err
}

return nil
}

func (d *YdbConnector) SelectBackups(
ctx context.Context, queryBuilder queries.ReadTableQuery,
) ([]*types.Backup, error) {
Expand Down
16 changes: 15 additions & 1 deletion internal/connectors/db/yql/queries/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

type WriteTableQuery interface {
FormatQuery(ctx context.Context) (*FormatQueryResult, error)
WithRawQuery(rawQuery string) WriteTableQuery
WithCreateBackup(backup types.Backup) WriteTableQuery
WithCreateOperation(operation types.Operation) WriteTableQuery
WithCreateBackupSchedule(schedule types.BackupSchedule) WriteTableQuery
Expand All @@ -30,6 +31,7 @@ type WriteTableQueryImpl struct {
}

type WriteSingleTableQueryImpl struct {
rawQuery *string
index int
tableName string
upsertFields []string
Expand Down Expand Up @@ -356,6 +358,15 @@ func NewWriteTableQuery() WriteTableQuery {
return &WriteTableQueryImpl{}
}

func (d *WriteTableQueryImpl) WithRawQuery(rawQuery string) WriteTableQuery {
index := len(d.tableQueries)
d.tableQueries = append(d.tableQueries, WriteSingleTableQueryImpl{
index: index,
rawQuery: &rawQuery,
})
return d
}

func (d *WriteTableQueryImpl) WithCreateBackup(backup types.Backup) WriteTableQuery {
index := len(d.tableQueries)
d.tableQueries = append(d.tableQueries, BuildCreateBackupQuery(backup, index))
Expand Down Expand Up @@ -441,7 +452,10 @@ func (d *WriteTableQueryImpl) FormatQuery(ctx context.Context) (*FormatQueryResu
allParams := make([]table.ParameterOption, 0)
for _, t := range d.tableQueries {
var err error
if t.updateParam == nil {

if t.rawQuery != nil {
queryStrings = append(queryStrings, *t.rawQuery)
} else if t.updateParam == nil {
err = ProcessUpsertQuery(&queryStrings, &allParams, &t)
} else {
err = ProcessUpdateQuery(&queryStrings, &allParams, &t)
Expand Down
4 changes: 4 additions & 0 deletions internal/connectors/db/yql/queries/write_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ func NewWriteTableQueryMock() WriteTableQuery {
return &WriteTableQueryMock{}
}

func (w *WriteTableQueryMock) WithRawQuery(_ string) WriteTableQuery {
return w
}

func (w *WriteTableQueryMock) FormatQuery(_ context.Context) (*FormatQueryResult, error) {
return &FormatQueryResult{}, nil
}
Expand Down

0 comments on commit d72915c

Please sign in to comment.