Skip to content

Commit

Permalink
added checkpoint frequency
Browse files Browse the repository at this point in the history
Signed-off-by: Shivam Kumar <[email protected]>
  • Loading branch information
shivamkm07 committed Aug 7, 2023
1 parent 4a38816 commit c57a126
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 26 deletions.
63 changes: 40 additions & 23 deletions internal/component/azure/eventhubs/eventhubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ import (
)

const (
DefaultMaxBulkSubCount = 100
DefaultMaxBulkSubAwaitDurationMs = 10000
DefaultMaxBulkSubCount = 100
DefaultMaxBulkSubAwaitDurationMs = 10000
DefaultCheckpointFrequencyPerPartition = 1
)

// AzureEventHubs allows sending/receiving Azure Event Hubs events.
Expand Down Expand Up @@ -68,6 +69,14 @@ type HandlerResponseItem struct {

type HandlerFn = func(context.Context, []*azeventhubs.ReceivedEventData) ([]HandlerResponseItem, error)

type SubscribeConfig struct {
Topic string
MaxBulkSubCount int
MaxBulkSubAwaitDurationMs int
CheckPointFrequencyPerPartition int
Handler HandlerFn
}

// NewAzureEventHubs returns a new Azure Event hubs instance.
func NewAzureEventHubs(logger logger.Logger, isBinding bool) *AzureEventHubs {
return &AzureEventHubs{
Expand Down Expand Up @@ -222,18 +231,19 @@ func (aeh *AzureEventHubs) GetBulkPubSubHandlerFunc(topic string, getAllProperti
}

// Subscribe receives data from Azure Event Hubs in background.
func (aeh *AzureEventHubs) Subscribe(subscribeCtx context.Context, topic string, maxBulkSubCount int, maxBulkSubAwaitDurationMs int, handler HandlerFn) error {
func (aeh *AzureEventHubs) Subscribe(subscribeCtx context.Context, config SubscribeConfig) error {
if aeh.metadata.ConsumerGroup == "" {
return errors.New("property consumerID is required to subscribe to an Event Hub topic")
}
if maxBulkSubCount < 1 {
if config.MaxBulkSubCount < 1 {
aeh.logger.Warnf("maxBulkSubCount must be greater than 0, setting it to 1")
maxBulkSubCount = 1
config.MaxBulkSubCount = 1
}
if maxBulkSubAwaitDurationMs < 1 {
if config.MaxBulkSubAwaitDurationMs < 1 {
aeh.logger.Warnf("maxBulkSubAwaitDurationMs must be greater than 0, setting it to %d", DefaultMaxBulkSubAwaitDurationMs)
maxBulkSubAwaitDurationMs = DefaultMaxBulkSubAwaitDurationMs
config.MaxBulkSubAwaitDurationMs = DefaultMaxBulkSubAwaitDurationMs
}
topic := config.Topic

// Get the processor client
processor, err := aeh.getProcessorForTopic(subscribeCtx, topic)
Expand Down Expand Up @@ -282,7 +292,7 @@ func (aeh *AzureEventHubs) Subscribe(subscribeCtx context.Context, topic string,
retryErr := retry.NotifyRecover(func() error {
attempts++
aeh.logger.Debugf("Processing EventHubs events for topic %s (attempt: %d)", topic, attempts)
resp, err = handler(ctx, events)
resp, err = config.Handler(ctx, events)
return err
}, b, func(err error, _ time.Duration) {
aeh.logger.Warnf("Error processing EventHubs events for topic %s. Error: %v. Retrying...", topic)
Expand All @@ -294,6 +304,7 @@ func (aeh *AzureEventHubs) Subscribe(subscribeCtx context.Context, topic string,
}
return resp, retryErr
}
config.Handler = retryHandler

// Process all partition clients as they come in
go func() {
Expand All @@ -308,7 +319,7 @@ func (aeh *AzureEventHubs) Subscribe(subscribeCtx context.Context, topic string,

// Once we get a partition client, process the events in a separate goroutine
go func() {
processErr := aeh.processEvents(subscribeCtx, topic, partitionClient, maxBulkSubCount, maxBulkSubAwaitDurationMs, retryHandler)
processErr := aeh.processEvents(subscribeCtx, partitionClient, config)
// Do not log context.Canceled which happens at shutdown
if processErr != nil && !errors.Is(processErr, context.Canceled) {
aeh.logger.Errorf("Error processing events from partition client: %v", processErr)
Expand Down Expand Up @@ -347,7 +358,7 @@ func (aeh *AzureEventHubs) handleAsync(ctx context.Context, topic string, messag
return err
}

func (aeh *AzureEventHubs) processEvents(subscribeCtx context.Context, topic string, partitionClient *azeventhubs.ProcessorPartitionClient, maxBulkSubCount int, maxBulkSubAwaitDurationMs int, handler HandlerFn) error {
func (aeh *AzureEventHubs) processEvents(subscribeCtx context.Context, partitionClient *azeventhubs.ProcessorPartitionClient, config SubscribeConfig) error {
// At the end of the method we need to do some cleanup and close the partition client
defer func() {
closeCtx, closeCancel := context.WithTimeout(context.Background(), resourceGetTimeout)
Expand All @@ -365,11 +376,12 @@ func (aeh *AzureEventHubs) processEvents(subscribeCtx context.Context, topic str
events []*azeventhubs.ReceivedEventData
err error
)
counter := 0
for {
// Maximum duration to wait till bulk message is sent to app is `maxBulkSubAwaitDurationMs`
ctx, cancel = context.WithTimeout(subscribeCtx, time.Duration(maxBulkSubAwaitDurationMs)*time.Millisecond)
ctx, cancel = context.WithTimeout(subscribeCtx, time.Duration(config.MaxBulkSubAwaitDurationMs)*time.Millisecond)
// Receive events with batchsize of `maxBulkSubCount`
events, err = partitionClient.ReceiveEvents(ctx, maxBulkSubCount, nil)
events, err = partitionClient.ReceiveEvents(ctx, config.MaxBulkSubCount, nil)
cancel()

// A DeadlineExceeded error means that the context timed out before we received the full batch of messages, and that's fine
Expand All @@ -378,27 +390,32 @@ func (aeh *AzureEventHubs) processEvents(subscribeCtx context.Context, topic str
// We'll just stop this subscription and return
eventHubError := (*azeventhubs.Error)(nil)
if errors.As(err, &eventHubError) && eventHubError.Code == azeventhubs.ErrorCodeOwnershipLost {
aeh.logger.Debugf("Client lost ownership of partition %s for topic %s", partitionClient.PartitionID(), topic)
aeh.logger.Debugf("Client lost ownership of partition %s for topic %s", partitionClient.PartitionID(), config.Topic)
return nil
}

return fmt.Errorf("error receiving events: %w", err)
}

aeh.logger.Debugf("Received batch with %d events on topic %s, partition %s", len(events), topic, partitionClient.PartitionID())
aeh.logger.Debugf("Received batch with %d events on topic %s, partition %s", len(events), config.Topic, partitionClient.PartitionID())

if len(events) != 0 {
// Handle received message
go aeh.handleAsync(ctx, topic, events, handler)

// Update the checkpoint with the last event received. If we lose ownership of this partition or have to restart the next owner will start from this point.
// This context inherits from the background one in case subscriptionCtx gets canceled
ctx, cancel = context.WithTimeout(context.Background(), resourceCreationTimeout)
err = partitionClient.UpdateCheckpoint(ctx, events[len(events)-1], nil)
cancel()
if err != nil {
return fmt.Errorf("failed to update checkpoint: %w", err)
go aeh.handleAsync(ctx, config.Topic, events, config.Handler)

// Update checkpoint with frequency of `checkpointFrequencyPerPartition` for a given partition
if counter%config.CheckPointFrequencyPerPartition == 0 {
// Update the checkpoint with the last event received. If we lose ownership of this partition or have to restart the next owner will start from this point.
// This context inherits from the background one in case subscriptionCtx gets canceled
ctx, cancel = context.WithTimeout(context.Background(), resourceCreationTimeout)
err = partitionClient.UpdateCheckpoint(ctx, events[len(events)-1], nil)
cancel()
if err != nil {
return fmt.Errorf("failed to update checkpoint: %w", err)
}
}
// Update counter
counter = (counter + 1) % config.CheckPointFrequencyPerPartition
}
}
}
Expand Down
10 changes: 10 additions & 0 deletions internal/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@ func GetIntValOrDefault(val int, defaultValue int) int {
return defaultValue
}

// GetIntValFromString returns an int value if the string is not empty and is a correct integer OR default value.
func GetIntValFromString(val string, defaultValue int) int {
if val != "" {
if i, err := strconv.Atoi(val); err == nil {
return i
}
}
return defaultValue
}

// Unquote parses a request data that may be quoted due to JSON encoding, and removes the quotes.
func Unquote(data []byte) (res string) {
var dataObj any
Expand Down
23 changes: 20 additions & 3 deletions pubsub/azure/eventhubs/eventhubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,20 @@ func (aeh *AzureEventHubs) Subscribe(ctx context.Context, req pubsub.SubscribeRe

// Check if requireAllProperties is set and is truthy
getAllProperties := utils.IsTruthy(req.Metadata["requireAllProperties"])
checkPointFrequencyPerPartition := utils.GetIntValFromString(req.Metadata["checkPointFrequencyPerPartition"], impl.DefaultCheckpointFrequencyPerPartition)

pubsubHandler := aeh.GetPubSubHandlerFunc(topic, getAllProperties, handler)

subscribeConfig := impl.SubscribeConfig{
Topic: topic,
MaxBulkSubCount: 1,
MaxBulkSubAwaitDurationMs: impl.DefaultMaxBulkSubAwaitDurationMs,
CheckPointFrequencyPerPartition: checkPointFrequencyPerPartition,
Handler: pubsubHandler,
}
// Start the subscription
// This is non-blocking
return aeh.AzureEventHubs.Subscribe(ctx, topic, 1, impl.DefaultMaxBulkSubAwaitDurationMs, pubsubHandler)
return aeh.AzureEventHubs.Subscribe(ctx, subscribeConfig)
}

// BulkSubscribe receives bulk messages from Azure Event Hubs.
Expand All @@ -145,15 +154,23 @@ func (aeh *AzureEventHubs) BulkSubscribe(ctx context.Context, req pubsub.Subscri

// Check if requireAllProperties is set and is truthy
getAllProperties := utils.IsTruthy(req.Metadata["requireAllProperties"])

checkPointFrequencyPerPartition := utils.GetIntValFromString(req.Metadata["checkPointFrequencyPerPartition"], impl.DefaultCheckpointFrequencyPerPartition)
maxBulkSubCount := utils.GetIntValOrDefault(req.BulkSubscribeConfig.MaxMessagesCount, impl.DefaultMaxBulkSubCount)
maxBulkSubAwaitDurationMs := utils.GetIntValOrDefault(req.BulkSubscribeConfig.MaxAwaitDurationMs, impl.DefaultMaxBulkSubAwaitDurationMs)

bulkPubsubHandler := aeh.GetBulkPubSubHandlerFunc(topic, getAllProperties, handler)

subscribeConfig := impl.SubscribeConfig{
Topic: topic,
MaxBulkSubCount: maxBulkSubCount,
MaxBulkSubAwaitDurationMs: maxBulkSubAwaitDurationMs,
CheckPointFrequencyPerPartition: checkPointFrequencyPerPartition,
Handler: bulkPubsubHandler,
}

// Start the subscription
// This is non-blocking
return aeh.AzureEventHubs.Subscribe(ctx, topic, maxBulkSubCount, maxBulkSubAwaitDurationMs, bulkPubsubHandler)
return aeh.AzureEventHubs.Subscribe(ctx, subscribeConfig)
}

func (aeh *AzureEventHubs) Close() (err error) {
Expand Down

0 comments on commit c57a126

Please sign in to comment.