Skip to content

Commit

Permalink
WIP: TBWR handler
Browse files Browse the repository at this point in the history
  • Loading branch information
Aleksei Pleshakov committed Oct 28, 2024
1 parent fa9a2d6 commit f4ff843
Show file tree
Hide file tree
Showing 13 changed files with 626 additions and 110 deletions.
128 changes: 128 additions & 0 deletions internal/backup_operations/make_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,131 @@ func MakeBackup(

return backup, op, nil
}

func RetryBackup(
ctx context.Context,
clientConn client.ClientConnector,
s3 config.S3Config,
allowedEndpointDomains []string,
allowInsecureEndpoint bool,
tbwr types.TakeBackupWithRetryOperation,
subject string,
) (*types.Backup, *types.TakeBackupOperation, error) {
if !IsAllowedEndpoint(tbwr.YdbConnectionParams.Endpoint, allowedEndpointDomains, allowInsecureEndpoint) {
xlog.Error(
ctx,
"endpoint of database is invalid or not allowed",
zap.String("DatabaseEndpoint", tbwr.YdbConnectionParams.Endpoint),
)
return nil, nil, status.Errorf(
codes.InvalidArgument,
"endpoint of database is invalid or not allowed, endpoint %s", tbwr.YdbConnectionParams.Endpoint,
)
}

dsn := types.MakeYdbConnectionString(tbwr.YdbConnectionParams)
ctx = xlog.With(ctx, zap.String("ClientDSN", dsn))
client, err := clientConn.Open(ctx, dsn)
if err != nil {
xlog.Error(ctx, "can't open client connection", zap.Error(err))
return nil, nil, status.Errorf(codes.Unknown, "can't open client connection, dsn %s", dsn)
}
defer func() {
if err := clientConn.Close(ctx, client); err != nil {
xlog.Error(ctx, "can't close client connection", zap.Error(err))
}
}()

accessKey, err := s3.AccessKey()
if err != nil {
xlog.Error(ctx, "can't get S3AccessKey", zap.Error(err))
return nil, nil, status.Error(codes.Internal, "can't get S3AccessKey")
}
secretKey, err := s3.SecretKey()
if err != nil {
xlog.Error(ctx, "can't get S3SecretKey", zap.Error(err))
return nil, nil, status.Error(codes.Internal, "can't get S3SecretKey")
}

dbNamePath := strings.Replace(tbwr.YdbConnectionParams.DatabaseName, "/", "_", -1) // TODO: checking user input
dbNamePath = strings.Trim(dbNamePath, "_")

destinationPrefix := path.Join(
s3.PathPrefix,
dbNamePath,
time.Now().Format(types.BackupTimestampFormat),
)
ctx = xlog.With(ctx, zap.String("S3DestinationPrefix", destinationPrefix))

sourcePaths := make([]string, 0, len(tbwr.SourcePaths))
for _, p := range tbwr.SourcePaths {
fullPath, ok := SafePathJoin(tbwr.YdbConnectionParams.DatabaseName, p)
if !ok {
xlog.Error(ctx, "incorrect source path", zap.String("path", p))
return nil, nil, status.Errorf(codes.InvalidArgument, "incorrect source path %s", p)
}
sourcePaths = append(sourcePaths, fullPath)
}

pathsForExport, err := clientConn.PreparePathsForExport(ctx, client, sourcePaths, tbwr.SourcePathsToExclude)
if err != nil {
xlog.Error(ctx, "error preparing paths for export", zap.Error(err))
return nil, nil, status.Errorf(codes.Unknown, "error preparing paths for export, dsn %s", dsn)
}

if len(pathsForExport) == 0 {
xlog.Error(ctx, "empty list of paths for export")
return nil, nil, status.Error(codes.FailedPrecondition, "empty list of paths for export")
}

s3Settings := types.ExportSettings{
Endpoint: s3.Endpoint,
Region: s3.Region,
Bucket: s3.Bucket,
AccessKey: accessKey,
SecretKey: secretKey,
Description: "ydbcp backup", // TODO: the description shoud be better
NumberOfRetries: 10, // TODO: get it from configuration
SourcePaths: pathsForExport,
DestinationPrefix: destinationPrefix,
S3ForcePathStyle: s3.S3ForcePathStyle,
}

clientOperationID, err := clientConn.ExportToS3(ctx, client, s3Settings)
if err != nil {
xlog.Error(ctx, "can't start export operation", zap.Error(err))
return nil, nil, status.Errorf(codes.Unknown, "can't start export operation, dsn %s", dsn)
}
ctx = xlog.With(ctx, zap.String("ClientOperationID", clientOperationID))
xlog.Info(ctx, "Export operation started")

var expireAt *time.Time
if tbwr.Ttl != nil {
expireAt = new(time.Time)
*expireAt = time.Now().Add(*tbwr.Ttl)
}

now := timestamppb.Now()
backup := &types.Backup{
ID: types.GenerateObjectID(),
ContainerID: tbwr.ContainerID,
DatabaseName: tbwr.YdbConnectionParams.DatabaseName,
DatabaseEndpoint: tbwr.YdbConnectionParams.Endpoint,
S3Endpoint: s3.Endpoint,
S3Region: s3.Region,
S3Bucket: s3.Bucket,
S3PathPrefix: destinationPrefix,
Status: types.BackupStateRunning,
AuditInfo: &pb.AuditInfo{
CreatedAt: now,
Creator: subject,
},
ScheduleID: tbwr.ScheduleID,
ExpireAt: expireAt,
SourcePaths: pathsForExport,
}

op := tbwr.SpawnNewTBOperation(backup.ID, subject, clientOperationID)

return backup, &op, nil
}
44 changes: 44 additions & 0 deletions internal/connectors/db/process_result_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) {
updatedAt *time.Time
updatedTs *timestamppb.Timestamp
parentOperationID *string
scheduleID *string
ttl *time.Duration
retriesCount *uint32
maxBackoff *time.Duration
)
err := res.ScanNamed(
named.Required("id", &operationId),
Expand All @@ -170,6 +174,10 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) {
named.Optional("initiated", &creator),
named.Optional("updated_at", &updatedAt),
named.Optional("parent_operation_id", &parentOperationID),
named.Optional("schedule_id", &scheduleID),
named.Optional("ttl", &ttl),
named.Optional("retries_count", &retriesCount),
named.Optional("retries_max_backoff", &maxBackoff),
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -255,6 +263,42 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) {
PathPrefix: pathPrefix,
UpdatedAt: updatedTs,
}, nil
} else if operationType == string(types.OperationTypeTBWR) {
if backupId == nil {
return nil, fmt.Errorf("failed to read backup_id for TBWR operation: %s", operationId)
}
var retryConfig *pb.RetryConfig = nil
if maxBackoff != nil {
retryConfig = &pb.RetryConfig{
Retries: &pb.RetryConfig_MaxBackoff{
MaxBackoff: durationpb.New(*maxBackoff),
},
}
}
if retriesCount != nil {
retryConfig = &pb.RetryConfig{
Retries: &pb.RetryConfig_Count{Count: *retriesCount},
}
}
return &types.TakeBackupWithRetryOperation{
TakeBackupOperation: types.TakeBackupOperation{
ID: operationId,
ContainerID: containerId,
State: operationState,
Message: StringOrEmpty(message),
YdbConnectionParams: types.YdbConnectionParams{
Endpoint: databaseEndpoint,
DatabaseName: databaseName,
},
SourcePaths: sourcePathsSlice,
SourcePathsToExclude: sourcePathsToExcludeSlice,
Audit: auditFromDb(creator, createdAt, completedAt),
UpdatedAt: updatedTs,
},
ScheduleID: scheduleID,
Ttl: ttl,
RetryConfig: retryConfig,
}, nil
}

return &types.GenericOperation{ID: operationId}, nil
Expand Down
30 changes: 30 additions & 0 deletions internal/connectors/db/yql/queries/queries.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package queries

import (
"fmt"
"ydbcp/internal/types"
)

var (
ListSchedulesQuery = fmt.Sprintf(
`$last_successful_backup_id = SELECT schedule_id, MAX(b.completed_at) AS recovery_point, MAX_BY(b.id, b.completed_at) AS last_successful_backup_id FROM Backups AS b WHERE b.status = '%s' GROUP BY schedule_id;
$last_backup_id = SELECT schedule_id AS schedule_id_2, MAX_BY(b.id, b.completed_at) AS last_backup_id FROM Backups AS b GROUP BY schedule_id;
SELECT * FROM BackupSchedules AS schedules
LEFT JOIN $last_successful_backup_id AS b1 ON schedules.id = b1.schedule_id
LEFT JOIN $last_backup_id AS b2 ON schedules.id = b2.schedule_id_2
`, types.BackupStateAvailable,
)
GetScheduleQuery = fmt.Sprintf(
`$rpo_info = SELECT
<|
recovery_point: MAX(b.completed_at),
last_successful_backup_id: MAX_BY(b.id, b.completed_at)
|> FROM Backups AS b WHERE b.status = '%s' AND b.schedule_id = $schedule_id;
$last_backup_id = SELECT MAX_BY(b.id, b.completed_at) AS last_backup_id FROM Backups AS b WHERE b.schedule_id = $schedule_id;
SELECT s.*, $last_backup_id AS last_backup_id, $rpo_info.recovery_point AS recovery_point, $rpo_info.last_successful_backup_id AS last_successful_backup_id FROM BackupSchedules AS s WHERE s.id = $schedule_id
`, types.BackupStateAvailable,
)
)
17 changes: 15 additions & 2 deletions internal/connectors/db/yql/queries/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ type ReadTableQueryImpl struct {
filters [][]table_types.Value
filterFields []string
isLikeFilter map[string]bool
index *string
orderBy *OrderSpec
pageSpec *PageSpec
tableQueryParams []table.ParameterOption
Expand Down Expand Up @@ -165,6 +166,12 @@ func WithPageSpec(spec PageSpec) ReadTableQueryOption {
}
}

func WithIndex(index string) ReadTableQueryOption {
return func(d *ReadTableQueryImpl) {
d.index = &index
}
}

func (d *ReadTableQueryImpl) AddTableQueryParam(paramValue table_types.Value) string {
paramName := fmt.Sprintf("$param%d", len(d.tableQueryParams))
d.tableQueryParams = append(
Expand Down Expand Up @@ -219,17 +226,23 @@ func (d *ReadTableQueryImpl) FormatPage() *string {
return &page
}

func (d *ReadTableQueryImpl) FormatTable() string {
if d.index == nil {
return d.tableName
}
return fmt.Sprintf("%s VIEW %s", d.tableName, *d.index)
}

func (d *ReadTableQueryImpl) FormatQuery(ctx context.Context) (*FormatQueryResult, error) {
var res string
filter := d.MakeFilterString()

if d.rawQuery == nil {
if len(d.tableName) == 0 {
return nil, errors.New("no table")
}
res = fmt.Sprintf(
"SELECT * FROM %s%s",
d.tableName,
d.FormatTable(),
filter,
)
} else {
Expand Down
16 changes: 16 additions & 0 deletions internal/connectors/db/yql/queries/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,19 @@ func TestOrderSpec(t *testing.T) {
"bad query format",
)
}

func TestIndex(t *testing.T) {
const (
query = `SELECT * FROM table1 VIEW index`
)
builder := NewReadTableQuery(
WithTableName("table1"),
WithIndex("index"),
)
fq, err := builder.FormatQuery(context.Background())
assert.Empty(t, err)
assert.Equal(
t, query, fq.QueryText,
"bad query format",
)
}
49 changes: 43 additions & 6 deletions internal/connectors/db/yql/queries/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@ import (
"context"
"errors"
"fmt"
"github.com/ydb-platform/ydb-go-sdk/v3/table"
table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types"
"go.uber.org/zap"
"log"
"strings"

"ydbcp/internal/types"
"ydbcp/internal/util/xlog"

"github.com/ydb-platform/ydb-go-sdk/v3/table"
table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types"
"go.uber.org/zap"
pb "ydbcp/pkg/proto/ydbcp/v1alpha1"
)

type WriteTableQuery interface {
Expand Down Expand Up @@ -128,6 +127,45 @@ func BuildCreateOperationQuery(operation types.Operation, index int) WriteSingle
if tb.ParentOperationID != nil {
d.AddValueParam("$parent_operation_id", table_types.StringValueFromString(*tb.ParentOperationID))
}
} else if operation.GetType() == types.OperationTypeTBWR {
tbwr, ok := operation.(*types.TakeBackupWithRetryOperation)
if !ok {
log.Fatalf("error cast operation to TakeBackupOperation operation_id %s", operation.GetID())
}

d.AddValueParam(
"$container_id", table_types.StringValueFromString(tbwr.ContainerID),
)
d.AddValueParam(
"$database",
table_types.StringValueFromString(tbwr.YdbConnectionParams.DatabaseName),
)
d.AddValueParam(
"$endpoint",
table_types.StringValueFromString(tbwr.YdbConnectionParams.Endpoint),
)
if len(tbwr.SourcePaths) > 0 {
d.AddValueParam("$paths", table_types.StringValueFromString(strings.Join(tbwr.SourcePaths, ",")))
}
if len(tbwr.SourcePathsToExclude) > 0 {
d.AddValueParam(
"$paths_to_exclude",
table_types.StringValueFromString(strings.Join(tbwr.SourcePathsToExclude, ",")),
)
}
if tbwr.RetryConfig != nil {
switch r := tbwr.RetryConfig.Retries.(type) {
case *pb.RetryConfig_Count:
d.AddValueParam("$retries_count", table_types.Uint32Value(r.Count))
case *pb.RetryConfig_MaxBackoff:
d.AddValueParam("$retries_max_backoff", table_types.IntervalValueFromDuration(r.MaxBackoff.AsDuration()))
default:
log.Fatalf("bad developers did not account for new oneof value")
}
}
if tbwr.ScheduleID != nil {
d.AddValueParam("$schedule_id", table_types.StringValueFromString(*tbwr.ScheduleID))
}
} else if operation.GetType() == types.OperationTypeRB {
rb, ok := operation.(*types.RestoreBackupOperation)
if !ok {
Expand Down Expand Up @@ -183,7 +221,6 @@ func BuildCreateOperationQuery(operation types.Operation, index int) WriteSingle
)

d.AddValueParam("$paths", table_types.StringValueFromString(db.PathPrefix))

} else {
log.Fatalf(
"unknown operation type write to db operation_id %s, operation_type %s",
Expand Down
5 changes: 5 additions & 0 deletions internal/connectors/db/yql/schema/create_tables.yql
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ CREATE TABLE Operations (
paths_to_exclude String,
operation_id String,
parent_operation_id String,
--used only in TBWR
schedule_id String,
ttl Interval,
retries_count Uint32,
retries_max_backoff Interval,

INDEX idx_cc GLOBAL ON (container_id, created_at, id),
INDEX idx_cbc GLOBAL ON (container_id, backup_id, created_at, id),
Expand Down
Loading

0 comments on commit f4ff843

Please sign in to comment.