Skip to content

Commit

Permalink
publisher: refactor to a generic sink interface
Browse files Browse the repository at this point in the history
  • Loading branch information
turtleDev committed Jun 8, 2024
1 parent 8fe2652 commit 8a3827f
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 70 deletions.
32 changes: 15 additions & 17 deletions publisher/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,43 +13,41 @@ import (
pb "github.com/raystack/raccoon/proto"
)

// KafkaProducer Produce data to kafka synchronously
type KafkaProducer interface {
// ProduceBulk message to kafka. Block until all messages are sent. Return array of error. Order is not guaranteed.
ProduceBulk(events []*pb.Event, connGroup string, deliveryChannel chan kafka.Event) error
}

func NewKafka() (*Kafka, error) {
kp, err := newKafkaClient(config.PublisherKafka.ToKafkaConfigMap())
if err != nil {
return &Kafka{}, err
}
return &Kafka{
kp: kp,
flushInterval: config.PublisherKafka.FlushInterval,
topicFormat: config.EventDistribution.PublisherPattern,
kp: kp,
flushInterval: config.PublisherKafka.FlushInterval,
topicFormat: config.EventDistribution.PublisherPattern,
deliveryChannelSize: config.Worker.DeliveryChannelSize,
}, nil
}

func NewKafkaFromClient(client Client, flushInterval int, topicFormat string) *Kafka {
func NewKafkaFromClient(client Client, flushInterval int, topicFormat string, deliveryChannelSize int) *Kafka {
return &Kafka{
kp: client,
flushInterval: flushInterval,
topicFormat: topicFormat,
kp: client,
flushInterval: flushInterval,
topicFormat: topicFormat,
deliveryChannelSize: deliveryChannelSize,
}
}

type Kafka struct {
kp Client
flushInterval int
topicFormat string
kp Client
flushInterval int
topicFormat string
deliveryChannelSize int
}

// ProduceBulk messages to kafka. Block until all messages are sent. Return array of error. Order of Errors is guaranteed.
// DeliveryChannel needs to be exclusive. DeliveryChannel is exposed for recyclability purpose.
func (pr *Kafka) ProduceBulk(events []*pb.Event, connGroup string, deliveryChannel chan kafka.Event) error {
func (pr *Kafka) ProduceBulk(events []*pb.Event, connGroup string) error {
errors := make([]error, len(events))
totalProcessed := 0
deliveryChannel := make(chan kafka.Event, pr.deliveryChannelSize)
for order, event := range events {
topic := fmt.Sprintf(pr.topicFormat, event.Type)
message := &kafka.Message{
Expand Down
18 changes: 9 additions & 9 deletions publisher/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestProducer_Close(suite *testing.T) {
client := &mockClient{}
client.On("Flush", 10).Return(0)
client.On("Close").Return()
kp := NewKafkaFromClient(client, 10, "%s")
kp := NewKafkaFromClient(client, 10, "%s", 1)
kp.Close()
client.AssertExpectations(t)
})
Expand All @@ -55,9 +55,9 @@ func TestKafka_ProduceBulk(suite *testing.T) {
}
}()
})
kp := NewKafkaFromClient(client, 10, "%s")
kp := NewKafkaFromClient(client, 10, "%s", 1)

err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}}, group1, make(chan kafka.Event, 2))
err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}}, group1)
assert.NoError(t, err)
})
})
Expand All @@ -79,9 +79,9 @@ func TestKafka_ProduceBulk(suite *testing.T) {
}()
}).Once()
client.On("Produce", mock.Anything, mock.Anything).Return(fmt.Errorf("buffer full")).Once()
kp := NewKafkaFromClient(client, 10, "%s")
kp := NewKafkaFromClient(client, 10, "%s", 1)

err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}}, group1, make(chan kafka.Event, 2))
err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}}, group1)
assert.Len(t, err.(BulkError).Errors, 3)
assert.Error(t, err.(BulkError).Errors[0])
assert.Empty(t, err.(BulkError).Errors[1])
Expand All @@ -91,9 +91,9 @@ func TestKafka_ProduceBulk(suite *testing.T) {
t.Run("Should return topic name when unknown topic is returned", func(t *testing.T) {
client := &mockClient{}
client.On("Produce", mock.Anything, mock.Anything).Return(fmt.Errorf("Local: Unknown topic")).Once()
kp := NewKafkaFromClient(client, 10, "%s")
kp := NewKafkaFromClient(client, 10, "%s", 1)

err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}}, "group1", make(chan kafka.Event, 2))
err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}}, "group1")
assert.EqualError(t, err.(BulkError).Errors[0], "Local: Unknown topic "+topic)
})
})
Expand All @@ -115,9 +115,9 @@ func TestKafka_ProduceBulk(suite *testing.T) {
}
}()
}).Once()
kp := NewKafkaFromClient(client, 10, "%s")
kp := NewKafkaFromClient(client, 10, "%s", 1)

err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}}, "group1", make(chan kafka.Event, 2))
err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}}, "group1")
assert.NotEmpty(t, err)
assert.Len(t, err.(BulkError).Errors, 2)
assert.Equal(t, "buffer full", err.(BulkError).Errors[0].Error())
Expand Down
5 changes: 2 additions & 3 deletions worker/mocks.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package worker

import (
kafka "github.com/confluentinc/confluent-kafka-go/kafka"
pb "github.com/raystack/raccoon/proto"
mock "github.com/stretchr/testify/mock"
)
Expand All @@ -12,8 +11,8 @@ type mockKafkaPublisher struct {
}

// ProduceBulk provides a mock function with given fields: events, deliveryChannel
func (m *mockKafkaPublisher) ProduceBulk(events []*pb.Event, connGroup string, deliveryChannel chan kafka.Event) error {
mock := m.Called(events, connGroup, deliveryChannel)
func (m *mockKafkaPublisher) ProduceBulk(events []*pb.Event, connGroup string) error {
mock := m.Called(events, connGroup)
return mock.Error(0)
}

Expand Down
103 changes: 64 additions & 39 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,75 +5,100 @@ import (
"sync"
"time"

"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/raystack/raccoon/collection"
"github.com/raystack/raccoon/logger"
"github.com/raystack/raccoon/metrics"
pb "github.com/raystack/raccoon/proto"
"github.com/raystack/raccoon/publisher"
)

// Producer produces data to sink
type Producer interface {
// ProduceBulk message to a sink. Blocks until all messages are sent. Returns slice of error.
ProduceBulk(events []*pb.Event, connGroup string) error
}

// Pool spawn goroutine as much as Size that will listen to EventsChannel. On Close, wait for all data in EventsChannel to be processed.
type Pool struct {
Size int
deliveryChannelSize int
EventsChannel <-chan collection.CollectRequest
kafkaProducer publisher.KafkaProducer
producer Producer
wg sync.WaitGroup
}

// CreateWorkerPool create new Pool struct given size and EventsChannel worker.
func CreateWorkerPool(size int, eventsChannel <-chan collection.CollectRequest, deliveryChannelSize int, kafkaProducer publisher.KafkaProducer) *Pool {
func CreateWorkerPool(size int, eventsChannel <-chan collection.CollectRequest, deliveryChannelSize int, producer Producer) *Pool {
return &Pool{
Size: size,
deliveryChannelSize: deliveryChannelSize,
EventsChannel: eventsChannel,
kafkaProducer: kafkaProducer,
producer: producer,
wg: sync.WaitGroup{},
}
}

// StartWorkers initialize worker pool as much as Pool.Size
func (w *Pool) StartWorkers() {
w.wg.Add(w.Size)
for i := 0; i < w.Size; i++ {
go func(workerName string) {
logger.Info("Running worker: " + workerName)
deliveryChan := make(chan kafka.Event, w.deliveryChannelSize)
for request := range w.EventsChannel {
metrics.Histogram("batch_idle_in_channel_milliseconds", (time.Now().Sub(request.TimePushed)).Milliseconds(), map[string]string{"worker": workerName})
batchReadTime := time.Now()
//@TODO - Should add integration tests to prove that the worker receives the same message that it produced, on the delivery channel it created
func (w *Pool) newWorker(name string) {

err := w.kafkaProducer.ProduceBulk(request.GetEvents(), request.ConnectionIdentifier.Group, deliveryChan)
logger.Info("Running worker: " + name)
for request := range w.EventsChannel {

produceTime := time.Since(batchReadTime)
metrics.Histogram("kafka_producebulk_tt_ms", produceTime.Milliseconds(), map[string]string{})
metrics.Histogram(
"batch_idle_in_channel_milliseconds",
time.Since(request.TimePushed).Milliseconds(),
map[string]string{"worker": name})

if request.AckFunc != nil {
request.AckFunc(err)
}
batchReadTime := time.Now()
//@TODO - Should add integration tests to prove that the worker receives the same message that it produced, on the delivery channel it created

err := w.producer.ProduceBulk(request.GetEvents(), request.ConnectionIdentifier.Group)

// TODO(turtledev): instrument this for individual sinks
// produceTime := time.Since(batchReadTime)
// metrics.Histogram("kafka_producebulk_tt_ms", produceTime.Milliseconds(), map[string]string{})

totalErr := 0
if request.AckFunc != nil {
request.AckFunc(err)
}

totalErr := 0
if err != nil {
// WARN(turtledev): this can panic if returned error is not of
// type publisher.BulkError
for _, err := range err.(publisher.BulkError).Errors {
if err != nil {
for _, err := range err.(publisher.BulkError).Errors {
if err != nil {
logger.Errorf("[worker] Fail to publish message to kafka %v", err)
totalErr++
}
}
}
lenBatch := int64(len(request.GetEvents()))
logger.Debug(fmt.Sprintf("Success sending messages, %v", lenBatch-int64(totalErr)))
if lenBatch > 0 {
eventTimingMs := time.Since(request.GetSentTime().AsTime()).Milliseconds() / lenBatch
metrics.Histogram("event_processing_duration_milliseconds", eventTimingMs, map[string]string{"conn_group": request.ConnectionIdentifier.Group})
now := time.Now()
metrics.Histogram("worker_processing_duration_milliseconds", (now.Sub(batchReadTime).Milliseconds())/lenBatch, map[string]string{"worker": workerName})
metrics.Histogram("server_processing_latency_milliseconds", (now.Sub(request.TimeConsumed)).Milliseconds()/lenBatch, map[string]string{"conn_group": request.ConnectionIdentifier.Group})
logger.Errorf("[worker] Fail to publish message to kafka %v", err)
totalErr++
}
}
w.wg.Done()
}(fmt.Sprintf("worker-%d", i))
}
lenBatch := int64(len(request.GetEvents()))
logger.Debug(fmt.Sprintf("Success sending messages, %v", lenBatch-int64(totalErr)))
if lenBatch > 0 {
eventTimingMs := time.Since(request.GetSentTime().AsTime()).Milliseconds() / lenBatch
metrics.Histogram(
"event_processing_duration_milliseconds",
eventTimingMs,
map[string]string{"conn_group": request.ConnectionIdentifier.Group})
now := time.Now()
metrics.Histogram(
"worker_processing_duration_milliseconds",
(now.Sub(batchReadTime).Milliseconds())/lenBatch,
map[string]string{"worker": name})
metrics.Histogram(
"server_processing_latency_milliseconds",
(now.Sub(request.TimeConsumed)).Milliseconds()/lenBatch,
map[string]string{"conn_group": request.ConnectionIdentifier.Group})
}
}
w.wg.Done()
}

// StartWorkers initialize worker pool as much as Pool.Size
func (w *Pool) StartWorkers() {
w.wg.Add(w.Size)
for i := 0; i < w.Size; i++ {
w.newWorker(fmt.Sprintf("worker-%d", i))
}
}

Expand Down
4 changes: 2 additions & 2 deletions worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestWorker(t *testing.T) {
Size: 1,
deliveryChannelSize: 0,
EventsChannel: bc,
kafkaProducer: &kp,
producer: &kp,
wg: sync.WaitGroup{},
}
worker.StartWorkers()
Expand All @@ -63,7 +63,7 @@ func TestWorker(t *testing.T) {
Size: 1,
deliveryChannelSize: 100,
EventsChannel: bc,
kafkaProducer: &kp,
producer: &kp,
wg: sync.WaitGroup{},
}
worker.StartWorkers()
Expand Down

0 comments on commit 8a3827f

Please sign in to comment.