Skip to content

Commit

Permalink
Enable TBWR operations
Browse files Browse the repository at this point in the history
  • Loading branch information
Aleksei Pleshakov authored and qrort committed Nov 11, 2024
1 parent 840957d commit b064fbf
Show file tree
Hide file tree
Showing 15 changed files with 278 additions and 185 deletions.
28 changes: 25 additions & 3 deletions cmd/integration/make_backup/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func main() {
}
}(conn)
client := pb.NewBackupServiceClient(conn)
opClient := pb.NewOperationServiceClient(conn)
backups, err := client.ListBackups(
context.Background(), &pb.ListBackupsRequest{
ContainerId: containerID,
Expand Down Expand Up @@ -61,7 +62,7 @@ func main() {
log.Panicf("unexpected error code: %v", err)
}

backupOperation, err := client.MakeBackup(
tbwr, err := client.MakeBackup(
context.Background(), &pb.MakeBackupRequest{
ContainerId: containerID,
DatabaseName: databaseName,
Expand All @@ -73,6 +74,16 @@ func main() {
if err != nil {
log.Panicf("failed to make backup: %v", err)
}
op, err := opClient.GetOperation(context.Background(), &pb.GetOperationRequest{
Id: tbwr.Id,
})
if err != nil {
log.Panicf("failed to get operation: %v", err)
}
if op.GetType() != types.OperationTypeTBWR.String() {
log.Panicf("unexpected operation type: %v", op.GetType())
}
time.Sleep(time.Second * 11) // to wait for operation handler
backups, err = client.ListBackups(
context.Background(), &pb.ListBackupsRequest{
ContainerId: containerID,
Expand All @@ -86,6 +97,18 @@ func main() {
log.Panicf("Did not list freshly made backup")
}
backupPb := backups.Backups[0]
ops, err := opClient.ListOperations(context.Background(), &pb.ListOperationsRequest{
ContainerId: containerID,
DatabaseNameMask: databaseName,
OperationTypes: []string{types.OperationTypeTB.String()},
})
if err != nil {
log.Panicf("failed to list operations: %v", err)
}
if len(ops.Operations) != 1 {
log.Panicf("expected one TB operation, got %d", len(ops.Operations))
}
backupOperation := ops.Operations[0]
if backupPb.Id != backupOperation.BackupId {
log.Panicf(
"backupOperation backupID %s does not match listed backup id %s", backupOperation.BackupId, backupPb.Id,
Expand Down Expand Up @@ -121,7 +144,6 @@ func main() {
if err != nil {
log.Panicf("failed to make restore: %v", err)
}
opClient := pb.NewOperationServiceClient(conn)
done = false
for range 30 {
op, err := opClient.GetOperation(
Expand Down Expand Up @@ -249,7 +271,7 @@ func main() {
}

newScheduleName := "schedule-2.0"
newSourcePath := databaseName + "/kv_test"
newSourcePath := "/kv_test"
newSchedule, err := scheduleClient.UpdateBackupSchedule(
context.Background(), &pb.UpdateBackupScheduleRequest{
Id: schedule.Id,
Expand Down
16 changes: 15 additions & 1 deletion cmd/ydbcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,25 @@ func main() {
os.Exit(1)
}

if err := handlersRegistry.Add(
types.OperationTypeTBWR,
handlers.NewTBWROperationHandler(
dbConnector,
clientConnector,
configInstance.S3,
configInstance.ClientConnection,
queries.NewWriteTableQuery,
clockwork.NewRealClock()),
); err != nil {
xlog.Error(ctx, "failed to register TBWR handler", zap.Error(err))
os.Exit(1)
}

processor.NewOperationProcessor(ctx, &wg, dbConnector, handlersRegistry, metrics)
ttl_watcher.NewTtlWatcher(ctx, &wg, dbConnector, queries.NewWriteTableQuery)

backupScheduleHandler := handlers.NewBackupScheduleHandler(
clientConnector, configInstance.S3, configInstance.ClientConnection, queries.NewWriteTableQuery, clockwork.NewRealClock(),
queries.NewWriteTableQuery, clockwork.NewRealClock(),
)
schedule_watcher.NewScheduleWatcher(ctx, &wg, dbConnector, backupScheduleHandler)
xlog.Info(ctx, "YDBCP started")
Expand Down
102 changes: 63 additions & 39 deletions internal/backup_operations/make_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ package backup_operations
import (
"context"
"github.com/jonboulle/clockwork"
"github.com/ydb-platform/ydb-go-sdk/v3"
"path"
"regexp"
"strings"
"time"
"ydbcp/internal/connectors/db"

"ydbcp/internal/config"
"ydbcp/internal/connectors/client"
"ydbcp/internal/types"
Expand All @@ -33,18 +32,19 @@ type MakeBackupInternalRequest struct {
SourcePathsToExclude []string
ScheduleID *string
Ttl *time.Duration
ParentOperationID *string
}

func FromGRPCRequest(request *pb.MakeBackupRequest, scheduleID *string) MakeBackupInternalRequest {
func FromBackupSchedule(schedule *types.BackupSchedule) MakeBackupInternalRequest {
res := MakeBackupInternalRequest{
ContainerID: request.ContainerId,
DatabaseEndpoint: request.DatabaseEndpoint,
DatabaseName: request.DatabaseName,
SourcePaths: request.SourcePaths,
SourcePathsToExclude: request.SourcePathsToExclude,
ScheduleID: scheduleID,
ContainerID: schedule.ContainerID,
DatabaseEndpoint: schedule.DatabaseEndpoint,
DatabaseName: schedule.DatabaseName,
SourcePaths: schedule.SourcePaths,
SourcePathsToExclude: schedule.SourcePathsToExclude,
ScheduleID: &schedule.ID,
}
if ttl := request.Ttl.AsDuration(); request.Ttl != nil {
if ttl := schedule.ScheduleSettings.Ttl.AsDuration(); schedule.ScheduleSettings.Ttl != nil {
res.Ttl = &ttl
}
return res
Expand All @@ -59,6 +59,7 @@ func FromTBWROperation(tbwr *types.TakeBackupWithRetryOperation) MakeBackupInter
SourcePathsToExclude: tbwr.SourcePathsToExclude,
ScheduleID: tbwr.ScheduleID,
Ttl: tbwr.Ttl,
ParentOperationID: &tbwr.ID,
}
}

Expand Down Expand Up @@ -94,6 +95,53 @@ func IsAllowedEndpoint(e string, allowedEndpointDomains []string, allowInsecureE
return false
}

func OpenConnAndValidateSourcePaths(ctx context.Context, req MakeBackupInternalRequest, clientConn client.ClientConnector) ([]string, error) {
clientConnectionParams := types.YdbConnectionParams{
Endpoint: req.DatabaseEndpoint,
DatabaseName: req.DatabaseName,
}
dsn := types.MakeYdbConnectionString(clientConnectionParams)
ctx = xlog.With(ctx, zap.String("ClientDSN", dsn))
client, err := clientConn.Open(ctx, dsn)
if err != nil {
xlog.Error(ctx, "can't open client connection", zap.Error(err))
return nil, status.Errorf(codes.Unknown, "can't open client connection, dsn %s", dsn)
}
defer func() {
if err := clientConn.Close(ctx, client); err != nil {
xlog.Error(ctx, "can't close client connection", zap.Error(err))
}
}()
return ValidateSourcePaths(ctx, req, clientConn, client, dsn)
}

func ValidateSourcePaths(ctx context.Context, req MakeBackupInternalRequest, clientConn client.ClientConnector, client *ydb.Driver, dsn string) ([]string, error) {
if req.ScheduleID != nil {
ctx = xlog.With(ctx, zap.String("ScheduleID", *req.ScheduleID))
}
sourcePaths := make([]string, 0, len(req.SourcePaths))
for _, p := range req.SourcePaths {
fullPath, ok := SafePathJoin(req.DatabaseName, p)
if !ok {
xlog.Error(ctx, "incorrect source path", zap.String("path", p))
return nil, status.Errorf(codes.InvalidArgument, "incorrect source path %s", p)
}
sourcePaths = append(sourcePaths, fullPath)
}

pathsForExport, err := clientConn.PreparePathsForExport(ctx, client, sourcePaths, req.SourcePathsToExclude)
if err != nil {
xlog.Error(ctx, "error preparing paths for export", zap.Error(err))
return nil, status.Errorf(codes.Unknown, "error preparing paths for export, dsn %s", dsn)
}

if len(pathsForExport) == 0 {
xlog.Error(ctx, "empty list of paths for export")
return nil, status.Error(codes.FailedPrecondition, "empty list of paths for export")
}
return pathsForExport, nil
}

func MakeBackup(
ctx context.Context,
clientConn client.ClientConnector,
Expand Down Expand Up @@ -157,35 +205,10 @@ func MakeBackup(
)
ctx = xlog.With(ctx, zap.String("S3DestinationPrefix", destinationPrefix))

sourcePaths := make([]string, 0, len(req.SourcePaths))
for _, p := range req.SourcePaths {
fullPath, ok := SafePathJoin(req.DatabaseName, p)
if !ok {
xlog.Error(ctx, "incorrect source path", zap.String("path", p))
return nil, nil, status.Errorf(codes.InvalidArgument, "incorrect source path %s", p)
}
sourcePaths = append(sourcePaths, fullPath)
}
pathsForExport, err := ValidateSourcePaths(ctx, req, clientConn, client, dsn)

pathsForExport, err := clientConn.PreparePathsForExport(ctx, client, sourcePaths, req.SourcePathsToExclude)
if err != nil {
xlog.Error(
ctx,
"error preparing paths for export",
zap.Strings("sourcePaths", req.SourcePaths),
zap.String("scheduleID", db.StringOrEmpty(req.ScheduleID)),
zap.Error(err),
)
return nil, nil, status.Errorf(codes.Unknown, "error preparing paths for export, dsn %s", dsn)
}

if len(pathsForExport) == 0 {
xlog.Error(
ctx,
"empty list of paths for export",
zap.String("scheduleID", db.StringOrEmpty(req.ScheduleID)),
)
return nil, nil, status.Error(codes.FailedPrecondition, "empty list of paths for export")
return nil, nil, err
}

s3Settings := types.ExportSettings{
Expand Down Expand Up @@ -250,8 +273,9 @@ func MakeBackup(
CreatedAt: now,
Creator: subject,
},
YdbOperationId: clientOperationID,
UpdatedAt: now,
YdbOperationId: clientOperationID,
UpdatedAt: now,
ParentOperationID: req.ParentOperationID,
}

return backup, op, nil
Expand Down
12 changes: 9 additions & 3 deletions internal/connectors/db/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,14 @@ func (c *MockDBConnector) ExecuteUpsert(_ context.Context, queryBuilder queries.
defer c.guard.Unlock()

queryBuilderMock := queryBuilder.(*queries.WriteTableQueryMock)
c.operations[queryBuilderMock.Operation.GetID()] = queryBuilderMock.Operation
c.backups[queryBuilderMock.Backup.ID] = queryBuilderMock.Backup
c.backupSchedules[queryBuilderMock.BackupSchedule.ID] = queryBuilderMock.BackupSchedule
if queryBuilderMock.Operation != nil {
c.operations[(*queryBuilderMock.Operation).GetID()] = *queryBuilderMock.Operation
}
if queryBuilderMock.Backup != nil {
c.backups[queryBuilderMock.Backup.ID] = *queryBuilderMock.Backup
}
if queryBuilderMock.BackupSchedule != nil {
c.backupSchedules[queryBuilderMock.BackupSchedule.ID] = *queryBuilderMock.BackupSchedule
}
return nil
}
2 changes: 0 additions & 2 deletions internal/connectors/db/process_result_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package db

import (
"fmt"
"log"
"strings"
"time"

Expand Down Expand Up @@ -197,7 +196,6 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) {
}

if updatedAt != nil {
log.Print("updated at read from db")
updatedTs = timestamppb.New(*updatedAt)
}

Expand Down
18 changes: 9 additions & 9 deletions internal/connectors/db/yql/queries/write_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
)

type WriteTableQueryMock struct {
Operation types.Operation
Backup types.Backup
BackupSchedule types.BackupSchedule
Operation *types.Operation
Backup *types.Backup
BackupSchedule *types.BackupSchedule
}

type WriteTableQueryMockOption func(*WriteTableQueryMock)
Expand All @@ -23,31 +23,31 @@ func (w *WriteTableQueryMock) FormatQuery(_ context.Context) (*FormatQueryResult
}

func (w *WriteTableQueryMock) WithCreateBackup(backup types.Backup) WriteTableQuery {
w.Backup = backup
w.Backup = &backup
return w
}

func (w *WriteTableQueryMock) WithCreateOperation(operation types.Operation) WriteTableQuery {
w.Operation = operation
w.Operation = &operation
return w
}

func (w *WriteTableQueryMock) WithCreateBackupSchedule(schedule types.BackupSchedule) WriteTableQuery {
w.BackupSchedule = schedule
w.BackupSchedule = &schedule
return w
}

func (w *WriteTableQueryMock) WithUpdateBackup(backup types.Backup) WriteTableQuery {
w.Backup = backup
w.Backup = &backup
return w
}

func (w *WriteTableQueryMock) WithUpdateOperation(operation types.Operation) WriteTableQuery {
w.Operation = operation
w.Operation = &operation
return w
}

func (w *WriteTableQueryMock) WithUpdateBackupSchedule(schedule types.BackupSchedule) WriteTableQuery {
w.BackupSchedule = schedule
w.BackupSchedule = &schedule
return w
}
Loading

0 comments on commit b064fbf

Please sign in to comment.