Skip to content

Commit

Permalink
fix bindings
Browse files Browse the repository at this point in the history
Signed-off-by: Shivam Kumar <[email protected]>
  • Loading branch information
shivamkm07 committed Jul 25, 2023
1 parent d35088b commit 35e7dd1
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 8 deletions.
12 changes: 4 additions & 8 deletions bindings/azure/eventhubs/eventhubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package eventhubs
import (
"context"
"reflect"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"

Expand Down Expand Up @@ -81,14 +82,9 @@ func (a *AzureEventHubs) Invoke(ctx context.Context, req *bindings.InvokeRequest
func (a *AzureEventHubs) Read(ctx context.Context, handler bindings.Handler) error {
// Start the subscription
// This is non-blocking
return a.AzureEventHubs.Subscribe(ctx, a.AzureEventHubs.EventHubName(), false, func(ctx context.Context, data []byte, metadata map[string]string) error {
res := bindings.ReadResponse{
Data: data,
Metadata: metadata,
}
_, hErr := handler(ctx, &res)
return hErr
})
topic := a.AzureEventHubs.EventHubName()
bindingsHandler := a.AzureEventHubs.GetBindingsHandlerFunc(topic, false, handler, 1*time.Minute)
return a.AzureEventHubs.Subscribe(ctx, topic, 1, impl.DefaultMaxBulkSubAwaitDurationMs, bindingsHandler)
}

func (a *AzureEventHubs) Close() error {
Expand Down
52 changes: 52 additions & 0 deletions internal/component/azure/eventhubs/eventhubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
"golang.org/x/exp/maps"

"github.com/dapr/components-contrib/bindings"
azauth "github.com/dapr/components-contrib/internal/authentication/azure"
"github.com/dapr/components-contrib/internal/component/azure/blobstorage"
"github.com/dapr/components-contrib/pubsub"
Expand Down Expand Up @@ -152,6 +153,57 @@ func (aeh *AzureEventHubs) Publish(ctx context.Context, topic string, messages [
return nil
}

// GetPubSubHandlerFunc returns the handler function for pubsub messages
func (aeh *AzureEventHubs) GetBindingsHandlerFunc(topic string, getAllProperties bool, handler bindings.Handler, timeout time.Duration) HandlerFn {
return func(ctx context.Context, messages []*azeventhubs.ReceivedEventData) ([]HandlerResponseItem, error) {
if len(messages) != 1 {
return nil, fmt.Errorf("expected 1 message, got %d", len(messages))
}

bindingsMsg, err := NewBindingsReadResponseFromEventData(messages[0], topic, getAllProperties)
if err != nil {
return nil, fmt.Errorf("failed to get bindings read response from azure eventhubs message: %+v", err)
}

// This component has built-in retries because Event Hubs doesn't support N/ACK for messages
retryHandler := func(ctx context.Context, msg *bindings.ReadResponse) error {
b := aeh.backOffConfig.NewBackOffWithContext(ctx)

mID := msg.Metadata[sysPropMessageID]
if mID == "" {
mID = "(nil)"
}
// This method is synchronous so no risk of race conditions if using side effects
var attempts int
retryerr := retry.NotifyRecover(func() error {
attempts++
aeh.logger.Debugf("Processing EventHubs event %s/%s (attempt: %d)", topic, mID, attempts)

if attempts > 1 {
// Adding number of attempts in `dapr-attempt` key in metadata
msg.Metadata["dapr-attempt"] = strconv.Itoa(attempts)
}

_, rErr := handler(ctx, msg)
return rErr
}, b, func(_ error, _ time.Duration) {
aeh.logger.Warnf("Error processing EventHubs event: %s/%s. Retrying...", topic, mID)
}, func() {
aeh.logger.Warnf("Successfully processed EventHubs event after it previously failed: %s/%s", topic, mID)
})
if retryerr != nil {
aeh.logger.Errorf("Too many failed attempts at processing Eventhubs event: %s/%s. Error: %v", topic, mID, retryerr)
}
return retryerr
}

handlerCtx, handlerCancel := context.WithTimeout(ctx, timeout)
defer handlerCancel()
aeh.logger.Debugf("Calling app's handler for message %s on topic %s", messages[0].SequenceNumber, topic)
return nil, retryHandler(handlerCtx, bindingsMsg)
}
}

// GetPubSubHandlerFunc returns the handler function for pubsub messages
func (aeh *AzureEventHubs) GetPubSubHandlerFunc(topic string, getAllProperties bool, handler pubsub.Handler, timeout time.Duration) HandlerFn {
return func(ctx context.Context, messages []*azeventhubs.ReceivedEventData) ([]HandlerResponseItem, error) {
Expand Down
11 changes: 11 additions & 0 deletions internal/component/azure/eventhubs/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
"github.com/dapr/components-contrib/bindings"

Check failure on line 24 in internal/component/azure/eventhubs/events.go

View workflow job for this annotation

GitHub Actions / Build linux_amd64 binaries

File is not `goimports`-ed with -local github.com/dapr/ (goimports)
"github.com/dapr/components-contrib/pubsub"
"github.com/google/uuid"
"github.com/spf13/cast"
Expand Down Expand Up @@ -85,6 +86,16 @@ func getMetadataFromEventData(e *azeventhubs.ReceivedEventData, getAllProperties
return md
}

// Returns bindings read response message from azure eventhub message
func NewBindingsReadResponseFromEventData(e *azeventhubs.ReceivedEventData, topic string, getAllProperties bool) (*bindings.ReadResponse, error) {
meta := getMetadataFromEventData(e, getAllProperties)
msg := &bindings.ReadResponse{
Data: e.Body,
Metadata: meta,
}
return msg, nil
}

// Returns a new pubsub message from azure eventhub message
func NewPubsubMessageFromEventData(e *azeventhubs.ReceivedEventData, topic string, getAllProperties bool) (*pubsub.NewMessage, error) {
meta := getMetadataFromEventData(e, getAllProperties)
Expand Down

0 comments on commit 35e7dd1

Please sign in to comment.