From 7ba94b0523e8bff3f6804362fbb399d849e84016 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Wed, 15 May 2024 17:39:22 +0300 Subject: [PATCH 01/10] Fix close connection lifetime context --- .../topicreaderinternal/stream_reconnector.go | 24 +++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/internal/topic/topicreaderinternal/stream_reconnector.go b/internal/topic/topicreaderinternal/stream_reconnector.go index f43044624..8abf7f18f 100644 --- a/internal/topic/topicreaderinternal/stream_reconnector.go +++ b/internal/topic/topicreaderinternal/stream_reconnector.go @@ -15,7 +15,6 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/empty" "github.com/ydb-platform/ydb-go-sdk/v3/internal/topic" "github.com/ydb-platform/ydb-go-sdk/v3/internal/value" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync" "github.com/ydb-platform/ydb-go-sdk/v3/trace" @@ -24,6 +23,8 @@ import ( var ( errReconnectRequestOutdated = xerrors.Wrap(errors.New("ydb: reconnect request outdated")) errReconnect = xerrors.Wrap(errors.New("ydb: reconnect to topic grpc stream")) + errStreamClosed = xerrors.Wrap(errors.New("ydb: topic reader stream closed")) + errConnectionTimeout = xerrors.Wrap(errors.New("ydb: topic reader connection timeout for stream")) ) type readerConnectFunc func(ctx context.Context) (batchedStreamReader, error) @@ -311,7 +312,7 @@ func (r *readerReconnector) connectWithTimeout() (_ batchedStreamReader, err err return nil, err } - connectionContext, cancel := xcontext.WithCancel(context.Background()) + connectionContext, cancel := context.WithCancelCause(context.WithoutCancel(bgContext)) type connectResult struct { stream batchedStreamReader @@ -332,14 +333,18 @@ func (r *readerReconnector) connectWithTimeout() (_ batchedStreamReader, err err case <-connectionTimoutTimer.Chan(): // cancel connection context only if timeout exceed while connection // because if cancel context after connect - it will break - cancel() + cancel(xerrors.WithStackTrace(errConnectionTimeout)) res = <-result case res = <-result: // pass } if res.err == nil { - return res.stream, nil + stream := batchedStreamReaderHook{ + batchedStreamReader: res.stream, + contextCancel: cancel, + } + return stream, nil } return nil, res.err @@ -417,6 +422,17 @@ func (r *readerReconnector) handlePanic() { } } +type batchedStreamReaderHook struct { + batchedStreamReader + contextCancel context.CancelCauseFunc +} + +func (b batchedStreamReaderHook) CloseWithError(ctx context.Context, err error) error { + defer b.contextCancel(xerrors.WithStackTrace(errStreamClosed)) + + return b.batchedStreamReader.CloseWithError(ctx, err) +} + type reconnectRequest struct { oldReader batchedStreamReader reason error From 65bdfaee8a3220b9ddd2a53a4aa542fca3f58e35 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Wed, 15 May 2024 19:14:29 +0300 Subject: [PATCH 02/10] Fix leak committer goroutines --- internal/background/worker.go | 16 ++++++---- internal/background/worker_test.go | 14 ++++----- .../topic/topicreaderinternal/committer.go | 8 ++--- .../topicreaderinternal/committer_test.go | 7 +++-- .../topicreaderinternal/stream_reader_impl.go | 14 ++++++--- .../stream_reader_impl_test.go | 2 +- .../topicreaderinternal/stream_reconnector.go | 29 +++++-------------- .../stream_reconnector_test.go | 19 ++++++++---- .../writer_single_stream.go | 7 +++-- 9 files changed, 64 insertions(+), 52 deletions(-) diff --git a/internal/background/worker.go b/internal/background/worker.go index c41ad4b0a..7c1389b04 100644 --- a/internal/background/worker.go +++ b/internal/background/worker.go @@ -20,6 +20,7 @@ var ( // A Worker must not be copied after first use type Worker struct { ctx context.Context //nolint:containedctx + name string workers sync.WaitGroup closeReason error tasksCompleted empty.Chan @@ -32,8 +33,10 @@ type Worker struct { type CallbackFunc func(ctx context.Context) -func NewWorker(parent context.Context) *Worker { - w := Worker{} +func NewWorker(parent context.Context, name string) *Worker { + w := Worker{ + name: name, + } w.ctx, w.stop = xcontext.WithCancel(parent) return &w @@ -122,11 +125,14 @@ func (b *Worker) init() { } b.tasks = make(chan backgroundTask) b.tasksCompleted = make(empty.Chan) - go b.starterLoop() + + pprof.Do(b.ctx, pprof.Labels("worker-name", b.name), func(ctx context.Context) { + go b.starterLoop(ctx) + }) }) } -func (b *Worker) starterLoop() { +func (b *Worker) starterLoop(ctx context.Context) { defer close(b.tasksCompleted) for bgTask := range b.tasks { @@ -135,7 +141,7 @@ func (b *Worker) starterLoop() { go func(task backgroundTask) { defer b.workers.Done() - pprof.Do(b.ctx, pprof.Labels("background", task.name), task.callback) + pprof.Do(ctx, pprof.Labels("background", task.name), task.callback) }(bgTask) } } diff --git a/internal/background/worker_test.go b/internal/background/worker_test.go index 2a188fb00..c149da4fb 100644 --- a/internal/background/worker_test.go +++ b/internal/background/worker_test.go @@ -25,7 +25,7 @@ func TestWorkerContext(t *testing.T) { t.Run("Dedicated", func(t *testing.T) { type ctxkey struct{} ctx := context.WithValue(context.Background(), ctxkey{}, "2") - w := NewWorker(ctx) + w := NewWorker(ctx, "test-worker, "+t.Name()) require.Equal(t, "2", w.Context().Value(ctxkey{})) }) @@ -41,7 +41,7 @@ func TestWorkerContext(t *testing.T) { func TestWorkerStart(t *testing.T) { t.Run("Started", func(t *testing.T) { - w := NewWorker(xtest.Context(t)) + w := NewWorker(xtest.Context(t), "test-worker, "+t.Name()) started := make(empty.Chan) w.Start("test", func(ctx context.Context) { close(started) @@ -50,7 +50,7 @@ func TestWorkerStart(t *testing.T) { }) t.Run("Stopped", func(t *testing.T) { ctx := xtest.Context(t) - w := NewWorker(ctx) + w := NewWorker(ctx, "test-worker, "+t.Name()) _ = w.Close(ctx, nil) started := make(empty.Chan) @@ -72,7 +72,7 @@ func TestWorkerStart(t *testing.T) { func TestWorkerClose(t *testing.T) { t.Run("StopBackground", func(t *testing.T) { ctx := xtest.Context(t) - w := NewWorker(ctx) + w := NewWorker(ctx, "test-worker, "+t.Name()) started := make(empty.Chan) stopped := atomic.Bool{} @@ -89,7 +89,7 @@ func TestWorkerClose(t *testing.T) { t.Run("DoubleClose", func(t *testing.T) { ctx := xtest.Context(t) - w := NewWorker(ctx) + w := NewWorker(ctx, "test-worker, "+t.Name()) require.NoError(t, w.Close(ctx, nil)) require.Error(t, w.Close(ctx, nil)) }) @@ -104,7 +104,7 @@ func TestWorkerConcurrentStartAndClose(t *testing.T) { var counter atomic.Int64 ctx := xtest.Context(t) - w := NewWorker(ctx) + w := NewWorker(ctx, "test-worker, "+t.Name()) stopNewStarts := atomic.Bool{} var wgStarters sync.WaitGroup @@ -144,7 +144,7 @@ func TestWorkerConcurrentStartAndClose(t *testing.T) { func TestWorkerStartCompletedWhileLongWait(t *testing.T) { xtest.TestManyTimes(t, func(t testing.TB) { ctx := xtest.Context(t) - w := NewWorker(ctx) + w := NewWorker(ctx, "test-worker, "+t.Name()) allowStop := make(empty.Chan) closeStarted := make(empty.Chan) diff --git a/internal/topic/topicreaderinternal/committer.go b/internal/topic/topicreaderinternal/committer.go index f0fa1c425..67940df1d 100644 --- a/internal/topic/topicreaderinternal/committer.go +++ b/internal/topic/topicreaderinternal/committer.go @@ -3,6 +3,7 @@ package topicreaderinternal import ( "context" "errors" + "fmt" "sync/atomic" "time" @@ -52,16 +53,15 @@ type committer struct { commits CommitRanges } -func newCommitter(tracer *trace.Topic, lifeContext context.Context, mode PublicCommitMode, send sendMessageToServerFunc) *committer { //nolint:lll,revive +func newCommitterStopped(tracer *trace.Topic, lifeContext context.Context, mode PublicCommitMode, send sendMessageToServerFunc, readerID int64) *committer { //nolint:lll,revive res := &committer{ mode: mode, clock: clockwork.NewRealClock(), send: send, - backgroundWorker: *background.NewWorker(lifeContext), + backgroundWorker: *background.NewWorker(lifeContext, fmt.Sprintf("ydb-topic-reader-committer: %v", readerID)), tracer: tracer, } res.initChannels() - res.start() return res } @@ -70,7 +70,7 @@ func (c *committer) initChannels() { c.commitLoopSignal = make(empty.Chan, 1) } -func (c *committer) start() { +func (c *committer) Start() { c.backgroundWorker.Start("commit pusher", c.pushCommitsLoop) } diff --git a/internal/topic/topicreaderinternal/committer_test.go b/internal/topic/topicreaderinternal/committer_test.go index 7bf444d62..2c3b9d2af 100644 --- a/internal/topic/topicreaderinternal/committer_test.go +++ b/internal/topic/topicreaderinternal/committer_test.go @@ -377,11 +377,12 @@ func TestCommitterBuffer(t *testing.T) { } func newTestCommitter(ctx context.Context, t testing.TB) *committer { - res := newCommitter(&trace.Topic{}, ctx, CommitModeAsync, func(msg rawtopicreader.ClientMessage) error { + res := newCommitterStopped(&trace.Topic{}, ctx, CommitModeAsync, func(msg rawtopicreader.ClientMessage) error { return nil - }) + }, -1) + res.Start() t.Cleanup(func() { - if err := res.Close(ctx, errors.New("test comitter closed")); err != nil { + if err := res.Close(ctx, errors.New("test committer closed")); err != nil { require.ErrorIs(t, err, background.ErrAlreadyClosed) } }) diff --git a/internal/topic/topicreaderinternal/stream_reader_impl.go b/internal/topic/topicreaderinternal/stream_reader_impl.go index 8eb91e0ac..8e80575b9 100644 --- a/internal/topic/topicreaderinternal/stream_reader_impl.go +++ b/internal/topic/topicreaderinternal/stream_reader_impl.go @@ -123,7 +123,7 @@ func newTopicStreamReader( if err = reader.initSession(); err != nil { return nil, err } - if err = reader.startLoops(); err != nil { + if err = reader.startBackgroundWorkers(); err != nil { return nil, err } @@ -150,13 +150,17 @@ func newTopicStreamReaderStopped( stream: &syncedStream{stream: stream}, cancel: cancel, batcher: newBatcher(), - backgroundWorkers: *background.NewWorker(stopPump), readConnectionID: "preinitID-" + readerConnectionID.String(), readerID: readerID, rawMessagesFromBuffer: make(chan rawtopicreader.ServerMessage, 1), } - res.committer = newCommitter(cfg.Trace, labeledContext, cfg.CommitMode, res.send) + res.backgroundWorkers = *background.NewWorker(stopPump, fmt.Sprintf( + "topic-reader-stream-background: %v", + res.readerID, + )) + + res.committer = newCommitterStopped(cfg.Trace, labeledContext, cfg.CommitMode, res.send, res.readerID) res.committer.BufferTimeLagTrigger = cfg.CommitterBatchTimeLag res.committer.BufferCountTrigger = cfg.CommitterBatchCounterTrigger res.sessionController.init() @@ -413,11 +417,13 @@ func (r *topicStreamReaderImpl) send(msg rawtopicreader.ClientMessage) error { return err } -func (r *topicStreamReaderImpl) startLoops() error { +func (r *topicStreamReaderImpl) startBackgroundWorkers() error { if err := r.setStarted(); err != nil { return err } + r.committer.Start() + r.backgroundWorkers.Start("readMessagesLoop", r.readMessagesLoop) r.backgroundWorkers.Start("dataRequestLoop", r.dataRequestLoop) r.backgroundWorkers.Start("updateTokenLoop", r.updateTokenLoop) diff --git a/internal/topic/topicreaderinternal/stream_reader_impl_test.go b/internal/topic/topicreaderinternal/stream_reader_impl_test.go index 22fb2997a..b7ce7b9d0 100644 --- a/internal/topic/topicreaderinternal/stream_reader_impl_test.go +++ b/internal/topic/topicreaderinternal/stream_reader_impl_test.go @@ -1055,7 +1055,7 @@ func newTopicReaderTestEnv(t testing.TB) streamEnv { } func (e *streamEnv) Start() { - require.NoError(e.t, e.reader.startLoops()) + require.NoError(e.t, e.reader.startBackgroundWorkers()) xtest.SpinWaitCondition(e.t, nil, func() bool { return e.reader.restBufferSizeBytes.Load() == e.initialBufferSizeBytes }) diff --git a/internal/topic/topicreaderinternal/stream_reconnector.go b/internal/topic/topicreaderinternal/stream_reconnector.go index 8abf7f18f..403c81f93 100644 --- a/internal/topic/topicreaderinternal/stream_reconnector.go +++ b/internal/topic/topicreaderinternal/stream_reconnector.go @@ -23,7 +23,6 @@ import ( var ( errReconnectRequestOutdated = xerrors.Wrap(errors.New("ydb: reconnect request outdated")) errReconnect = xerrors.Wrap(errors.New("ydb: reconnect to topic grpc stream")) - errStreamClosed = xerrors.Wrap(errors.New("ydb: topic reader stream closed")) errConnectionTimeout = xerrors.Wrap(errors.New("ydb: topic reader connection timeout for stream")) ) @@ -34,6 +33,7 @@ type readerReconnector struct { clock clockwork.Clock retrySettings topic.RetrySettings streamVal batchedStreamReader + streamContextCancel context.CancelCauseFunc streamErr error closedErr error initErr error @@ -149,6 +149,7 @@ func (r *readerReconnector) CloseWithError(ctx context.Context, err error) error if r.streamVal != nil { streamCloseErr := r.streamVal.CloseWithError(ctx, xerrors.WithStackTrace(errReaderClosed)) + r.streamContextCancel(errReaderClosed) if closeErr == nil { closeErr = streamCloseErr } @@ -268,7 +269,7 @@ func (r *readerReconnector) reconnect(ctx context.Context, reason error, oldRead _ = oldReader.CloseWithError(ctx, xerrors.WithStackTrace(errReconnect)) } - newStream, err := r.connectWithTimeout() + newStream, newStreamClose, err := r.connectWithTimeout() if r.isRetriableError(err) { go func(reason error) { @@ -282,6 +283,7 @@ func (r *readerReconnector) reconnect(ctx context.Context, reason error, oldRead r.streamErr = err if err == nil { r.streamVal = newStream + r.streamContextCancel = newStreamClose if !r.initDone { r.initDone = true close(r.initDoneCh) @@ -305,11 +307,11 @@ func (r *readerReconnector) checkErrRetryMode(err error, retriesDuration time.Du return topic.CheckRetryMode(err, r.retrySettings, retriesDuration) } -func (r *readerReconnector) connectWithTimeout() (_ batchedStreamReader, err error) { +func (r *readerReconnector) connectWithTimeout() (_ batchedStreamReader, _ context.CancelCauseFunc, err error) { bgContext := r.background.Context() if err = bgContext.Err(); err != nil { - return nil, err + return nil, nil, err } connectionContext, cancel := context.WithCancelCause(context.WithoutCancel(bgContext)) @@ -340,14 +342,10 @@ func (r *readerReconnector) connectWithTimeout() (_ batchedStreamReader, err err } if res.err == nil { - stream := batchedStreamReaderHook{ - batchedStreamReader: res.stream, - contextCancel: cancel, - } - return stream, nil + return res.stream, cancel, nil } - return nil, res.err + return nil, nil, res.err } func (r *readerReconnector) WaitInit(ctx context.Context) error { @@ -422,17 +420,6 @@ func (r *readerReconnector) handlePanic() { } } -type batchedStreamReaderHook struct { - batchedStreamReader - contextCancel context.CancelCauseFunc -} - -func (b batchedStreamReaderHook) CloseWithError(ctx context.Context, err error) error { - defer b.contextCancel(xerrors.WithStackTrace(errStreamClosed)) - - return b.batchedStreamReader.CloseWithError(ctx, err) -} - type reconnectRequest struct { oldReader batchedStreamReader reason error diff --git a/internal/topic/topicreaderinternal/stream_reconnector_test.go b/internal/topic/topicreaderinternal/stream_reconnector_test.go index d9034066e..6583014da 100644 --- a/internal/topic/topicreaderinternal/stream_reconnector_test.go +++ b/internal/topic/topicreaderinternal/stream_reconnector_test.go @@ -34,8 +34,9 @@ func TestTopicReaderReconnectorReadMessageBatch(t *testing.T) { baseReader.EXPECT().ReadMessageBatch(gomock.Any(), opts).Return(batch, nil) reader := &readerReconnector{ - streamVal: baseReader, - tracer: &trace.Topic{}, + streamVal: baseReader, + streamContextCancel: func(cause error) {}, + tracer: &trace.Topic{}, } reader.initChannelsAndClock() res, err := reader.ReadMessageBatch(context.Background(), opts) @@ -163,7 +164,11 @@ func TestTopicReaderReconnectorCommit(t *testing.T) { require.Equal(t, "v", ctx.Value(k{})) require.Equal(t, expectedCommitRange, offset) }) - reconnector := &readerReconnector{streamVal: stream, tracer: &trace.Topic{}} + reconnector := &readerReconnector{ + streamVal: stream, + streamContextCancel: func(cause error) {}, + tracer: &trace.Topic{}, + } reconnector.initChannelsAndClock() require.NoError(t, reconnector.Commit(ctx, expectedCommitRange)) }) @@ -174,7 +179,11 @@ func TestTopicReaderReconnectorCommit(t *testing.T) { require.Equal(t, "v", ctx.Value(k{})) require.Equal(t, expectedCommitRange, offset) }).Return(testErr) - reconnector := &readerReconnector{streamVal: stream, tracer: &trace.Topic{}} + reconnector := &readerReconnector{ + streamVal: stream, + streamContextCancel: func(cause error) {}, + tracer: &trace.Topic{}, + } reconnector.initChannelsAndClock() require.ErrorIs(t, reconnector.Commit(ctx, expectedCommitRange), testErr) }) @@ -209,7 +218,7 @@ func TestTopicReaderReconnectorConnectionLoop(t *testing.T) { reconnector := &readerReconnector{ connectTimeout: value.InfiniteDuration, - background: *background.NewWorker(ctx), + background: *background.NewWorker(ctx, "test-worker, "+t.Name()), tracer: &trace.Topic{}, } reconnector.initChannelsAndClock() diff --git a/internal/topic/topicwriterinternal/writer_single_stream.go b/internal/topic/topicwriterinternal/writer_single_stream.go index 4f01c56d1..4643ad3dc 100644 --- a/internal/topic/topicwriterinternal/writer_single_stream.go +++ b/internal/topic/topicwriterinternal/writer_single_stream.go @@ -82,8 +82,11 @@ func newSingleStreamWriterStopped( cfg SingleStreamWriterConfig, //nolint:gocritic ) *SingleStreamWriter { return &SingleStreamWriter{ - cfg: cfg, - background: *background.NewWorker(xcontext.ValueOnly(ctxForPProfLabelsOnly)), + cfg: cfg, + background: *background.NewWorker(xcontext.ValueOnly(ctxForPProfLabelsOnly), fmt.Sprintf( + "ydb-topic-stream-writer-background: %v", + cfg.reconnectorInstanceID, + )), closeCompleted: make(empty.Chan), } } From fb5f893339c0de4a2fbed7e504750d97968d1974 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Wed, 15 May 2024 19:15:28 +0300 Subject: [PATCH 03/10] changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index dd6225910..f5c5c4208 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,4 @@ +* Fixed goroutines leak within topic reader on network problems * Added type assertion checks to enhance type safety and prevent unexpected panics in critical sections of the codebase ## v3.66.3 From ee79d1e6925fd276f6816737eafbd54e03a64534 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Thu, 16 May 2024 17:28:53 +0300 Subject: [PATCH 04/10] fix data race on background log with xmany tests --- internal/xtest/manytimes.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/internal/xtest/manytimes.go b/internal/xtest/manytimes.go index 4dc7f1ccb..9d021eb23 100644 --- a/internal/xtest/manytimes.go +++ b/internal/xtest/manytimes.go @@ -38,10 +38,11 @@ func TestManyTimes(t testing.TB, test TestFunc, opts ...TestManyTimesOption) { } start := time.Now() + var testMutex sync.Mutex for { testCounter++ // run test, then check stopAfter for guarantee run test least once - runTest(t, test) + runTest(t, test, &testMutex) if time.Since(start) > options.stopAfter || t.Failed() { return @@ -60,11 +61,12 @@ func TestManyTimesWithName(t *testing.T, name string, test TestFunc) { type TestFunc func(t testing.TB) -func runTest(t testing.TB, test TestFunc) { +func runTest(t testing.TB, test TestFunc, testMutex *sync.Mutex) { t.Helper() tw := &testWrapper{ TB: t, + m: testMutex, } defer tw.doCleanup() @@ -75,7 +77,7 @@ func runTest(t testing.TB, test TestFunc) { type testWrapper struct { testing.TB - m sync.Mutex + m *sync.Mutex logs []logRecord cleanup []func() } From 637deba536cf235a1e9784d8f25cbcad99118c2c Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Thu, 16 May 2024 18:06:57 +0300 Subject: [PATCH 05/10] Use fake clock for pauses in TestWriterImpl_Reconnect/ReconnectOnErrors test --- .../topic/topicwriterinternal/writer_options.go | 9 +++++++++ .../topicwriterinternal/writer_reconnector.go | 10 ++++------ .../writer_reconnector_test.go | 16 +++++++++++++++- 3 files changed, 28 insertions(+), 7 deletions(-) diff --git a/internal/topic/topicwriterinternal/writer_options.go b/internal/topic/topicwriterinternal/writer_options.go index 5c88bdbb3..215ff46d3 100644 --- a/internal/topic/topicwriterinternal/writer_options.go +++ b/internal/topic/topicwriterinternal/writer_options.go @@ -3,6 +3,8 @@ package topicwriterinternal import ( "time" + "github.com/jonboulle/clockwork" + "github.com/ydb-platform/ydb-go-sdk/v3/credentials" "github.com/ydb-platform/ydb-go-sdk/v3/internal/config" "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon" @@ -152,3 +154,10 @@ func WithTopic(topic string) PublicWriterOption { cfg.topic = topic } } + +// WithClock is private option for tests +func WithClock(clock clockwork.Clock) PublicWriterOption { + return func(cfg *WriterReconnectorConfig) { + cfg.clock = clock + } +} diff --git a/internal/topic/topicwriterinternal/writer_reconnector.go b/internal/topic/topicwriterinternal/writer_reconnector.go index 66d646291..01ce7433c 100644 --- a/internal/topic/topicwriterinternal/writer_reconnector.go +++ b/internal/topic/topicwriterinternal/writer_reconnector.go @@ -118,7 +118,6 @@ type WriterReconnector struct { queue messageQueue background background.Worker retrySettings topic.RetrySettings - clock clockwork.Clock writerInstanceID string sessionID string semaphore *semaphore.Weighted @@ -149,7 +148,6 @@ func newWriterReconnectorStopped( cfg: cfg, semaphore: semaphore.NewWeighted(int64(cfg.MaxQueueLen)), queue: newMessageQueue(), - clock: clockwork.NewRealClock(), lastSeqNo: -1, firstInitResponseProcessedChan: make(empty.Chan), encodersMap: NewEncoderMap(), @@ -189,7 +187,7 @@ func (w *WriterReconnector) fillFields(messages []messageWithDataContent) error if w.cfg.AutoSetCreatedTime { if msg.CreatedAt.IsZero() { if now.IsZero() { - now = w.clock.Now() + now = w.cfg.clock.Now() } msg.CreatedAt = now } else { @@ -391,17 +389,17 @@ func (w *WriterReconnector) connectionLoop(ctx context.Context) { now := time.Now() if startOfRetries.IsZero() || topic.CheckResetReconnectionCounters(prevAttemptTime, now, w.cfg.connectTimeout) { attempt = 0 - startOfRetries = w.clock.Now() + startOfRetries = w.cfg.clock.Now() } else { attempt++ } prevAttemptTime = now if reconnectReason != nil { - retryDuration := w.clock.Since(startOfRetries) + retryDuration := w.cfg.clock.Since(startOfRetries) if backoff, retry := topic.CheckRetryMode(reconnectReason, w.retrySettings, retryDuration); retry { delay := backoff.Delay(attempt) - delayTimer := w.clock.NewTimer(delay) + delayTimer := w.cfg.clock.NewTimer(delay) select { case <-doneCtx: delayTimer.Stop() diff --git a/internal/topic/topicwriterinternal/writer_reconnector_test.go b/internal/topic/topicwriterinternal/writer_reconnector_test.go index 70630d74d..9df08f469 100644 --- a/internal/topic/topicwriterinternal/writer_reconnector_test.go +++ b/internal/topic/topicwriterinternal/writer_reconnector_test.go @@ -7,12 +7,14 @@ import ( "errors" "fmt" "io" + "math" "sort" "sync" "sync/atomic" "testing" "time" + "github.com/jonboulle/clockwork" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" @@ -426,7 +428,19 @@ func TestWriterImpl_Reconnect(t *testing.T) { xtest.TestManyTimesWithName(t, "ReconnectOnErrors", func(t testing.TB) { ctx := xtest.Context(t) - w := newTestWriterStopped() + clock := clockwork.NewFakeClock() + + go func() { + for { + if ctx.Err() != nil { + return + } + clock.Advance(time.Second) + time.Sleep(time.Microsecond) + } + }() + + w := newTestWriterStopped(WithClock(clock), WithTokenUpdateInterval(time.Duration(math.MaxInt64))) mc := gomock.NewController(t) From b17d68f87de0270cc70f5da0e5e43061756a3d5d Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Thu, 16 May 2024 18:18:30 +0300 Subject: [PATCH 06/10] Extract fast clock to xtest --- .../writer_reconnector_test.go | 15 +------- internal/xtest/clock.go | 37 +++++++++++++++++++ 2 files changed, 38 insertions(+), 14 deletions(-) create mode 100644 internal/xtest/clock.go diff --git a/internal/topic/topicwriterinternal/writer_reconnector_test.go b/internal/topic/topicwriterinternal/writer_reconnector_test.go index 9df08f469..1675fedbb 100644 --- a/internal/topic/topicwriterinternal/writer_reconnector_test.go +++ b/internal/topic/topicwriterinternal/writer_reconnector_test.go @@ -14,7 +14,6 @@ import ( "testing" "time" - "github.com/jonboulle/clockwork" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" @@ -428,19 +427,7 @@ func TestWriterImpl_Reconnect(t *testing.T) { xtest.TestManyTimesWithName(t, "ReconnectOnErrors", func(t testing.TB) { ctx := xtest.Context(t) - clock := clockwork.NewFakeClock() - - go func() { - for { - if ctx.Err() != nil { - return - } - clock.Advance(time.Second) - time.Sleep(time.Microsecond) - } - }() - - w := newTestWriterStopped(WithClock(clock), WithTokenUpdateInterval(time.Duration(math.MaxInt64))) + w := newTestWriterStopped(WithClock(xtest.FastClock(t)), WithTokenUpdateInterval(time.Duration(math.MaxInt64))) mc := gomock.NewController(t) diff --git a/internal/xtest/clock.go b/internal/xtest/clock.go new file mode 100644 index 000000000..2cc2b9a6d --- /dev/null +++ b/internal/xtest/clock.go @@ -0,0 +1,37 @@ +package xtest + +import ( + "sync/atomic" + "testing" + "time" + + "github.com/jonboulle/clockwork" +) + +// FastClock returns fake clock with very fast time speed advanced until end of test +// the clock stops advance at end of test +func FastClock(t testing.TB) clockwork.FakeClock { + clock := clockwork.NewFakeClock() + var needStop atomic.Bool + clockStopped := make(chan struct{}) + + go func() { + defer close(clockStopped) + + for { + if needStop.Load() { + return + } + + clock.Advance(time.Second) + time.Sleep(time.Microsecond) + } + }() + + t.Cleanup(func() { + needStop.Store(true) + <-clockStopped + }) + + return clock +} From e20088b1d3928ede3c090689cd9a9870006e15e7 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Thu, 16 May 2024 19:12:12 +0300 Subject: [PATCH 07/10] Fixed one of races, improve logs --- internal/background/worker.go | 3 ++- .../topic/topicwriterinternal/writer_reconnector_test.go | 9 +++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/internal/background/worker.go b/internal/background/worker.go index 7c1389b04..2620657b1 100644 --- a/internal/background/worker.go +++ b/internal/background/worker.go @@ -3,6 +3,7 @@ package background import ( "context" "errors" + "fmt" "runtime/pprof" "sync" @@ -75,7 +76,7 @@ func (b *Worker) Close(ctx context.Context, err error) error { var resErr error b.m.WithLock(func() { if b.closed { - resErr = xerrors.WithStackTrace(ErrAlreadyClosed) + resErr = xerrors.WithStackTrace(fmt.Errorf("%w with reason: %+v", ErrAlreadyClosed, b.closeReason)) return } diff --git a/internal/topic/topicwriterinternal/writer_reconnector_test.go b/internal/topic/topicwriterinternal/writer_reconnector_test.go index 1675fedbb..101f73733 100644 --- a/internal/topic/topicwriterinternal/writer_reconnector_test.go +++ b/internal/topic/topicwriterinternal/writer_reconnector_test.go @@ -555,6 +555,8 @@ func TestWriterImpl_CloseWithFlush(t *testing.T) { }, }, Codec: rawtopiccommon.CodecRaw, + }).Do(func(_ *rawtopicwriter.WriteRequest) { + close(writeCompleted) }).Return(nil) flushCompleted := make(empty.Chan) @@ -564,7 +566,6 @@ func TestWriterImpl_CloseWithFlush(t *testing.T) { CreatedAt: messageTime, Data: bytes.NewReader(messageData), }}) - close(writeCompleted) require.NoError(t, err) }() @@ -611,11 +612,11 @@ func TestWriterImpl_CloseWithFlush(t *testing.T) { { name: "flush", flush: func(ctx context.Context, writer *WriterReconnector) error { - return writer.Close(ctx) + return writer.Flush(ctx) }, }, { - name: "flush and close", + name: "flush_and_close", flush: func(ctx context.Context, writer *WriterReconnector) error { err := writer.Flush(ctx) if err != nil { @@ -631,7 +632,7 @@ func TestWriterImpl_CloseWithFlush(t *testing.T) { t.Run(test.name, func(t *testing.T) { xtest.TestManyTimes(t, func(t testing.TB) { f(t, test.flush) - }) + }, xtest.StopAfter(time.Minute)) }) } } From 812081d51e26a760aea8a823e92316a03047ae12 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Thu, 16 May 2024 19:47:58 +0300 Subject: [PATCH 08/10] Fixed data race on close topic writer --- .../topic/topicwriterinternal/writer_reconnector.go | 11 ++++++----- .../topicwriterinternal/writer_reconnector_test.go | 2 +- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/internal/topic/topicwriterinternal/writer_reconnector.go b/internal/topic/topicwriterinternal/writer_reconnector.go index 01ce7433c..88738c83b 100644 --- a/internal/topic/topicwriterinternal/writer_reconnector.go +++ b/internal/topic/topicwriterinternal/writer_reconnector.go @@ -347,16 +347,17 @@ func (w *WriterReconnector) close(ctx context.Context, reason error) (resErr err onDone(resErr) }() - closeErr := w.queue.Close(reason) - if resErr == nil && closeErr != nil { - resErr = closeErr - } - + // stop background work and single stream writer bgErr := w.background.Close(ctx, reason) if resErr == nil && bgErr != nil { resErr = bgErr } + closeErr := w.queue.Close(reason) + if resErr == nil && closeErr != nil { + resErr = closeErr + } + return resErr } diff --git a/internal/topic/topicwriterinternal/writer_reconnector_test.go b/internal/topic/topicwriterinternal/writer_reconnector_test.go index 101f73733..d09e60dd3 100644 --- a/internal/topic/topicwriterinternal/writer_reconnector_test.go +++ b/internal/topic/topicwriterinternal/writer_reconnector_test.go @@ -632,7 +632,7 @@ func TestWriterImpl_CloseWithFlush(t *testing.T) { t.Run(test.name, func(t *testing.T) { xtest.TestManyTimes(t, func(t testing.TB) { f(t, test.flush) - }, xtest.StopAfter(time.Minute)) + }) }) } } From 78d3a6572b94730fc53431e9412fea1a77de1c3c Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Thu, 16 May 2024 19:50:23 +0300 Subject: [PATCH 09/10] changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 51f05d457..9e7e218d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,4 @@ +* Fixed race of stop internal processes on close topic writer * Fixed goroutines leak within topic reader on network problems ## v3.67.0 From 799c0b420210c4e8e5e2507e66c517b5702be48c Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Thu, 16 May 2024 20:01:35 +0300 Subject: [PATCH 10/10] fix linter --- internal/background/worker.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/background/worker.go b/internal/background/worker.go index 2620657b1..55d5b0363 100644 --- a/internal/background/worker.go +++ b/internal/background/worker.go @@ -76,6 +76,8 @@ func (b *Worker) Close(ctx context.Context, err error) error { var resErr error b.m.WithLock(func() { if b.closed { + // The error of Close is second close, close reason added for describe previous close only, for better debug + //nolint:errorlint resErr = xerrors.WithStackTrace(fmt.Errorf("%w with reason: %+v", ErrAlreadyClosed, b.closeReason)) return