Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable TBWR operations #96

Merged
merged 1 commit into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions cmd/integration/make_backup/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func main() {
}
}(conn)
client := pb.NewBackupServiceClient(conn)
opClient := pb.NewOperationServiceClient(conn)
backups, err := client.ListBackups(
context.Background(), &pb.ListBackupsRequest{
ContainerId: containerID,
Expand Down Expand Up @@ -61,7 +62,7 @@ func main() {
log.Panicf("unexpected error code: %v", err)
}

backupOperation, err := client.MakeBackup(
tbwr, err := client.MakeBackup(
context.Background(), &pb.MakeBackupRequest{
ContainerId: containerID,
DatabaseName: databaseName,
Expand All @@ -73,6 +74,16 @@ func main() {
if err != nil {
log.Panicf("failed to make backup: %v", err)
}
op, err := opClient.GetOperation(context.Background(), &pb.GetOperationRequest{
Id: tbwr.Id,
})
if err != nil {
log.Panicf("failed to get operation: %v", err)
}
if op.GetType() != types.OperationTypeTBWR.String() {
log.Panicf("unexpected operation type: %v", op.GetType())
}
time.Sleep(time.Second * 11) // to wait for operation handler
backups, err = client.ListBackups(
context.Background(), &pb.ListBackupsRequest{
ContainerId: containerID,
Expand All @@ -86,6 +97,18 @@ func main() {
log.Panicf("Did not list freshly made backup")
}
backupPb := backups.Backups[0]
ops, err := opClient.ListOperations(context.Background(), &pb.ListOperationsRequest{
ContainerId: containerID,
DatabaseNameMask: databaseName,
OperationTypes: []string{types.OperationTypeTB.String()},
})
if err != nil {
log.Panicf("failed to list operations: %v", err)
}
if len(ops.Operations) != 1 {
log.Panicf("expected one TB operation, got %d", len(ops.Operations))
}
backupOperation := ops.Operations[0]
if backupPb.Id != backupOperation.BackupId {
log.Panicf(
"backupOperation backupID %s does not match listed backup id %s", backupOperation.BackupId, backupPb.Id,
Expand Down Expand Up @@ -121,7 +144,6 @@ func main() {
if err != nil {
log.Panicf("failed to make restore: %v", err)
}
opClient := pb.NewOperationServiceClient(conn)
done = false
for range 30 {
op, err := opClient.GetOperation(
Expand Down Expand Up @@ -249,7 +271,7 @@ func main() {
}

newScheduleName := "schedule-2.0"
newSourcePath := databaseName + "/kv_test"
newSourcePath := "/kv_test"
newSchedule, err := scheduleClient.UpdateBackupSchedule(
context.Background(), &pb.UpdateBackupScheduleRequest{
Id: schedule.Id,
Expand Down
16 changes: 15 additions & 1 deletion cmd/ydbcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,25 @@ func main() {
os.Exit(1)
}

if err := handlersRegistry.Add(
types.OperationTypeTBWR,
handlers.NewTBWROperationHandler(
dbConnector,
clientConnector,
configInstance.S3,
configInstance.ClientConnection,
queries.NewWriteTableQuery,
clockwork.NewRealClock()),
); err != nil {
xlog.Error(ctx, "failed to register TBWR handler", zap.Error(err))
os.Exit(1)
}

processor.NewOperationProcessor(ctx, &wg, dbConnector, handlersRegistry, metrics)
ttl_watcher.NewTtlWatcher(ctx, &wg, dbConnector, queries.NewWriteTableQuery)

backupScheduleHandler := handlers.NewBackupScheduleHandler(
clientConnector, configInstance.S3, configInstance.ClientConnection, queries.NewWriteTableQuery, clockwork.NewRealClock(),
queries.NewWriteTableQuery, clockwork.NewRealClock(),
)
schedule_watcher.NewScheduleWatcher(ctx, &wg, dbConnector, backupScheduleHandler)
xlog.Info(ctx, "YDBCP started")
Expand Down
102 changes: 63 additions & 39 deletions internal/backup_operations/make_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ package backup_operations
import (
"context"
"github.com/jonboulle/clockwork"
"github.com/ydb-platform/ydb-go-sdk/v3"
"path"
"regexp"
"strings"
"time"
"ydbcp/internal/connectors/db"

"ydbcp/internal/config"
"ydbcp/internal/connectors/client"
"ydbcp/internal/types"
Expand All @@ -33,18 +32,19 @@ type MakeBackupInternalRequest struct {
SourcePathsToExclude []string
ScheduleID *string
Ttl *time.Duration
ParentOperationID *string
}

func FromGRPCRequest(request *pb.MakeBackupRequest, scheduleID *string) MakeBackupInternalRequest {
func FromBackupSchedule(schedule *types.BackupSchedule) MakeBackupInternalRequest {
res := MakeBackupInternalRequest{
ContainerID: request.ContainerId,
DatabaseEndpoint: request.DatabaseEndpoint,
DatabaseName: request.DatabaseName,
SourcePaths: request.SourcePaths,
SourcePathsToExclude: request.SourcePathsToExclude,
ScheduleID: scheduleID,
ContainerID: schedule.ContainerID,
DatabaseEndpoint: schedule.DatabaseEndpoint,
DatabaseName: schedule.DatabaseName,
SourcePaths: schedule.SourcePaths,
SourcePathsToExclude: schedule.SourcePathsToExclude,
ScheduleID: &schedule.ID,
}
if ttl := request.Ttl.AsDuration(); request.Ttl != nil {
if ttl := schedule.ScheduleSettings.Ttl.AsDuration(); schedule.ScheduleSettings.Ttl != nil {
res.Ttl = &ttl
}
return res
Expand All @@ -59,6 +59,7 @@ func FromTBWROperation(tbwr *types.TakeBackupWithRetryOperation) MakeBackupInter
SourcePathsToExclude: tbwr.SourcePathsToExclude,
ScheduleID: tbwr.ScheduleID,
Ttl: tbwr.Ttl,
ParentOperationID: &tbwr.ID,
}
}

Expand Down Expand Up @@ -94,6 +95,53 @@ func IsAllowedEndpoint(e string, allowedEndpointDomains []string, allowInsecureE
return false
}

func OpenConnAndValidateSourcePaths(ctx context.Context, req MakeBackupInternalRequest, clientConn client.ClientConnector) ([]string, error) {
clientConnectionParams := types.YdbConnectionParams{
Endpoint: req.DatabaseEndpoint,
DatabaseName: req.DatabaseName,
}
dsn := types.MakeYdbConnectionString(clientConnectionParams)
ctx = xlog.With(ctx, zap.String("ClientDSN", dsn))
client, err := clientConn.Open(ctx, dsn)
if err != nil {
xlog.Error(ctx, "can't open client connection", zap.Error(err))
return nil, status.Errorf(codes.Unknown, "can't open client connection, dsn %s", dsn)
}
defer func() {
if err := clientConn.Close(ctx, client); err != nil {
xlog.Error(ctx, "can't close client connection", zap.Error(err))
}
}()
return ValidateSourcePaths(ctx, req, clientConn, client, dsn)
}

func ValidateSourcePaths(ctx context.Context, req MakeBackupInternalRequest, clientConn client.ClientConnector, client *ydb.Driver, dsn string) ([]string, error) {
if req.ScheduleID != nil {
ctx = xlog.With(ctx, zap.String("ScheduleID", *req.ScheduleID))
}
sourcePaths := make([]string, 0, len(req.SourcePaths))
for _, p := range req.SourcePaths {
fullPath, ok := SafePathJoin(req.DatabaseName, p)
if !ok {
xlog.Error(ctx, "incorrect source path", zap.String("path", p))
return nil, status.Errorf(codes.InvalidArgument, "incorrect source path %s", p)
}
sourcePaths = append(sourcePaths, fullPath)
}

pathsForExport, err := clientConn.PreparePathsForExport(ctx, client, sourcePaths, req.SourcePathsToExclude)
if err != nil {
xlog.Error(ctx, "error preparing paths for export", zap.Error(err))
return nil, status.Errorf(codes.Unknown, "error preparing paths for export, dsn %s", dsn)
}

if len(pathsForExport) == 0 {
xlog.Error(ctx, "empty list of paths for export")
return nil, status.Error(codes.FailedPrecondition, "empty list of paths for export")
}
return pathsForExport, nil
}

func MakeBackup(
ctx context.Context,
clientConn client.ClientConnector,
Expand Down Expand Up @@ -157,35 +205,10 @@ func MakeBackup(
)
ctx = xlog.With(ctx, zap.String("S3DestinationPrefix", destinationPrefix))

sourcePaths := make([]string, 0, len(req.SourcePaths))
for _, p := range req.SourcePaths {
fullPath, ok := SafePathJoin(req.DatabaseName, p)
if !ok {
xlog.Error(ctx, "incorrect source path", zap.String("path", p))
return nil, nil, status.Errorf(codes.InvalidArgument, "incorrect source path %s", p)
}
sourcePaths = append(sourcePaths, fullPath)
}
pathsForExport, err := ValidateSourcePaths(ctx, req, clientConn, client, dsn)

pathsForExport, err := clientConn.PreparePathsForExport(ctx, client, sourcePaths, req.SourcePathsToExclude)
if err != nil {
xlog.Error(
ctx,
"error preparing paths for export",
zap.Strings("sourcePaths", req.SourcePaths),
zap.String("scheduleID", db.StringOrEmpty(req.ScheduleID)),
zap.Error(err),
)
return nil, nil, status.Errorf(codes.Unknown, "error preparing paths for export, dsn %s", dsn)
}

if len(pathsForExport) == 0 {
xlog.Error(
ctx,
"empty list of paths for export",
zap.String("scheduleID", db.StringOrEmpty(req.ScheduleID)),
)
return nil, nil, status.Error(codes.FailedPrecondition, "empty list of paths for export")
return nil, nil, err
}

s3Settings := types.ExportSettings{
Expand Down Expand Up @@ -250,8 +273,9 @@ func MakeBackup(
CreatedAt: now,
Creator: subject,
},
YdbOperationId: clientOperationID,
UpdatedAt: now,
YdbOperationId: clientOperationID,
UpdatedAt: now,
ParentOperationID: req.ParentOperationID,
}

return backup, op, nil
Expand Down
12 changes: 9 additions & 3 deletions internal/connectors/db/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,14 @@ func (c *MockDBConnector) ExecuteUpsert(_ context.Context, queryBuilder queries.
defer c.guard.Unlock()

queryBuilderMock := queryBuilder.(*queries.WriteTableQueryMock)
c.operations[queryBuilderMock.Operation.GetID()] = queryBuilderMock.Operation
c.backups[queryBuilderMock.Backup.ID] = queryBuilderMock.Backup
c.backupSchedules[queryBuilderMock.BackupSchedule.ID] = queryBuilderMock.BackupSchedule
if queryBuilderMock.Operation != nil {
c.operations[(*queryBuilderMock.Operation).GetID()] = *queryBuilderMock.Operation
}
if queryBuilderMock.Backup != nil {
c.backups[queryBuilderMock.Backup.ID] = *queryBuilderMock.Backup
}
if queryBuilderMock.BackupSchedule != nil {
c.backupSchedules[queryBuilderMock.BackupSchedule.ID] = *queryBuilderMock.BackupSchedule
}
return nil
}
2 changes: 0 additions & 2 deletions internal/connectors/db/process_result_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package db

import (
"fmt"
"log"
"strings"
"time"

Expand Down Expand Up @@ -197,7 +196,6 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) {
}

if updatedAt != nil {
log.Print("updated at read from db")
updatedTs = timestamppb.New(*updatedAt)
}

Expand Down
18 changes: 9 additions & 9 deletions internal/connectors/db/yql/queries/write_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
)

type WriteTableQueryMock struct {
Operation types.Operation
Backup types.Backup
BackupSchedule types.BackupSchedule
Operation *types.Operation
Backup *types.Backup
BackupSchedule *types.BackupSchedule
}

type WriteTableQueryMockOption func(*WriteTableQueryMock)
Expand All @@ -23,31 +23,31 @@ func (w *WriteTableQueryMock) FormatQuery(_ context.Context) (*FormatQueryResult
}

func (w *WriteTableQueryMock) WithCreateBackup(backup types.Backup) WriteTableQuery {
w.Backup = backup
w.Backup = &backup
return w
}

func (w *WriteTableQueryMock) WithCreateOperation(operation types.Operation) WriteTableQuery {
w.Operation = operation
w.Operation = &operation
return w
}

func (w *WriteTableQueryMock) WithCreateBackupSchedule(schedule types.BackupSchedule) WriteTableQuery {
w.BackupSchedule = schedule
w.BackupSchedule = &schedule
return w
}

func (w *WriteTableQueryMock) WithUpdateBackup(backup types.Backup) WriteTableQuery {
w.Backup = backup
w.Backup = &backup
return w
}

func (w *WriteTableQueryMock) WithUpdateOperation(operation types.Operation) WriteTableQuery {
w.Operation = operation
w.Operation = &operation
return w
}

func (w *WriteTableQueryMock) WithUpdateBackupSchedule(schedule types.BackupSchedule) WriteTableQuery {
w.BackupSchedule = schedule
w.BackupSchedule = &schedule
return w
}
Loading
Loading