Skip to content

Commit

Permalink
[Disk Manager] better YDB client errors metric (#1853)
Browse files Browse the repository at this point in the history
  • Loading branch information
Rattysed authored Sep 2, 2024
1 parent 408dddb commit 45a1854
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 17 deletions.
62 changes: 54 additions & 8 deletions cloud/tasks/persistence/ydb.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,48 @@ func (s *Session) execute(
return tx, Result{res: ydbRes}, nil
}

func (s *Session) CreateOrAlterTable(
ctx context.Context,
fullPath string,
description CreateTableDescription,
dropUnusedColumns bool,
) (err error) {

ctx, cancel := context.WithTimeout(ctx, s.callTimeout)
defer cancel()

defer s.metrics.StatCall(
ctx,
"session/CreateOrAlterTable",
fmt.Sprintf("At path: %q", fullPath),
)(&err)

return createOrAlterTable(
ctx,
s.session,
fullPath,
description,
dropUnusedColumns,
)
}

func (s *Session) DropTable(
ctx context.Context,
fullPath string,
) (err error) {

ctx, cancel := context.WithTimeout(ctx, s.callTimeout)
defer cancel()

defer s.metrics.StatCall(
ctx,
"session/DropTable",
fmt.Sprintf("At path: %q", fullPath),
)(&err)

return dropTable(ctx, s.session, fullPath)
}

////////////////////////////////////////////////////////////////////////////////

type YDBClient struct {
Expand Down Expand Up @@ -515,15 +557,13 @@ func (c *YDBClient) CreateOrAlterTable(

return c.Execute(
ctx,
func(ctx context.Context, s *Session) error {
err := c.makeDirs(ctx, folderFullPath)
func(ctx context.Context, s *Session) (err error) {
err = c.makeDirs(ctx, folderFullPath)
if err != nil {
return err
}

return createOrAlterTable(
ctx,
s.session,
return s.CreateOrAlterTable(ctx,
fullPath,
description,
dropUnusedColumns,
Expand All @@ -544,7 +584,7 @@ func (c *YDBClient) DropTable(
return c.Execute(
ctx,
func(ctx context.Context, s *Session) error {
return dropTable(ctx, s.session, fullPath)
return s.DropTable(ctx, fullPath)
},
)
}
Expand Down Expand Up @@ -607,7 +647,13 @@ func (c *YDBClient) ExecuteRW(

////////////////////////////////////////////////////////////////////////////////

func (c *YDBClient) makeDirs(ctx context.Context, absolutePath string) error {
func (c *YDBClient) makeDirs(ctx context.Context, absolutePath string) (err error) {
defer c.metrics.StatCall(
ctx,
"client/makeDirs",
fmt.Sprintf("At path: %q", absolutePath),
)(&err)

if !strings.HasPrefix(absolutePath, c.database) {
return errors.NewNonRetriableErrorf(
"'%v' is expected to be rooted at '%v'",
Expand All @@ -626,7 +672,7 @@ func (c *YDBClient) makeDirs(ctx context.Context, absolutePath string) error {
}

dirPath = path.Join(dirPath, part)
err := c.db.Scheme().MakeDirectory(ctx, dirPath)
err = c.db.Scheme().MakeDirectory(ctx, dirPath)
if err != nil {
return errors.NewNonRetriableErrorf(
"cannot make directory %v: %w",
Expand Down
26 changes: 25 additions & 1 deletion cloud/tasks/persistence/ydb_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ func (m *ydbMetrics) StatCall(

// Should initialize all counters before using them, to avoid 'no data'.
hangingCounter := subRegistry.Counter("hanging")
errorCounter := subRegistry.Counter("errors")
timeoutCounter := subRegistry.Counter("timeout")
successCounter := subRegistry.Counter("success")

Expand All @@ -45,6 +44,31 @@ func (m *ydbMetrics) StatCall(
}

if *err != nil {
var errorType string

switch {
case ydb.IsOperationErrorTransactionLocksInvalidated(*err):
errorType = "TLI"
case ydb.IsOperationErrorSchemeError(*err):
errorType = "scheme"
case ydb.IsTransportError(*err):
errorType = "transport"
case ydb.IsOperationErrorOverloaded(*err):
errorType = "overloaded"
case ydb.IsOperationErrorUnavailable(*err):
errorType = "unavailable"
case ydb.IsRatelimiterAcquireError(*err):
errorType = "ratelimiterAcquire"
default:
errorType = "unknown"
}

errorRegistry := subRegistry.WithTags(map[string]string{
"type": errorType,
})

// Should initialize all counters before using them, to avoid 'no data'.
errorCounter := errorRegistry.Counter("errors")
errorCounter.Inc()

if errors.Is(*err, context.DeadlineExceeded) {
Expand Down
56 changes: 48 additions & 8 deletions cloud/tasks/persistence/ydb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@ import (
"context"
"fmt"
"os"
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/ydb-platform/nbs/cloud/tasks/logging"
"github.com/ydb-platform/nbs/cloud/tasks/metrics"
"github.com/ydb-platform/nbs/cloud/tasks/metrics/empty"
"github.com/ydb-platform/nbs/cloud/tasks/metrics/mocks"
persistence_config "github.com/ydb-platform/nbs/cloud/tasks/persistence/config"
"github.com/ydb-platform/ydb-go-sdk/v3"
)

////////////////////////////////////////////////////////////////////////////////
Expand All @@ -22,7 +26,11 @@ func newContext() context.Context {
)
}

func newYDB(ctx context.Context) (*YDBClient, error) {
func newYDB(
ctx context.Context,
metricsRegistry metrics.Registry,
) (*YDBClient, error) {

endpoint := os.Getenv("YDB_ENDPOINT")
database := os.Getenv("YDB_DATABASE")
rootPath := "disk_manager"
Expand All @@ -34,7 +42,7 @@ func newYDB(ctx context.Context) (*YDBClient, error) {
Database: &database,
RootPath: &rootPath,
},
empty.NewRegistry(),
metricsRegistry,
)
}

Expand Down Expand Up @@ -231,7 +239,7 @@ func TestYDBCreateTableV1(t *testing.T) {
ctx, cancel := context.WithCancel(newContext())
defer cancel()

db, err := newYDB(ctx)
db, err := newYDB(ctx, empty.NewRegistry())
require.NoError(t, err)
defer db.Close(ctx)

Expand Down Expand Up @@ -265,7 +273,7 @@ func TestYDBCreateTableV1Twice(t *testing.T) {
ctx, cancel := context.WithCancel(newContext())
defer cancel()

db, err := newYDB(ctx)
db, err := newYDB(ctx, empty.NewRegistry())
require.NoError(t, err)
defer db.Close(ctx)

Expand Down Expand Up @@ -316,7 +324,7 @@ func TestYDBMigrateTableV1ToTableV2(t *testing.T) {
ctx, cancel := context.WithCancel(newContext())
defer cancel()

db, err := newYDB(ctx)
db, err := newYDB(ctx, empty.NewRegistry())
require.NoError(t, err)
defer db.Close(ctx)

Expand Down Expand Up @@ -374,7 +382,7 @@ func TestYDBUseV1InV2(t *testing.T) {
ctx, cancel := context.WithCancel(newContext())
defer cancel()

db, err := newYDB(ctx)
db, err := newYDB(ctx, empty.NewRegistry())
require.NoError(t, err)
defer db.Close(ctx)

Expand Down Expand Up @@ -432,7 +440,7 @@ func TestYDBFailMigrationChangingType(t *testing.T) {
ctx, cancel := context.WithCancel(newContext())
defer cancel()

db, err := newYDB(ctx)
db, err := newYDB(ctx, empty.NewRegistry())
require.NoError(t, err)
defer db.Close(ctx)

Expand Down Expand Up @@ -466,7 +474,7 @@ func TestYDBFailMigrationChangingPrimaryKey(t *testing.T) {
ctx, cancel := context.WithCancel(newContext())
defer cancel()

db, err := newYDB(ctx)
db, err := newYDB(ctx, empty.NewRegistry())
require.NoError(t, err)
defer db.Close(ctx)

Expand Down Expand Up @@ -495,3 +503,35 @@ func TestYDBFailMigrationChangingPrimaryKey(t *testing.T) {
)
assert.Error(t, err)
}

func TestYDBShouldSendSchemeErrorMetric(t *testing.T) {
ctx, cancel := context.WithCancel(newContext())
defer cancel()

metricsRegistry := mocks.NewRegistryMock()

db, err := newYDB(ctx, metricsRegistry)
require.NoError(t, err)
defer db.Close(ctx)

metricsRegistry.GetCounter(
"errors",
map[string]string{"call": "client/makeDirs", "type": "scheme"},
).On("Inc").Once()

// YDB has limited length of object name. Current limit is 255.
extraLargeString := strings.Repeat("x", 100500)
folder := fmt.Sprintf("ydb_test/%s/%s", extraLargeString, t.Name())
table := "table"

err = db.CreateOrAlterTable(
ctx,
folder,
table,
tableV1TableDescription(),
false, // dropUnusedColumns
)
require.True(t, ydb.IsOperationErrorSchemeError(err))

metricsRegistry.AssertAllExpectations(t)
}

0 comments on commit 45a1854

Please sign in to comment.