Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Remove ring client pool from JumpHashClientPool #14367

Merged
merged 5 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 2 additions & 11 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1298,18 +1298,9 @@ Experimental: The `bloom_gateway` block configures the Loki bloom gateway server
client:
# Configures the behavior of the connection pool.
pool_config:
# How frequently to clean up clients for servers that have gone away or are
# unhealthy.
# How frequently to update the list of servers.
# CLI flag: -bloom-gateway-client.pool.check-interval
[check_interval: <duration> | default = 10s]

# Run a health check on each server during periodic cleanup.
# CLI flag: -bloom-gateway-client.pool.enable-health-check
[enable_health_check: <boolean> | default = true]

# Timeout for the health check if health check is enabled.
# CLI flag: -bloom-gateway-client.pool.health-check-timeout
[health_check_timeout: <duration> | default = 1s]
[check_interval: <duration> | default = 15s]

# The grpc_client block configures the gRPC client used to communicate between
# a client and server component in Loki.
Expand Down
17 changes: 5 additions & 12 deletions pkg/bloomgateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func NewClient(
}
}

poolFactory := func(addr string) (ringclient.PoolClient, error) {
clientFactory := func(addr string) (ringclient.PoolClient, error) {
pool, err := NewBloomGatewayGRPCPool(addr, dialOpts)
if err != nil {
return nil, errors.Wrap(err, "new bloom gateway grpc pool")
Expand All @@ -185,17 +185,10 @@ func NewClient(
// Make an attempt to do one DNS lookup so we can start with addresses
dnsProvider.RunOnce()

clientPool := ringclient.NewPool(
"bloom-gateway",
ringclient.PoolConfig(cfg.PoolConfig),
func() ([]string, error) { return dnsProvider.Addresses(), nil },
ringclient.PoolAddrFunc(poolFactory),
metrics.clients,
logger,
)

pool := NewJumpHashClientPool(clientPool, dnsProvider, cfg.PoolConfig.CheckInterval, logger)
pool.Start()
pool, err := NewJumpHashClientPool(clientFactory, dnsProvider, cfg.PoolConfig.CheckInterval, logger)
if err != nil {
return nil, err
}

return &GatewayClient{
cfg: cfg,
Expand Down
111 changes: 65 additions & 46 deletions pkg/bloomgateway/client_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package bloomgateway
import (
"context"
"flag"
"sort"
"sync"
"time"

"github.com/go-kit/log"
Expand All @@ -15,53 +15,65 @@ import (
)

// PoolConfig is config for creating a Pool.
// It has the same fields as "github.com/grafana/dskit/ring/client.PoolConfig" so it can be cast.
type PoolConfig struct {
CheckInterval time.Duration `yaml:"check_interval"`
HealthCheckEnabled bool `yaml:"enable_health_check"`
HealthCheckTimeout time.Duration `yaml:"health_check_timeout"`
MaxConcurrentHealthChecks int `yaml:"-"`
CheckInterval time.Duration `yaml:"check_interval"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *PoolConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.DurationVar(&cfg.CheckInterval, prefix+"check-interval", 10*time.Second, "How frequently to clean up clients for servers that have gone away or are unhealthy.")
f.BoolVar(&cfg.HealthCheckEnabled, prefix+"enable-health-check", true, "Run a health check on each server during periodic cleanup.")
f.DurationVar(&cfg.HealthCheckTimeout, prefix+"health-check-timeout", 1*time.Second, "Timeout for the health check if health check is enabled.")
f.DurationVar(&cfg.CheckInterval, prefix+"check-interval", 15*time.Second, "How frequently to update the list of servers.")
}

func (cfg *PoolConfig) Validate() error {
return nil
}

// compiler check
var _ clientPool = &JumpHashClientPool{}

type ClientFactory func(addr string) (client.PoolClient, error)

func (f ClientFactory) New(addr string) (client.PoolClient, error) {
return f(addr)
}

type JumpHashClientPool struct {
*client.Pool
services.Service
*jumphash.Selector
sync.RWMutex

provider AddressProvider
logger log.Logger

done chan struct{}
logger log.Logger
clients map[string]client.PoolClient
clientFactory ClientFactory
}

type AddressProvider interface {
Addresses() []string
}

func NewJumpHashClientPool(pool *client.Pool, dnsProvider AddressProvider, updateInterval time.Duration, logger log.Logger) *JumpHashClientPool {
func NewJumpHashClientPool(clientFactory ClientFactory, dnsProvider AddressProvider, updateInterval time.Duration, logger log.Logger) (*JumpHashClientPool, error) {
selector := jumphash.DefaultSelector()
err := selector.SetServers(dnsProvider.Addresses()...)
if err != nil {
level.Warn(logger).Log("msg", "error updating servers", "err", err)
}

p := &JumpHashClientPool{
Pool: pool,
Selector: selector,
done: make(chan struct{}),
logger: logger,
Selector: selector,
clientFactory: clientFactory,
provider: dnsProvider,
logger: logger,
clients: make(map[string]client.PoolClient, len(dnsProvider.Addresses())),
}
go p.updateLoop(dnsProvider, updateInterval)

return p
p.Service = services.NewTimerService(updateInterval, nil, p.updateLoop, nil)
return p, services.StartAndAwaitRunning(context.Background(), p.Service)
}

func (p *JumpHashClientPool) Stop() {
_ = services.StopAndAwaitTerminated(context.Background(), p.Service)
}

func (p *JumpHashClientPool) AddrForFingerprint(fp uint64) (string, error) {
Expand All @@ -80,35 +92,42 @@ func (p *JumpHashClientPool) Addr(key string) (string, error) {
return addr.String(), nil
}

func (p *JumpHashClientPool) Start() {
ctx := context.Background()
_ = services.StartAndAwaitRunning(ctx, p.Pool)
func (p *JumpHashClientPool) updateLoop(_ context.Context) error {
err := p.SetServers(p.provider.Addresses()...)
if err != nil {
level.Warn(p.logger).Log("msg", "error updating servers", "err", err)
}
return nil
}

func (p *JumpHashClientPool) Stop() {
ctx := context.Background()
_ = services.StopAndAwaitTerminated(ctx, p.Pool)
close(p.done)
}
// GetClientFor implements clientPool.
func (p *JumpHashClientPool) GetClientFor(addr string) (client.PoolClient, error) {
client, ok := p.fromCache(addr)
if ok {
return client, nil
}

// No client in cache so create one
p.Lock()
defer p.Unlock()

func (p *JumpHashClientPool) updateLoop(provider AddressProvider, updateInterval time.Duration) {
ticker := time.NewTicker(updateInterval)
defer ticker.Stop()

for {
select {
case <-p.done:
return
case <-ticker.C:
servers := provider.Addresses()
// ServerList deterministically maps keys to _index_ of the server list.
// Since DNS returns records in different order each time, we sort to
// guarantee best possible match between nodes.
sort.Strings(servers)
err := p.SetServers(servers...)
if err != nil {
level.Warn(p.logger).Log("msg", "error updating servers", "err", err)
}
}
// Check if a client has been created just after checking the cache and before acquiring the lock.
client, ok = p.clients[addr]
if ok {
return client, nil
}

client, err := p.clientFactory.New(addr)
if err != nil {
return nil, err
}
p.clients[addr] = client
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it OK that nothing ever gets removed from p.clients?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so. Worst thing is that we keep a couple of unused client objects around that are not actively used.

return client, nil
}

func (p *JumpHashClientPool) fromCache(addr string) (client.PoolClient, bool) {
p.RLock()
defer p.RUnlock()
client, ok := p.clients[addr]
return client, ok
}
3 changes: 2 additions & 1 deletion pkg/bloomgateway/client_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ func TestJumpHashClientPool_UpdateLoop(t *testing.T) {

provider := &provider{}
provider.UpdateAddresses([]string{"localhost:9095"})
pool := NewJumpHashClientPool(nil, provider, interval, log.NewNopLogger())
pool, err := NewJumpHashClientPool(nil, provider, interval, log.NewNopLogger())
require.NoError(t, err)
require.Len(t, pool.Addrs(), 1)
require.Equal(t, "127.0.0.1:9095", pool.Addrs()[0].String())

Expand Down
Loading