diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index 2837647cff97..830aff30e9f5 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -1986,102 +1986,6 @@ ring: The `bloom_gateway` block configures the Loki bloom gateway server, responsible for serving queries for filtering chunks based on filter expressions. ```yaml -# Defines the ring to be used by the bloom gateway servers and clients. In case -# this isn't configured, this block supports inheriting configuration from the -# common ring section. -ring: - kvstore: - # Backend storage to use for the ring. Supported values are: consul, etcd, - # inmemory, memberlist, multi. - # CLI flag: -bloom-gateway.ring.store - [store: | default = "consul"] - - # The prefix for the keys in the store. Should end with a /. - # CLI flag: -bloom-gateway.ring.prefix - [prefix: | default = "collectors/"] - - # Configuration for a Consul client. Only applies if the selected kvstore is - # consul. - # The CLI flags prefix for this block configuration is: bloom-gateway.ring - [consul: ] - - # Configuration for an ETCD v3 client. Only applies if the selected kvstore - # is etcd. - # The CLI flags prefix for this block configuration is: bloom-gateway.ring - [etcd: ] - - multi: - # Primary backend storage used by multi-client. - # CLI flag: -bloom-gateway.ring.multi.primary - [primary: | default = ""] - - # Secondary backend storage used by multi-client. - # CLI flag: -bloom-gateway.ring.multi.secondary - [secondary: | default = ""] - - # Mirror writes to secondary store. - # CLI flag: -bloom-gateway.ring.multi.mirror-enabled - [mirror_enabled: | default = false] - - # Timeout for storing value to secondary store. - # CLI flag: -bloom-gateway.ring.multi.mirror-timeout - [mirror_timeout: | default = 2s] - - # Period at which to heartbeat to the ring. 0 = disabled. - # CLI flag: -bloom-gateway.ring.heartbeat-period - [heartbeat_period: | default = 15s] - - # The heartbeat timeout after which compactors are considered unhealthy within - # the ring. 0 = never (timeout disabled). - # CLI flag: -bloom-gateway.ring.heartbeat-timeout - [heartbeat_timeout: | default = 1m] - - # File path where tokens are stored. If empty, tokens are not stored at - # shutdown and restored at startup. - # CLI flag: -bloom-gateway.ring.tokens-file-path - [tokens_file_path: | default = ""] - - # True to enable zone-awareness and replicate blocks across different - # availability zones. - # CLI flag: -bloom-gateway.ring.zone-awareness-enabled - [zone_awareness_enabled: | default = false] - - # Number of tokens to use in the ring. The bigger the number of tokens, the - # more fingerprint ranges the compactor will own, but the smaller these ranges - # will be. Bigger number of tokens means that more but smaller requests will - # be handled by each gateway. - # CLI flag: -bloom-gateway.ring.tokens - [num_tokens: | default = 16] - - # Factor for data replication. - # CLI flag: -bloom-gateway.ring.replication-factor - [replication_factor: | default = 3] - - # Instance ID to register in the ring. - # CLI flag: -bloom-gateway.ring.instance-id - [instance_id: | default = ""] - - # Name of network interface to read address from. - # CLI flag: -bloom-gateway.ring.instance-interface-names - [instance_interface_names: | default = []] - - # Port to advertise in the ring (defaults to server.grpc-listen-port). - # CLI flag: -bloom-gateway.ring.instance-port - [instance_port: | default = 0] - - # IP address to advertise in the ring. - # CLI flag: -bloom-gateway.ring.instance-addr - [instance_addr: | default = ""] - - # The availability zone where this instance is running. Required if - # zone-awareness is enabled. - # CLI flag: -bloom-gateway.ring.instance-availability-zone - [instance_availability_zone: | default = ""] - - # Enable using a IPv6 instance address. - # CLI flag: -bloom-gateway.ring.instance-enable-ipv6 - [instance_enable_ipv6: | default = false] - # Flag to enable or disable the bloom gateway component globally. # CLI flag: -bloom-gateway.enabled [enabled: | default = false] @@ -4149,7 +4053,6 @@ ring: Configuration for a Consul client. Only applies if the selected kvstore is `consul`. The supported CLI flags `` used to reference this configuration block are: - `bloom-compactor.ring` -- `bloom-gateway.ring` - `common.storage.ring` - `compactor.ring` - `distributor.ring` @@ -4196,7 +4099,6 @@ Configuration for a Consul client. Only applies if the selected kvstore is `cons Configuration for an ETCD v3 client. Only applies if the selected kvstore is `etcd`. The supported CLI flags `` used to reference this configuration block are: - `bloom-compactor.ring` -- `bloom-gateway.ring` - `common.storage.ring` - `compactor.ring` - `distributor.ring` diff --git a/pkg/bloomgateway/bloomgateway_test.go b/pkg/bloomgateway/bloomgateway_test.go index edaa2ea7f0c8..547652473ab9 100644 --- a/pkg/bloomgateway/bloomgateway_test.go +++ b/pkg/bloomgateway/bloomgateway_test.go @@ -10,9 +10,6 @@ import ( "github.com/go-kit/log" "github.com/grafana/dskit/flagext" - "github.com/grafana/dskit/kv" - "github.com/grafana/dskit/kv/consul" - "github.com/grafana/dskit/ring" "github.com/grafana/dskit/services" "github.com/grafana/dskit/user" "github.com/pkg/errors" @@ -29,7 +26,6 @@ import ( "github.com/grafana/loki/v3/pkg/storage/config" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" bloomshipperconfig "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper/config" - lokiring "github.com/grafana/loki/v3/pkg/util/ring" "github.com/grafana/loki/v3/pkg/validation" ) @@ -99,20 +95,8 @@ func TestBloomGateway_StartStopService(t *testing.T) { reg := prometheus.NewRegistry() t.Run("start and stop bloom gateway", func(t *testing.T) { - kvStore, closer := consul.NewInMemoryClient(ring.GetCodec(), logger, reg) - t.Cleanup(func() { - closer.Close() - }) - cfg := Config{ - Enabled: true, - Ring: lokiring.RingConfig{ - KVStore: kv.Config{ - Mock: kvStore, - }, - ReplicationFactor: 1, - NumTokens: 16, - }, + Enabled: true, WorkerConcurrency: 4, MaxOutstandingPerTenant: 1024, } @@ -137,22 +121,8 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { tenantID := "test" logger := log.NewNopLogger() - reg := prometheus.NewRegistry() - - kvStore, closer := consul.NewInMemoryClient(ring.GetCodec(), logger, reg) - t.Cleanup(func() { - closer.Close() - }) - cfg := Config{ - Enabled: true, - Ring: lokiring.RingConfig{ - KVStore: kv.Config{ - Mock: kvStore, - }, - ReplicationFactor: 1, - NumTokens: 16, - }, + Enabled: true, WorkerConcurrency: 2, BlockQueryConcurrency: 2, MaxOutstandingPerTenant: 1024, diff --git a/pkg/bloomgateway/config.go b/pkg/bloomgateway/config.go index 9eaa6771e674..397d8681ae49 100644 --- a/pkg/bloomgateway/config.go +++ b/pkg/bloomgateway/config.go @@ -2,16 +2,10 @@ package bloomgateway import ( "flag" - - "github.com/grafana/loki/v3/pkg/util/ring" ) // Config configures the Bloom Gateway component. type Config struct { - // Ring configures the ring store used to save and retrieve the different Bloom Gateway instances. - // In case it isn't explicitly set, it follows the same behavior of the other rings (ex: using the common configuration - // section and the ingester configuration by default). - Ring ring.RingConfig `yaml:"ring,omitempty" doc:"description=Defines the ring to be used by the bloom gateway servers and clients. In case this isn't configured, this block supports inheriting configuration from the common ring section."` // Enabled is the global switch to configures whether Bloom Gateways should be used to filter chunks. Enabled bool `yaml:"enabled"` // Client configures the Bloom Gateway client @@ -38,13 +32,6 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { // TODO(chaudum): Figure out what the better place is for registering flags // -bloom-gateway.client.* or -bloom-gateway-client.* cfg.Client.RegisterFlags(f) - - // Ring - skipFlags := []string{ - prefix + "ring.tokens", - } - cfg.Ring.RegisterFlagsWithPrefix(prefix, "collectors/", f, skipFlags...) - f.IntVar(&cfg.Ring.NumTokens, prefix+"ring.tokens", 16, "Number of tokens to use in the ring. The bigger the number of tokens, the more fingerprint ranges the compactor will own, but the smaller these ranges will be. Bigger number of tokens means that more but smaller requests will be handled by each gateway.") } type Limits interface { diff --git a/pkg/loki/config_wrapper.go b/pkg/loki/config_wrapper.go index 7353f797bdd1..e74d055e5e40 100644 --- a/pkg/loki/config_wrapper.go +++ b/pkg/loki/config_wrapper.go @@ -182,7 +182,6 @@ func applyInstanceConfigs(r, defaults *ConfigWrapper) { func applyCommonReplicationFactor(r, defaults *ConfigWrapper) { if !reflect.DeepEqual(r.Common.ReplicationFactor, defaults.Common.ReplicationFactor) { r.IndexGateway.Ring.ReplicationFactor = r.Common.ReplicationFactor - r.BloomGateway.Ring.ReplicationFactor = r.Common.ReplicationFactor } } @@ -332,20 +331,6 @@ func applyConfigToRings(r, defaults *ConfigWrapper, rc lokiring.RingConfig, merg r.BloomCompactor.Ring.KVStore = rc.KVStore r.BloomCompactor.Ring.NumTokens = rc.NumTokens } - - // BloomGateway - if mergeWithExisting || reflect.DeepEqual(r.BloomGateway.Ring, defaults.BloomGateway.Ring) { - r.BloomGateway.Ring.HeartbeatTimeout = rc.HeartbeatTimeout - r.BloomGateway.Ring.HeartbeatPeriod = rc.HeartbeatPeriod - r.BloomGateway.Ring.InstancePort = rc.InstancePort - r.BloomGateway.Ring.InstanceAddr = rc.InstanceAddr - r.BloomGateway.Ring.InstanceID = rc.InstanceID - r.BloomGateway.Ring.InstanceInterfaceNames = rc.InstanceInterfaceNames - r.BloomGateway.Ring.InstanceZone = rc.InstanceZone - r.BloomGateway.Ring.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled - r.BloomGateway.Ring.KVStore = rc.KVStore - r.BloomGateway.Ring.NumTokens = rc.NumTokens - } } func applyTokensFilePath(cfg *ConfigWrapper) error { @@ -384,13 +369,6 @@ func applyTokensFilePath(cfg *ConfigWrapper) error { } cfg.BloomCompactor.Ring.TokensFilePath = f - // Bloom-Gateway - f, err = tokensFile(cfg, "bloomgateway.tokens") - if err != nil { - return err - } - cfg.BloomGateway.Ring.TokensFilePath = f - // Pattern f, err = tokensFile(cfg, "pattern.tokens") if err != nil { @@ -487,10 +465,6 @@ func appendLoopbackInterface(cfg, defaults *ConfigWrapper) { if reflect.DeepEqual(cfg.BloomCompactor.Ring.InstanceInterfaceNames, defaults.BloomCompactor.Ring.InstanceInterfaceNames) { cfg.BloomCompactor.Ring.InstanceInterfaceNames = append(cfg.BloomCompactor.Ring.InstanceInterfaceNames, loopbackIface) } - - if reflect.DeepEqual(cfg.BloomGateway.Ring.InstanceInterfaceNames, defaults.BloomGateway.Ring.InstanceInterfaceNames) { - cfg.BloomGateway.Ring.InstanceInterfaceNames = append(cfg.BloomGateway.Ring.InstanceInterfaceNames, loopbackIface) - } } // applyMemberlistConfig will change the default ingester, distributor, ruler, and query scheduler ring configurations to use memberlist. @@ -506,7 +480,6 @@ func applyMemberlistConfig(r *ConfigWrapper) { r.CompactorConfig.CompactorRing.KVStore.Store = memberlistStr r.IndexGateway.Ring.KVStore.Store = memberlistStr r.BloomCompactor.Ring.KVStore.Store = memberlistStr - r.BloomGateway.Ring.KVStore.Store = memberlistStr } var ErrTooManyStorageConfigs = errors.New("too many storage configs provided in the common config, please only define one storage backend") diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index c3ee72a7fc57..5e759b647012 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -681,7 +681,6 @@ func (t *Loki) setupModuleManager() error { mm.RegisterModule(IndexGatewayRing, t.initIndexGatewayRing, modules.UserInvisibleModule) mm.RegisterModule(IndexGatewayInterceptors, t.initIndexGatewayInterceptors, modules.UserInvisibleModule) mm.RegisterModule(BloomGateway, t.initBloomGateway) - mm.RegisterModule(BloomGatewayRing, t.initBloomGatewayRing, modules.UserInvisibleModule) mm.RegisterModule(QueryScheduler, t.initQueryScheduler) mm.RegisterModule(QuerySchedulerRing, t.initQuerySchedulerRing, modules.UserInvisibleModule) mm.RegisterModule(Analytics, t.initAnalytics) @@ -713,14 +712,13 @@ func (t *Loki) setupModuleManager() error { TableManager: {Server, Analytics}, Compactor: {Server, Overrides, MemberlistKV, Analytics}, IndexGateway: {Server, Store, IndexGatewayRing, IndexGatewayInterceptors, Analytics}, - BloomGateway: {Server, BloomStore, BloomGatewayRing, Analytics}, + BloomGateway: {Server, BloomStore, Analytics}, BloomCompactor: {Server, BloomStore, BloomCompactorRing, Analytics, Store}, PatternIngester: {Server, MemberlistKV, Analytics}, PatternRingClient: {Server, MemberlistKV, Analytics}, IngesterQuerier: {Ring}, QuerySchedulerRing: {Overrides, MemberlistKV}, IndexGatewayRing: {Overrides, MemberlistKV}, - BloomGatewayRing: {Overrides, MemberlistKV}, BloomCompactorRing: {Overrides, MemberlistKV}, MemberlistKV: {Server}, @@ -777,11 +775,6 @@ func (t *Loki) setupModuleManager() error { deps[Server] = append(deps[Server], IngesterGRPCInterceptors) } - // Add bloom gateway ring in client mode to IndexGateway service dependencies if bloom filtering is enabled. - if t.Cfg.BloomGateway.Enabled { - deps[IndexGateway] = append(deps[IndexGateway], BloomGatewayRing) - } - if t.Cfg.LegacyReadTarget { deps[Read] = append(deps[Read], deps[Backend]...) } diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 5d64b947437c..0efa6b430bfb 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -114,7 +114,6 @@ const ( MemberlistKV string = "memberlist-kv" Compactor string = "compactor" BloomGateway string = "bloom-gateway" - BloomGatewayRing string = "bloom-gateway-ring" IndexGateway string = "index-gateway" IndexGatewayRing string = "index-gateway-ring" IndexGatewayInterceptors string = "index-gateway-interceptors" @@ -278,7 +277,6 @@ func (t *Loki) initRuntimeConfig() (services.Service, error) { t.Cfg.BloomCompactor.Ring.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig) t.Cfg.Distributor.DistributorRing.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig) t.Cfg.IndexGateway.Ring.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig) - t.Cfg.BloomGateway.Ring.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig) t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig) t.Cfg.QueryScheduler.SchedulerRing.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig) t.Cfg.Ruler.Ring.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig) @@ -1293,7 +1291,6 @@ func (t *Loki) initMemberlistKV() (services.Service, error) { t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV t.Cfg.QueryScheduler.SchedulerRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV t.Cfg.Ruler.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV - t.Cfg.BloomGateway.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV t.Cfg.BloomCompactor.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV t.Cfg.Pattern.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV t.Server.HTTP.Handle("/memberlist", t.MemberlistKV) @@ -1386,35 +1383,6 @@ func (t *Loki) initBloomGateway() (services.Service, error) { return gateway, nil } -func (t *Loki) initBloomGatewayRing() (services.Service, error) { - if !t.Cfg.BloomGateway.Enabled { - return nil, nil - } - // Inherit ring listen port from gRPC config - t.Cfg.BloomGateway.Ring.ListenPort = t.Cfg.Server.GRPCListenPort - - // TODO(chaudum): Do we want to integration the bloom gateway component into the backend target? - mode := lokiring.ClientMode - legacyReadMode := t.Cfg.LegacyReadTarget && t.isModuleActive(Read) - if t.Cfg.isModuleEnabled(BloomGateway) || t.Cfg.isModuleEnabled(Backend) || legacyReadMode { - mode = lokiring.ServerMode - } - manager, err := lokiring.NewRingManager(bloomGatewayRingKey, mode, t.Cfg.BloomGateway.Ring, t.Cfg.BloomGateway.Ring.ReplicationFactor, t.Cfg.BloomGateway.Ring.NumTokens, util_log.Logger, prometheus.DefaultRegisterer) - if err != nil { - return nil, gerrors.Wrap(err, "error initializing bloom gateway ring manager") - } - - t.bloomGatewayRingManager = manager - - t.Server.HTTP.Path("/bloomgateway/ring").Methods("GET", "POST").Handler(t.bloomGatewayRingManager) - - if t.Cfg.InternalServer.Enable { - t.InternalServer.HTTP.Path("/bloomgateway/ring").Methods("GET", "POST").Handler(t.bloomGatewayRingManager) - } - - return t.bloomGatewayRingManager, nil -} - func (t *Loki) initIndexGateway() (services.Service, error) { shardingStrategy := indexgateway.GetShardingStrategy(t.Cfg.IndexGateway, t.indexGatewayRingManager, t.Overrides) diff --git a/pkg/loki/modules_test.go b/pkg/loki/modules_test.go index 90c0b887dd02..eb196896ee15 100644 --- a/pkg/loki/modules_test.go +++ b/pkg/loki/modules_test.go @@ -410,7 +410,6 @@ func minimalWorkingConfig(t *testing.T, dir, target string, cfgTransformers ...f cfg.IndexGateway.Mode = indexgateway.SimpleMode cfg.IndexGateway.Ring.InstanceAddr = localhost cfg.BloomCompactor.Ring.InstanceAddr = localhost - cfg.BloomGateway.Ring.InstanceAddr = localhost cfg.CompactorConfig.CompactorRing.InstanceAddr = localhost cfg.CompactorConfig.WorkingDirectory = filepath.Join(dir, "compactor")