diff --git a/cmd/integration/main.go b/cmd/integration/main.go index 724b3c71..b9c9da37 100644 --- a/cmd/integration/main.go +++ b/cmd/integration/main.go @@ -4,6 +4,7 @@ import ( "context" "log" "time" + "ydbcp/internal/types" pb "ydbcp/pkg/proto/ydbcp/v1alpha1" diff --git a/cmd/ydbcp/main.go b/cmd/ydbcp/main.go index 4e850ccb..9f476e06 100644 --- a/cmd/ydbcp/main.go +++ b/cmd/ydbcp/main.go @@ -8,25 +8,25 @@ import ( "os/signal" "sync" "syscall" - "ydbcp/internal/connectors/s3" - "ydbcp/internal/server/services/backup_schedule" - - "go.uber.org/automaxprocs/maxprocs" - "go.uber.org/zap" "ydbcp/internal/auth" "ydbcp/internal/config" "ydbcp/internal/connectors/client" "ydbcp/internal/connectors/db" "ydbcp/internal/connectors/db/yql/queries" + "ydbcp/internal/connectors/s3" "ydbcp/internal/handlers" "ydbcp/internal/processor" "ydbcp/internal/server" "ydbcp/internal/server/services/backup" + "ydbcp/internal/server/services/backup_schedule" "ydbcp/internal/server/services/operation" "ydbcp/internal/types" "ydbcp/internal/util/xlog" ap "ydbcp/pkg/plugins/auth" + + "go.uber.org/automaxprocs/maxprocs" + "go.uber.org/zap" ) func main() { @@ -136,7 +136,7 @@ func main() { if err := handlersRegistry.Add( types.OperationTypeDB, - handlers.NewDBOperationHandler(dbConnector, s3Connector, configInstance, queries.NewWriteTableQuery(ctx)), + handlers.NewDBOperationHandler(dbConnector, s3Connector, configInstance, queries.NewWriteTableQuery), ); err != nil { xlog.Error(ctx, "failed to register DB handler", zap.Error(err)) os.Exit(1) diff --git a/internal/auth/dummy.go b/internal/auth/dummy.go index 359fee9f..d43fb8ad 100644 --- a/internal/auth/dummy.go +++ b/internal/auth/dummy.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "ydbcp/internal/util/xlog" "ydbcp/pkg/plugins/auth" diff --git a/internal/auth/mock.go b/internal/auth/mock.go index fec8b590..2ade1e03 100644 --- a/internal/auth/mock.go +++ b/internal/auth/mock.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "ydbcp/internal/util/xlog" "ydbcp/pkg/plugins/auth" diff --git a/internal/config/config.go b/internal/config/config.go index 16add49a..a94806e1 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -7,6 +7,7 @@ import ( "os" "regexp" "strings" + "ydbcp/internal/util/xlog" "go.uber.org/zap" diff --git a/internal/connectors/client/connector.go b/internal/connectors/client/connector.go index 27905ad0..0c95055c 100644 --- a/internal/connectors/client/connector.go +++ b/internal/connectors/client/connector.go @@ -7,6 +7,7 @@ import ( "regexp" "strings" "time" + "ydbcp/internal/config" "ydbcp/internal/types" "ydbcp/internal/util/xlog" @@ -15,10 +16,6 @@ import ( "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" - - "github.com/ydb-platform/ydb-go-sdk/v3/retry" - "github.com/ydb-platform/ydb-go-sdk/v3/scheme" - "github.com/ydb-platform/ydb-go-genproto/Ydb_Export_V1" "github.com/ydb-platform/ydb-go-genproto/Ydb_Import_V1" "github.com/ydb-platform/ydb-go-genproto/Ydb_Operation_V1" @@ -28,6 +25,8 @@ import ( "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Operations" "github.com/ydb-platform/ydb-go-sdk/v3" "github.com/ydb-platform/ydb-go-sdk/v3/balancers" + "github.com/ydb-platform/ydb-go-sdk/v3/retry" + "github.com/ydb-platform/ydb-go-sdk/v3/scheme" "go.uber.org/zap" "google.golang.org/protobuf/types/known/durationpb" ) diff --git a/internal/connectors/client/mock.go b/internal/connectors/client/mock.go index e97393e1..a45c65d3 100644 --- a/internal/connectors/client/mock.go +++ b/internal/connectors/client/mock.go @@ -4,12 +4,12 @@ import ( "context" "fmt" "path" - "ydbcp/internal/types" - "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Issue" + "ydbcp/internal/types" "github.com/google/uuid" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Issue" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Operations" "github.com/ydb-platform/ydb-go-sdk/v3" ) diff --git a/internal/connectors/db/connector.go b/internal/connectors/db/connector.go index b55a4301..c1db6ab9 100644 --- a/internal/connectors/db/connector.go +++ b/internal/connectors/db/connector.go @@ -4,21 +4,20 @@ import ( "context" "errors" "fmt" - "google.golang.org/protobuf/types/known/timestamppb" "time" + "ydbcp/internal/config" "ydbcp/internal/connectors/db/yql/queries" "ydbcp/internal/types" - - table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types" + "ydbcp/internal/util/xlog" "github.com/ydb-platform/ydb-go-sdk/v3" "github.com/ydb-platform/ydb-go-sdk/v3/balancers" "github.com/ydb-platform/ydb-go-sdk/v3/table" "github.com/ydb-platform/ydb-go-sdk/v3/table/result" + table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types" "go.uber.org/zap" - - "ydbcp/internal/util/xlog" + "google.golang.org/protobuf/types/known/timestamppb" ) var ( @@ -36,8 +35,6 @@ var ( ) ) -var ErrUnimplemented = errors.New("unimplemented") - type DBConnector interface { GetTableClient() table.Client SelectBackups(ctx context.Context, queryBuilder queries.ReadTableQuery) ( @@ -319,14 +316,14 @@ func (d *YdbConnector) UpdateOperation( operation.SetUpdatedAt(timestamppb.Now()) } - return d.ExecuteUpsert(ctx, queries.NewWriteTableQuery(ctx).WithUpdateOperation(operation)) + return d.ExecuteUpsert(ctx, queries.NewWriteTableQuery().WithUpdateOperation(operation)) } func (d *YdbConnector) CreateOperation( ctx context.Context, operation types.Operation, ) (string, error) { operation.SetID(types.GenerateObjectID()) - err := d.ExecuteUpsert(ctx, queries.NewWriteTableQuery(ctx).WithCreateOperation(operation)) + err := d.ExecuteUpsert(ctx, queries.NewWriteTableQuery().WithCreateOperation(operation)) if err != nil { return "", err } @@ -338,7 +335,7 @@ func (d *YdbConnector) CreateBackup( ) (string, error) { id := types.GenerateObjectID() backup.ID = id - err := d.ExecuteUpsert(ctx, queries.NewWriteTableQuery(ctx).WithCreateBackup(backup)) + err := d.ExecuteUpsert(ctx, queries.NewWriteTableQuery().WithCreateBackup(backup)) if err != nil { return "", err } @@ -352,5 +349,5 @@ func (d *YdbConnector) UpdateBackup( ID: id, Status: backupStatus, } - return d.ExecuteUpsert(ctx, queries.NewWriteTableQuery(ctx).WithUpdateBackup(backup)) + return d.ExecuteUpsert(ctx, queries.NewWriteTableQuery().WithUpdateBackup(backup)) } diff --git a/internal/connectors/db/mock.go b/internal/connectors/db/mock.go index 7b2cd140..3a848f38 100644 --- a/internal/connectors/db/mock.go +++ b/internal/connectors/db/mock.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "sync" + "ydbcp/internal/connectors/db/yql/queries" "ydbcp/internal/types" diff --git a/internal/connectors/db/process_result_set.go b/internal/connectors/db/process_result_set.go index dbebc501..bdcccb6f 100644 --- a/internal/connectors/db/process_result_set.go +++ b/internal/connectors/db/process_result_set.go @@ -4,13 +4,13 @@ import ( "fmt" "strings" "time" + "ydbcp/internal/types" pb "ydbcp/pkg/proto/ydbcp/v1alpha1" - "google.golang.org/protobuf/types/known/durationpb" - "github.com/ydb-platform/ydb-go-sdk/v3/table/result" "github.com/ydb-platform/ydb-go-sdk/v3/table/result/named" + "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" ) diff --git a/internal/connectors/db/yql/queries/read.go b/internal/connectors/db/yql/queries/read.go index 82f40058..cef2695c 100644 --- a/internal/connectors/db/yql/queries/read.go +++ b/internal/connectors/db/yql/queries/read.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "strings" + "ydbcp/internal/util/xlog" "github.com/ydb-platform/ydb-go-sdk/v3/table" diff --git a/internal/connectors/db/yql/queries/write.go b/internal/connectors/db/yql/queries/write.go index 12607df4..6b43cbf6 100644 --- a/internal/connectors/db/yql/queries/write.go +++ b/internal/connectors/db/yql/queries/write.go @@ -4,7 +4,9 @@ import ( "context" "errors" "fmt" + "log" "strings" + "ydbcp/internal/types" "ydbcp/internal/util/xlog" @@ -24,7 +26,6 @@ type WriteTableQuery interface { } type WriteTableQueryImpl struct { - ctx context.Context tableQueries []WriteSingleTableQueryImpl } @@ -36,6 +37,9 @@ type WriteSingleTableQueryImpl struct { updateParam *table.ParameterOption } +type WriteTableQueryImplOption func(*WriteTableQueryImpl) +type WriteQueryBulderFactory func() WriteTableQuery + func (d *WriteSingleTableQueryImpl) AddValueParam(name string, value table_types.Value) { d.upsertFields = append(d.upsertFields, name[1:]) d.tableQueryParams = append(d.tableQueryParams, table.ValueParam(fmt.Sprintf("%s_%d", name, d.index), value)) @@ -55,7 +59,7 @@ func (d *WriteSingleTableQueryImpl) GetParamNames() []string { return res } -func BuildCreateOperationQuery(ctx context.Context, operation types.Operation, index int) WriteSingleTableQueryImpl { +func BuildCreateOperationQuery(operation types.Operation, index int) WriteSingleTableQueryImpl { d := WriteSingleTableQueryImpl{ index: index, tableName: "Operations", @@ -90,8 +94,7 @@ func BuildCreateOperationQuery(ctx context.Context, operation types.Operation, i if operation.GetType() == types.OperationTypeTB { tb, ok := operation.(*types.TakeBackupOperation) if !ok { - xlog.Error(ctx, "error cast operation to TakeBackupOperation", zap.String("operation_id", operation.GetID())) - return d + log.Fatalf("error cast operation to TakeBackupOperation operation_id %s", operation.GetID()) } d.AddValueParam( @@ -125,8 +128,7 @@ func BuildCreateOperationQuery(ctx context.Context, operation types.Operation, i } else if operation.GetType() == types.OperationTypeRB { rb, ok := operation.(*types.RestoreBackupOperation) if !ok { - xlog.Error(ctx, "error cast operation to RestoreBackupOperation", zap.String("operation_id", operation.GetID())) - return d + log.Fatalf("error cast operation to RestoreBackupOperation operation_id %s", operation.GetID()) } d.AddValueParam( @@ -155,8 +157,7 @@ func BuildCreateOperationQuery(ctx context.Context, operation types.Operation, i } else if operation.GetType() == types.OperationTypeDB { db, ok := operation.(*types.DeleteBackupOperation) if !ok { - xlog.Error(ctx, "error cast operation to DeleteBackupOperation", zap.String("operation_id", operation.GetID())) - return d + log.Fatalf("error cast operation to DeleteBackupOperation operation_id %s", operation.GetID()) } d.AddValueParam( @@ -181,7 +182,11 @@ func BuildCreateOperationQuery(ctx context.Context, operation types.Operation, i d.AddValueParam("$paths", table_types.StringValueFromString(db.PathPrefix)) } else { - xlog.Error(ctx, "unknown operation type write to db", zap.String("operation_type", string(operation.GetType()))) + log.Fatalf( + "unknown operation type write to db operation_id %s, operation_type %s", + operation.GetID(), + operation.GetType().String(), + ) } return d @@ -357,12 +362,8 @@ func BuildUpdateBackupScheduleQuery(schedule types.BackupSchedule, index int) Wr return d } -type WriteTableQueryImplOption func(*WriteTableQueryImpl) - -type WriteTableQueryMockOption func(*WriteTableQueryMock) - -func NewWriteTableQuery(ctx context.Context) WriteTableQuery { - return &WriteTableQueryImpl{ctx: ctx} +func NewWriteTableQuery() WriteTableQuery { + return &WriteTableQueryImpl{} } func (d *WriteTableQueryImpl) WithCreateBackup(backup types.Backup) WriteTableQuery { @@ -385,7 +386,7 @@ func (d *WriteTableQueryImpl) WithUpdateOperation(operation types.Operation) Wri func (d *WriteTableQueryImpl) WithCreateOperation(operation types.Operation) WriteTableQuery { index := len(d.tableQueries) - d.tableQueries = append(d.tableQueries, BuildCreateOperationQuery(d.ctx, operation, index)) + d.tableQueries = append(d.tableQueries, BuildCreateOperationQuery(operation, index)) return d } diff --git a/internal/connectors/db/yql/queries/write_mock.go b/internal/connectors/db/yql/queries/write_mock.go index ff033680..ae1eff79 100644 --- a/internal/connectors/db/yql/queries/write_mock.go +++ b/internal/connectors/db/yql/queries/write_mock.go @@ -2,6 +2,7 @@ package queries import ( "context" + "ydbcp/internal/types" ) @@ -11,7 +12,9 @@ type WriteTableQueryMock struct { BackupSchedule types.BackupSchedule } -func NewWriteTableQueryMock(_ context.Context) WriteTableQuery { +type WriteTableQueryMockOption func(*WriteTableQueryMock) + +func NewWriteTableQueryMock() WriteTableQuery { return &WriteTableQueryMock{} } diff --git a/internal/connectors/db/yql/queries/write_test.go b/internal/connectors/db/yql/queries/write_test.go index f4bbff69..175736ac 100644 --- a/internal/connectors/db/yql/queries/write_test.go +++ b/internal/connectors/db/yql/queries/write_test.go @@ -5,16 +5,15 @@ import ( "strings" "testing" "time" + "ydbcp/internal/types" pb "ydbcp/pkg/proto/ydbcp/v1alpha1" - "google.golang.org/protobuf/types/known/durationpb" - - "google.golang.org/protobuf/types/known/timestamppb" - "github.com/stretchr/testify/assert" "github.com/ydb-platform/ydb-go-sdk/v3/table" table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types" + "google.golang.org/protobuf/types/known/durationpb" + "google.golang.org/protobuf/types/known/timestamppb" ) func TestQueryBuilder_UpdateUpdate(t *testing.T) { @@ -34,7 +33,7 @@ UPDATE Operations SET status = $status_1, message = $message_1 WHERE id = $id_1` Status: "Available", Message: "Message", } - builder := NewWriteTableQuery(context.Background()). + builder := NewWriteTableQuery(). WithUpdateBackup(backup). WithUpdateOperation(&op) var ( @@ -97,7 +96,7 @@ UPSERT INTO Operations (id, type, status, message, initiated, created_at, contai CreatedAt: timestamppb.Now(), }, } - builder := NewWriteTableQuery(context.Background()). + builder := NewWriteTableQuery(). WithCreateBackup(backup). WithCreateOperation(&tbOp) var ( @@ -168,7 +167,6 @@ func TestQueryBuilder_UpdateCreate(t *testing.T) { queryString = `UPDATE Backups SET status = $status_0, message = $message_0 WHERE id = $id_0; UPSERT INTO Operations (id, type, status, message, initiated, created_at, container_id, database, endpoint, backup_id, operation_id, paths, paths_to_exclude) VALUES ($id_1, $type_1, $status_1, $message_1, $initiated_1, $created_at_1, $container_id_1, $database_1, $endpoint_1, $backup_id_1, $operation_id_1, $paths_1, $paths_to_exclude_1)` ) - ctx := context.Background() opId := types.GenerateObjectID() backupId := types.GenerateObjectID() tbOp := types.TakeBackupOperation{ @@ -193,7 +191,7 @@ UPSERT INTO Operations (id, type, status, message, initiated, created_at, contai Status: "Available", Message: "Success", } - builder := NewWriteTableQuery(ctx). + builder := NewWriteTableQuery(). WithUpdateBackup(backup). WithCreateOperation(&tbOp) var ( @@ -260,7 +258,6 @@ func TestQueryBuilder_CreateBackupSchedule(t *testing.T) { const ( queryString = `UPSERT INTO BackupSchedules (id, container_id, database, endpoint, name, active, crontab, ttl, paths, initiated, created_at, recovery_point_objective, last_backup_id, next_launch) VALUES ($id_0, $container_id_0, $database_0, $endpoint_0, $name_0, $active_0, $crontab_0, $ttl_0, $paths_0, $initiated_0, $created_at_0, $recovery_point_objective_0, $last_backup_id_0, $next_launch_0)` ) - ctx := context.Background() scID := types.GenerateObjectID() bID := types.GenerateObjectID() now := time.Now() @@ -284,7 +281,7 @@ func TestQueryBuilder_CreateBackupSchedule(t *testing.T) { LastSuccessfulBackupID: nil, RecoveryPoint: nil, } - builder := NewWriteTableQuery(ctx).WithCreateBackupSchedule(schedule) + builder := NewWriteTableQuery().WithCreateBackupSchedule(schedule) var ( queryParams = table.NewQueryParameters( table.ValueParam("$id_0", table_types.StringValueFromString(scID)), diff --git a/internal/connectors/s3/connector.go b/internal/connectors/s3/connector.go index df3f4469..e3c5c87f 100644 --- a/internal/connectors/s3/connector.go +++ b/internal/connectors/s3/connector.go @@ -3,6 +3,7 @@ package s3 import ( "fmt" "strings" + "ydbcp/internal/config" "github.com/aws/aws-sdk-go/aws" diff --git a/internal/handlers/delete_backup.go b/internal/handlers/delete_backup.go index e97fb923..a7184eed 100644 --- a/internal/handlers/delete_backup.go +++ b/internal/handlers/delete_backup.go @@ -3,6 +3,7 @@ package handlers import ( "context" "fmt" + "ydbcp/internal/config" "ydbcp/internal/connectors/db" "ydbcp/internal/connectors/db/yql/queries" @@ -19,10 +20,10 @@ func NewDBOperationHandler( db db.DBConnector, s3 s3.S3Connector, config config.Config, - queryBuilder queries.WriteTableQuery, + queryBulderFactory queries.WriteQueryBulderFactory, ) types.OperationHandler { return func(ctx context.Context, op types.Operation) error { - return DBOperationHandler(ctx, op, db, s3, config, queryBuilder) + return DBOperationHandler(ctx, op, db, s3, config, queryBulderFactory) } } @@ -32,7 +33,7 @@ func DBOperationHandler( db db.DBConnector, s3 s3.S3Connector, config config.Config, - queryBuilder queries.WriteTableQuery, + queryBulderFactory queries.WriteQueryBulderFactory, ) error { xlog.Info(ctx, "DBOperationHandler", zap.String("OperationMessage", operation.GetMessage())) @@ -59,7 +60,7 @@ func DBOperationHandler( operation.SetMessage("Operation deadline exceeded") operation.GetAudit().CompletedAt = timestamppb.Now() return db.ExecuteUpsert( - ctx, queryBuilder.WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), + ctx, queryBulderFactory().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), ) } @@ -110,7 +111,7 @@ func DBOperationHandler( backupToWrite.Status = types.BackupStateDeleting operation.SetState(types.OperationStateRunning) err := db.ExecuteUpsert( - ctx, queryBuilder.WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), + ctx, queryBulderFactory().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), ) if err != nil { return fmt.Errorf("can't update operation: %v", err) @@ -133,6 +134,6 @@ func DBOperationHandler( } return db.ExecuteUpsert( - ctx, queryBuilder.WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), + ctx, queryBulderFactory().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), ) } diff --git a/internal/handlers/delete_backup_test.go b/internal/handlers/delete_backup_test.go index 38e2013d..418784ec 100644 --- a/internal/handlers/delete_backup_test.go +++ b/internal/handlers/delete_backup_test.go @@ -3,6 +3,7 @@ package handlers import ( "context" "testing" + "ydbcp/internal/config" "ydbcp/internal/connectors/db" "ydbcp/internal/connectors/db/yql/queries" @@ -50,7 +51,7 @@ func TestDBOperationHandlerDeadlineExceededForRunningOperation(t *testing.T) { handler := NewDBOperationHandler( dbConnector, s3Connector, config.Config{ OperationTtlSeconds: 0, - }, queries.NewWriteTableQueryMock(ctx), + }, queries.NewWriteTableQueryMock, ) err := handler(ctx, &dbOp) @@ -120,7 +121,7 @@ func TestDBOperationHandlerPendingOperationCompletedSuccessfully(t *testing.T) { handler := NewDBOperationHandler( dbConnector, s3Connector, config.Config{ OperationTtlSeconds: 1000, - }, queries.NewWriteTableQueryMock(ctx), + }, queries.NewWriteTableQueryMock, ) err := handler(ctx, &dbOp) @@ -195,7 +196,7 @@ func TestDBOperationHandlerRunningOperationCompletedSuccessfully(t *testing.T) { handler := NewDBOperationHandler( dbConnector, s3Connector, config.Config{ OperationTtlSeconds: 1000, - }, queries.NewWriteTableQueryMock(ctx), + }, queries.NewWriteTableQueryMock, ) err := handler(ctx, &dbOp) diff --git a/internal/handlers/restore_backup.go b/internal/handlers/restore_backup.go index f5eb4e64..9a8e10c5 100644 --- a/internal/handlers/restore_backup.go +++ b/internal/handlers/restore_backup.go @@ -3,16 +3,16 @@ package handlers import ( "context" "fmt" + "ydbcp/internal/config" "ydbcp/internal/connectors/client" "ydbcp/internal/connectors/db" "ydbcp/internal/types" "ydbcp/internal/util/xlog" - "google.golang.org/protobuf/types/known/timestamppb" - "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" "go.uber.org/zap" + "google.golang.org/protobuf/types/known/timestamppb" ) func NewRBOperationHandler( diff --git a/internal/handlers/restore_backup_test.go b/internal/handlers/restore_backup_test.go index 2a1dd97d..51b9dd8d 100644 --- a/internal/handlers/restore_backup_test.go +++ b/internal/handlers/restore_backup_test.go @@ -3,17 +3,17 @@ package handlers import ( "context" "testing" + "ydbcp/internal/config" "ydbcp/internal/connectors/client" "ydbcp/internal/connectors/db" "ydbcp/internal/types" pb "ydbcp/pkg/proto/ydbcp/v1alpha1" - "google.golang.org/protobuf/types/known/timestamppb" - "github.com/stretchr/testify/assert" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Operations" + "google.golang.org/protobuf/types/known/timestamppb" ) func TestRBOperationHandlerInvalidOperationResponse(t *testing.T) { diff --git a/internal/handlers/take_backup.go b/internal/handlers/take_backup.go index 9ba8c677..af597a68 100644 --- a/internal/handlers/take_backup.go +++ b/internal/handlers/take_backup.go @@ -3,6 +3,7 @@ package handlers import ( "context" "fmt" + "ydbcp/internal/config" "ydbcp/internal/connectors/client" "ydbcp/internal/connectors/db" @@ -12,20 +13,18 @@ import ( "ydbcp/internal/util/xlog" pb "ydbcp/pkg/proto/ydbcp/v1alpha1" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types" "go.uber.org/zap" - "google.golang.org/protobuf/types/known/timestamppb" - - "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" ) func NewTBOperationHandler( db db.DBConnector, client client.ClientConnector, s3 s3.S3Connector, config config.Config, - getQueryBuilder func(ctx context.Context) queries.WriteTableQuery, + queryBulderFactory queries.WriteQueryBulderFactory, ) types.OperationHandler { return func(ctx context.Context, op types.Operation) error { - return TBOperationHandler(ctx, op, db, client, s3, config, getQueryBuilder) + return TBOperationHandler(ctx, op, db, client, s3, config, queryBulderFactory) } } @@ -36,7 +35,7 @@ func TBOperationHandler( client client.ClientConnector, s3 s3.S3Connector, config config.Config, - getQueryBuilder func(ctx context.Context) queries.WriteTableQuery, + queryBulderFactory queries.WriteQueryBulderFactory, ) error { xlog.Info(ctx, "TBOperationHandler", zap.String("OperationMessage", operation.GetMessage())) @@ -77,7 +76,7 @@ func TBOperationHandler( backupToWrite.Message = operation.GetMessage() backupToWrite.AuditInfo.CompletedAt = now return db.ExecuteUpsert( - ctx, getQueryBuilder(ctx).WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), + ctx, queryBulderFactory().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), ) } if ydbOpResponse.opResponse == nil { @@ -159,7 +158,7 @@ func TBOperationHandler( backupToWrite.Message = operation.GetMessage() backupToWrite.AuditInfo.CompletedAt = operation.GetAudit().CompletedAt return db.ExecuteUpsert( - ctx, getQueryBuilder(ctx).WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), + ctx, queryBulderFactory().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), ) } case types.OperationStateCancelling: @@ -173,7 +172,7 @@ func TBOperationHandler( operation.GetAudit().CompletedAt = now backupToWrite.Message = operation.GetMessage() return db.ExecuteUpsert( - ctx, getQueryBuilder(ctx).WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), + ctx, queryBulderFactory().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), ) } @@ -224,6 +223,6 @@ func TBOperationHandler( backupToWrite.AuditInfo.CompletedAt = now operation.GetAudit().CompletedAt = now return db.ExecuteUpsert( - ctx, getQueryBuilder(ctx).WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), + ctx, queryBulderFactory().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), ) } diff --git a/internal/handlers/take_backup_test.go b/internal/handlers/take_backup_test.go index 2ed4a558..8da9e051 100644 --- a/internal/handlers/take_backup_test.go +++ b/internal/handlers/take_backup_test.go @@ -4,23 +4,21 @@ import ( "context" "fmt" "testing" + "ydbcp/internal/config" + "ydbcp/internal/connectors/client" + "ydbcp/internal/connectors/db" "ydbcp/internal/connectors/db/yql/queries" s3Client "ydbcp/internal/connectors/s3" + "ydbcp/internal/types" pb "ydbcp/pkg/proto/ydbcp/v1alpha1" "github.com/aws/aws-sdk-go/aws" - - "google.golang.org/protobuf/types/known/timestamppb" - - "ydbcp/internal/connectors/client" - "ydbcp/internal/connectors/db" - "ydbcp/internal/types" - "github.com/aws/aws-sdk-go/service/s3" "github.com/stretchr/testify/assert" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Operations" + "google.golang.org/protobuf/types/known/timestamppb" ) func TestTBOperationHandlerInvalidOperationResponse(t *testing.T) { diff --git a/internal/handlers/utils.go b/internal/handlers/utils.go index c028bfe9..01742101 100644 --- a/internal/handlers/utils.go +++ b/internal/handlers/utils.go @@ -4,17 +4,17 @@ import ( "context" "fmt" "time" + "ydbcp/internal/config" "ydbcp/internal/connectors/client" "ydbcp/internal/types" "ydbcp/internal/util/xlog" - "google.golang.org/protobuf/types/known/timestamppb" - "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Operations" "github.com/ydb-platform/ydb-go-sdk/v3" "go.uber.org/zap" + "google.golang.org/protobuf/types/known/timestamppb" ) func deadlineExceeded(createdAt *timestamppb.Timestamp, config config.Config) bool { diff --git a/internal/processor/processor_test.go b/internal/processor/processor_test.go index 8f96144e..9b9201b1 100644 --- a/internal/processor/processor_test.go +++ b/internal/processor/processor_test.go @@ -5,6 +5,7 @@ import ( "sync" "testing" "time" + "ydbcp/internal/connectors/db" "ydbcp/internal/types" "ydbcp/internal/util/ticker" diff --git a/internal/processor/registry.go b/internal/processor/registry.go index 36d41438..530e253a 100644 --- a/internal/processor/registry.go +++ b/internal/processor/registry.go @@ -3,6 +3,7 @@ package processor import ( "context" "fmt" + "ydbcp/internal/types" ) diff --git a/internal/processor/registry_test.go b/internal/processor/registry_test.go index bdb7dc3c..d9b916af 100644 --- a/internal/processor/registry_test.go +++ b/internal/processor/registry_test.go @@ -3,6 +3,7 @@ package processor import ( "context" "testing" + "ydbcp/internal/types" "github.com/stretchr/testify/assert" diff --git a/internal/server/grpcinfo/grpcinfo.go b/internal/server/grpcinfo/grpcinfo.go index 157fe933..861ed094 100644 --- a/internal/server/grpcinfo/grpcinfo.go +++ b/internal/server/grpcinfo/grpcinfo.go @@ -2,6 +2,7 @@ package grpcinfo import ( "context" + "ydbcp/internal/util/xlog" "github.com/google/uuid" diff --git a/internal/server/server.go b/internal/server/server.go index 53267a23..d3225f51 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -6,13 +6,13 @@ import ( "net" "sync" + "ydbcp/internal/config" + "ydbcp/internal/util/xlog" + "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/reflection" - - "ydbcp/internal/config" - "ydbcp/internal/util/xlog" ) type Server interface { diff --git a/internal/server/services/backup/backup_service.go b/internal/server/services/backup/backup_service.go index 98441ada..264d257b 100644 --- a/internal/server/services/backup/backup_service.go +++ b/internal/server/services/backup/backup_service.go @@ -6,6 +6,7 @@ import ( "regexp" "strings" "time" + "ydbcp/internal/auth" "ydbcp/internal/config" "ydbcp/internal/connectors/client" diff --git a/internal/server/services/backup/backup_service_test.go b/internal/server/services/backup/backup_service_test.go index 56bc5116..3955f489 100644 --- a/internal/server/services/backup/backup_service_test.go +++ b/internal/server/services/backup/backup_service_test.go @@ -2,6 +2,7 @@ package backup import ( "testing" + "ydbcp/internal/auth" "ydbcp/internal/config" "ydbcp/internal/connectors/client" diff --git a/internal/server/services/backup_schedule/backup_schedule_service.go b/internal/server/services/backup_schedule/backup_schedule_service.go index e044573e..a7767cee 100644 --- a/internal/server/services/backup_schedule/backup_schedule_service.go +++ b/internal/server/services/backup_schedule/backup_schedule_service.go @@ -3,6 +3,7 @@ package backup_schedule import ( "context" "time" + "ydbcp/internal/auth" "ydbcp/internal/connectors/db" "ydbcp/internal/connectors/db/yql/queries" @@ -62,7 +63,7 @@ func (s *BackupScheduleService) CreateBackupSchedule( NextLaunch: &now, } - err = s.driver.ExecuteUpsert(ctx, queries.NewWriteTableQuery(ctx).WithCreateBackupSchedule(schedule)) + err = s.driver.ExecuteUpsert(ctx, queries.NewWriteTableQuery().WithCreateBackupSchedule(schedule)) if err != nil { xlog.Error( ctx, "can't create backup schedule", zap.String("backup schedule", schedule.Proto().String()), diff --git a/internal/server/services/operation/operation_service.go b/internal/server/services/operation/operation_service.go index 25b99129..3b1f2063 100644 --- a/internal/server/services/operation/operation_service.go +++ b/internal/server/services/operation/operation_service.go @@ -2,6 +2,7 @@ package operation import ( "context" + "ydbcp/internal/auth" "ydbcp/internal/connectors/db" "ydbcp/internal/connectors/db/yql/queries" diff --git a/internal/types/backup.go b/internal/types/backup.go index 09d9ab13..4a2232af 100644 --- a/internal/types/backup.go +++ b/internal/types/backup.go @@ -2,6 +2,7 @@ package types import ( "fmt" + pb "ydbcp/pkg/proto/ydbcp/v1alpha1" "github.com/google/uuid" diff --git a/internal/types/backup_schedule.go b/internal/types/backup_schedule.go index 6f4fdac0..78908890 100644 --- a/internal/types/backup_schedule.go +++ b/internal/types/backup_schedule.go @@ -2,6 +2,7 @@ package types import ( "time" + pb "ydbcp/pkg/proto/ydbcp/v1alpha1" "google.golang.org/protobuf/types/known/durationpb" diff --git a/internal/types/operation.go b/internal/types/operation.go index 571ad8ad..f35bbefe 100644 --- a/internal/types/operation.go +++ b/internal/types/operation.go @@ -3,14 +3,15 @@ package types import ( "context" "fmt" - "google.golang.org/protobuf/types/known/timestamppb" "log" "strings" + "ydbcp/internal/util/xlog" pb "ydbcp/pkg/proto/ydbcp/v1alpha1" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Issue" "go.uber.org/zap" + "google.golang.org/protobuf/types/known/timestamppb" ) type OperationType string diff --git a/plugins/auth_nebius/auth_nebius.go b/plugins/auth_nebius/auth_nebius.go index f354bd90..2f3bbf7f 100644 --- a/plugins/auth_nebius/auth_nebius.go +++ b/plugins/auth_nebius/auth_nebius.go @@ -8,6 +8,7 @@ import ( "fmt" "os" "strings" + "ydbcp/internal/util/xlog" "ydbcp/pkg/plugins/auth" pb "ydbcp/plugins/auth_nebius/proto/iam/v1"