Skip to content

Commit

Permalink
Replace logrus with log/slog for logging
Browse files Browse the repository at this point in the history
  • Loading branch information
xdanica committed Jun 5, 2024
1 parent 67d205c commit 6934fc8
Show file tree
Hide file tree
Showing 13 changed files with 124 additions and 144 deletions.
9 changes: 4 additions & 5 deletions admin_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
"context"
"errors"
"fmt"
"log/slog"
"time"

log "github.com/sirupsen/logrus"
"github.com/tsuna/gohbase/hrpc"
"github.com/tsuna/gohbase/pb"
"github.com/tsuna/gohbase/region"
Expand Down Expand Up @@ -48,9 +48,6 @@ func NewAdminClient(zkquorum string, options ...Option) AdminClient {
}

func newAdminClient(zkquorum string, options ...Option) AdminClient {
log.WithFields(log.Fields{
"Host": zkquorum,
}).Debug("Creating new admin client.")
c := &client{
clientType: region.MasterClient,
rpcQueueSize: defaultRPCQueueSize,
Expand All @@ -63,11 +60,13 @@ func newAdminClient(zkquorum string, options ...Option) AdminClient {
regionLookupTimeout: region.DefaultLookupTimeout,
regionReadTimeout: region.DefaultReadTimeout,
newRegionClientFn: region.NewClient,
logger: slog.Default(),
}
for _, option := range options {
option(c)
}
c.zkClient = zk.NewClient(zkquorum, c.zkTimeout, c.zkDialer)
c.logger.Info("Creating new admin client.", "Host", slog.StringValue(zkquorum))
c.zkClient = zk.NewClient(zkquorum, c.zkTimeout, c.zkDialer, c.logger)
return c
}

Expand Down
47 changes: 19 additions & 28 deletions caches.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
"bytes"
"fmt"
"io"
"log/slog"
"sync"

log "github.com/sirupsen/logrus"
"github.com/tsuna/gohbase/hrpc"
"modernc.org/b/v2"
)
Expand All @@ -20,6 +20,7 @@ import (
// look up all the regioninfos that map to a specific client
type clientRegionCache struct {
m sync.RWMutex
logger *slog.Logger

regions map[hrpc.RegionClient]map[hrpc.RegionInfo]struct{}
}
Expand All @@ -42,19 +43,17 @@ func (rcc *clientRegionCache) put(addr string, r hrpc.RegionInfo,
}
rcc.m.Unlock()

log.WithFields(log.Fields{
"client": existingClient,
}).Debug("region client is already in client's cache")
rcc.logger.Debug("region client is already in client's cache", "client", existingClient)
return existingClient
}
}

// no such client yet
c := newClient()
rcc.regions[c] = map[hrpc.RegionInfo]struct{}{r: struct{}{}}
rcc.regions[c] = map[hrpc.RegionInfo]struct{}{r: {}}
rcc.m.Unlock()

log.WithField("client", c).Info("added new region client")
rcc.logger.Info("added new region client", "client", c)
return c
}

Expand Down Expand Up @@ -88,7 +87,7 @@ func (rcc *clientRegionCache) clientDown(c hrpc.RegionClient) map[hrpc.RegionInf
rcc.m.Unlock()

if ok {
log.WithField("client", c).Info("removed region client")
rcc.logger.Info("removed region client", "client", c)
}
return downregions
}
Expand Down Expand Up @@ -125,7 +124,8 @@ func (rcc *clientRegionCache) debugInfo(

// key -> region cache.
type keyRegionCache struct {
m sync.RWMutex
m sync.RWMutex
logger *slog.Logger

// Maps a []byte of a region start key to a hrpc.RegionInfo
regions *b.Tree[[]byte, hrpc.RegionInfo]
Expand All @@ -137,8 +137,7 @@ func (krc *keyRegionCache) get(key []byte) ([]byte, hrpc.RegionInfo) {
enum, ok := krc.regions.Seek(key)
if ok {
krc.m.RUnlock()
log.Fatalf("WTF: got exact match for region search key %q", key)
return nil, nil
panic(fmt.Errorf("WTF: got exact match for region search key %q", key))
}
k, v, err := enum.Prev()
enum.Close()
Expand Down Expand Up @@ -204,7 +203,7 @@ func (krc *keyRegionCache) getOverlaps(reg hrpc.RegionInfo) []hrpc.RegionInfo {
key := createRegionSearchKey(fullyQualifiedTable(reg), reg.StartKey())
enum, ok := krc.regions.Seek(key)
if ok {
log.Fatalf("WTF: found a region with exact name as the search key %q", key)
panic(fmt.Errorf("WTF: found a region with exact name as the search key %q", key))
}

// case 1: landed before the first region in cache
Expand All @@ -229,15 +228,15 @@ func (krc *keyRegionCache) getOverlaps(reg hrpc.RegionInfo) []hrpc.RegionInfo {
enum.Close()
enum, err = krc.regions.SeekFirst()
if err != nil {
log.Fatalf(
"error seeking first region when getting overlaps for region %v: %v", reg, err)
panic(fmt.Errorf(
"error seeking first region when getting overlaps for region %v: %v", reg, err))
}
}

_, v, err = enum.Next()
if err != nil {
log.Fatalf(
"error accessing first region when getting overlaps for region %v: %v", reg, err)
panic(fmt.Errorf(
"error accessing first region when getting overlaps for region %v: %v", reg, err))
}
if isRegionOverlap(v, reg) {
overlaps = append(overlaps, v)
Expand Down Expand Up @@ -294,11 +293,8 @@ func (krc *keyRegionCache) put(reg hrpc.RegionInfo) (overlaps []hrpc.RegionInfo,
return reg, true
})
if !replaced {
log.WithFields(log.Fields{
"region": reg,
"overlaps": overlaps,
"replaced": replaced,
}).Debug("region is already in cache")
krc.logger.Debug("region is already in cache",
"region", reg, "overlaps", overlaps, "replaced", replaced)
return
}
// delete overlapping regions
Expand All @@ -310,11 +306,8 @@ func (krc *keyRegionCache) put(reg hrpc.RegionInfo) (overlaps []hrpc.RegionInfo,
o.MarkDead()
}

log.WithFields(log.Fields{
"region": reg,
"overlaps": overlaps,
"replaced": replaced,
}).Info("added new region")
krc.logger.Info("added new region",
"region", reg, "overlaps", overlaps, "replaced", replaced)
return
}

Expand All @@ -328,8 +321,6 @@ func (krc *keyRegionCache) del(reg hrpc.RegionInfo) bool {
if success {
cachedRegionTotal.Dec()
}
log.WithFields(log.Fields{
"region": reg,
}).Debug("removed region")
krc.logger.Debug("removed region", "region", reg)
return success
}
41 changes: 31 additions & 10 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ import (
"encoding/binary"
"encoding/json"
"fmt"
"log/slog"
"net"
"reflect"
"sync"
"time"

log "github.com/sirupsen/logrus"
"github.com/tsuna/gohbase/compression"
"github.com/tsuna/gohbase/hrpc"
"github.com/tsuna/gohbase/pb"
Expand Down Expand Up @@ -100,7 +101,8 @@ type client struct {

newRegionClientFn func(string, region.ClientType, int, time.Duration,
string, time.Duration, compression.Codec,
func(ctx context.Context, network, addr string) (net.Conn, error)) hrpc.RegionClient
func(ctx context.Context, network, addr string) (net.Conn, error),
*slog.Logger) hrpc.RegionClient

compressionCodec compression.Codec

Expand All @@ -109,6 +111,8 @@ type client struct {
// regionDialer is passed into the region client to connect to hbase in a custom way,
// such as SOCKS proxy.
regionDialer func(ctx context.Context, network, addr string) (net.Conn, error)
// logger that could be defined by user
logger *slog.Logger
}

// NewClient creates a new HBase client.
Expand All @@ -117,13 +121,14 @@ func NewClient(zkquorum string, options ...Option) Client {
}

func newClient(zkquorum string, options ...Option) *client {
log.WithFields(log.Fields{
"Host": zkquorum,
}).Debug("Creating new client.")
c := &client{
clientType: region.RegionClient,
regions: keyRegionCache{regions: b.TreeNew[[]byte, hrpc.RegionInfo](region.Compare)},
regions: keyRegionCache{
logger: slog.Default(),
regions: b.TreeNew[[]byte, hrpc.RegionInfo](region.Compare),
},
clients: clientRegionCache{
logger: slog.Default(),
regions: make(map[hrpc.RegionClient]map[hrpc.RegionInfo]struct{}),
},
rpcQueueSize: defaultRPCQueueSize,
Expand All @@ -142,24 +147,31 @@ func newClient(zkquorum string, options ...Option) *client {
regionReadTimeout: region.DefaultReadTimeout,
done: make(chan struct{}),
newRegionClientFn: region.NewClient,
logger: slog.Default(),
}
for _, option := range options {
option(c)
}
c.logger.Info("Creating new admin client.", "Host", slog.StringValue(zkquorum))

//Have to create the zkClient after the Options have been set
//since the zkTimeout could be changed as an option
c.zkClient = zk.NewClient(zkquorum, c.zkTimeout, c.zkDialer)
c.zkClient = zk.NewClient(zkquorum, c.zkTimeout, c.zkDialer, c.logger)

return c
}

// DebugState information about the clients keyRegionCache, and clientRegionCache
func DebugState(client Client) ([]byte, error) {
func DebugState(c Client) ([]byte, error) {

debugInfoJson, err := json.Marshal(client)
debugInfoJson, err := json.Marshal(c)
if err != nil {
log.Errorf("Cannot turn client into JSON bytes array: %v", err)
if cclient, ok := c.(*client); ok {
cclient.logger.Error("Cannot turn client into JSON bytes array", "error", err)
} else {
slog.Error("unknown client type", "type", reflect.TypeOf(c))
slog.Error("Cannot turn client into JSON bytes array", "error", err)
}
}
return debugInfoJson, err
}
Expand Down Expand Up @@ -296,6 +308,15 @@ func RegionDialer(dialer func(
}
}

// Logger will return an option to set *slog.Logger instance
func Logger(logger *slog.Logger) Option {
return func(c *client) {
c.regions.logger = logger
c.clients.logger = logger
c.logger = logger
}
}

// Close closes connections to hbase master and regionservers
func (c *client) Close() {
c.closeOnce.Do(func() {
Expand Down
2 changes: 2 additions & 0 deletions debug_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gohbase

import (
"encoding/json"
"log/slog"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -27,6 +28,7 @@ func TestDebugStateSanity(t *testing.T) {
region.DefaultReadTimeout,
client.compressionCodec,
nil,
slog.Default(),
)
newClientFn := func() hrpc.RegionClient {
return regClient
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ require (
github.com/go-zookeeper/zk v1.0.3
github.com/golang/snappy v0.0.4
github.com/prometheus/client_golang v1.13.0
github.com/sirupsen/logrus v1.9.0
github.com/stretchr/testify v1.8.0
go.opentelemetry.io/otel v1.11.1
go.opentelemetry.io/otel/trace v1.11.1
Expand Down
5 changes: 0 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,6 @@ github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0ua
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 h1:OdAsTTz6OkFY5QxjkYwrChwuRruF69c169dPK26NUlk=
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
Expand Down
7 changes: 2 additions & 5 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@
// Use of this source code is governed by the Apache License 2.0
// that can be found in the COPYING file.

//go:build integration
// +build integration

package gohbase_test

import (
Expand All @@ -16,6 +13,7 @@ import (
"flag"
"fmt"
"io"
"log/slog"
"os"
"os/exec"
"reflect"
Expand All @@ -27,7 +25,6 @@ import (

"math"

log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/tsuna/gohbase"
"github.com/tsuna/gohbase/filter"
Expand Down Expand Up @@ -101,7 +98,7 @@ func TestMain(m *testing.M) {
panic("Host is not set!")
}

log.SetLevel(log.DebugLevel)
slog.SetLogLoggerLevel(slog.LevelDebug)

ac := gohbase.NewAdminClient(*host)

Expand Down
4 changes: 3 additions & 1 deletion mockrc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"bytes"
"context"
"fmt"
"log/slog"
"net"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -179,7 +180,8 @@ func init() {
func newMockRegionClient(addr string, ctype region.ClientType, queueSize int,
flushInterval time.Duration, effectiveUser string,
readTimeout time.Duration, codec compression.Codec,
dialer func(ctx context.Context, network, addr string) (net.Conn, error)) hrpc.RegionClient {
dialer func(ctx context.Context, network, addr string) (net.Conn, error),
log *slog.Logger) hrpc.RegionClient {
m.Lock()
clients[addr]++
m.Unlock()
Expand Down
Loading

0 comments on commit 6934fc8

Please sign in to comment.