Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bulk subscribe support in azure eventhubs #3011

Merged
merged 22 commits into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ var eventHubPubsubName = 'eventhubs-pubsub-topic'
var eventHubPubsubPolicyName = '${eventHubPubsubName}-policy'
var eventHubPubsubConsumerGroupName = '${eventHubPubsubName}-cg'

var eventHubBulkPubsubName = 'eventhubs-pubsub-topic-bulk'
var eventHubBulkPubsubPolicyName = '${eventHubBulkPubsubName}-policy'

var certificationEventHubPubsubTopicActiveName = 'certification-pubsub-topic-active'
var certificationEventHubPubsubTopicActivePolicyName = '${certificationEventHubPubsubTopicActiveName}-policy'

Expand Down Expand Up @@ -96,6 +99,24 @@ resource eventHubsNamespace 'Microsoft.EventHub/namespaces@2017-04-01' = {
name: eventHubPubsubConsumerGroupName
}
}
resource eventHubBulkPubsub 'eventhubs' = {
name: eventHubBulkPubsubName
properties: {
messageRetentionInDays: 1
}
resource eventHubBulkPubsubPolicy 'authorizationRules' = {
name: eventHubBulkPubsubPolicyName
properties: {
rights: [
'Send'
'Listen'
]
}
}
resource eventHubPubsubConsumerGroup 'consumergroups' = {
name: eventHubPubsubConsumerGroupName
}
}
resource certificationEventHubPubsubTopicActive 'eventhubs' = {
name: certificationEventHubPubsubTopicActiveName
properties: {
Expand Down Expand Up @@ -175,6 +196,9 @@ output eventHubPubsubName string = eventHubsNamespace::eventHubPubsub.name
output eventHubPubsubPolicyName string = eventHubsNamespace::eventHubPubsub::eventHubPubsubPolicy.name
output eventHubPubsubConsumerGroupName string = eventHubsNamespace::eventHubPubsub::eventHubPubsubConsumerGroup.name

output eventHubBulkPubsubName string = eventHubsNamespace::eventHubBulkPubsub.name
output eventHubBulkPubsubPolicyName string = eventHubsNamespace::eventHubBulkPubsub::eventHubBulkPubsubPolicy.name

output eventHubsNamespacePolicyName string = eventHubsNamespace::eventHubPubsubNamespacePolicy.name
output certificationEventHubPubsubTopicActiveName string = eventHubsNamespace::certificationEventHubPubsubTopicActive.name
output certificationEventHubPubsubTopicActivePolicyName string = eventHubsNamespace::certificationEventHubPubsubTopicActive::certificationEventHubPubsubTopicActivePolicy.name
Expand Down
17 changes: 10 additions & 7 deletions bindings/azure/eventhubs/eventhubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,16 @@ func (a *AzureEventHubs) Invoke(ctx context.Context, req *bindings.InvokeRequest
func (a *AzureEventHubs) Read(ctx context.Context, handler bindings.Handler) error {
// Start the subscription
// This is non-blocking
return a.AzureEventHubs.Subscribe(ctx, a.AzureEventHubs.EventHubName(), false, func(ctx context.Context, data []byte, metadata map[string]string) error {
res := bindings.ReadResponse{
Data: data,
Metadata: metadata,
}
_, hErr := handler(ctx, &res)
return hErr
topic := a.AzureEventHubs.EventHubName()
bindingsHandler := a.AzureEventHubs.GetBindingsHandlerFunc(topic, false, handler)
// Setting `maxBulkSubCount` to 1 as bindings are not supported for bulk subscriptions
// Setting `CheckPointFrequencyPerPartition` to default value of 1
return a.AzureEventHubs.Subscribe(ctx, impl.SubscribeConfig{
Topic: topic,
MaxBulkSubCount: 1,
MaxBulkSubAwaitDurationMs: impl.DefaultMaxBulkSubAwaitDurationMs,
CheckPointFrequencyPerPartition: impl.DefaultCheckpointFrequencyPerPartition,
Handler: bindingsHandler,
})
}

Expand Down
215 changes: 167 additions & 48 deletions internal/component/azure/eventhubs/eventhubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"context"
"errors"
"fmt"
"strconv"
"sync"
"sync/atomic"
"time"
Expand All @@ -29,12 +28,20 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
"golang.org/x/exp/maps"

"github.com/dapr/components-contrib/bindings"
azauth "github.com/dapr/components-contrib/internal/authentication/azure"
"github.com/dapr/components-contrib/internal/component/azure/blobstorage"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/kit/logger"
"github.com/dapr/kit/retry"
)

const (
DefaultMaxBulkSubCount = 100
DefaultMaxBulkSubAwaitDurationMs = 10000
DefaultCheckpointFrequencyPerPartition = 1
)

// AzureEventHubs allows sending/receiving Azure Event Hubs events.
// This is an abstract class used by both the pubsub and binding components.
type AzureEventHubs struct {
Expand All @@ -54,6 +61,22 @@ type AzureEventHubs struct {
isFailed atomic.Bool
}

// HandlerResponseItem represents a response from the handler for each message.
type HandlerResponseItem struct {
EntryID string
Error error
}

type HandlerFn = func(context.Context, []*azeventhubs.ReceivedEventData) ([]HandlerResponseItem, error)

type SubscribeConfig struct {
Topic string
MaxBulkSubCount int
MaxBulkSubAwaitDurationMs int
CheckPointFrequencyPerPartition int
Handler HandlerFn
}

// NewAzureEventHubs returns a new Azure Event hubs instance.
func NewAzureEventHubs(logger logger.Logger, isBinding bool) *AzureEventHubs {
return &AzureEventHubs{
Expand Down Expand Up @@ -138,11 +161,89 @@ func (aeh *AzureEventHubs) Publish(ctx context.Context, topic string, messages [
return nil
}

// GetBindingsHandlerFunc returns the handler function for bindings messages
func (aeh *AzureEventHubs) GetBindingsHandlerFunc(topic string, getAllProperties bool, handler bindings.Handler) HandlerFn {
return func(ctx context.Context, messages []*azeventhubs.ReceivedEventData) ([]HandlerResponseItem, error) {
if len(messages) != 1 {
ItalyPaleAle marked this conversation as resolved.
Show resolved Hide resolved
return nil, fmt.Errorf("expected 1 message, got %d", len(messages))
}

bindingsMsg, err := NewBindingsReadResponseFromEventData(messages[0], topic, getAllProperties)
if err != nil {
return nil, fmt.Errorf("failed to get bindings read response from azure eventhubs message: %w", err)
}

aeh.logger.Debugf("Calling app's handler for message %s on topic %s", messages[0].SequenceNumber, topic)
_, err = handler(ctx, bindingsMsg)
return nil, err
}
}

// GetPubSubHandlerFunc returns the handler function for pubsub messages
func (aeh *AzureEventHubs) GetPubSubHandlerFunc(topic string, getAllProperties bool, handler pubsub.Handler) HandlerFn {
return func(ctx context.Context, messages []*azeventhubs.ReceivedEventData) ([]HandlerResponseItem, error) {
if len(messages) != 1 {
return nil, fmt.Errorf("expected 1 message, got %d", len(messages))
}

pubsubMsg, err := NewPubsubMessageFromEventData(messages[0], topic, getAllProperties)
if err != nil {
return nil, fmt.Errorf("failed to get pubsub message from azure eventhubs message: %w", err)
}

aeh.logger.Debugf("Calling app's handler for message %s on topic %s", messages[0].SequenceNumber, topic)
return nil, handler(ctx, pubsubMsg)
}
}

// GetPubSubHandlerFunc returns the handler function for bulk pubsub messages.
func (aeh *AzureEventHubs) GetBulkPubSubHandlerFunc(topic string, getAllProperties bool, handler pubsub.BulkHandler) HandlerFn {
return func(ctx context.Context, messages []*azeventhubs.ReceivedEventData) ([]HandlerResponseItem, error) {
pubsubMsgs := make([]pubsub.BulkMessageEntry, len(messages))
for i, msg := range messages {
pubsubMsg, err := NewBulkMessageEntryFromEventData(msg, topic, getAllProperties)
if err != nil {
return nil, fmt.Errorf("failed to get pubsub message from eventhub message: %w", err)
}
pubsubMsgs[i] = pubsubMsg
}

// Note, no metadata is currently supported here.
// In the future, we could add propagate metadata to the handler if required.
bulkMessage := &pubsub.BulkMessage{
ItalyPaleAle marked this conversation as resolved.
Show resolved Hide resolved
Entries: pubsubMsgs,
Topic: topic,
Metadata: map[string]string{},
}

aeh.logger.Debugf("Calling app's handler for %d messages on topic %s", len(messages), topic)
resps, err := handler(ctx, bulkMessage)

handlerResps := make([]HandlerResponseItem, len(resps))
for i, resp := range resps {
handlerResps[i] = HandlerResponseItem{
EntryID: resp.EntryId,
Error: resp.Error,
}
}
return handlerResps, err
}
}

// Subscribe receives data from Azure Event Hubs in background.
func (aeh *AzureEventHubs) Subscribe(subscribeCtx context.Context, topic string, getAllProperties bool, handler SubscribeHandler) (err error) {
func (aeh *AzureEventHubs) Subscribe(subscribeCtx context.Context, config SubscribeConfig) error {
if aeh.metadata.ConsumerGroup == "" {
return errors.New("property consumerID is required to subscribe to an Event Hub topic")
}
if config.MaxBulkSubCount < 1 {
aeh.logger.Warnf("maxBulkSubCount must be greater than 0, setting it to 1")
config.MaxBulkSubCount = 1
}
if config.MaxBulkSubAwaitDurationMs < 1 {
aeh.logger.Warnf("maxBulkSubAwaitDurationMs must be greater than 0, setting it to %d", DefaultMaxBulkSubAwaitDurationMs)
config.MaxBulkSubAwaitDurationMs = DefaultMaxBulkSubAwaitDurationMs
}
topic := config.Topic

// Get the processor client
processor, err := aeh.getProcessorForTopic(subscribeCtx, topic)
Expand Down Expand Up @@ -182,37 +283,31 @@ func (aeh *AzureEventHubs) Subscribe(subscribeCtx context.Context, topic string,
}

// This component has built-in retries because Event Hubs doesn't support N/ACK for messages
retryHandler := func(ctx context.Context, data []byte, metadata map[string]string) error {
b := aeh.backOffConfig.NewBackOffWithContext(subscribeCtx)

mID := metadata[sysPropMessageID]
if mID == "" {
mID = "(nil)"
}
// This method is synchronous so no risk of race conditions if using side effects
var attempts int
retryerr := retry.NotifyRecover(func() error {
attempts++
aeh.logger.Debugf("Processing EventHubs event %s/%s (attempt: %d)", topic, mID, attempts)

if attempts > 1 {
metadata["dapr-attempt"] = strconv.Itoa(attempts)
}

return handler(ctx, data, metadata)
}, b, func(_ error, _ time.Duration) {
aeh.logger.Warnf("Error processing EventHubs event: %s/%s. Retrying...", topic, mID)
retryHandler := func(ctx context.Context, events []*azeventhubs.ReceivedEventData) ([]HandlerResponseItem, error) {
b := aeh.backOffConfig.NewBackOffWithContext(ctx)

var attempts atomic.Int32
resp, retryErr := retry.NotifyRecoverWithData(func() (rResp []HandlerResponseItem, rErr error) {
aeh.logger.Debugf("Processing EventHubs events for topic %s (attempt: %d)", topic, attempts.Add(1))
return config.Handler(ctx, events)
}, b, func(err error, _ time.Duration) {
aeh.logger.Warnf("Error processing EventHubs events for topic %s. Error: %v. Retrying...", topic)
}, func() {
aeh.logger.Warnf("Successfully processed EventHubs event after it previously failed: %s/%s", topic, mID)
aeh.logger.Warnf("Successfully processed EventHubs events after it previously failed for topic %s", topic)
})
if retryerr != nil {
aeh.logger.Errorf("Too many failed attempts at processing Eventhubs event: %s/%s. Error: %v", topic, mID, err)
if retryErr != nil {
aeh.logger.Errorf("Too many failed attempts at processing Eventhubs events for topic %s. Error: %v", topic, retryErr)
}
return retryerr
return resp, retryErr
}

// Get the subscribe handler
eventHandler := subscribeHandler(subscribeCtx, getAllProperties, retryHandler)
retryConfig := SubscribeConfig{
Topic: config.Topic,
MaxBulkSubCount: config.MaxBulkSubCount,
MaxBulkSubAwaitDurationMs: config.MaxBulkSubAwaitDurationMs,
CheckPointFrequencyPerPartition: config.CheckPointFrequencyPerPartition,
Handler: retryHandler,
}

// Process all partition clients as they come in
go func() {
Expand All @@ -227,7 +322,7 @@ func (aeh *AzureEventHubs) Subscribe(subscribeCtx context.Context, topic string,

// Once we get a partition client, process the events in a separate goroutine
go func() {
processErr := aeh.processEvents(subscribeCtx, topic, partitionClient, eventHandler)
processErr := aeh.processEvents(subscribeCtx, partitionClient, retryConfig)
// Do not log context.Canceled which happens at shutdown
if processErr != nil && !errors.Is(processErr, context.Canceled) {
aeh.logger.Errorf("Error processing events from partition client: %v", processErr)
Expand All @@ -249,7 +344,24 @@ func (aeh *AzureEventHubs) Subscribe(subscribeCtx context.Context, topic string,
return nil
}

func (aeh *AzureEventHubs) processEvents(subscribeCtx context.Context, topic string, partitionClient *azeventhubs.ProcessorPartitionClient, eventHandler func(e *azeventhubs.ReceivedEventData) error) error {
// Processes received eventhubs messages asynchronously
func (aeh *AzureEventHubs) handleAsync(ctx context.Context, topic string, messages []*azeventhubs.ReceivedEventData, handler HandlerFn) error {
resp, err := handler(ctx, messages)
if err != nil {
// If we have a response with 0 items (or a nil response), it means the handler was a non-bulk one
if len(resp) == 0 {
aeh.logger.Errorf("Failed to process Eventhubs message %s for topic %s: Error: %v", messages[0].MessageID, topic, err)
}
for _, item := range resp {
if item.Error != nil {
aeh.logger.Errorf("Failed to process Eventhubs bulk message entry. EntryID: %s. Error: %v ", item.EntryID, item.Error)
}
}
}
return err
}

func (aeh *AzureEventHubs) processEvents(subscribeCtx context.Context, partitionClient *azeventhubs.ProcessorPartitionClient, config SubscribeConfig) error {
// At the end of the method we need to do some cleanup and close the partition client
defer func() {
closeCtx, closeCancel := context.WithTimeout(context.Background(), resourceGetTimeout)
Expand All @@ -267,11 +379,12 @@ func (aeh *AzureEventHubs) processEvents(subscribeCtx context.Context, topic str
events []*azeventhubs.ReceivedEventData
err error
)
counter := 0
for {
// TODO: Support setting a batch size
const batchSize = 1
ctx, cancel = context.WithTimeout(subscribeCtx, time.Minute)
events, err = partitionClient.ReceiveEvents(ctx, batchSize, nil)
// Maximum duration to wait till bulk message is sent to app is `maxBulkSubAwaitDurationMs`
ctx, cancel = context.WithTimeout(subscribeCtx, time.Duration(config.MaxBulkSubAwaitDurationMs)*time.Millisecond)
// Receive events with batchsize of `maxBulkSubCount`
events, err = partitionClient.ReceiveEvents(ctx, config.MaxBulkSubCount, nil)
cancel()

// A DeadlineExceeded error means that the context timed out before we received the full batch of messages, and that's fine
Expand All @@ -280,28 +393,34 @@ func (aeh *AzureEventHubs) processEvents(subscribeCtx context.Context, topic str
// We'll just stop this subscription and return
eventHubError := (*azeventhubs.Error)(nil)
if errors.As(err, &eventHubError) && eventHubError.Code == azeventhubs.ErrorCodeOwnershipLost {
aeh.logger.Debugf("Client lost ownership of partition %s for topic %s", partitionClient.PartitionID(), topic)
aeh.logger.Debugf("Client lost ownership of partition %s for topic %s", partitionClient.PartitionID(), config.Topic)
return nil
}

return fmt.Errorf("error receiving events: %w", err)
}

aeh.logger.Debugf("Received batch with %d events on topic %s, partition %s", len(events), topic, partitionClient.PartitionID())
aeh.logger.Debugf("Received batch with %d events on topic %s, partition %s", len(events), config.Topic, partitionClient.PartitionID())

if len(events) != 0 {
for _, event := range events {
// Process the event in its own goroutine
go eventHandler(event)
}

// Update the checkpoint with the last event received. If we lose ownership of this partition or have to restart the next owner will start from this point.
// This context inherits from the background one in case subscriptionCtx gets canceled
ctx, cancel = context.WithTimeout(context.Background(), resourceCreationTimeout)
err = partitionClient.UpdateCheckpoint(ctx, events[len(events)-1], nil)
cancel()
if err != nil {
return fmt.Errorf("failed to update checkpoint: %w", err)
// Handle received message
go aeh.handleAsync(subscribeCtx, config.Topic, events, config.Handler)

// Checkpointing disabled for CheckPointFrequencyPerPartition == 0
if config.CheckPointFrequencyPerPartition > 0 {
// Update checkpoint with frequency of `checkpointFrequencyPerPartition` for a given partition
if counter%config.CheckPointFrequencyPerPartition == 0 {
// Update the checkpoint with the last event received. If we lose ownership of this partition or have to restart the next owner will start from this point.
// This context inherits from the background one in case subscriptionCtx gets canceled
ctx, cancel = context.WithTimeout(context.Background(), resourceCreationTimeout)
err = partitionClient.UpdateCheckpoint(ctx, events[len(events)-1], nil)
cancel()
if err != nil {
return fmt.Errorf("failed to update checkpoint: %w", err)
}
}
// Update counter
counter = (counter + 1) % config.CheckPointFrequencyPerPartition
}
}
}
Expand Down
Loading
Loading