Skip to content

Commit

Permalink
[Fixes #6565] Implement Luhn Loki processor (#6574)
Browse files Browse the repository at this point in the history
* implement luhn loki processor

* Apply suggestions from code review

Co-authored-by: Clayton Cornell <[email protected]>

* use paypal generated credit card in docs

* finish implementing luhn filter, remove dead code

* fix lint

---------

Co-authored-by: Clayton Cornell <[email protected]>
Co-authored-by: mattdurham <[email protected]>
  • Loading branch information
3 people authored Mar 25, 2024
1 parent eb56b0f commit 19b05d2
Show file tree
Hide file tree
Showing 6 changed files with 253 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@ v0.40.0 (2024-02-27)

- Python profiling using eBPF is now aggregated now by kernel space. [PR](https://github.com/grafana/pyroscope/pull/2996) (@korniltsev)

- Add Luhn filter to `loki.process` to filter PCI data from log data

### Bugfixes

- Fix an issue in `remote.s3` where the exported content of an object would be an empty string if `remote.s3` failed to fully retrieve
Expand Down
43 changes: 43 additions & 0 deletions docs/sources/flow/reference/components/loki.process.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ The following blocks are supported inside the definition of `loki.process`:
| stage.labels | [stage.labels][] | Configures a `labels` processing stage. | no |
| stage.limit | [stage.limit][] | Configures a `limit` processing stage. | no |
| stage.logfmt | [stage.logfmt][] | Configures a `logfmt` processing stage. | no |
| stage.luhn | [stage.luhn][] | Configures a `luhn` processing stage. | no |
| stage.match | [stage.match][] | Configures a `match` processing stage. | no |
| stage.metrics | [stage.metrics][] | Configures a `metrics` stage. | no |
| stage.multiline | [stage.multiline][] | Configures a `multiline` processing stage. | no |
Expand Down Expand Up @@ -95,6 +96,7 @@ file.
[stage.labels]: #stagelabels-block
[stage.limit]: #stagelimit-block
[stage.logfmt]: #stagelogfmt-block
[stage.luhn]: #stageluhn-block
[stage.match]: #stagematch-block
[stage.metrics]: #stagemetrics-block
[stage.multiline]: #stagemultiline-block
Expand Down Expand Up @@ -566,6 +568,47 @@ set of extracted data, with the value of `user=foo`.
The second stage parses the contents of `extra` and appends the `username: foo`
key-value pair to the set of extracted data.

### stage.luhn block

The `stage.luhn` inner block configures a processing stage that reads incoming
log lines and redacts strings that match a Luhn algorithm.

The [Luhn algorithm][] is a simple checksum formula used to validate various
identification numbers, such as credit card numbers, IMEI numbers, National
Provider Identifier numbers in the US, and Canadian Social Insurance Numbers.
Many Payment Card Industry environments require these numbers to be redacted.

[Luhn algorithm]: https://en.wikipedia.org/wiki/Luhn_algorithm

The following arguments are supported:

| Name | Type | Description | Default | Required |
| ------------- | ------------- | ---------------------------------------------- | ---------------- | -------- |
| `replacement` | `string` | String to substitute the matched patterns with | `"**REDACTED**"` | no |
| `source` | `string` | Source of the data to parse. | `""` | no |
| `minLength` | `int` | Minimum length of digits to consider | `13` | no |


The `source` field defines the source of data to search. When `source` is
missing or empty, the stage parses the log line itself, but it can also be used
to parse a previously extracted value.

The following example log line contains an approved credit card number.

```
time=2012-11-01T22:08:41+00:00 app=loki level=WARN duration=125 message="credit card approved 4032032513548443" extra="user=foo"
stage.luhn {
replacement = "**DELETED**"
}
```

The stage parses the log line, redacts the credit card number, and produces the following updated log line:

```
time=2012-11-01T22:08:41+00:00 app=loki level=INFO duration=125 message="credit card approved **DELETED**" extra="user=foo"
```

### stage.match block

The `stage.match` inner block configures a filtering stage that can conditionally
Expand Down
147 changes: 147 additions & 0 deletions internal/component/loki/process/stages/luhn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package stages

import (
"strconv"
"strings"
"time"
"unicode"

"github.com/prometheus/common/model"
)

// LuhnFilterConfig configures a processing stage that filters out Luhn-valid numbers.
type LuhnFilterConfig struct {
Replacement string `river:"replacement,attr,optional"`
Source *string `river:"source,attr,optional"`
MinLength int `river:"min_length,attr,optional"`
}

// validateLuhnFilterConfig validates the LuhnFilterConfig.
func validateLuhnFilterConfig(c LuhnFilterConfig) error {
if c.Replacement == "" {
c.Replacement = "**REDACTED**"
}
if c.MinLength < 1 {
c.MinLength = 13
}
if c.Source != nil && *c.Source == "" {
return ErrEmptyRegexStageSource
}
return nil
}

// newLuhnFilterStage creates a new LuhnFilterStage.
func newLuhnFilterStage(config LuhnFilterConfig) (Stage, error) {
if err := validateLuhnFilterConfig(config); err != nil {
return nil, err
}
return toStage(&luhnFilterStage{
config: &config,
}), nil
}

// luhnFilterStage applies Luhn algorithm filtering to log entries.
type luhnFilterStage struct {
config *LuhnFilterConfig
}

// Process implements Stage.
func (r *luhnFilterStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) {
input := entry
if r.config.Source != nil {
value, ok := extracted[*r.config.Source]
if !ok {
return
}
strVal, ok := value.(string)
if !ok {
return
}
input = &strVal
}

if input == nil {
return
}

// Replace Luhn-valid numbers in the input.
updatedEntry := replaceLuhnValidNumbers(*input, r.config.Replacement, r.config.MinLength)
*entry = updatedEntry
}

// replaceLuhnValidNumbers scans the input for Luhn-valid numbers and replaces them.

func replaceLuhnValidNumbers(input, replacement string, minLength int) string {
var sb strings.Builder
var currentNumber strings.Builder

flushNumber := func() {
// If the number is at least minLength, check if it's a Luhn-valid number.
if currentNumber.Len() >= minLength {
numberStr := currentNumber.String()
number, err := strconv.Atoi(numberStr)
if err == nil && isLuhn(number) {
// If the number is Luhn-valid, replace it.
sb.WriteString(replacement)
} else {
// If the number is not Luhn-valid, write it as is.
sb.WriteString(numberStr)
}
} else if currentNumber.Len() > 0 {
// If the number is less than minLength but not empty, write it as is.
sb.WriteString(currentNumber.String())
}
// Reset the current number.
currentNumber.Reset()
}

// Iterate over the input, replacing Luhn-valid numbers.
for _, char := range input {
// If the character is a digit, add it to the current number.
if unicode.IsDigit(char) {
currentNumber.WriteRune(char)
} else {
// If the character is not a digit, flush the current number and write the character.
flushNumber()
sb.WriteRune(char)
}
}
flushNumber() // Ensure any trailing number is processed

return sb.String()
}

// isLuhn check number is valid or not based on Luhn algorithm
func isLuhn(number int) bool {
// Luhn algorithm is a simple checksum formula used to validate a
// variety of identification numbers, such as credit card numbers, IMEI
// numbers, National Provider Identifier numbers in the US, and
// Canadian Social Insurance Numbers. This is a simple implementation
// of the Luhn algorithm.
// https://en.wikipedia.org/wiki/Luhn_algorithm
return (number%10+checksum(number/10))%10 == 0
}

func checksum(number int) int {
var luhn int

for i := 0; number > 0; i++ {
cur := number % 10

if i%2 == 0 { // even
cur *= 2
if cur > 9 {
cur = cur%10 + cur/10
}
}

luhn += cur
number /= 10
}
return luhn % 10
}

// Name implements Stage.
func (r *luhnFilterStage) Name() string {
return StageTypeLuhn
}
54 changes: 54 additions & 0 deletions internal/component/loki/process/stages/luhn_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package stages

import (
"testing"
)

// Test cases for the Luhn algorithm validation
func TestIsLuhnValid(t *testing.T) {
cases := []struct {
input int
want bool
}{
{4539_1488_0343_6467, true}, // Valid Luhn number
{1234_5678_1234_5670, true}, // Another valid Luhn number
{499_2739_8112_1717, false}, // Invalid Luhn number
{1234567812345678, false}, // Another invalid Luhn number
{3782_822463_10005, true}, // Short, valid Luhn number
{123, false}, // Short, invalid Luhn number
}

for _, c := range cases {
got := isLuhn(c.input)
if got != c.want {
t.Errorf("isLuhnValid(%q) == %t, want %t", c.input, got, c.want)
}
}
}

// TestReplaceLuhnValidNumbers tests the replaceLuhnValidNumbers function.
func TestReplaceLuhnValidNumbers(t *testing.T) {
cases := []struct {
input string
replacement string
want string
}{
// Test case with a single Luhn-valid number
{"My credit card number is 3530111333300000.", "**REDACTED**", "My credit card number is **REDACTED**."},
// Test case with multiple Luhn-valid numbers
{"Cards 4532015112830366 and 6011111111111117 are valid.", "**REDACTED**", "Cards **REDACTED** and **REDACTED** are valid."},
// Test case with no Luhn-valid numbers
{"No valid numbers here.", "**REDACTED**", "No valid numbers here."},
// Test case with mixed content
{"Valid: 4556737586899855, invalid: 1234.", "**REDACTED**", "Valid: **REDACTED**, invalid: 1234."},
// Test case with edge cases
{"Edge cases: 0, 00, 000, 1.", "**REDACTED**", "Edge cases: 0, 00, 000, 1."},
}

for _, c := range cases {
got := replaceLuhnValidNumbers(c.input, c.replacement, 13)
if got != c.want {
t.Errorf("replaceLuhnValidNumbers(%q, %q) == %q, want %q", c.input, c.replacement, got, c.want)
}
}
}
1 change: 1 addition & 0 deletions internal/component/loki/process/stages/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type StageConfig struct {
LabelsConfig *LabelsConfig `river:"labels,block,optional"`
LimitConfig *LimitConfig `river:"limit,block,optional"`
LogfmtConfig *LogfmtConfig `river:"logfmt,block,optional"`
LuhnFilterConfig *LuhnFilterConfig `river:"luhn,block,optional"`
MatchConfig *MatchConfig `river:"match,block,optional"`
MetricsConfig *MetricsConfig `river:"metrics,block,optional"`
MultilineConfig *MultilineConfig `river:"multiline,block,optional"`
Expand Down
6 changes: 6 additions & 0 deletions internal/component/loki/process/stages/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
StageTypeLabelDrop = "labeldrop"
StageTypeLimit = "limit"
StageTypeLogfmt = "logfmt"
StageTypeLuhn = "luhn"
StageTypeMatch = "match"
StageTypeMetric = "metrics"
StageTypeMultiline = "multiline"
Expand Down Expand Up @@ -136,6 +137,11 @@ func New(logger log.Logger, jobName *string, cfg StageConfig, registerer prometh
if err != nil {
return nil, err
}
case cfg.LuhnFilterConfig != nil:
s, err = newLuhnFilterStage(*cfg.LuhnFilterConfig)
if err != nil {
return nil, err
}
case cfg.MetricsConfig != nil:
s, err = newMetricStage(logger, *cfg.MetricsConfig, registerer)
if err != nil {
Expand Down

0 comments on commit 19b05d2

Please sign in to comment.