From 721bb1ac0e093e274a26893babb0f94b9f441302 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 11 Dec 2023 14:31:47 +0800 Subject: [PATCH] plugin: fix bug that watch loop will refresh frequently when channel closed (#49275) (#49286) close pingcap/tidb#49273 --- plugin/plugin.go | 27 ++++++++++++++++++++++++--- plugin/plugin_test.go | 43 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 3 deletions(-) diff --git a/plugin/plugin.go b/plugin/plugin.go index 572b676aa3450..7ad54f748982f 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -20,6 +20,7 @@ import ( gplugin "plugin" "strconv" "sync/atomic" + "time" "unsafe" "github.com/pingcap/errors" @@ -245,12 +246,32 @@ type flushWatcher struct { } func (w *flushWatcher) watchLoop() { - watchChan := w.etcd.Watch(w.ctx, w.path) + const reWatchInterval = time.Second * 5 + logutil.BgLogger().Info("plugin flushWatcher loop started", zap.String("plugin", w.manifest.Name)) + for w.ctx.Err() == nil { + ch := w.etcd.Watch(w.ctx, w.path) + if exit := w.watchLoopWithChan(ch); exit { + break + } + + logutil.BgLogger().Info( + "plugin flushWatcher old chan closed, restart loop later", + zap.String("plugin", w.manifest.Name), + zap.Duration("after", reWatchInterval)) + time.Sleep(reWatchInterval) + } +} + +func (w *flushWatcher) watchLoopWithChan(ch clientv3.WatchChan) (exit bool) { for { select { case <-w.ctx.Done(): - return - case <-watchChan: + return true + case _, ok := <-ch: + if !ok { + return false + } + logutil.BgLogger().Info("plugin flushWatcher detected event to reload plugin config", zap.String("plugin", w.manifest.Name)) disabled, err := w.getPluginDisabledFlag() if err != nil { logutil.BgLogger().Error("get plugin disabled flag failure", zap.String("plugin", w.manifest.Name), zap.Error(err)) diff --git a/plugin/plugin_test.go b/plugin/plugin_test.go index cfcc85ef310f3..e539a8fd40899 100644 --- a/plugin/plugin_test.go +++ b/plugin/plugin_test.go @@ -18,10 +18,13 @@ import ( "context" "io" "strconv" + "sync/atomic" "testing" + "time" "github.com/pingcap/tidb/sessionctx/variable" "github.com/stretchr/testify/require" + "go.etcd.io/etcd/clientv3" ) func TestLoadPluginSuccess(t *testing.T) { @@ -242,3 +245,43 @@ func TestPluginsClone(t *testing.T) { require.Equal(t, uint16(1), cps.versions["whitelist"]) require.Len(t, cps.dyingPlugins, 1) } + +func TestPluginWatcherLoop(t *testing.T) { + // exit when ctx done + ctx, cancel := context.WithCancel(context.Background()) + watcher := &flushWatcher{ + ctx: ctx, + manifest: &Manifest{ + Name: "test", + }, + } + ch := make(chan clientv3.WatchResponse) + var cancelled int32 + go func() { + time.Sleep(10 * time.Millisecond) + atomic.StoreInt32(&cancelled, 1) + cancel() + }() + exit := watcher.watchLoopWithChan(ch) + require.True(t, exit) + require.Equal(t, int32(1), atomic.LoadInt32(&cancelled)) + + // exit when ch closed + watcher = &flushWatcher{ + ctx: context.Background(), + manifest: &Manifest{ + Name: "test", + }, + } + + var closed int32 + ch = make(chan clientv3.WatchResponse) + go func() { + time.Sleep(10 * time.Millisecond) + atomic.StoreInt32(&closed, 1) + close(ch) + }() + exit = watcher.watchLoopWithChan(ch) + require.False(t, exit) + require.Equal(t, int32(1), atomic.LoadInt32(&closed)) +}