Skip to content

Commit

Permalink
Feature: SASL Auth (PLAIN and SCRAM) (#3)
Browse files Browse the repository at this point in the history
* Test and fix issues with SASL plain and scram authentication
* Return error on SASL SCRAM failure
* Reformat JS code
* Fix docstrings/comments in scripts
* Remove minBytes and maxBytes from reader
* Fix linter errors
* Add a slightly better implementation of credentials for SASL
* Update README

---
For testing this feature, I've created a test environment with
SASL PLAIN and SASL SCRAM enabled using Confluents test environments:
<https://github.com/vdesabou/kafka-docker-playground/tree/master/environment/sasl-plain>
<https://github.com/vdesabou/kafka-docker-playground/tree/master/environment/sasl-scram>

```
$ git clone https://github.com/vdesabou/kafka-docker-playground
$ cd kafka-docker-playground/environment/sasl-plain
$ ./start.sh
```

I've compiled xk6-kafka and copied it to the broker container. I also copied the
test_sasl_auth.js into the container. Then I executed a shell inside the container and
run k6 using the test script.

```
$ xk6 build --with github.com/mostafa/xk6-kafka=.
$ docker cp k6 broker:/
$ docker cp test_sasl_auth.js broker:/
$ docker exec -it broker bash
[appuser@broker ~]$ cd /
[appuser@broker ~]$ ./k6 run --vus 50 --duration 60s test_sasl_auth.js

          /\      |‾‾| /‾‾/   /‾‾/
     /\  /  \     |  |/  /   /  /
    /  \/    \    |     (   /   ‾‾\
   /          \   |  |\  \ |  (‾)  |
  / __________ \  |__| \__\ \_____/ .io

  execution: local
     script: test_sasl_auth.js
     output: -

  scenarios: (100.00%) 1 scenario, 50 max VUs, 1m30s max duration (incl. graceful stop):
           * default: 50 looping VUs for 1m0s (gracefulStop: 30s)

running (1m00.4s), 00/50 VUs, 3711 complete and 0 interrupted iterations
default ✓ [======================================] 50 VUs  1m0s

     ✓ is sent
     ✓ 10 messages returned

     █ teardown

     checks.........................: 100.00% ✓ 374811 ✗ 0
     data_received..................: 0 B     0 B/s
     data_sent......................: 0 B     0 B/s
     iteration_duration.............: avg=811.56ms min=7.13ms med=734.71ms max=2.34s p(90)=1.1s p(95)=1.34s
     iterations.....................: 3711    61.420675/s
     kafka.reader.dial.count........: 50      0.827549/s
     kafka.reader.error.count.......: 0       0/s
     kafka.reader.fetches.count.....: 50      0.827549/s
     kafka.reader.message.bytes.....: 7.3 MB  120 kB/s
     kafka.reader.message.count.....: 37160   615.034296/s
     kafka.reader.rebalance.count...: 0       0/s
     kafka.reader.timeouts.count....: 0       0/s
     kafka.writer.dial.count........: 50      0.827549/s
     kafka.writer.error.count.......: 0       0/s
     kafka.writer.message.bytes.....: 146 MB  2.4 MB/s
     kafka.writer.message.count.....: 742200  12284.134941/s
     kafka.writer.rebalance.count...: 250     4.137744/s
     kafka.writer.write.count.......: 742200  12284.134941/s
     vus............................: 50      min=50   max=50
     vus_max........................: 50      min=50   max=50
```
  • Loading branch information
mostafa authored Jun 18, 2021
1 parent 6e72384 commit c35da2d
Show file tree
Hide file tree
Showing 7 changed files with 257 additions and 85 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ $ docker exec -it lensesio bash
(inside container)$ kafka-topics --create --topic xk6_kafka_json_topic --bootstrap-server localhost:9092
```

If you want to test SASL authentication, have a look at (this commmit message)[https://github.com/mostafa/xk6-kafka/pull/3/commits/216ee0cd4f69864cb259445819541ef34fe2f2dd], where I describe how to run a test environment.

### k6 Test

The following k6 test script is used to test this extension and Apache Kafka in turn. The script is available as `test_<format>.js` with more code and commented sections. The scripts have 4 parts:
Expand Down
64 changes: 64 additions & 0 deletions auth.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package kafka

import (
"encoding/json"
"time"

kafkago "github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/plain"
"github.com/segmentio/kafka-go/sasl/scram"
)

const (
Plain = "plain"
SHA256 = "sha256"
SHA512 = "sha512"
)

type Credentials struct {
Username string `json:"username"`
Password string `json:"password"`
Algorithm string `json:"algorithm"`
}

func unmarshalCredentials(auth string) (creds *Credentials, err error) {
creds = &Credentials{
Algorithm: Plain,
}

err = json.Unmarshal([]byte(auth), &creds)

return
}

func getDialer(creds *Credentials) (dialer *kafkago.Dialer) {
dialer = &kafkago.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
}

if creds.Algorithm == Plain {
mechanism := plain.Mechanism{
Username: creds.Username,
Password: creds.Password,
}
dialer.SASLMechanism = mechanism
return
} else {
hashes := make(map[string]scram.Algorithm)
hashes["sha256"] = scram.SHA256
hashes["sha512"] = scram.SHA512

mechanism, err := scram.Mechanism(
hashes[creds.Algorithm],
creds.Username,
creds.Password,
)
if err != nil {
ReportError(err, "authentication failed")
return nil
}
dialer.SASLMechanism = mechanism
return
}
}
25 changes: 17 additions & 8 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import (
"io"
"time"

kafkago "github.com/segmentio/kafka-go"
"go.k6.io/k6/js/modules"
"go.k6.io/k6/lib"
"go.k6.io/k6/stats"
kafkago "github.com/segmentio/kafka-go"
)

func init() {
Expand All @@ -19,22 +19,31 @@ func init() {
type Kafka struct{}

func (*Kafka) Reader(
brokers []string, topic string, partition int,
minBytes int, maxBytes int, offset int64) *kafkago.Reader {
brokers []string, topic string, partition int, offset int64, auth string) *kafkago.Reader {
var dialer *kafkago.Dialer

if auth != "" {
creds, err := unmarshalCredentials(auth)
if err != nil {
ReportError(err, "Unable to unmarshal credentials")
return nil
}

if maxBytes == 0 {
maxBytes = 10e6 // 10MB
dialer = getDialer(creds)
if dialer == nil {
ReportError(nil, "Dialer cannot authenticate")
return nil
}
}

reader := kafkago.NewReader(kafkago.ReaderConfig{
Brokers: brokers,
Topic: topic,
Partition: partition,
MinBytes: minBytes,
MaxBytes: maxBytes,
MaxWait: time.Millisecond * 200,
RebalanceTimeout: time.Second * 5,
QueueCapacity: 1,
Dialer: dialer,
})

if offset > 0 {
Expand Down Expand Up @@ -102,7 +111,7 @@ func (*Kafka) Consume(

func ReportReaderStats(ctx context.Context, currentStats kafkago.ReaderStats) error {
state := lib.GetState(ctx)
err := errors.New("State is nil")
err := errors.New("state is nil")

if state == nil {
ReportError(err, "Cannot determine state")
Expand Down
24 changes: 21 additions & 3 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,38 @@ import (
"go.k6.io/k6/stats"
)

func (*Kafka) Writer(brokers []string, topic string) *kafkago.Writer {
func (*Kafka) Writer(brokers []string, topic string, auth string) *kafkago.Writer {
var dialer *kafkago.Dialer

if auth != "" {
creds, err := unmarshalCredentials(auth)
if err != nil {
ReportError(err, "Unable to unmarshal credentials")
return nil
}

dialer = getDialer(creds)
if dialer == nil {
ReportError(nil, "Dialer cannot authenticate")
return nil
}
}

return kafkago.NewWriter(kafkago.WriterConfig{
Brokers: brokers,
Topic: topic,
Balancer: &kafkago.LeastBytes{},
BatchSize: 1,
Dialer: dialer,
Async: false,
})
}

func (*Kafka) Produce(
ctx context.Context, writer *kafkago.Writer, messages []map[string]string,
keySchema string, valueSchema string) error {
state := lib.GetState(ctx)
err := errors.New("State is nil")
err := errors.New("state is nil")

if state == nil {
ReportError(err, "Cannot determine state")
Expand Down Expand Up @@ -66,7 +84,7 @@ func (*Kafka) Produce(

func ReportWriterStats(ctx context.Context, currentStats kafkago.WriterStats) error {
state := lib.GetState(ctx)
err := errors.New("State is nil")
err := errors.New("state is nil")

if state == nil {
ReportError(err, "Cannot determine state")
Expand Down
98 changes: 49 additions & 49 deletions test_avro.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
This is a k6 test script that imports the xk6-kafka and
tests Kafka with a 100 Avro messages per iteration.
tests Kafka with a 200 Avro messages per iteration.
*/

Expand Down Expand Up @@ -36,60 +36,60 @@ const valueSchema = JSON.stringify({
"name": "Value",
"namespace": "dev.mostafa.xk6.kafka",
"fields": [{
"name": "name",
"type": "string"
},
{
"name": "version",
"type": "string"
},
{
"name": "author",
"type": "string"
},
{
"name": "description",
"type": "string"
},
{
"name": "url",
"type": "string"
},
{
"name": "index",
"type": "int"
}
"name": "name",
"type": "string"
},
{
"name": "version",
"type": "string"
},
{
"name": "author",
"type": "string"
},
{
"name": "description",
"type": "string"
},
{
"name": "url",
"type": "string"
},
{
"name": "index",
"type": "int"
}
]
});

export default function () {
for (let index = 0; index < 100; index++) {
let messages = [{
key: JSON.stringify({
"correlationId": "test-id-abc-" + index
}),
value: JSON.stringify({
"name": "xk6-kafka",
"version": "0.2.1",
"author": "Mostafa Moradian",
"description": "k6 extension to load test Apache Kafka with support for Avro messages",
"url": "https://mostafa.dev",
"index": index
})
},
{
key: JSON.stringify({
"correlationId": "test-id-def-" + index
}),
value: JSON.stringify({
"name": "xk6-kafka",
"version": "0.2.1",
"author": "Mostafa Moradian",
"description": "k6 extension to load test Apache Kafka with support for Avro messages",
"url": "https://mostafa.dev",
"index": index
})
}
key: JSON.stringify({
"correlationId": "test-id-abc-" + index
}),
value: JSON.stringify({
"name": "xk6-kafka",
"version": "0.2.1",
"author": "Mostafa Moradian",
"description": "k6 extension to load test Apache Kafka with support for Avro messages",
"url": "https://mostafa.dev",
"index": index
})
},
{
key: JSON.stringify({
"correlationId": "test-id-def-" + index
}),
value: JSON.stringify({
"name": "xk6-kafka",
"version": "0.2.1",
"author": "Mostafa Moradian",
"description": "k6 extension to load test Apache Kafka with support for Avro messages",
"url": "https://mostafa.dev",
"index": index
})
}
]
let error = produce(producer, messages, keySchema, valueSchema);
check(error, {
Expand Down
50 changes: 25 additions & 25 deletions test_json.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
This is a k6 test script that imports the xk6-kafka and
tests Kafka with a 100 JSON messages per iteration.
tests Kafka with a 200 JSON messages per iteration.
*/

Expand All @@ -13,7 +13,7 @@ import {
produce,
reader,
consume
} from 'k6/x/kafka'; // import kafka plugin
} from 'k6/x/kafka'; // import kafka extension

const bootstrapServers = ["localhost:9092"];
const kafkaTopic = "xk6_kafka_json_topic";
Expand All @@ -24,29 +24,29 @@ const consumer = reader(bootstrapServers, kafkaTopic);
export default function () {
for (let index = 0; index < 100; index++) {
let messages = [{
key: JSON.stringify({
"correlationId": "test-id-abc-" + index
}),
value: JSON.stringify({
"name": "xk6-kafka",
"version": "0.2.1",
"author": "Mostafa Moradian",
"description": "k6 extension to load test Apache Kafka with support for Avro messages",
"index": index
})
},
{
key: JSON.stringify({
"correlationId": "test-id-def-" + index
}),
value: JSON.stringify({
"name": "xk6-kafka",
"version": "0.2.1",
"author": "Mostafa Moradian",
"description": "k6 extension to load test Apache Kafka with support for Avro messages",
"index": index
})
}
key: JSON.stringify({
"correlationId": "test-id-abc-" + index
}),
value: JSON.stringify({
"name": "xk6-kafka",
"version": "0.2.1",
"author": "Mostafa Moradian",
"description": "k6 extension to load test Apache Kafka with support for Avro messages",
"index": index
})
},
{
key: JSON.stringify({
"correlationId": "test-id-def-" + index
}),
value: JSON.stringify({
"name": "xk6-kafka",
"version": "0.2.1",
"author": "Mostafa Moradian",
"description": "k6 extension to load test Apache Kafka with support for Avro messages",
"index": index
})
}
]

let error = produce(producer, messages);
Expand Down
Loading

0 comments on commit c35da2d

Please sign in to comment.