Skip to content

Commit

Permalink
Merge branch 'master' into skip-rollback
Browse files Browse the repository at this point in the history
  • Loading branch information
asmyasnikov authored May 16, 2024
2 parents 80be127 + 51e23f0 commit a7bd1b8
Show file tree
Hide file tree
Showing 15 changed files with 140 additions and 58 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
* Skipped explicit `Rollback` of transaction on errors (server-side automatically rolled back transactions on errors)
* Fixed race of stop internal processes on close topic writer
* Fixed goroutines leak within topic reader on network problems

## v3.67.0
* Added `ydb.WithNodeAddressMutator` experimental option for mutate node addresses from `discovery.ListEndpoints` response
Expand Down
21 changes: 15 additions & 6 deletions internal/background/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package background
import (
"context"
"errors"
"fmt"
"runtime/pprof"
"sync"

Expand All @@ -20,6 +21,7 @@ var (
// A Worker must not be copied after first use
type Worker struct {
ctx context.Context //nolint:containedctx
name string
workers sync.WaitGroup
closeReason error
tasksCompleted empty.Chan
Expand All @@ -32,8 +34,10 @@ type Worker struct {

type CallbackFunc func(ctx context.Context)

func NewWorker(parent context.Context) *Worker {
w := Worker{}
func NewWorker(parent context.Context, name string) *Worker {
w := Worker{
name: name,
}
w.ctx, w.stop = xcontext.WithCancel(parent)

return &w
Expand Down Expand Up @@ -72,7 +76,9 @@ func (b *Worker) Close(ctx context.Context, err error) error {
var resErr error
b.m.WithLock(func() {
if b.closed {
resErr = xerrors.WithStackTrace(ErrAlreadyClosed)
// The error of Close is second close, close reason added for describe previous close only, for better debug
//nolint:errorlint
resErr = xerrors.WithStackTrace(fmt.Errorf("%w with reason: %+v", ErrAlreadyClosed, b.closeReason))

return
}
Expand Down Expand Up @@ -122,11 +128,14 @@ func (b *Worker) init() {
}
b.tasks = make(chan backgroundTask)
b.tasksCompleted = make(empty.Chan)
go b.starterLoop()

pprof.Do(b.ctx, pprof.Labels("worker-name", b.name), func(ctx context.Context) {
go b.starterLoop(ctx)
})
})
}

func (b *Worker) starterLoop() {
func (b *Worker) starterLoop(ctx context.Context) {
defer close(b.tasksCompleted)

for bgTask := range b.tasks {
Expand All @@ -135,7 +144,7 @@ func (b *Worker) starterLoop() {
go func(task backgroundTask) {
defer b.workers.Done()

pprof.Do(b.ctx, pprof.Labels("background", task.name), task.callback)
pprof.Do(ctx, pprof.Labels("background", task.name), task.callback)
}(bgTask)
}
}
Expand Down
14 changes: 7 additions & 7 deletions internal/background/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestWorkerContext(t *testing.T) {
t.Run("Dedicated", func(t *testing.T) {
type ctxkey struct{}
ctx := context.WithValue(context.Background(), ctxkey{}, "2")
w := NewWorker(ctx)
w := NewWorker(ctx, "test-worker, "+t.Name())
require.Equal(t, "2", w.Context().Value(ctxkey{}))
})

Expand All @@ -41,7 +41,7 @@ func TestWorkerContext(t *testing.T) {

func TestWorkerStart(t *testing.T) {
t.Run("Started", func(t *testing.T) {
w := NewWorker(xtest.Context(t))
w := NewWorker(xtest.Context(t), "test-worker, "+t.Name())
started := make(empty.Chan)
w.Start("test", func(ctx context.Context) {
close(started)
Expand All @@ -50,7 +50,7 @@ func TestWorkerStart(t *testing.T) {
})
t.Run("Stopped", func(t *testing.T) {
ctx := xtest.Context(t)
w := NewWorker(ctx)
w := NewWorker(ctx, "test-worker, "+t.Name())
_ = w.Close(ctx, nil)

started := make(empty.Chan)
Expand All @@ -72,7 +72,7 @@ func TestWorkerStart(t *testing.T) {
func TestWorkerClose(t *testing.T) {
t.Run("StopBackground", func(t *testing.T) {
ctx := xtest.Context(t)
w := NewWorker(ctx)
w := NewWorker(ctx, "test-worker, "+t.Name())

started := make(empty.Chan)
stopped := atomic.Bool{}
Expand All @@ -89,7 +89,7 @@ func TestWorkerClose(t *testing.T) {

t.Run("DoubleClose", func(t *testing.T) {
ctx := xtest.Context(t)
w := NewWorker(ctx)
w := NewWorker(ctx, "test-worker, "+t.Name())
require.NoError(t, w.Close(ctx, nil))
require.Error(t, w.Close(ctx, nil))
})
Expand All @@ -104,7 +104,7 @@ func TestWorkerConcurrentStartAndClose(t *testing.T) {
var counter atomic.Int64

ctx := xtest.Context(t)
w := NewWorker(ctx)
w := NewWorker(ctx, "test-worker, "+t.Name())

stopNewStarts := atomic.Bool{}
var wgStarters sync.WaitGroup
Expand Down Expand Up @@ -144,7 +144,7 @@ func TestWorkerConcurrentStartAndClose(t *testing.T) {
func TestWorkerStartCompletedWhileLongWait(t *testing.T) {
xtest.TestManyTimes(t, func(t testing.TB) {
ctx := xtest.Context(t)
w := NewWorker(ctx)
w := NewWorker(ctx, "test-worker, "+t.Name())

allowStop := make(empty.Chan)
closeStarted := make(empty.Chan)
Expand Down
8 changes: 4 additions & 4 deletions internal/topic/topicreaderinternal/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package topicreaderinternal
import (
"context"
"errors"
"fmt"
"sync/atomic"
"time"

Expand Down Expand Up @@ -52,16 +53,15 @@ type committer struct {
commits CommitRanges
}

func newCommitter(tracer *trace.Topic, lifeContext context.Context, mode PublicCommitMode, send sendMessageToServerFunc) *committer { //nolint:lll,revive
func newCommitterStopped(tracer *trace.Topic, lifeContext context.Context, mode PublicCommitMode, send sendMessageToServerFunc, readerID int64) *committer { //nolint:lll,revive
res := &committer{
mode: mode,
clock: clockwork.NewRealClock(),
send: send,
backgroundWorker: *background.NewWorker(lifeContext),
backgroundWorker: *background.NewWorker(lifeContext, fmt.Sprintf("ydb-topic-reader-committer: %v", readerID)),
tracer: tracer,
}
res.initChannels()
res.start()

return res
}
Expand All @@ -70,7 +70,7 @@ func (c *committer) initChannels() {
c.commitLoopSignal = make(empty.Chan, 1)
}

func (c *committer) start() {
func (c *committer) Start() {
c.backgroundWorker.Start("commit pusher", c.pushCommitsLoop)
}

Expand Down
7 changes: 4 additions & 3 deletions internal/topic/topicreaderinternal/committer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,11 +377,12 @@ func TestCommitterBuffer(t *testing.T) {
}

func newTestCommitter(ctx context.Context, t testing.TB) *committer {
res := newCommitter(&trace.Topic{}, ctx, CommitModeAsync, func(msg rawtopicreader.ClientMessage) error {
res := newCommitterStopped(&trace.Topic{}, ctx, CommitModeAsync, func(msg rawtopicreader.ClientMessage) error {
return nil
})
}, -1)
res.Start()
t.Cleanup(func() {
if err := res.Close(ctx, errors.New("test comitter closed")); err != nil {
if err := res.Close(ctx, errors.New("test committer closed")); err != nil {
require.ErrorIs(t, err, background.ErrAlreadyClosed)
}
})
Expand Down
14 changes: 10 additions & 4 deletions internal/topic/topicreaderinternal/stream_reader_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func newTopicStreamReader(
if err = reader.initSession(); err != nil {
return nil, err
}
if err = reader.startLoops(); err != nil {
if err = reader.startBackgroundWorkers(); err != nil {
return nil, err
}

Expand All @@ -150,13 +150,17 @@ func newTopicStreamReaderStopped(
stream: &syncedStream{stream: stream},
cancel: cancel,
batcher: newBatcher(),
backgroundWorkers: *background.NewWorker(stopPump),
readConnectionID: "preinitID-" + readerConnectionID.String(),
readerID: readerID,
rawMessagesFromBuffer: make(chan rawtopicreader.ServerMessage, 1),
}

res.committer = newCommitter(cfg.Trace, labeledContext, cfg.CommitMode, res.send)
res.backgroundWorkers = *background.NewWorker(stopPump, fmt.Sprintf(
"topic-reader-stream-background: %v",
res.readerID,
))

res.committer = newCommitterStopped(cfg.Trace, labeledContext, cfg.CommitMode, res.send, res.readerID)
res.committer.BufferTimeLagTrigger = cfg.CommitterBatchTimeLag
res.committer.BufferCountTrigger = cfg.CommitterBatchCounterTrigger
res.sessionController.init()
Expand Down Expand Up @@ -413,11 +417,13 @@ func (r *topicStreamReaderImpl) send(msg rawtopicreader.ClientMessage) error {
return err
}

func (r *topicStreamReaderImpl) startLoops() error {
func (r *topicStreamReaderImpl) startBackgroundWorkers() error {
if err := r.setStarted(); err != nil {
return err
}

r.committer.Start()

r.backgroundWorkers.Start("readMessagesLoop", r.readMessagesLoop)
r.backgroundWorkers.Start("dataRequestLoop", r.dataRequestLoop)
r.backgroundWorkers.Start("updateTokenLoop", r.updateTokenLoop)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1055,7 +1055,7 @@ func newTopicReaderTestEnv(t testing.TB) streamEnv {
}

func (e *streamEnv) Start() {
require.NoError(e.t, e.reader.startLoops())
require.NoError(e.t, e.reader.startBackgroundWorkers())
xtest.SpinWaitCondition(e.t, nil, func() bool {
return e.reader.restBufferSizeBytes.Load() == e.initialBufferSizeBytes
})
Expand Down
19 changes: 11 additions & 8 deletions internal/topic/topicreaderinternal/stream_reconnector.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3/internal/empty"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/topic"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/value"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
Expand All @@ -24,6 +23,7 @@ import (
var (
errReconnectRequestOutdated = xerrors.Wrap(errors.New("ydb: reconnect request outdated"))
errReconnect = xerrors.Wrap(errors.New("ydb: reconnect to topic grpc stream"))
errConnectionTimeout = xerrors.Wrap(errors.New("ydb: topic reader connection timeout for stream"))
)

type readerConnectFunc func(ctx context.Context) (batchedStreamReader, error)
Expand All @@ -33,6 +33,7 @@ type readerReconnector struct {
clock clockwork.Clock
retrySettings topic.RetrySettings
streamVal batchedStreamReader
streamContextCancel context.CancelCauseFunc
streamErr error
closedErr error
initErr error
Expand Down Expand Up @@ -148,6 +149,7 @@ func (r *readerReconnector) CloseWithError(ctx context.Context, err error) error

if r.streamVal != nil {
streamCloseErr := r.streamVal.CloseWithError(ctx, xerrors.WithStackTrace(errReaderClosed))
r.streamContextCancel(errReaderClosed)
if closeErr == nil {
closeErr = streamCloseErr
}
Expand Down Expand Up @@ -267,7 +269,7 @@ func (r *readerReconnector) reconnect(ctx context.Context, reason error, oldRead
_ = oldReader.CloseWithError(ctx, xerrors.WithStackTrace(errReconnect))
}

newStream, err := r.connectWithTimeout()
newStream, newStreamClose, err := r.connectWithTimeout()

if r.isRetriableError(err) {
go func(reason error) {
Expand All @@ -281,6 +283,7 @@ func (r *readerReconnector) reconnect(ctx context.Context, reason error, oldRead
r.streamErr = err
if err == nil {
r.streamVal = newStream
r.streamContextCancel = newStreamClose
if !r.initDone {
r.initDone = true
close(r.initDoneCh)
Expand All @@ -304,14 +307,14 @@ func (r *readerReconnector) checkErrRetryMode(err error, retriesDuration time.Du
return topic.CheckRetryMode(err, r.retrySettings, retriesDuration)
}

func (r *readerReconnector) connectWithTimeout() (_ batchedStreamReader, err error) {
func (r *readerReconnector) connectWithTimeout() (_ batchedStreamReader, _ context.CancelCauseFunc, err error) {
bgContext := r.background.Context()

if err = bgContext.Err(); err != nil {
return nil, err
return nil, nil, err
}

connectionContext, cancel := xcontext.WithCancel(context.Background())
connectionContext, cancel := context.WithCancelCause(context.WithoutCancel(bgContext))

type connectResult struct {
stream batchedStreamReader
Expand All @@ -332,17 +335,17 @@ func (r *readerReconnector) connectWithTimeout() (_ batchedStreamReader, err err
case <-connectionTimoutTimer.Chan():
// cancel connection context only if timeout exceed while connection
// because if cancel context after connect - it will break
cancel()
cancel(xerrors.WithStackTrace(errConnectionTimeout))
res = <-result
case res = <-result:
// pass
}

if res.err == nil {
return res.stream, nil
return res.stream, cancel, nil
}

return nil, res.err
return nil, nil, res.err
}

func (r *readerReconnector) WaitInit(ctx context.Context) error {
Expand Down
19 changes: 14 additions & 5 deletions internal/topic/topicreaderinternal/stream_reconnector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ func TestTopicReaderReconnectorReadMessageBatch(t *testing.T) {
baseReader.EXPECT().ReadMessageBatch(gomock.Any(), opts).Return(batch, nil)

reader := &readerReconnector{
streamVal: baseReader,
tracer: &trace.Topic{},
streamVal: baseReader,
streamContextCancel: func(cause error) {},
tracer: &trace.Topic{},
}
reader.initChannelsAndClock()
res, err := reader.ReadMessageBatch(context.Background(), opts)
Expand Down Expand Up @@ -163,7 +164,11 @@ func TestTopicReaderReconnectorCommit(t *testing.T) {
require.Equal(t, "v", ctx.Value(k{}))
require.Equal(t, expectedCommitRange, offset)
})
reconnector := &readerReconnector{streamVal: stream, tracer: &trace.Topic{}}
reconnector := &readerReconnector{
streamVal: stream,
streamContextCancel: func(cause error) {},
tracer: &trace.Topic{},
}
reconnector.initChannelsAndClock()
require.NoError(t, reconnector.Commit(ctx, expectedCommitRange))
})
Expand All @@ -174,7 +179,11 @@ func TestTopicReaderReconnectorCommit(t *testing.T) {
require.Equal(t, "v", ctx.Value(k{}))
require.Equal(t, expectedCommitRange, offset)
}).Return(testErr)
reconnector := &readerReconnector{streamVal: stream, tracer: &trace.Topic{}}
reconnector := &readerReconnector{
streamVal: stream,
streamContextCancel: func(cause error) {},
tracer: &trace.Topic{},
}
reconnector.initChannelsAndClock()
require.ErrorIs(t, reconnector.Commit(ctx, expectedCommitRange), testErr)
})
Expand Down Expand Up @@ -209,7 +218,7 @@ func TestTopicReaderReconnectorConnectionLoop(t *testing.T) {

reconnector := &readerReconnector{
connectTimeout: value.InfiniteDuration,
background: *background.NewWorker(ctx),
background: *background.NewWorker(ctx, "test-worker, "+t.Name()),
tracer: &trace.Topic{},
}
reconnector.initChannelsAndClock()
Expand Down
9 changes: 9 additions & 0 deletions internal/topic/topicwriterinternal/writer_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package topicwriterinternal
import (
"time"

"github.com/jonboulle/clockwork"

"github.com/ydb-platform/ydb-go-sdk/v3/credentials"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon"
Expand Down Expand Up @@ -152,3 +154,10 @@ func WithTopic(topic string) PublicWriterOption {
cfg.topic = topic
}
}

// WithClock is private option for tests
func WithClock(clock clockwork.Clock) PublicWriterOption {
return func(cfg *WriterReconnectorConfig) {
cfg.clock = clock
}
}
Loading

0 comments on commit a7bd1b8

Please sign in to comment.