From 6934fc8765cae55e2228fc9ea95fe64c550a2010 Mon Sep 17 00:00:00 2001 From: Danica Xiao Date: Wed, 5 Jun 2024 11:23:53 -0700 Subject: [PATCH] Replace logrus with log/slog for logging --- admin_client.go | 9 ++--- caches.go | 47 +++++++++-------------- client.go | 41 +++++++++++++++----- debug_state_test.go | 2 + go.mod | 1 - go.sum | 5 --- integration_test.go | 7 +--- mockrc_test.go | 4 +- region/client.go | 21 +++------- region/new.go | 5 ++- rpc.go | 94 +++++++++++++++------------------------------ rpc_test.go | 13 +++++-- zk/client.go | 19 +++++---- 13 files changed, 124 insertions(+), 144 deletions(-) diff --git a/admin_client.go b/admin_client.go index 5ff1db3e..8922d9d4 100644 --- a/admin_client.go +++ b/admin_client.go @@ -9,9 +9,9 @@ import ( "context" "errors" "fmt" + "log/slog" "time" - log "github.com/sirupsen/logrus" "github.com/tsuna/gohbase/hrpc" "github.com/tsuna/gohbase/pb" "github.com/tsuna/gohbase/region" @@ -48,9 +48,6 @@ func NewAdminClient(zkquorum string, options ...Option) AdminClient { } func newAdminClient(zkquorum string, options ...Option) AdminClient { - log.WithFields(log.Fields{ - "Host": zkquorum, - }).Debug("Creating new admin client.") c := &client{ clientType: region.MasterClient, rpcQueueSize: defaultRPCQueueSize, @@ -63,11 +60,13 @@ func newAdminClient(zkquorum string, options ...Option) AdminClient { regionLookupTimeout: region.DefaultLookupTimeout, regionReadTimeout: region.DefaultReadTimeout, newRegionClientFn: region.NewClient, + logger: slog.Default(), } for _, option := range options { option(c) } - c.zkClient = zk.NewClient(zkquorum, c.zkTimeout, c.zkDialer) + c.logger.Info("Creating new admin client.", "Host", slog.StringValue(zkquorum)) + c.zkClient = zk.NewClient(zkquorum, c.zkTimeout, c.zkDialer, c.logger) return c } diff --git a/caches.go b/caches.go index 2b45adc0..81f7f2fe 100644 --- a/caches.go +++ b/caches.go @@ -9,9 +9,9 @@ import ( "bytes" "fmt" "io" + "log/slog" "sync" - log "github.com/sirupsen/logrus" "github.com/tsuna/gohbase/hrpc" "modernc.org/b/v2" ) @@ -20,6 +20,7 @@ import ( // look up all the regioninfos that map to a specific client type clientRegionCache struct { m sync.RWMutex + logger *slog.Logger regions map[hrpc.RegionClient]map[hrpc.RegionInfo]struct{} } @@ -42,19 +43,17 @@ func (rcc *clientRegionCache) put(addr string, r hrpc.RegionInfo, } rcc.m.Unlock() - log.WithFields(log.Fields{ - "client": existingClient, - }).Debug("region client is already in client's cache") + rcc.logger.Debug("region client is already in client's cache", "client", existingClient) return existingClient } } // no such client yet c := newClient() - rcc.regions[c] = map[hrpc.RegionInfo]struct{}{r: struct{}{}} + rcc.regions[c] = map[hrpc.RegionInfo]struct{}{r: {}} rcc.m.Unlock() - log.WithField("client", c).Info("added new region client") + rcc.logger.Info("added new region client", "client", c) return c } @@ -88,7 +87,7 @@ func (rcc *clientRegionCache) clientDown(c hrpc.RegionClient) map[hrpc.RegionInf rcc.m.Unlock() if ok { - log.WithField("client", c).Info("removed region client") + rcc.logger.Info("removed region client", "client", c) } return downregions } @@ -125,7 +124,8 @@ func (rcc *clientRegionCache) debugInfo( // key -> region cache. type keyRegionCache struct { - m sync.RWMutex + m sync.RWMutex + logger *slog.Logger // Maps a []byte of a region start key to a hrpc.RegionInfo regions *b.Tree[[]byte, hrpc.RegionInfo] @@ -137,8 +137,7 @@ func (krc *keyRegionCache) get(key []byte) ([]byte, hrpc.RegionInfo) { enum, ok := krc.regions.Seek(key) if ok { krc.m.RUnlock() - log.Fatalf("WTF: got exact match for region search key %q", key) - return nil, nil + panic(fmt.Errorf("WTF: got exact match for region search key %q", key)) } k, v, err := enum.Prev() enum.Close() @@ -204,7 +203,7 @@ func (krc *keyRegionCache) getOverlaps(reg hrpc.RegionInfo) []hrpc.RegionInfo { key := createRegionSearchKey(fullyQualifiedTable(reg), reg.StartKey()) enum, ok := krc.regions.Seek(key) if ok { - log.Fatalf("WTF: found a region with exact name as the search key %q", key) + panic(fmt.Errorf("WTF: found a region with exact name as the search key %q", key)) } // case 1: landed before the first region in cache @@ -229,15 +228,15 @@ func (krc *keyRegionCache) getOverlaps(reg hrpc.RegionInfo) []hrpc.RegionInfo { enum.Close() enum, err = krc.regions.SeekFirst() if err != nil { - log.Fatalf( - "error seeking first region when getting overlaps for region %v: %v", reg, err) + panic(fmt.Errorf( + "error seeking first region when getting overlaps for region %v: %v", reg, err)) } } _, v, err = enum.Next() if err != nil { - log.Fatalf( - "error accessing first region when getting overlaps for region %v: %v", reg, err) + panic(fmt.Errorf( + "error accessing first region when getting overlaps for region %v: %v", reg, err)) } if isRegionOverlap(v, reg) { overlaps = append(overlaps, v) @@ -294,11 +293,8 @@ func (krc *keyRegionCache) put(reg hrpc.RegionInfo) (overlaps []hrpc.RegionInfo, return reg, true }) if !replaced { - log.WithFields(log.Fields{ - "region": reg, - "overlaps": overlaps, - "replaced": replaced, - }).Debug("region is already in cache") + krc.logger.Debug("region is already in cache", + "region", reg, "overlaps", overlaps, "replaced", replaced) return } // delete overlapping regions @@ -310,11 +306,8 @@ func (krc *keyRegionCache) put(reg hrpc.RegionInfo) (overlaps []hrpc.RegionInfo, o.MarkDead() } - log.WithFields(log.Fields{ - "region": reg, - "overlaps": overlaps, - "replaced": replaced, - }).Info("added new region") + krc.logger.Info("added new region", + "region", reg, "overlaps", overlaps, "replaced", replaced) return } @@ -328,8 +321,6 @@ func (krc *keyRegionCache) del(reg hrpc.RegionInfo) bool { if success { cachedRegionTotal.Dec() } - log.WithFields(log.Fields{ - "region": reg, - }).Debug("removed region") + krc.logger.Debug("removed region", "region", reg) return success } diff --git a/client.go b/client.go index 00bd3e85..683a099b 100644 --- a/client.go +++ b/client.go @@ -10,11 +10,12 @@ import ( "encoding/binary" "encoding/json" "fmt" + "log/slog" "net" + "reflect" "sync" "time" - log "github.com/sirupsen/logrus" "github.com/tsuna/gohbase/compression" "github.com/tsuna/gohbase/hrpc" "github.com/tsuna/gohbase/pb" @@ -100,7 +101,8 @@ type client struct { newRegionClientFn func(string, region.ClientType, int, time.Duration, string, time.Duration, compression.Codec, - func(ctx context.Context, network, addr string) (net.Conn, error)) hrpc.RegionClient + func(ctx context.Context, network, addr string) (net.Conn, error), + *slog.Logger) hrpc.RegionClient compressionCodec compression.Codec @@ -109,6 +111,8 @@ type client struct { // regionDialer is passed into the region client to connect to hbase in a custom way, // such as SOCKS proxy. regionDialer func(ctx context.Context, network, addr string) (net.Conn, error) + // logger that could be defined by user + logger *slog.Logger } // NewClient creates a new HBase client. @@ -117,13 +121,14 @@ func NewClient(zkquorum string, options ...Option) Client { } func newClient(zkquorum string, options ...Option) *client { - log.WithFields(log.Fields{ - "Host": zkquorum, - }).Debug("Creating new client.") c := &client{ clientType: region.RegionClient, - regions: keyRegionCache{regions: b.TreeNew[[]byte, hrpc.RegionInfo](region.Compare)}, + regions: keyRegionCache{ + logger: slog.Default(), + regions: b.TreeNew[[]byte, hrpc.RegionInfo](region.Compare), + }, clients: clientRegionCache{ + logger: slog.Default(), regions: make(map[hrpc.RegionClient]map[hrpc.RegionInfo]struct{}), }, rpcQueueSize: defaultRPCQueueSize, @@ -142,24 +147,31 @@ func newClient(zkquorum string, options ...Option) *client { regionReadTimeout: region.DefaultReadTimeout, done: make(chan struct{}), newRegionClientFn: region.NewClient, + logger: slog.Default(), } for _, option := range options { option(c) } + c.logger.Info("Creating new admin client.", "Host", slog.StringValue(zkquorum)) //Have to create the zkClient after the Options have been set //since the zkTimeout could be changed as an option - c.zkClient = zk.NewClient(zkquorum, c.zkTimeout, c.zkDialer) + c.zkClient = zk.NewClient(zkquorum, c.zkTimeout, c.zkDialer, c.logger) return c } // DebugState information about the clients keyRegionCache, and clientRegionCache -func DebugState(client Client) ([]byte, error) { +func DebugState(c Client) ([]byte, error) { - debugInfoJson, err := json.Marshal(client) + debugInfoJson, err := json.Marshal(c) if err != nil { - log.Errorf("Cannot turn client into JSON bytes array: %v", err) + if cclient, ok := c.(*client); ok { + cclient.logger.Error("Cannot turn client into JSON bytes array", "error", err) + } else { + slog.Error("unknown client type", "type", reflect.TypeOf(c)) + slog.Error("Cannot turn client into JSON bytes array", "error", err) + } } return debugInfoJson, err } @@ -296,6 +308,15 @@ func RegionDialer(dialer func( } } +// Logger will return an option to set *slog.Logger instance +func Logger(logger *slog.Logger) Option { + return func(c *client) { + c.regions.logger = logger + c.clients.logger = logger + c.logger = logger + } +} + // Close closes connections to hbase master and regionservers func (c *client) Close() { c.closeOnce.Do(func() { diff --git a/debug_state_test.go b/debug_state_test.go index 9a9bb16e..6462b1dc 100644 --- a/debug_state_test.go +++ b/debug_state_test.go @@ -2,6 +2,7 @@ package gohbase import ( "encoding/json" + "log/slog" "testing" "github.com/stretchr/testify/assert" @@ -27,6 +28,7 @@ func TestDebugStateSanity(t *testing.T) { region.DefaultReadTimeout, client.compressionCodec, nil, + slog.Default(), ) newClientFn := func() hrpc.RegionClient { return regClient diff --git a/go.mod b/go.mod index eb3a1aa8..d67d1649 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,6 @@ require ( github.com/go-zookeeper/zk v1.0.3 github.com/golang/snappy v0.0.4 github.com/prometheus/client_golang v1.13.0 - github.com/sirupsen/logrus v1.9.0 github.com/stretchr/testify v1.8.0 go.opentelemetry.io/otel v1.11.1 go.opentelemetry.io/otel/trace v1.11.1 diff --git a/go.sum b/go.sum index a4a1ba15..e4ce0cd1 100644 --- a/go.sum +++ b/go.sum @@ -198,11 +198,6 @@ github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0ua github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 h1:OdAsTTz6OkFY5QxjkYwrChwuRruF69c169dPK26NUlk= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= -github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= -github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= -github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= -github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0= -github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= diff --git a/integration_test.go b/integration_test.go index 7918d1dc..f55412f9 100644 --- a/integration_test.go +++ b/integration_test.go @@ -3,9 +3,6 @@ // Use of this source code is governed by the Apache License 2.0 // that can be found in the COPYING file. -//go:build integration -// +build integration - package gohbase_test import ( @@ -16,6 +13,7 @@ import ( "flag" "fmt" "io" + "log/slog" "os" "os/exec" "reflect" @@ -27,7 +25,6 @@ import ( "math" - log "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/tsuna/gohbase" "github.com/tsuna/gohbase/filter" @@ -101,7 +98,7 @@ func TestMain(m *testing.M) { panic("Host is not set!") } - log.SetLevel(log.DebugLevel) + slog.SetLogLoggerLevel(slog.LevelDebug) ac := gohbase.NewAdminClient(*host) diff --git a/mockrc_test.go b/mockrc_test.go index a8516e69..b2c37e89 100644 --- a/mockrc_test.go +++ b/mockrc_test.go @@ -9,6 +9,7 @@ import ( "bytes" "context" "fmt" + "log/slog" "net" "sync" "sync/atomic" @@ -179,7 +180,8 @@ func init() { func newMockRegionClient(addr string, ctype region.ClientType, queueSize int, flushInterval time.Duration, effectiveUser string, readTimeout time.Duration, codec compression.Codec, - dialer func(ctx context.Context, network, addr string) (net.Conn, error)) hrpc.RegionClient { + dialer func(ctx context.Context, network, addr string) (net.Conn, error), + log *slog.Logger) hrpc.RegionClient { m.Lock() clients[addr]++ m.Unlock() diff --git a/region/client.go b/region/client.go index 7336b66d..a477d086 100644 --- a/region/client.go +++ b/region/client.go @@ -13,6 +13,7 @@ import ( "errors" "fmt" "io" + "log/slog" "net" "strings" "sync" @@ -20,7 +21,6 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" - log "github.com/sirupsen/logrus" "github.com/tsuna/gohbase/hrpc" "github.com/tsuna/gohbase/pb" @@ -196,6 +196,8 @@ type client struct { // dialer is used to connect to region servers in non-standard ways dialer func(ctx context.Context, network, addr string) (net.Conn, error) + + logger *slog.Logger } // QueueRPC will add an rpc call to the queue for processing by the writer goroutine @@ -283,10 +285,7 @@ func (c *client) inFlightDown() error { func (c *client) fail(err error) { c.failOnce.Do(func() { if err != ErrClientClosed { - log.WithFields(log.Fields{ - "client": c, - "err": err, - }).Error("error occured, closing region client") + c.logger.Error("error occured, closing region client", "client", c, "err", err) } // we don't close c.rpcs channel to make it block in select of QueueRPC @@ -314,10 +313,7 @@ func (c *client) failSentRPCs() { c.sent = make(map[uint32]hrpc.Call) c.sentM.Unlock() - log.WithFields(log.Fields{ - "client": c, - "count": len(sent), - }).Debug("failing awaiting RPCs") + c.logger.Debug("failing awaiting RPCs", "client", c, "count", len(sent)) // send error to awaiting rpcs for _, rpc := range sent { @@ -350,12 +346,7 @@ func (c *client) processRPCs() { }() flush := func(reason string) { - if log.GetLevel() == log.DebugLevel { - log.WithFields(log.Fields{ - "len": m.len(), - "addr": c.Addr(), - }).Debug("flushing MultiRequest") - } + c.logger.Debug("flushing MultiRequest", "len", m.len(), "addr", c.Addr()) flushReasonCount.With(prometheus.Labels{ "reason": reason, diff --git a/region/new.go b/region/new.go index 27fca777..6a8e0eae 100644 --- a/region/new.go +++ b/region/new.go @@ -11,6 +11,7 @@ package region import ( "context" "fmt" + "log/slog" "net" "time" @@ -21,7 +22,8 @@ import ( // NewClient creates a new RegionClient. func NewClient(addr string, ctype ClientType, queueSize int, flushInterval time.Duration, effectiveUser string, readTimeout time.Duration, codec compression.Codec, - dialer func(ctx context.Context, network, addr string) (net.Conn, error)) hrpc.RegionClient { + dialer func(ctx context.Context, network, addr string) (net.Conn, error), + slogger *slog.Logger) hrpc.RegionClient { c := &client{ addr: addr, ctype: ctype, @@ -32,6 +34,7 @@ func NewClient(addr string, ctype ClientType, queueSize int, flushInterval time. rpcs: make(chan []hrpc.Call), done: make(chan struct{}), sent: make(map[uint32]hrpc.Call), + logger: slogger, } if codec != nil { diff --git a/rpc.go b/rpc.go index 0b20ce27..86f61385 100644 --- a/rpc.go +++ b/rpc.go @@ -15,7 +15,6 @@ import ( "strconv" "time" - log "github.com/sirupsen/logrus" "github.com/tsuna/gohbase/hrpc" "github.com/tsuna/gohbase/internal/observability" "github.com/tsuna/gohbase/region" @@ -502,31 +501,27 @@ func (c *client) lookupRegion(ctx context.Context, // If it takes longer than regionLookupTimeout, fail so that we can sleep lookupCtx, cancel := context.WithTimeout(ctx, c.regionLookupTimeout) if c.clientType == region.MasterClient { - log.WithField("resource", zk.Master).Debug("looking up master") + c.logger.Debug("looking up master", "resource", zk.Master) addr, err = c.zkLookup(lookupCtx, zk.Master) cancel() reg = c.adminRegionInfo } else if bytes.Equal(table, metaTableName) { - log.WithField("resource", zk.Meta).Debug("looking up region server of hbase:meta") + c.logger.Debug("looking up region server of hbase:meta", "resource", zk.Meta) addr, err = c.zkLookup(lookupCtx, zk.Meta) cancel() reg = c.metaRegionInfo } else { - log.WithFields(log.Fields{ - "table": strconv.Quote(string(table)), - "key": strconv.Quote(string(key)), - }).Debug("looking up region") + c.logger.Debug("looking up region", + "table", strconv.Quote(string(table)), "key", strconv.Quote(string(key))) reg, addr, err = c.metaLookup(lookupCtx, table, key) cancel() if err == TableNotFound { - log.WithFields(log.Fields{ - "table": strconv.Quote(string(table)), - "key": strconv.Quote(string(key)), - "err": err, - }).Debug("hbase:meta does not know about this table/key") + c.logger.Debug("hbase:meta does not know about this table/key", + "table", strconv.Quote(string(table)), + "key", strconv.Quote(string(key)), "err", err) return nil, "", err } else if err == ErrClientClosed { @@ -534,22 +529,14 @@ func (c *client) lookupRegion(ctx context.Context, } } if err == nil { - log.WithFields(log.Fields{ - "table": strconv.Quote(string(table)), - "key": strconv.Quote(string(key)), - "region": reg, - "addr": addr, - }).Debug("looked up a region") + c.logger.Debug("looked up a region", "table", strconv.Quote(string(table)), + "key", strconv.Quote(string(key)), "region", reg, "addr", addr) return reg, addr, nil } - log.WithFields(log.Fields{ - "table": strconv.Quote(string(table)), - "key": strconv.Quote(string(key)), - "backoff": backoff, - "err": err, - }).Error("failed looking up region") + c.logger.Error("failed looking up region", "table", strconv.Quote(string(table)), + "key", strconv.Quote(string(key)), "backoff", backoff, "err", err) // This will be hit if there was an error locating the region backoff, err = sleepAndIncreaseBackoff(ctx, backoff) @@ -567,17 +554,13 @@ func (c *client) lookupAllRegions(ctx context.Context, for { // If it takes longer than regionLookupTimeout, fail so that we can sleep lookupCtx, cancel := context.WithTimeout(ctx, c.regionLookupTimeout) - log.WithFields(log.Fields{ - "table": strconv.Quote(string(table)), - }).Debug("looking up regions") + c.logger.Debug("looking up regions", "table", strconv.Quote(string(table))) regs, err = c.metaLookupForTable(lookupCtx, table) cancel() if err == TableNotFound { - log.WithFields(log.Fields{ - "table": strconv.Quote(string(table)), - "err": err, - }).Debug("hbase:meta does not know about this table") + c.logger.Debug("hbase:meta does not know about this table", + "table", strconv.Quote(string(table)), "err", err) return nil, err } else if err == ErrClientClosed { @@ -585,19 +568,14 @@ func (c *client) lookupAllRegions(ctx context.Context, } if err == nil { - log.WithFields(log.Fields{ - "table": strconv.Quote(string(table)), - "regionsAndAddr": regs, - }).Debug("looked up all regions") + c.logger.Debug("looked up all regions", + "table", strconv.Quote(string(table)), "regionsAndAddr", regs) return regs, nil } - log.WithFields(log.Fields{ - "table": strconv.Quote(string(table)), - "backoff": backoff, - "err": err, - }).Error("failed looking up regions") + c.logger.Error("failed looking up regions", "table", strconv.Quote(string(table)), + "backoff", backoff, "err", err) // This will be hit if there was an error locating the region backoff, err = sleepAndIncreaseBackoff(ctx, backoff) @@ -829,7 +807,7 @@ func (c *client) reestablishRegion(reg hrpc.RegionInfo) { default: } - log.WithField("region", reg).Debug("reestablishing region") + c.logger.Debug("reestablishing region", "region", reg) c.establishRegion(reg, "") } @@ -900,33 +878,25 @@ func (c *client) establishRegion(reg hrpc.RegionInfo, addr string) { c.clients.del(originalReg) originalReg.MarkAvailable() - log.WithFields(log.Fields{ - "region": originalReg.String(), - "err": err, - "backoff": backoff, - }).Info("region does not exist anymore") + c.logger.Info("region does not exist anymore", + "region", originalReg.String(), "err", err, "backoff", backoff) return } else if originalReg.Context().Err() != nil { // region is dead originalReg.MarkAvailable() - log.WithFields(log.Fields{ - "region": originalReg.String(), - "err": err, - "backoff": backoff, - }).Info("region became dead while establishing client for it") + c.logger.Info("region became dead while establishing client for it", + "region", originalReg.String(), "err", err, "backoff", backoff) return } else if err == ErrClientClosed { // client has been closed return } else if err != nil { - log.WithFields(log.Fields{ - "region": originalReg.String(), - "err": err, - "backoff": backoff, - }).Fatal("unknown error occured when looking up region") + c.logger.Error("unknown error occured when looking up region", + "region", originalReg.String(), "err", err, "backoff", backoff) + panic(fmt.Errorf("unknown error occured when looking up region")) } if !bytes.Equal(reg.Name(), originalReg.Name()) { // put new region and remove overlapping ones. @@ -958,11 +928,12 @@ func (c *client) establishRegion(reg hrpc.RegionInfo, addr string) { // master that we don't add to the cache // TODO: consider combining this case with the regular regionserver path client = c.newRegionClientFn(addr, c.clientType, c.rpcQueueSize, c.flushInterval, - c.effectiveUser, c.regionReadTimeout, nil, c.regionDialer) + c.effectiveUser, c.regionReadTimeout, nil, c.regionDialer, c.logger) } else { client = c.clients.put(addr, reg, func() hrpc.RegionClient { return c.newRegionClientFn(addr, c.clientType, c.rpcQueueSize, c.flushInterval, - c.effectiveUser, c.regionReadTimeout, c.compressionCodec, c.regionDialer) + c.effectiveUser, c.regionReadTimeout, c.compressionCodec, + c.regionDialer, c.logger) }) } @@ -1001,11 +972,8 @@ func (c *client) establishRegion(reg hrpc.RegionInfo, addr string) { c.clientDown(client, reg) } - log.WithFields(log.Fields{ - "region": reg, - "backoff": backoff, - "err": err, - }).Debug("region was not established, retrying") + c.logger.Debug("region was not established, retrying", + "region", reg, "backoff", backoff, "err", err) // reset address because we weren't able to connect to it // or regionserver says it's still offline, should look up again addr = "" diff --git a/rpc_test.go b/rpc_test.go index ddd0a3de..e2363851 100644 --- a/rpc_test.go +++ b/rpc_test.go @@ -10,6 +10,7 @@ import ( "context" "errors" "fmt" + "log/slog" "math/rand" "net" "reflect" @@ -38,15 +39,19 @@ import ( func newRegionClientFn(addr string) func() hrpc.RegionClient { return func() hrpc.RegionClient { return newMockRegionClient(addr, region.RegionClient, - 0, 0, "root", region.DefaultReadTimeout, nil, nil) + 0, 0, "root", region.DefaultReadTimeout, nil, nil, slog.Default()) } } func newMockClient(zkClient zk.Client) *client { return &client{ clientType: region.RegionClient, - regions: keyRegionCache{regions: b.TreeNew[[]byte, hrpc.RegionInfo](region.Compare)}, + regions: keyRegionCache{ + logger: slog.Default(), + regions: b.TreeNew[[]byte, hrpc.RegionInfo](region.Compare), + }, clients: clientRegionCache{ + logger: slog.Default(), regions: make(map[hrpc.RegionClient]map[hrpc.RegionInfo]struct{}), }, rpcQueueSize: defaultRPCQueueSize, @@ -58,6 +63,7 @@ func newMockClient(zkClient zk.Client) *client { regionLookupTimeout: region.DefaultLookupTimeout, regionReadTimeout: region.DefaultReadTimeout, newRegionClientFn: newMockRegionClient, + logger: slog.Default(), } } @@ -303,7 +309,8 @@ func TestEstablishRegionDialFail(t *testing.T) { newRegionClientFnCallCount := 0 c.newRegionClientFn = func(_ string, _ region.ClientType, _ int, _ time.Duration, _ string, _ time.Duration, _ compression.Codec, - _ func(ctx context.Context, network, addr string) (net.Conn, error)) hrpc.RegionClient { + _ func(ctx context.Context, network, addr string) (net.Conn, error), + _ *slog.Logger) hrpc.RegionClient { var rc hrpc.RegionClient if newRegionClientFnCallCount == 0 { rc = rcFailDial diff --git a/zk/client.go b/zk/client.go index 6fbf9036..064e2ffd 100644 --- a/zk/client.go +++ b/zk/client.go @@ -10,26 +10,27 @@ import ( "context" "encoding/binary" "fmt" + "log/slog" "net" "path" "strings" "time" - log "github.com/sirupsen/logrus" - "github.com/go-zookeeper/zk" "github.com/tsuna/gohbase/pb" "google.golang.org/protobuf/proto" ) -type logger struct{} +type logger struct{ + slogger *slog.Logger +} func (l *logger) Printf(format string, args ...interface{}) { - log.Debugf(format, args...) + l.slogger.Debug(fmt.Sprintf(format, args...)) } func init() { - zk.DefaultLogger = &logger{} + zk.DefaultLogger = &logger{slogger: slog.Default()} } // ResourceName is a type alias that is used to represent different resources @@ -60,15 +61,19 @@ type client struct { zks []string sessionTimeout time.Duration dialer func(ctx context.Context, network, addr string) (net.Conn, error) + logger *slog.Logger } // NewClient establishes connection to zookeeper and returns the client func NewClient(zkquorum string, st time.Duration, - dialer func(ctx context.Context, network, addr string) (net.Conn, error)) Client { + dialer func(ctx context.Context, network, addr string) (net.Conn, error), + slogger *slog.Logger) Client { + return &client{ zks: strings.Split(zkquorum, ","), sessionTimeout: st, dialer: dialer, + logger: slogger, } } @@ -91,7 +96,7 @@ func (c *client) LocateResource(resource ResourceName) (string, error) { return "", fmt.Errorf("failed to read the %s znode: %s", resource, err) } if len(buf) == 0 { - log.Fatalf("%s was empty!", resource) + panic(fmt.Errorf("%s was empty", resource)) } else if buf[0] != 0xFF { return "", fmt.Errorf("the first byte of %s was 0x%x, not 0xFF", resource, buf[0]) }