From d72915c784acacaba8a243113993e8072d2b4c0d Mon Sep 17 00:00:00 2001 From: ulya-sidorina Date: Thu, 26 Sep 2024 15:02:59 +0200 Subject: [PATCH] fix(ydbcp): make conditional operation update --- internal/connectors/db/connector.go | 52 +++++++++++++++++++ internal/connectors/db/yql/queries/write.go | 16 +++++- .../connectors/db/yql/queries/write_mock.go | 4 ++ 3 files changed, 71 insertions(+), 1 deletion(-) diff --git a/internal/connectors/db/connector.go b/internal/connectors/db/connector.go index 303abd09..e5c1cc9e 100644 --- a/internal/connectors/db/connector.go +++ b/internal/connectors/db/connector.go @@ -52,6 +52,7 @@ type DBConnector interface { CreateBackup(context.Context, types.Backup) (string, error) UpdateBackup(context context.Context, id string, backupState string) error ExecuteUpsert(ctx context.Context, queryBuilder queries.WriteTableQuery) error + ExecuteConditionalUpsert(ctx context.Context, queryBuilder queries.WriteTableQuery) error Close(context.Context) } @@ -228,6 +229,57 @@ func (d *YdbConnector) ExecuteUpsert(ctx context.Context, queryBuilder queries.W return nil } +func (d *YdbConnector) ExecuteConditionalUpsert(ctx context.Context, queryBuilder queries.WriteTableQuery) error { + queryFormat, err := queryBuilder.FormatQuery(ctx) + if err != nil { + return err + } + + err = d.GetTableClient().Do( + ctx, func(ctx context.Context, s table.Session) (err error) { + var res result.Result + + _, res, err = s.Execute( + ctx, + writeTx, + queryFormat.QueryText, + queryFormat.QueryParams, + ) + if err != nil { + return err + } + + defer func(res result.Result) { + err = res.Close() + if err != nil { + xlog.Error(ctx, "Error closing transaction result") + } + }(res) // result must be closed + + if res.ResultSetCount() != 1 { + return errors.New("expected 1 result set") + } + + resultSet := res.CurrentResultSet() + if resultSet == nil { + return errors.New("empty result set") + } + + if resultSet.RowCount() == 0 { + return errors.New("upsert wasn't applied") + } + + return res.Err() + }, + ) + if err != nil { + xlog.Error(ctx, "Error executing query", zap.Error(err)) + return err + } + + return nil +} + func (d *YdbConnector) SelectBackups( ctx context.Context, queryBuilder queries.ReadTableQuery, ) ([]*types.Backup, error) { diff --git a/internal/connectors/db/yql/queries/write.go b/internal/connectors/db/yql/queries/write.go index cf2a7182..98bdfaf2 100644 --- a/internal/connectors/db/yql/queries/write.go +++ b/internal/connectors/db/yql/queries/write.go @@ -17,6 +17,7 @@ import ( type WriteTableQuery interface { FormatQuery(ctx context.Context) (*FormatQueryResult, error) + WithRawQuery(rawQuery string) WriteTableQuery WithCreateBackup(backup types.Backup) WriteTableQuery WithCreateOperation(operation types.Operation) WriteTableQuery WithCreateBackupSchedule(schedule types.BackupSchedule) WriteTableQuery @@ -30,6 +31,7 @@ type WriteTableQueryImpl struct { } type WriteSingleTableQueryImpl struct { + rawQuery *string index int tableName string upsertFields []string @@ -356,6 +358,15 @@ func NewWriteTableQuery() WriteTableQuery { return &WriteTableQueryImpl{} } +func (d *WriteTableQueryImpl) WithRawQuery(rawQuery string) WriteTableQuery { + index := len(d.tableQueries) + d.tableQueries = append(d.tableQueries, WriteSingleTableQueryImpl{ + index: index, + rawQuery: &rawQuery, + }) + return d +} + func (d *WriteTableQueryImpl) WithCreateBackup(backup types.Backup) WriteTableQuery { index := len(d.tableQueries) d.tableQueries = append(d.tableQueries, BuildCreateBackupQuery(backup, index)) @@ -441,7 +452,10 @@ func (d *WriteTableQueryImpl) FormatQuery(ctx context.Context) (*FormatQueryResu allParams := make([]table.ParameterOption, 0) for _, t := range d.tableQueries { var err error - if t.updateParam == nil { + + if t.rawQuery != nil { + queryStrings = append(queryStrings, *t.rawQuery) + } else if t.updateParam == nil { err = ProcessUpsertQuery(&queryStrings, &allParams, &t) } else { err = ProcessUpdateQuery(&queryStrings, &allParams, &t) diff --git a/internal/connectors/db/yql/queries/write_mock.go b/internal/connectors/db/yql/queries/write_mock.go index ae1eff79..67ce7ae2 100644 --- a/internal/connectors/db/yql/queries/write_mock.go +++ b/internal/connectors/db/yql/queries/write_mock.go @@ -18,6 +18,10 @@ func NewWriteTableQueryMock() WriteTableQuery { return &WriteTableQueryMock{} } +func (w *WriteTableQueryMock) WithRawQuery(_ string) WriteTableQuery { + return w +} + func (w *WriteTableQueryMock) FormatQuery(_ context.Context) (*FormatQueryResult, error) { return &FormatQueryResult{}, nil }