Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add .NET client for dynamic pubsub subscriptions #1346

Closed
wants to merge 46 commits into from

Conversation

WhitWaldo
Copy link
Contributor

@WhitWaldo WhitWaldo commented Sep 4, 2024

Description

Implements streaming subscription support as a discrete Dapr.Messaging package.

This was based on the work done by @philliphoff here. The requirements call for a mechanism to return an unbounded stream of message from the subscription and a follow-up call to the sidecar for each message to convey an action that should be taken on it (e.g. drop, retry or mark as completed).

While I think his use of a delegate to force the developer to process each message and immediately notate the action that should be taken on the message, I instead wanted to expose a method returning an IAsyncEnumerable as I think there's some interesting opportunities for combining this with some Rx goodness.

My approach then also requires that in addition to setting up the initial subscription, the developer call AcknowledgeMessageAsync with the message ID and the action to take, but also bakes in a policy so that a default configurable action is taken after a timeout window in which the developer fails to otherwise indicate success or failure.

Because a separate connection will be created with the sidecar for every combination of pubsub component and topic, the actual implementation is an internal class called PublishSubscribeReceiver that maintains a single streaming connection to the sidecar for each instance in the ConnectionManager. In order to facilitate some future backpressure support and otherwise decouple the receipt of messages from the sidecar from the subscription, I write inbound messages to a Channel<TopicMessage> and read from it from the subscription method implementing IAsyncEnumerable<TopicMessage>. As a side effect of each message read out, it also registers the message identifier with a TaskCompletionSource and a CancellationTokenSource bound to the provided cancellation token so that if the developer fails to acknowledge the message within the configured timeout window, it the default message handler action will be taken automatically.

Update: After hammering out a concept of the above, I ultimately agreed with @philliphoff that the approach isn't as smooth as his original approach, so I've modified it accordingly and detailed it more in this comment.

Finally, I opted to implement the extensions property on the TopicMessage with a Dictionary<string, Value> where the Value is the Protobuf Value struct (meaning that a developer is going to have to query the type of the struct and retrieve the appropriate property with the matching value). While I considered just strongly converting all values to a string, there are some Values types this can represent that would make that cumbersome (e.g. List or other structs). Ultimately, if the developer is opting to use this, they can figure out how to retrieve the Value they intend to.

Additional notes

This should ideally be merged after #1331 as it takes a dependency on a package introduced (Dapr.Common which introduces a generic client builder class and contains shared exceptions) in that PR. I created a stub for it here, but it'd be nice to avoid the conflict later on.

This implementation also creates a separate Dapr.Protos package that each of the existing libraries can take advantage of once this is merged in that ensures that there's only one place that Protos have to be updated at instead of each library having its own copy.

I think there's an opportunity to improve on how we do logging across all these distinct projects using a static logging type. As the internal GrpcClient isn't DI-injected, it eliminates several of the simpler approaches to just use an ILogger<> or ILoggerFactory. I removed the use of the configuration options as a carrier of the logging implementation as it just didn't feel like a great fit there, but I think it's worth exploring a more concrete and centrally-defined approach to more uniform logging going forward for the .NET client. But perhaps in another issue.

Issue reference

We strive to have all PR being opened based on an issue, where the problem or feature have been discussed prior to implementation.

Please reference the issue this PR will close: #1324

Checklist

Please make sure you've completed the relevant tasks for this PR, out of the following list:

@WhitWaldo WhitWaldo mentioned this pull request Sep 10, 2024
38 tasks
…ded flag to ensure double-subscription doesn't happen to cancel stream (with multiple initial requests), updated message draining during disposal and updated unit tests.

Signed-off-by: Whit Waldo <[email protected]>
Copy link

codecov bot commented Sep 25, 2024

Codecov Report

Attention: Patch coverage is 5.58659% with 169 lines in your changes missing coverage. Please review.

Project coverage is 65.91%. Comparing base (1b7c9f4) to head (dd22e83).
Report is 19 commits behind head on master.

Files with missing lines Patch % Lines
...aging/PublishSubscribe/PublishSubscribeReceiver.cs 10.86% 82 Missing ⚠️
src/Dapr.Common/DaprGenericClientBuilder.cs 0.00% 50 Missing ⚠️
...ons/PublishSubscribeServiceCollectionExtensions.cs 0.00% 16 Missing ⚠️
src/Dapr.Common/DaprException.cs 0.00% 6 Missing ⚠️
...PublishSubscribe/DaprPublishSubscribeGrpcClient.cs 0.00% 4 Missing ⚠️
...rc/Dapr.Messaging/PublishSubscribe/TopicMessage.cs 0.00% 4 Missing ⚠️
...lishSubscribe/DaprPublishSubscribeClientBuilder.cs 0.00% 3 Missing ⚠️
...saging/PublishSubscribe/DaprSubscriptionOptions.cs 0.00% 3 Missing ⚠️
...essaging/PublishSubscribe/MessageHandlingPolicy.cs 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #1346      +/-   ##
==========================================
- Coverage   67.28%   65.91%   -1.37%     
==========================================
  Files         174      184      +10     
  Lines        6025     6241     +216     
  Branches      671      698      +27     
==========================================
+ Hits         4054     4114      +60     
- Misses       1802     1957     +155     
- Partials      169      170       +1     
Flag Coverage Δ
net6 65.89% <5.58%> (-1.37%) ⬇️
net7 65.89% <5.58%> (-1.37%) ⬇️
net8 65.90% <5.58%> (-1.37%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

…le try/catch block instead of having more than one

Signed-off-by: Whit Waldo <[email protected]>
…cancellation token throws. Minor perf improvments as spotted. Added/fixed comments.

Signed-off-by: Whit Waldo <[email protected]>
…be constrained by a configurable timespan in how long it waits. Added some try/catch blocks to handle messages being written when the writer has been completed in case a Disposal happens mid-processing.

Signed-off-by: Whit Waldo <[email protected]>
… the GrpcClient creates the PublishSubscribeReceiver, then calls SubscribeAsync internally. Renamed Register to `SubscribeAsync` and updated return type + example

Signed-off-by: Whit Waldo <[email protected]>
…tantiation. Added support to let developer specify a maximum number of messages that can be queued for processing (blocking new Dapr from submitting more to the replica) and tweaked how messages are written in the subscription loop to accommodate this.

Signed-off-by: Whit Waldo <[email protected]>
@WhitWaldo WhitWaldo added this to the v1.15 milestone Oct 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add Streaming Subscription Support
3 participants