Skip to content

Commit

Permalink
TBWR handler
Browse files Browse the repository at this point in the history
  • Loading branch information
Aleksei Pleshakov authored and qrort committed Nov 5, 2024
1 parent 9412981 commit c9e554e
Show file tree
Hide file tree
Showing 29 changed files with 1,651 additions and 173 deletions.
8 changes: 8 additions & 0 deletions .github/workflows/unit-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,11 @@ jobs:
- name: docker compose down
run: |
docker compose down
- name: docker compose up
run: |
docker compose up -d
- name: run orm test
run: docker exec local-ydbcp sh -c './orm'
- name: docker compose down
run: |
docker compose down
22 changes: 22 additions & 0 deletions cmd/integration/common/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package common

import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"log"
"time"
)

func CreateGRPCClient(endpoint string) *grpc.ClientConn {
var opts []grpc.DialOption
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
for range 5 {
conn, err := grpc.NewClient(endpoint, opts...)
if err == nil {
return conn
}
time.Sleep(time.Second) // Wait before retrying
}
log.Panicln("failed to dial")
return nil
}
9 changes: 2 additions & 7 deletions cmd/integration/list_entities/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"log"
"strconv"
"time"
"ydbcp/cmd/integration/common"
"ydbcp/internal/config"
"ydbcp/internal/connectors/db"
"ydbcp/internal/connectors/db/yql/queries"
Expand All @@ -16,7 +17,6 @@ import (
pb "ydbcp/pkg/proto/ydbcp/v1alpha1"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

const (
Expand Down Expand Up @@ -214,12 +214,7 @@ func SchedulesToInsert() []types.BackupSchedule {

func main() {
ctx := context.Background()
var opts []grpc.DialOption
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.NewClient(ydbcpEndpoint, opts...)
if err != nil {
log.Panicln("failed to dial")
}
conn := common.CreateGRPCClient(ydbcpEndpoint)
defer func(conn *grpc.ClientConn) {
err := conn.Close()
if err != nil {
Expand Down
9 changes: 2 additions & 7 deletions cmd/integration/make_backup/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import (
"log"
"strings"
"time"
"ydbcp/cmd/integration/common"

"ydbcp/internal/types"
pb "ydbcp/pkg/proto/ydbcp/v1alpha1"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

const (
Expand All @@ -23,12 +23,7 @@ const (
)

func main() {
var opts []grpc.DialOption
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.NewClient(ydbcpEndpoint, opts...)
if err != nil {
log.Panicln("failed to dial")
}
conn := common.CreateGRPCClient(ydbcpEndpoint)
defer func(conn *grpc.ClientConn) {
err := conn.Close()
if err != nil {
Expand Down
155 changes: 155 additions & 0 deletions cmd/integration/orm/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package main

import (
"context"
table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
"log"
"reflect"
"time"
"ydbcp/cmd/integration/common"
"ydbcp/internal/config"
"ydbcp/internal/connectors/db"
"ydbcp/internal/connectors/db/yql/queries"
"ydbcp/internal/types"
pb "ydbcp/pkg/proto/ydbcp/v1alpha1"
)

const (
containerID = "abcde"
databaseName = "/local"
ydbcpEndpoint = "localhost:50051"
connectionString = "grpcs://local-ydb:2135/local"
)

var (
t1 = time.Date(2024, 01, 01, 00, 00, 00, 0, time.UTC)
t2 = time.Date(2024, 01, 02, 00, 00, 00, 0, time.UTC)
)

func OperationsToInsert() []types.TakeBackupWithRetryOperation {
schedule := "schedule"
ttl := time.Hour * 24
return []types.TakeBackupWithRetryOperation{
{
TakeBackupOperation: types.TakeBackupOperation{
ID: "1",
ContainerID: containerID,
State: "xz",
Message: "nz",
YdbConnectionParams: types.YdbConnectionParams{
Endpoint: ydbcpEndpoint,
DatabaseName: databaseName,
},
SourcePaths: []string{"path"},
SourcePathsToExclude: []string{"exclude"},
Audit: &pb.AuditInfo{
Creator: "ydbcp",
CreatedAt: timestamppb.New(t1),
CompletedAt: timestamppb.New(t2),
},
UpdatedAt: timestamppb.New(t2),
},
ScheduleID: &schedule,
Ttl: &ttl,
RetryConfig: &pb.RetryConfig{Retries: &pb.RetryConfig_Count{Count: 5}},
},
{
TakeBackupOperation: types.TakeBackupOperation{
ID: "1",
ContainerID: containerID,
State: "xz",
Message: "nz",
YdbConnectionParams: types.YdbConnectionParams{
Endpoint: ydbcpEndpoint,
DatabaseName: databaseName,
},
SourcePaths: []string{"path"},
SourcePathsToExclude: []string{"exclude"},
Audit: &pb.AuditInfo{
Creator: "ydbcp",
CreatedAt: timestamppb.New(t1),
CompletedAt: timestamppb.New(t2),
},
UpdatedAt: timestamppb.New(t2),
},
ScheduleID: &schedule,
Ttl: &ttl,
RetryConfig: &pb.RetryConfig{Retries: &pb.RetryConfig_MaxBackoff{MaxBackoff: durationpb.New(ttl)}},
},
}
}

func ReadTBWROperation(ctx context.Context, ydbConn *db.YdbConnector, id string) *types.TakeBackupWithRetryOperation {
operations, err := ydbConn.SelectOperations(ctx, queries.NewReadTableQuery(
queries.WithTableName("Operations"),
queries.WithQueryFilters(queries.QueryFilter{
Field: "id",
Values: []table_types.Value{table_types.StringValueFromString(id)},
}),
))
if err != nil {
log.Panicf("failed to select operations: %v", err)
}
if len(operations) != 1 {
log.Panicf("expected 1 operation, got %d", len(operations))
}
return operations[0].(*types.TakeBackupWithRetryOperation)
}

func TestTBWROperationORM(ctx context.Context, ydbConn *db.YdbConnector, operation types.TakeBackupWithRetryOperation) {
err := ydbConn.ExecuteUpsert(ctx, queries.NewWriteTableQuery().WithCreateOperation(&operation))
if err != nil {
log.Panicf("failed to insert operation: %v", err)
}
tbwr := ReadTBWROperation(ctx, ydbConn, operation.ID)
if !reflect.DeepEqual(operation, *tbwr) {
log.Panicf("operation %v corrupted after read and write\ngot %v", operation, *tbwr)
}

err = ydbConn.ExecuteUpsert(ctx, queries.NewWriteTableQuery().WithUpdateOperation(&operation))
if err != nil {
log.Panicf("failed to insert operation: %v", err)
}
tbwr = ReadTBWROperation(ctx, ydbConn, operation.ID)
if !reflect.DeepEqual(operation, *tbwr) {
log.Panicf("operation %v corrupted after update and read\ngot %v", operation, *tbwr)
}
operation.Message = "xxx"
err = ydbConn.ExecuteUpsert(ctx, queries.NewWriteTableQuery().WithUpdateOperation(&operation))
if err != nil {
log.Panicf("failed to insert operation: %v", err)
}
tbwr = ReadTBWROperation(ctx, ydbConn, operation.ID)
if "xxx" != tbwr.Message {
log.Panicf("operation %v did not change after update", *tbwr)
}
}

func main() {
ctx := context.Background()
conn := common.CreateGRPCClient(ydbcpEndpoint)
defer func(conn *grpc.ClientConn) {
err := conn.Close()
if err != nil {
log.Panicln("failed to close connection")
}
}(conn)
ydbConn, err := db.NewYdbConnector(
ctx,
config.YDBConnectionConfig{
ConnectionString: connectionString,
Insecure: true,
Discovery: false,
DialTimeoutSeconds: 10,
},
)
if err != nil {
log.Panicf("failed to create ydb connector: %v", err)
}
for _, op := range OperationsToInsert() {
TestTBWROperationORM(ctx, ydbConn, op)
}
}
6 changes: 3 additions & 3 deletions cmd/ydbcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,10 @@ func main() {
ttl_watcher.NewTtlWatcher(ctx, &wg, dbConnector, queries.NewWriteTableQuery)

backupScheduleHandler := handlers.NewBackupScheduleHandler(
clientConnector, configInstance.S3, configInstance.ClientConnection, queries.NewWriteTableQuery,
clientConnector, configInstance.S3, configInstance.ClientConnection, queries.NewWriteTableQuery, clockwork.NewRealClock(),
)
schedule_watcher.NewScheduleWatcher(ctx, &wg, clockwork.NewRealClock(), dbConnector, backupScheduleHandler)

schedule_watcher.NewScheduleWatcher(ctx, &wg, dbConnector, backupScheduleHandler)
xlog.Info(ctx, "YDBCP started")
wg.Add(1)
go func() {
defer wg.Done()
Expand Down
3 changes: 1 addition & 2 deletions dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ RUN go build -o . ./cmd/ydbcp/main.go

# Build integration test app
RUN go build -o ./make_backup ./cmd/integration/make_backup/main.go

# Build integration test app
RUN go build -o ./list_entities ./cmd/integration/list_entities/main.go
RUN go build -o ./orm ./cmd/integration/orm/main.go

# Command to run the executable
CMD ["./main", "--config=local_config.yaml"]
Loading

0 comments on commit c9e554e

Please sign in to comment.