Skip to content

Commit

Permalink
Merge pull request #64 from ydb-platform/fix-query-builder
Browse files Browse the repository at this point in the history
QueryBuilder reusage has been fixed
  • Loading branch information
ulya-sidorina authored Sep 13, 2024
2 parents 29737aa + 5a5c9fd commit 02af8fd
Show file tree
Hide file tree
Showing 35 changed files with 104 additions and 89 deletions.
1 change: 1 addition & 0 deletions cmd/integration/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"log"
"time"

"ydbcp/internal/types"
pb "ydbcp/pkg/proto/ydbcp/v1alpha1"

Expand Down
12 changes: 6 additions & 6 deletions cmd/ydbcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,25 @@ import (
"os/signal"
"sync"
"syscall"
"ydbcp/internal/connectors/s3"
"ydbcp/internal/server/services/backup_schedule"

"go.uber.org/automaxprocs/maxprocs"
"go.uber.org/zap"

"ydbcp/internal/auth"
"ydbcp/internal/config"
"ydbcp/internal/connectors/client"
"ydbcp/internal/connectors/db"
"ydbcp/internal/connectors/db/yql/queries"
"ydbcp/internal/connectors/s3"
"ydbcp/internal/handlers"
"ydbcp/internal/processor"
"ydbcp/internal/server"
"ydbcp/internal/server/services/backup"
"ydbcp/internal/server/services/backup_schedule"
"ydbcp/internal/server/services/operation"
"ydbcp/internal/types"
"ydbcp/internal/util/xlog"
ap "ydbcp/pkg/plugins/auth"

"go.uber.org/automaxprocs/maxprocs"
"go.uber.org/zap"
)

func main() {
Expand Down Expand Up @@ -136,7 +136,7 @@ func main() {

if err := handlersRegistry.Add(
types.OperationTypeDB,
handlers.NewDBOperationHandler(dbConnector, s3Connector, configInstance, queries.NewWriteTableQuery(ctx)),
handlers.NewDBOperationHandler(dbConnector, s3Connector, configInstance, queries.NewWriteTableQuery),
); err != nil {
xlog.Error(ctx, "failed to register DB handler", zap.Error(err))
os.Exit(1)
Expand Down
1 change: 1 addition & 0 deletions internal/auth/dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"

"ydbcp/internal/util/xlog"
"ydbcp/pkg/plugins/auth"

Expand Down
1 change: 1 addition & 0 deletions internal/auth/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"

"ydbcp/internal/util/xlog"
"ydbcp/pkg/plugins/auth"

Expand Down
1 change: 1 addition & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"regexp"
"strings"

"ydbcp/internal/util/xlog"

"go.uber.org/zap"
Expand Down
7 changes: 3 additions & 4 deletions internal/connectors/client/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"regexp"
"strings"
"time"

"ydbcp/internal/config"
"ydbcp/internal/types"
"ydbcp/internal/util/xlog"
Expand All @@ -15,10 +16,6 @@ import (
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"

"github.com/ydb-platform/ydb-go-sdk/v3/retry"
"github.com/ydb-platform/ydb-go-sdk/v3/scheme"

"github.com/ydb-platform/ydb-go-genproto/Ydb_Export_V1"
"github.com/ydb-platform/ydb-go-genproto/Ydb_Import_V1"
"github.com/ydb-platform/ydb-go-genproto/Ydb_Operation_V1"
Expand All @@ -28,6 +25,8 @@ import (
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Operations"
"github.com/ydb-platform/ydb-go-sdk/v3"
"github.com/ydb-platform/ydb-go-sdk/v3/balancers"
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
"github.com/ydb-platform/ydb-go-sdk/v3/scheme"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/durationpb"
)
Expand Down
4 changes: 2 additions & 2 deletions internal/connectors/client/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import (
"context"
"fmt"
"path"
"ydbcp/internal/types"

"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Issue"
"ydbcp/internal/types"

"github.com/google/uuid"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Issue"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Operations"
"github.com/ydb-platform/ydb-go-sdk/v3"
)
Expand Down
19 changes: 8 additions & 11 deletions internal/connectors/db/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,20 @@ import (
"context"
"errors"
"fmt"
"google.golang.org/protobuf/types/known/timestamppb"
"time"

"ydbcp/internal/config"
"ydbcp/internal/connectors/db/yql/queries"
"ydbcp/internal/types"

table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types"
"ydbcp/internal/util/xlog"

"github.com/ydb-platform/ydb-go-sdk/v3"
"github.com/ydb-platform/ydb-go-sdk/v3/balancers"
"github.com/ydb-platform/ydb-go-sdk/v3/table"
"github.com/ydb-platform/ydb-go-sdk/v3/table/result"
table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types"
"go.uber.org/zap"

"ydbcp/internal/util/xlog"
"google.golang.org/protobuf/types/known/timestamppb"
)

var (
Expand All @@ -36,8 +35,6 @@ var (
)
)

var ErrUnimplemented = errors.New("unimplemented")

type DBConnector interface {
GetTableClient() table.Client
SelectBackups(ctx context.Context, queryBuilder queries.ReadTableQuery) (
Expand Down Expand Up @@ -319,14 +316,14 @@ func (d *YdbConnector) UpdateOperation(
operation.SetUpdatedAt(timestamppb.Now())
}

return d.ExecuteUpsert(ctx, queries.NewWriteTableQuery(ctx).WithUpdateOperation(operation))
return d.ExecuteUpsert(ctx, queries.NewWriteTableQuery().WithUpdateOperation(operation))
}

func (d *YdbConnector) CreateOperation(
ctx context.Context, operation types.Operation,
) (string, error) {
operation.SetID(types.GenerateObjectID())
err := d.ExecuteUpsert(ctx, queries.NewWriteTableQuery(ctx).WithCreateOperation(operation))
err := d.ExecuteUpsert(ctx, queries.NewWriteTableQuery().WithCreateOperation(operation))
if err != nil {
return "", err
}
Expand All @@ -338,7 +335,7 @@ func (d *YdbConnector) CreateBackup(
) (string, error) {
id := types.GenerateObjectID()
backup.ID = id
err := d.ExecuteUpsert(ctx, queries.NewWriteTableQuery(ctx).WithCreateBackup(backup))
err := d.ExecuteUpsert(ctx, queries.NewWriteTableQuery().WithCreateBackup(backup))
if err != nil {
return "", err
}
Expand All @@ -352,5 +349,5 @@ func (d *YdbConnector) UpdateBackup(
ID: id,
Status: backupStatus,
}
return d.ExecuteUpsert(ctx, queries.NewWriteTableQuery(ctx).WithUpdateBackup(backup))
return d.ExecuteUpsert(ctx, queries.NewWriteTableQuery().WithUpdateBackup(backup))
}
1 change: 1 addition & 0 deletions internal/connectors/db/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"sync"

"ydbcp/internal/connectors/db/yql/queries"
"ydbcp/internal/types"

Expand Down
4 changes: 2 additions & 2 deletions internal/connectors/db/process_result_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import (
"fmt"
"strings"
"time"

"ydbcp/internal/types"
pb "ydbcp/pkg/proto/ydbcp/v1alpha1"

"google.golang.org/protobuf/types/known/durationpb"

"github.com/ydb-platform/ydb-go-sdk/v3/table/result"
"github.com/ydb-platform/ydb-go-sdk/v3/table/result/named"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)

Expand Down
1 change: 1 addition & 0 deletions internal/connectors/db/yql/queries/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"strings"

"ydbcp/internal/util/xlog"

"github.com/ydb-platform/ydb-go-sdk/v3/table"
Expand Down
33 changes: 17 additions & 16 deletions internal/connectors/db/yql/queries/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
"errors"
"fmt"
"log"
"strings"

"ydbcp/internal/types"
"ydbcp/internal/util/xlog"

Expand All @@ -24,7 +26,6 @@ type WriteTableQuery interface {
}

type WriteTableQueryImpl struct {
ctx context.Context
tableQueries []WriteSingleTableQueryImpl
}

Expand All @@ -36,6 +37,9 @@ type WriteSingleTableQueryImpl struct {
updateParam *table.ParameterOption
}

type WriteTableQueryImplOption func(*WriteTableQueryImpl)
type WriteQueryBulderFactory func() WriteTableQuery

func (d *WriteSingleTableQueryImpl) AddValueParam(name string, value table_types.Value) {
d.upsertFields = append(d.upsertFields, name[1:])
d.tableQueryParams = append(d.tableQueryParams, table.ValueParam(fmt.Sprintf("%s_%d", name, d.index), value))
Expand All @@ -55,7 +59,7 @@ func (d *WriteSingleTableQueryImpl) GetParamNames() []string {
return res
}

func BuildCreateOperationQuery(ctx context.Context, operation types.Operation, index int) WriteSingleTableQueryImpl {
func BuildCreateOperationQuery(operation types.Operation, index int) WriteSingleTableQueryImpl {
d := WriteSingleTableQueryImpl{
index: index,
tableName: "Operations",
Expand Down Expand Up @@ -90,8 +94,7 @@ func BuildCreateOperationQuery(ctx context.Context, operation types.Operation, i
if operation.GetType() == types.OperationTypeTB {
tb, ok := operation.(*types.TakeBackupOperation)
if !ok {
xlog.Error(ctx, "error cast operation to TakeBackupOperation", zap.String("operation_id", operation.GetID()))
return d
log.Fatalf("error cast operation to TakeBackupOperation operation_id %s", operation.GetID())
}

d.AddValueParam(
Expand Down Expand Up @@ -125,8 +128,7 @@ func BuildCreateOperationQuery(ctx context.Context, operation types.Operation, i
} else if operation.GetType() == types.OperationTypeRB {
rb, ok := operation.(*types.RestoreBackupOperation)
if !ok {
xlog.Error(ctx, "error cast operation to RestoreBackupOperation", zap.String("operation_id", operation.GetID()))
return d
log.Fatalf("error cast operation to RestoreBackupOperation operation_id %s", operation.GetID())
}

d.AddValueParam(
Expand Down Expand Up @@ -155,8 +157,7 @@ func BuildCreateOperationQuery(ctx context.Context, operation types.Operation, i
} else if operation.GetType() == types.OperationTypeDB {
db, ok := operation.(*types.DeleteBackupOperation)
if !ok {
xlog.Error(ctx, "error cast operation to DeleteBackupOperation", zap.String("operation_id", operation.GetID()))
return d
log.Fatalf("error cast operation to DeleteBackupOperation operation_id %s", operation.GetID())
}

d.AddValueParam(
Expand All @@ -181,7 +182,11 @@ func BuildCreateOperationQuery(ctx context.Context, operation types.Operation, i
d.AddValueParam("$paths", table_types.StringValueFromString(db.PathPrefix))

} else {
xlog.Error(ctx, "unknown operation type write to db", zap.String("operation_type", string(operation.GetType())))
log.Fatalf(
"unknown operation type write to db operation_id %s, operation_type %s",
operation.GetID(),
operation.GetType().String(),
)
}

return d
Expand Down Expand Up @@ -357,12 +362,8 @@ func BuildUpdateBackupScheduleQuery(schedule types.BackupSchedule, index int) Wr
return d
}

type WriteTableQueryImplOption func(*WriteTableQueryImpl)

type WriteTableQueryMockOption func(*WriteTableQueryMock)

func NewWriteTableQuery(ctx context.Context) WriteTableQuery {
return &WriteTableQueryImpl{ctx: ctx}
func NewWriteTableQuery() WriteTableQuery {
return &WriteTableQueryImpl{}
}

func (d *WriteTableQueryImpl) WithCreateBackup(backup types.Backup) WriteTableQuery {
Expand All @@ -385,7 +386,7 @@ func (d *WriteTableQueryImpl) WithUpdateOperation(operation types.Operation) Wri

func (d *WriteTableQueryImpl) WithCreateOperation(operation types.Operation) WriteTableQuery {
index := len(d.tableQueries)
d.tableQueries = append(d.tableQueries, BuildCreateOperationQuery(d.ctx, operation, index))
d.tableQueries = append(d.tableQueries, BuildCreateOperationQuery(operation, index))
return d
}

Expand Down
5 changes: 4 additions & 1 deletion internal/connectors/db/yql/queries/write_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package queries

import (
"context"

"ydbcp/internal/types"
)

Expand All @@ -11,7 +12,9 @@ type WriteTableQueryMock struct {
BackupSchedule types.BackupSchedule
}

func NewWriteTableQueryMock(_ context.Context) WriteTableQuery {
type WriteTableQueryMockOption func(*WriteTableQueryMock)

func NewWriteTableQueryMock() WriteTableQuery {
return &WriteTableQueryMock{}
}

Expand Down
Loading

0 comments on commit 02af8fd

Please sign in to comment.