Skip to content

Commit

Permalink
Fix logs cluster (#1716)
Browse files Browse the repository at this point in the history
* fix clustering for logs

* Simplify code.

* Add changelog.
  • Loading branch information
mattdurham authored and ptodev committed Oct 4, 2024
1 parent 6c71f5f commit e3b425b
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 2 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ v1.4.2
- Fixed an issue with `loki.process` where configuration could be reloaded even if there
were no changes. (@ptodev, @thampiotr)

- Fix issue where `loki.source.kubernetes` took into account all labels, instead of specific logs labels. Resulting in duplication. (@mattdurham)

v1.4.1
-----------------

Expand Down
14 changes: 14 additions & 0 deletions docs/sources/reference/components/loki/loki.source.kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,20 @@ If {{< param "PRODUCT_NAME" >}} is _not_ running in clustered mode, then the blo
`loki.source.kubernetes` collects logs from every target it receives in its
arguments.

Clustering only looks at the following labels for determining the shard key:
* `__pod_namespace__`
* `__pod_name__`
* `__pod_container_name__`
* `__pod_uid__`
* `__meta_kubernetes_namespace`
* `__meta_kubernetes_pod_name`
* `__meta_kubernetes_pod_container_name`
* `__meta_kubernetes_pod_uid`
* `container`
* `pod`
* `job`
* `namespace`

[using clustering]: ../../../../get-started/clustering/

## Exported fields
Expand Down
12 changes: 12 additions & 0 deletions internal/component/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package discovery

import (
"context"
"slices"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -41,6 +42,17 @@ func (t Target) NonMetaLabels() labels.Labels {
return lset
}

func (t Target) SpecificLabels(lbls []string) labels.Labels {
var lset labels.Labels
for k, v := range t {
if slices.Contains(lbls, k) {
lset = append(lset, labels.Label{Name: k, Value: v})
}
}
sort.Sort(lset)
return lset
}

// Exports holds values which are exported by all discovery components.
type Exports struct {
Targets []Target `alloy:"targets,attr"`
Expand Down
29 changes: 28 additions & 1 deletion internal/component/discovery/distributed_targets.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ type DistributedTargets struct {
// NewDistributedTargets creates the abstraction that allows components to
// dynamically shard targets between components.
func NewDistributedTargets(clusteringEnabled bool, cluster cluster.Cluster, allTargets []Target) *DistributedTargets {
return NewDistributedTargetsWithCustomLabels(clusteringEnabled, cluster, allTargets, nil)
}

// NewDistributedTargetsWithCustomLabels creates the abstraction that allows components to
// dynamically shard targets between components. Passing in labels will limit the sharding to only use those labels for computing the hash key.
// Passing in nil or empty array means look at all labels.
func NewDistributedTargetsWithCustomLabels(clusteringEnabled bool, cluster cluster.Cluster, allTargets []Target, labels []string) *DistributedTargets {
if !clusteringEnabled || cluster == nil {
cluster = disabledCluster{}
}
Expand All @@ -32,8 +39,20 @@ func NewDistributedTargets(clusteringEnabled bool, cluster cluster.Cluster, allT
localTargetKeys := make([]shard.Key, 0, localCap)
remoteTargetKeys := make(map[shard.Key]struct{}, len(allTargets)-localCap)

// Need to handle duplicate entries.
singlular := make(map[shard.Key]struct{})
for _, tgt := range allTargets {
targetKey := keyFor(tgt)
var targetKey shard.Key
// If we have no custom labels check all non-meta labels.
if len(labels) == 0 {
targetKey = keyFor(tgt)
} else {
targetKey = keyForLabels(tgt, labels)
}
if _, ok := singlular[targetKey]; ok {
continue
}
singlular[targetKey] = struct{}{}
peers, err := cluster.Lookup(targetKey, 1, shard.OpReadWrite)
belongsToLocal := err != nil || len(peers) == 0 || peers[0].Self

Expand All @@ -57,6 +76,10 @@ func (dt *DistributedTargets) LocalTargets() []Target {
return dt.localTargets
}

func (dt *DistributedTargets) TargetCount() int {
return len(dt.localTargetKeys) + len(dt.remoteTargetKeys)
}

// MovedToRemoteInstance returns the set of local targets from prev
// that are no longer local in dt, indicating an active target has moved.
// Only targets which exist in both prev and dt are returned. If prev
Expand All @@ -79,6 +102,10 @@ func keyFor(tgt Target) shard.Key {
return shard.Key(tgt.NonMetaLabels().Hash())
}

func keyForLabels(tgt Target, lbls []string) shard.Key {
return shard.Key(tgt.SpecificLabels(lbls).Hash())
}

type disabledCluster struct{}

var _ cluster.Cluster = disabledCluster{}
Expand Down
2 changes: 1 addition & 1 deletion internal/component/loki/source/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (c *Component) Update(args component.Arguments) error {
}

func (c *Component) resyncTargets(targets []discovery.Target) {
distTargets := discovery.NewDistributedTargets(c.args.Clustering.Enabled, c.cluster, targets)
distTargets := discovery.NewDistributedTargetsWithCustomLabels(c.args.Clustering.Enabled, c.cluster, targets, kubetail.ClusteringLabels)
targets = distTargets.LocalTargets()

tailTargets := make([]*kubetail.Target, 0, len(targets))
Expand Down
32 changes: 32 additions & 0 deletions internal/component/loki/source/kubernetes/kubernetes_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package kubernetes

import (
"github.com/grafana/alloy/internal/component/discovery"
"github.com/grafana/alloy/internal/component/loki/source/kubernetes/kubetail"
"github.com/grafana/alloy/internal/service/cluster"
"testing"

"github.com/grafana/alloy/syntax"
Expand Down Expand Up @@ -43,3 +46,32 @@ func TestBadAlloyConfig(t *testing.T) {
err := syntax.Unmarshal([]byte(exampleAlloyConfig), &args)
require.ErrorContains(t, err, "at most one of basic_auth, authorization, oauth2, bearer_token & bearer_token_file must be configured")
}

func TestClusteringDuplicateAddress(t *testing.T) {
// Since loki.source.kubernetes looks up by pod name, if we dont use the special NewDistributedTargetsWithCustomLabels
// then we can pull logs multiple times if the address is reused for the port. This works fine for scraping since those are different
// endpoints, but from a log perspective they are the same logs.
distTargets := discovery.NewDistributedTargetsWithCustomLabels(
true,
cluster.Mock(),
[]discovery.Target{
{
"__address__": "localhost:9090",
"container": "alloy",
"pod": "grafana-k8s-monitoring-alloy-0",
"job": "integrations/alloy",
"namespace": "default",
},
{
"__address__": "localhost:8080",
"container": "alloy",
"pod": "grafana-k8s-monitoring-alloy-0",
"job": "integrations/alloy",
"namespace": "default",
},
},
kubetail.ClusteringLabels,
)
require.True(t, distTargets.TargetCount() == 1)

}
15 changes: 15 additions & 0 deletions internal/component/loki/source/kubernetes/kubetail/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,21 @@ const (
kubePodUID = "__meta_kubernetes_pod_uid"
)

var ClusteringLabels = []string{
LabelPodNamespace,
LabelPodName,
LabelPodContainerName,
LabelPodUID,
kubePodNamespace,
kubePodName,
kubePodContainerName,
kubePodUID,
"container",
"pod",
"job",
"namespace",
}

// Target represents an individual container being tailed for logs.
type Target struct {
origLabels labels.Labels // Original discovery labels
Expand Down

0 comments on commit e3b425b

Please sign in to comment.