Skip to content

Commit

Permalink
feat: Querier: Split gRPC client into two. (#12726)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
Split the gRPC client used by the querier into two, one for the communication with the scheduler, the other for communicating with the query-frontend.
- This change is retrocompatible: you don't have to change anything to keep existing behavior. 
- To configure the custom scheduler grpc client, you can use the new `query_scheduler_grpc_client` config or the new CLI flag `querier.scheduler-grpc-client`
- If you'd like to configure your frontend grpc client using a better named section, you can use the new `query_frontend_grpc_client` instead of the old `grpc_client_config`. Just make sure you don't use both at the same time, it will result in an error.

This work is necessary for configuring custom behavior between `querier<->scheduler` vs `querier<->frontend`. A use case is configuring mTLS when a different certificate is used by queriers, schedulers and frontends. You can only configure a single `server_name` with our current setup, making it impossible.
  • Loading branch information
DylanGuedes authored May 6, 2024
1 parent 6904a65 commit 7b6f057
Show file tree
Hide file tree
Showing 8 changed files with 229 additions and 27 deletions.
27 changes: 21 additions & 6 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -543,19 +543,19 @@ The `alibabacloud_storage_config` block configures the connection to Alibaba Clo

```yaml
# Name of OSS bucket.
# CLI flag: -common.storage.oss.bucketname
# CLI flag: -<prefix>.storage.oss.bucketname
[bucket: <string> | default = ""]
# oss Endpoint to connect to.
# CLI flag: -common.storage.oss.endpoint
# CLI flag: -<prefix>.storage.oss.endpoint
[endpoint: <string> | default = ""]
# alibabacloud Access Key ID
# CLI flag: -common.storage.oss.access-key-id
# CLI flag: -<prefix>.storage.oss.access-key-id
[access_key_id: <string> | default = ""]
# alibabacloud Secret Access Key
# CLI flag: -common.storage.oss.secret-access-key
# CLI flag: -<prefix>.storage.oss.secret-access-key
[secret_access_key: <string> | default = ""]
```

Expand Down Expand Up @@ -2236,10 +2236,23 @@ The `frontend_worker` configures the worker - running within the Loki querier -
# CLI flag: -querier.id
[id: <string> | default = ""]
# The grpc_client block configures the gRPC client used to communicate between a
# client and server component in Loki.
# Configures the querier gRPC client used to communicate with the
# query-frontend. Shouldn't be used in conjunction with 'grpc_client_config'.
# The CLI flags prefix for this block configuration is:
# querier.frontend-grpc-client
[query_frontend_grpc_client: <grpc_client>]
# Configures the querier gRPC client used to communicate with the query-frontend
# and with the query-scheduler if 'query_scheduler_grpc_client' isn't defined.
# This shouldn't be used if 'query_frontend_grpc_client' is defined.
# The CLI flags prefix for this block configuration is: querier.frontend-client
[grpc_client_config: <grpc_client>]
# Configures the querier gRPC client used to communicate with the
# query-scheduler. If not defined, 'grpc_client_config' is used instead.
# The CLI flags prefix for this block configuration is:
# querier.scheduler-grpc-client
[query_scheduler_grpc_client: <grpc_client>]
```

### gcs_storage_config
Expand Down Expand Up @@ -2297,6 +2310,8 @@ The `grpc_client` block configures the gRPC client used to communicate between a
- `ingester.client`
- `pattern-ingester.client`
- `querier.frontend-client`
- `querier.frontend-grpc-client`
- `querier.scheduler-grpc-client`
- `query-scheduler.grpc-client-config`
- `ruler.client`
- `tsdb.shipper.index-gateway-client.grpc`
Expand Down
23 changes: 23 additions & 0 deletions pkg/loki/config_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source {
applyIngesterFinalSleep(r)
applyIngesterReplicationFactor(r)
applyChunkRetain(r, &defaults)
if err := applyCommonQuerierWorkerGRPCConfig(r, &defaults); err != nil {
return err
}

return nil
}
Expand Down Expand Up @@ -684,3 +687,23 @@ func applyChunkRetain(cfg, defaults *ConfigWrapper) {
}
}
}

func applyCommonQuerierWorkerGRPCConfig(cfg, defaults *ConfigWrapper) error {
if !reflect.DeepEqual(cfg.Worker.OldQueryFrontendGRPCClientConfig, defaults.Worker.OldQueryFrontendGRPCClientConfig) {
// User is using the old grpc configuration.

if reflect.DeepEqual(cfg.Worker.NewQueryFrontendGRPCClientConfig, defaults.Worker.NewQueryFrontendGRPCClientConfig) {
// User is using the old grpc configuration only, we can just copy it to the new grpc client struct.
cfg.Worker.NewQueryFrontendGRPCClientConfig = cfg.Worker.OldQueryFrontendGRPCClientConfig
} else {
// User is using both, old and new way of configuring the grpc client, so we throw an error.
return fmt.Errorf("both `grpc_client_config` and `query_frontend_grpc_client` are set at the same time. Please use only one of them")
}

if reflect.DeepEqual(cfg.Worker.QuerySchedulerGRPCClientConfig, defaults.Worker.QuerySchedulerGRPCClientConfig) {
// Since the scheduler grpc client is not set, we can just copy the old query frontend grpc client to the scheduler grpc client.
cfg.Worker.QuerySchedulerGRPCClientConfig = cfg.Worker.OldQueryFrontendGRPCClientConfig
}
}
return nil
}
103 changes: 103 additions & 0 deletions pkg/loki/config_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,109 @@ query_range:
config, _ := testContext(configFileString, nil)
assert.True(t, config.QueryRange.ResultsCacheConfig.CacheConfig.EmbeddedCache.Enabled)
})

t.Run("querier worker grpc client behavior", func(t *testing.T) {
newConfigBothClientsSet := `---
frontend_worker:
query_frontend_grpc_client:
tls_server_name: query-frontend
query_scheduler_grpc_client:
tls_server_name: query-scheduler
`

oldConfig := `---
frontend_worker:
grpc_client_config:
tls_server_name: query-frontend
`

mixedConfig := `---
frontend_worker:
grpc_client_config:
tls_server_name: query-frontend-old
query_frontend_grpc_client:
tls_server_name: query-frontend-new
query_scheduler_grpc_client:
tls_server_name: query-scheduler
`
t.Run("new configs are used", func(t *testing.T) {
asserts := func(config ConfigWrapper) {
require.EqualValues(t, "query-frontend", config.Worker.NewQueryFrontendGRPCClientConfig.TLS.ServerName)
require.EqualValues(t, "query-scheduler", config.Worker.QuerySchedulerGRPCClientConfig.TLS.ServerName)
// we never want to use zero values by default.
require.NotEqualValues(t, 0, config.Worker.NewQueryFrontendGRPCClientConfig.MaxRecvMsgSize)
require.NotEqualValues(t, 0, config.Worker.QuerySchedulerGRPCClientConfig.MaxRecvMsgSize)
}

yamlConfig, _, err := configWrapperFromYAML(t, newConfigBothClientsSet, nil)
require.NoError(t, err)
asserts(yamlConfig)

// repeat the test using only cli flags.
cliFlags := []string{
"-querier.frontend-grpc-client.tls-server-name=query-frontend",
"-querier.scheduler-grpc-client.tls-server-name=query-scheduler",
}
cliConfig, _, err := configWrapperFromYAML(t, emptyConfigString, cliFlags)
require.NoError(t, err)
asserts(cliConfig)
})

t.Run("old config works the same way", func(t *testing.T) {
asserts := func(config ConfigWrapper) {
require.EqualValues(t, "query-frontend", config.Worker.NewQueryFrontendGRPCClientConfig.TLS.ServerName)
require.EqualValues(t, "query-frontend", config.Worker.QuerySchedulerGRPCClientConfig.TLS.ServerName)

// we never want to use zero values by default.
require.NotEqualValues(t, 0, config.Worker.NewQueryFrontendGRPCClientConfig.MaxRecvMsgSize)
require.NotEqualValues(t, 0, config.Worker.QuerySchedulerGRPCClientConfig.MaxRecvMsgSize)
}

yamlConfig, _, err := configWrapperFromYAML(t, oldConfig, nil)
require.NoError(t, err)
asserts(yamlConfig)

// repeat the test using only cli flags.
cliFlags := []string{
"-querier.frontend-client.tls-server-name=query-frontend",
}
cliConfig, _, err := configWrapperFromYAML(t, emptyConfigString, cliFlags)
require.NoError(t, err)
asserts(cliConfig)
})

t.Run("mixed frontend clients throws an error", func(t *testing.T) {
_, _, err := configWrapperFromYAML(t, mixedConfig, nil)
require.Error(t, err)

// repeat the test using only cli flags.
_, _, err = configWrapperFromYAML(t, emptyConfigString, []string{
"-querier.frontend-client.tls-server-name=query-frontend",
"-querier.frontend-grpc-client.tls-server-name=query-frontend",
})
require.Error(t, err)

// repeat the test mixing the YAML with cli flags.
_, _, err = configWrapperFromYAML(t, newConfigBothClientsSet, []string{
"-querier.frontend-client.tls-server-name=query-frontend",
})
require.Error(t, err)
})

t.Run("mix correct cli flags with YAML configs", func(t *testing.T) {
config, _, err := configWrapperFromYAML(t, newConfigBothClientsSet, []string{
"-querier.scheduler-grpc-client.tls-enabled=true",
})
require.NoError(t, err)

require.EqualValues(t, "query-frontend", config.Worker.NewQueryFrontendGRPCClientConfig.TLS.ServerName)
require.EqualValues(t, "query-scheduler", config.Worker.QuerySchedulerGRPCClientConfig.TLS.ServerName)
// we never want to use zero values by default.
require.NotEqualValues(t, 0, config.Worker.NewQueryFrontendGRPCClientConfig.MaxRecvMsgSize)
require.NotEqualValues(t, 0, config.Worker.QuerySchedulerGRPCClientConfig.MaxRecvMsgSize)
require.True(t, config.Worker.QuerySchedulerGRPCClientConfig.TLSEnabled)
})
})
}

const defaultResulsCacheString = `---
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/worker/frontend_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func newFrontendProcessor(cfg Config, handler RequestHandler, log log.Logger, co
log: log,
handler: handler,
codec: codec,
maxMessageSize: cfg.GRPCClientConfig.MaxSendMsgSize,
maxMessageSize: cfg.NewQueryFrontendGRPCClientConfig.MaxSendMsgSize,
querierID: cfg.QuerierID,
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/worker/scheduler_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ func newSchedulerProcessor(cfg Config, handler RequestHandler, log log.Logger, m
log: log,
handler: handler,
codec: codec,
maxMessageSize: cfg.GRPCClientConfig.MaxSendMsgSize,
maxMessageSize: cfg.NewQueryFrontendGRPCClientConfig.MaxRecvMsgSize,
querierID: cfg.QuerierID,
grpcConfig: cfg.GRPCClientConfig,
grpcConfig: cfg.NewQueryFrontendGRPCClientConfig,
schedulerClientFactory: func(conn *grpc.ClientConn) schedulerpb.SchedulerForQuerierClient {
return schedulerpb.NewSchedulerForQuerierClient(conn)
},
Expand Down
55 changes: 38 additions & 17 deletions pkg/querier/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ type Config struct {

QuerierID string `yaml:"id"`

GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`
NewQueryFrontendGRPCClientConfig grpcclient.Config `yaml:"query_frontend_grpc_client" doc:"description=Configures the querier gRPC client used to communicate with the query-frontend. Shouldn't be used in conjunction with 'grpc_client_config'."`
OldQueryFrontendGRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=Configures the querier gRPC client used to communicate with the query-frontend and with the query-scheduler if 'query_scheduler_grpc_client' isn't defined. This shouldn't be used if 'query_frontend_grpc_client' is defined."`

QuerySchedulerGRPCClientConfig grpcclient.Config `yaml:"query_scheduler_grpc_client" doc:"description=Configures the querier gRPC client used to communicate with the query-scheduler. If not defined, 'grpc_client_config' is used instead."`
}

func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
Expand All @@ -39,14 +42,25 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.DNSLookupPeriod, "querier.dns-lookup-period", 3*time.Second, "How often to query DNS for query-frontend or query-scheduler address. Also used to determine how often to poll the scheduler-ring for addresses if the scheduler-ring is configured.")
f.StringVar(&cfg.QuerierID, "querier.id", "", "Querier ID, sent to frontend service to identify requests from the same querier. Defaults to hostname.")

cfg.GRPCClientConfig.RegisterFlagsWithPrefix("querier.frontend-client", f)
// Register old client as the frontend-client flag for retro-compatibility.
cfg.OldQueryFrontendGRPCClientConfig.RegisterFlagsWithPrefix("querier.frontend-client", f)

cfg.NewQueryFrontendGRPCClientConfig.RegisterFlagsWithPrefix("querier.frontend-grpc-client", f)
cfg.QuerySchedulerGRPCClientConfig.RegisterFlagsWithPrefix("querier.scheduler-grpc-client", f)
}

func (cfg *Config) Validate() error {
if cfg.FrontendAddress != "" && cfg.SchedulerAddress != "" {
return errors.New("frontend address and scheduler address are mutually exclusive, please use only one")
}
return cfg.GRPCClientConfig.Validate()
if err := cfg.NewQueryFrontendGRPCClientConfig.Validate(); err != nil {
return err
}
if err := cfg.OldQueryFrontendGRPCClientConfig.Validate(); err != nil {
return err
}

return cfg.QuerySchedulerGRPCClientConfig.Validate()
}

// Handler for HTTP requests wrapped in protobuf messages.
Expand Down Expand Up @@ -80,7 +94,6 @@ type processor interface {
type querierWorker struct {
*services.BasicService

cfg Config
logger log.Logger

processor processor
Expand All @@ -92,6 +105,9 @@ type querierWorker struct {
managers map[string]*processorManager

metrics *Metrics

grpcClientConfig grpcclient.Config
maxConcurrentRequests int
}

func NewQuerierWorker(cfg Config, rng ring.ReadRing, handler RequestHandler, logger log.Logger, reg prometheus.Registerer, codec RequestCodec) (services.Service, error) {
Expand All @@ -105,43 +121,48 @@ func NewQuerierWorker(cfg Config, rng ring.ReadRing, handler RequestHandler, log

metrics := NewMetrics(cfg, reg)
var processor processor
var grpcCfg grpcclient.Config
var servs []services.Service
var address string

switch {
case rng != nil:
level.Info(logger).Log("msg", "Starting querier worker using query-scheduler and scheduler ring for addresses")
grpcCfg = cfg.QuerySchedulerGRPCClientConfig
processor, servs = newSchedulerProcessor(cfg, handler, logger, metrics, codec)
case cfg.SchedulerAddress != "":
level.Info(logger).Log("msg", "Starting querier worker connected to query-scheduler", "scheduler", cfg.SchedulerAddress)

grpcCfg = cfg.QuerySchedulerGRPCClientConfig
address = cfg.SchedulerAddress
processor, servs = newSchedulerProcessor(cfg, handler, logger, metrics, codec)

case cfg.FrontendAddress != "":
level.Info(logger).Log("msg", "Starting querier worker connected to query-frontend", "frontend", cfg.FrontendAddress)

address = cfg.FrontendAddress
grpcCfg = cfg.NewQueryFrontendGRPCClientConfig
processor = newFrontendProcessor(cfg, handler, logger, codec)
default:
return nil, errors.New("unable to start the querier worker, need to configure one of frontend_address, scheduler_address, or a ring config in the query_scheduler config block")
}

return newQuerierWorkerWithProcessor(cfg, metrics, logger, processor, address, rng, servs)
return newQuerierWorkerWithProcessor(grpcCfg, cfg.MaxConcurrent, cfg.DNSLookupPeriod, metrics, logger, processor, address, rng, servs)
}

func newQuerierWorkerWithProcessor(cfg Config, metrics *Metrics, logger log.Logger, processor processor, address string, ring ring.ReadRing, servs []services.Service) (*querierWorker, error) {
func newQuerierWorkerWithProcessor(grpcCfg grpcclient.Config, maxConcReq int, dnsLookupPeriod time.Duration, metrics *Metrics, logger log.Logger, processor processor, address string, ring ring.ReadRing, servs []services.Service) (*querierWorker, error) {
f := &querierWorker{
cfg: cfg,
logger: logger,
managers: map[string]*processorManager{},
processor: processor,
metrics: metrics,
maxConcurrentRequests: maxConcReq,
grpcClientConfig: grpcCfg,
logger: logger,
managers: map[string]*processorManager{},
processor: processor,
metrics: metrics,
}

// Empty address is only used in tests, where individual targets are added manually.
if address != "" {
w, err := util.NewDNSWatcher(address, cfg.DNSLookupPeriod, f)
w, err := util.NewDNSWatcher(address, dnsLookupPeriod, f)
if err != nil {
return nil, err
}
Expand All @@ -150,7 +171,7 @@ func newQuerierWorkerWithProcessor(cfg Config, metrics *Metrics, logger log.Logg
}

if ring != nil {
w, err := util.NewRingWatcher(log.With(logger, "component", "querier-scheduler-worker"), ring, cfg.DNSLookupPeriod, f)
w, err := util.NewRingWatcher(log.With(logger, "component", "querier-scheduler-worker"), ring, dnsLookupPeriod, f)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -245,17 +266,17 @@ func (w *querierWorker) resetConcurrency() {
}()

for _, m := range w.managers {
concurrency := w.cfg.MaxConcurrent / len(w.managers)
concurrency := w.maxConcurrentRequests / len(w.managers)

// If max concurrency does not evenly divide into our frontends a subset will be chosen
// to receive an extra connection. Frontend addresses were shuffled above so this will be a
// random selection of frontends.
if index < w.cfg.MaxConcurrent%len(w.managers) {
if index < w.maxConcurrentRequests%len(w.managers) {
level.Warn(w.logger).Log("msg", "max concurrency is not evenly divisible across targets, adding an extra connection", "addr", m.address)
concurrency++
}

// If concurrency is 0 then MaxConcurrentRequests is less than the total number of
// If concurrency is 0 then maxConcurrentRequests is less than the total number of
// frontends/schedulers. In order to prevent accidentally starving a frontend or scheduler we are just going to
// always connect once to every target. This is dangerous b/c we may start exceeding LogQL
// max concurrency.
Expand All @@ -271,7 +292,7 @@ func (w *querierWorker) resetConcurrency() {

func (w *querierWorker) connect(ctx context.Context, address string) (*grpc.ClientConn, error) {
// Because we only use single long-running method, it doesn't make sense to inject user ID, send over tracing or add metrics.
opts, err := w.cfg.GRPCClientConfig.DialOption(nil, nil)
opts, err := w.grpcClientConfig.DialOption(nil, nil)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 7b6f057

Please sign in to comment.