Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace logrus with log/slog for logging #257

Merged
merged 1 commit into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions admin_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
"context"
"errors"
"fmt"
"log/slog"
"time"

log "github.com/sirupsen/logrus"
"github.com/tsuna/gohbase/hrpc"
"github.com/tsuna/gohbase/pb"
"github.com/tsuna/gohbase/region"
Expand Down Expand Up @@ -48,9 +48,6 @@ func NewAdminClient(zkquorum string, options ...Option) AdminClient {
}

func newAdminClient(zkquorum string, options ...Option) AdminClient {
log.WithFields(log.Fields{
"Host": zkquorum,
}).Debug("Creating new admin client.")
c := &client{
clientType: region.MasterClient,
rpcQueueSize: defaultRPCQueueSize,
Expand All @@ -63,11 +60,13 @@ func newAdminClient(zkquorum string, options ...Option) AdminClient {
regionLookupTimeout: region.DefaultLookupTimeout,
regionReadTimeout: region.DefaultReadTimeout,
newRegionClientFn: region.NewClient,
logger: slog.Default(),
}
for _, option := range options {
option(c)
}
c.zkClient = zk.NewClient(zkquorum, c.zkTimeout, c.zkDialer)
c.logger.Debug("Creating new admin client.", "Host", slog.StringValue(zkquorum))
c.zkClient = zk.NewClient(zkquorum, c.zkTimeout, c.zkDialer, c.logger)
return c
}

Expand Down
49 changes: 20 additions & 29 deletions caches.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,18 @@ import (
"bytes"
"fmt"
"io"
"log/slog"
"sync"

log "github.com/sirupsen/logrus"
"github.com/tsuna/gohbase/hrpc"
"modernc.org/b/v2"
)

// clientRegionCache is client -> region cache. Used to quickly
// look up all the regioninfos that map to a specific client
type clientRegionCache struct {
m sync.RWMutex
m sync.RWMutex
logger *slog.Logger

regions map[hrpc.RegionClient]map[hrpc.RegionInfo]struct{}
}
Expand All @@ -42,19 +43,17 @@ func (rcc *clientRegionCache) put(addr string, r hrpc.RegionInfo,
}
rcc.m.Unlock()

log.WithFields(log.Fields{
"client": existingClient,
}).Debug("region client is already in client's cache")
rcc.logger.Debug("region client is already in client's cache", "client", existingClient)
return existingClient
}
}

// no such client yet
c := newClient()
rcc.regions[c] = map[hrpc.RegionInfo]struct{}{r: struct{}{}}
rcc.regions[c] = map[hrpc.RegionInfo]struct{}{r: {}}
rcc.m.Unlock()

log.WithField("client", c).Info("added new region client")
rcc.logger.Info("added new region client", "client", c)
return c
}

Expand Down Expand Up @@ -88,7 +87,7 @@ func (rcc *clientRegionCache) clientDown(c hrpc.RegionClient) map[hrpc.RegionInf
rcc.m.Unlock()

if ok {
log.WithField("client", c).Info("removed region client")
rcc.logger.Info("removed region client", "client", c)
}
return downregions
}
Expand Down Expand Up @@ -125,7 +124,8 @@ func (rcc *clientRegionCache) debugInfo(

// key -> region cache.
type keyRegionCache struct {
m sync.RWMutex
m sync.RWMutex
logger *slog.Logger

// Maps a []byte of a region start key to a hrpc.RegionInfo
regions *b.Tree[[]byte, hrpc.RegionInfo]
Expand All @@ -137,8 +137,7 @@ func (krc *keyRegionCache) get(key []byte) ([]byte, hrpc.RegionInfo) {
enum, ok := krc.regions.Seek(key)
if ok {
krc.m.RUnlock()
log.Fatalf("WTF: got exact match for region search key %q", key)
return nil, nil
panic(fmt.Errorf("WTF: got exact match for region search key %q", key))
}
k, v, err := enum.Prev()
enum.Close()
Expand Down Expand Up @@ -204,7 +203,7 @@ func (krc *keyRegionCache) getOverlaps(reg hrpc.RegionInfo) []hrpc.RegionInfo {
key := createRegionSearchKey(fullyQualifiedTable(reg), reg.StartKey())
enum, ok := krc.regions.Seek(key)
if ok {
log.Fatalf("WTF: found a region with exact name as the search key %q", key)
panic(fmt.Errorf("WTF: found a region with exact name as the search key %q", key))
}

// case 1: landed before the first region in cache
Expand All @@ -229,15 +228,15 @@ func (krc *keyRegionCache) getOverlaps(reg hrpc.RegionInfo) []hrpc.RegionInfo {
enum.Close()
enum, err = krc.regions.SeekFirst()
if err != nil {
log.Fatalf(
"error seeking first region when getting overlaps for region %v: %v", reg, err)
panic(fmt.Errorf(
"error seeking first region when getting overlaps for region %v: %v", reg, err))
}
}

_, v, err = enum.Next()
if err != nil {
log.Fatalf(
"error accessing first region when getting overlaps for region %v: %v", reg, err)
panic(fmt.Errorf(
"error accessing first region when getting overlaps for region %v: %v", reg, err))
}
if isRegionOverlap(v, reg) {
overlaps = append(overlaps, v)
Expand Down Expand Up @@ -294,11 +293,8 @@ func (krc *keyRegionCache) put(reg hrpc.RegionInfo) (overlaps []hrpc.RegionInfo,
return reg, true
})
if !replaced {
log.WithFields(log.Fields{
"region": reg,
"overlaps": overlaps,
"replaced": replaced,
}).Debug("region is already in cache")
krc.logger.Debug("region is already in cache",
"region", reg, "overlaps", overlaps, "replaced", replaced)
return
}
// delete overlapping regions
Expand All @@ -310,11 +306,8 @@ func (krc *keyRegionCache) put(reg hrpc.RegionInfo) (overlaps []hrpc.RegionInfo,
o.MarkDead()
}

log.WithFields(log.Fields{
"region": reg,
"overlaps": overlaps,
"replaced": replaced,
}).Info("added new region")
krc.logger.Info("added new region",
"region", reg, "overlaps", overlaps, "replaced", replaced)
return
}

Expand All @@ -328,8 +321,6 @@ func (krc *keyRegionCache) del(reg hrpc.RegionInfo) bool {
if success {
cachedRegionTotal.Dec()
}
log.WithFields(log.Fields{
"region": reg,
}).Debug("removed region")
krc.logger.Debug("removed region", "region", reg)
return success
}
45 changes: 31 additions & 14 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ import (
"encoding/binary"
"encoding/json"
"fmt"
"log/slog"
"net"
"sync"
"time"

log "github.com/sirupsen/logrus"
"github.com/tsuna/gohbase/compression"
"github.com/tsuna/gohbase/hrpc"
"github.com/tsuna/gohbase/pb"
Expand Down Expand Up @@ -100,7 +100,8 @@ type client struct {

newRegionClientFn func(string, region.ClientType, int, time.Duration,
string, time.Duration, compression.Codec,
func(ctx context.Context, network, addr string) (net.Conn, error)) hrpc.RegionClient
func(ctx context.Context, network, addr string) (net.Conn, error),
*slog.Logger) hrpc.RegionClient

compressionCodec compression.Codec

Expand All @@ -109,6 +110,8 @@ type client struct {
// regionDialer is passed into the region client to connect to hbase in a custom way,
// such as SOCKS proxy.
regionDialer func(ctx context.Context, network, addr string) (net.Conn, error)
// logger that could be defined by user
logger *slog.Logger
}

// NewClient creates a new HBase client.
Expand All @@ -117,15 +120,8 @@ func NewClient(zkquorum string, options ...Option) Client {
}

func newClient(zkquorum string, options ...Option) *client {
log.WithFields(log.Fields{
"Host": zkquorum,
}).Debug("Creating new client.")
c := &client{
clientType: region.RegionClient,
regions: keyRegionCache{regions: b.TreeNew[[]byte, hrpc.RegionInfo](region.Compare)},
clients: clientRegionCache{
regions: make(map[hrpc.RegionClient]map[hrpc.RegionInfo]struct{}),
},
clientType: region.RegionClient,
rpcQueueSize: defaultRPCQueueSize,
flushInterval: defaultFlushInterval,
metaRegionInfo: region.NewInfo(
Expand All @@ -142,24 +138,38 @@ func newClient(zkquorum string, options ...Option) *client {
regionReadTimeout: region.DefaultReadTimeout,
done: make(chan struct{}),
newRegionClientFn: region.NewClient,
logger: slog.Default(),
}
for _, option := range options {
option(c)
}
c.logger.Debug("Creating new client.", "Host", slog.StringValue(zkquorum))

//Have to create the zkClient after the Options have been set
//since the zkTimeout could be changed as an option
c.zkClient = zk.NewClient(zkquorum, c.zkTimeout, c.zkDialer)
c.zkClient = zk.NewClient(zkquorum, c.zkTimeout, c.zkDialer, c.logger)
c.regions = keyRegionCache{
logger: c.logger,
regions: b.TreeNew[[]byte, hrpc.RegionInfo](region.Compare),
}
c.clients = clientRegionCache{
logger: c.logger,
regions: make(map[hrpc.RegionClient]map[hrpc.RegionInfo]struct{}),
}

return c
}

// DebugState information about the clients keyRegionCache, and clientRegionCache
func DebugState(client Client) ([]byte, error) {
func DebugState(c Client) ([]byte, error) {

debugInfoJson, err := json.Marshal(client)
debugInfoJson, err := json.Marshal(c)
if err != nil {
log.Errorf("Cannot turn client into JSON bytes array: %v", err)
if cclient, ok := c.(*client); ok {
cclient.logger.Error("Cannot turn client into JSON bytes array", "error", err)
} else {
slog.Error("Cannot turn client into JSON bytes array", "error", err)
}
}
return debugInfoJson, err
}
Expand Down Expand Up @@ -296,6 +306,13 @@ func RegionDialer(dialer func(
}
}

// Logger will return an option to set *slog.Logger instance
func Logger(logger *slog.Logger) Option {
return func(c *client) {
c.logger = logger
}
}

// Close closes connections to hbase master and regionservers
func (c *client) Close() {
c.closeOnce.Do(func() {
Expand Down
2 changes: 2 additions & 0 deletions debug_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gohbase

import (
"encoding/json"
"log/slog"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -27,6 +28,7 @@ func TestDebugStateSanity(t *testing.T) {
region.DefaultReadTimeout,
client.compressionCodec,
nil,
slog.Default(),
)
newClientFn := func() hrpc.RegionClient {
return regClient
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ require (
github.com/go-zookeeper/zk v1.0.3
github.com/golang/snappy v0.0.4
github.com/prometheus/client_golang v1.19.1
github.com/sirupsen/logrus v1.9.3
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/otel v1.27.0
go.opentelemetry.io/otel/trace v1.27.0
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 h1:OdAsTTz6O
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
Expand Down
4 changes: 2 additions & 2 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"flag"
"fmt"
"io"
"log/slog"
"math"
"os"
"os/exec"
Expand All @@ -26,7 +27,6 @@ import (
"testing"
"time"

log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/tsuna/gohbase"
"github.com/tsuna/gohbase/filter"
Expand Down Expand Up @@ -100,7 +100,7 @@ func TestMain(m *testing.M) {
panic("Host is not set!")
}

log.SetLevel(log.DebugLevel)
slog.SetLogLoggerLevel(slog.LevelDebug)

ac := gohbase.NewAdminClient(*host)

Expand Down
4 changes: 3 additions & 1 deletion mockrc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"bytes"
"context"
"fmt"
"log/slog"
"net"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -179,7 +180,8 @@ func init() {
func newMockRegionClient(addr string, ctype region.ClientType, queueSize int,
flushInterval time.Duration, effectiveUser string,
readTimeout time.Duration, codec compression.Codec,
dialer func(ctx context.Context, network, addr string) (net.Conn, error)) hrpc.RegionClient {
dialer func(ctx context.Context, network, addr string) (net.Conn, error),
log *slog.Logger) hrpc.RegionClient {
m.Lock()
clients[addr]++
m.Unlock()
Expand Down
Loading
Loading