Skip to content

Commit

Permalink
feat: optimise kafka writing
Browse files Browse the repository at this point in the history
  • Loading branch information
ZakShearman committed Sep 7, 2024
1 parent 6d38c1a commit 9e0dfbc
Showing 1 changed file with 8 additions and 6 deletions.
14 changes: 8 additions & 6 deletions internal/kafka/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"message-handler/internal/config"
"time"
)

const writeTopic = "message-handler"
Expand All @@ -24,12 +25,13 @@ type kafkaNotifier struct {

func NewKafkaNotifier(cfg *config.KafkaConfig, logger *zap.SugaredLogger) Notifier {
w := &kafka.Writer{
Addr: kafka.TCP(fmt.Sprintf("%s:%d", cfg.Host, cfg.Port)),
Topic: writeTopic,
Balancer: &kafka.LeastBytes{},
Async: true,
BatchSize: 1,
ErrorLogger: kafka.LoggerFunc(logger.Errorw),
Addr: kafka.TCP(fmt.Sprintf("%s:%d", cfg.Host, cfg.Port)),
Topic: writeTopic,
Balancer: &kafka.LeastBytes{},
Async: true,
BatchSize: 100,
BatchTimeout: 100 * time.Millisecond,
ErrorLogger: kafka.LoggerFunc(logger.Errorw),
}

return &kafkaNotifier{w: w}
Expand Down

0 comments on commit 9e0dfbc

Please sign in to comment.