-
Notifications
You must be signed in to change notification settings - Fork 335
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Streaming Subscription Support #1324
Comments
/assign |
Some initial thoughts on the shape of a possible implementation:
namespace Dapr.PublishSubscribe;
public sealed record DaprSubscriptionOptions
{
public IReadOnlyDictionary<string, string> Metadata { get; init; } = Dictionary<string, string>();
public string? DeadLetterTopic { get; init; }
}
public sealed record TopicRequest
{
public required string Id { get; init; }
public required string Source { get; init; }
public required string Type { get; init; }
public required string SpecVersion { get; init; }
public required string DataContentType { get; init; }
public required string Topic { get; init; }
public required string PubSubName { get; init; }
public string? Path { get; init; }
public object? Extensions { get; init; } // TODO: Determine what this should look like.
}
public enum TopicResponse
{
Success,
Error,
Drop
}
public delegate Task<TopicResponse> TopicHandler(TopicRequest request);
public abstract class DaprPublishSubscribeClient
{
public DaprPublishSubscribeClient CreateClient();
public Task SubscribeAsync(string pubSubName, string topicName, TopicRequestHandler handler, DaprSubscriptionOptions? options = null, CancellationToken cancellationToken = default);
} |
Looking at the protos, the extension is implemented as a Struct, so that might be most easily implemented as a This might be worth a more substantive discussion elsewhere, but I was tentatively thinking of the following component-based naming scheme:
This leaves the door open for other messaging building blocks down the road like an event bus or distributed event sender/processor under the same messaging subset. |
@WhitWaldo I also prefer that, for well-known values, the APIs expose them in an appropriately typed way.
@WhitWaldo I feel like, from a packaging perspective, this might be too fine-grained. That said, I think it'd be fine to have a single component-category package (e.g. |
@WhitWaldo There's a start of my thinking in this branch: https://github.com/dapr/dotnet-sdk/tree/philliphoff-streaming-subs (I had started thinking about this a bit before you self-assigned the issue; feel free to borrow, build-upon, or completely ignore.) |
@philliphoff I use a mono-repo for most of my own professional stuff and just leave it to the CI/CD to deal with the individual packages, so I'll typically lean towards more specific packages than jamming everything in one package. Pair that with a documentation perspective and I like the idea of saying that here are the docs for this building block, here are the docs to explain this specialty block within with a package purpose-built for this sub-concept. Namespaces are nice within the package itself, but just as I'd prefer to just install I meandered through your approach and think it frankly looks great. I'd build a sample or two off it just to play around with the experience, but generate the client using the shared project from my Jobs proposal (so it's bundling any available api-token) and I think it's frankly pretty much there. Now, the big question would be whether, especially come 1.15, you'd be open to shuffling out all the pubsub bits to this package or if that's worth some sort of lower-level facelift first with a fresh start in the new package (like what I'm slowly wanting to do with state)? |
@WhitWaldo For me it's finding a balance between individual purpose and repo/package maintainability. I'm not sure I'm convinced yet that having separate packages for, say, key/value store clients and cache clients are in that sweet spot of balance. (We always reserve the right to further split packages as necessary, too. Yes it's breaking, but it's a fairly minimal one.) I've updated my branch switching to a I'm running into some curious behavior in testing, though, where the Dapr sidecar seems to ignore success/drop responses and keeps re-delivering messages. From the logs, it appears the responses are getting back to the sidecar so I'm not sure what's happening. I've also done a little testing with raw payloads; that seems to result in none of the normal topic request fields to be populated and the extensions field to contain the actual data; I'm not sure yet how best to expose that. I'm also not sure how useful it is to manage deserialization of the data on the user's behalf vs. just having the user do it (which is fairly trivial and the content type is provided anyway, as well as gets the Dapr SDK out of the serialization options business). |
@philliphoff I get that - I'll see if I can think of more evidence to support one approach or the other. That's less than ideal behavior - happy to dig in and see if I can't help out once I've got Jobs wrapped up. Let me know if you come to a conclusion on the serialization. It's something I'm torn on with the Jobs mapping is whether it should similarly get out of the serialization game. On one hand, it'd be nice to blindly pass types to a generic endpoint and it all Just Work. On the other hand, if it instead accepted only an array of bytes for the job payload and left serialization entirely to the developer, that would simplify the SDK mapping significantly because we wouldn't potentially have a situation of the job being triggered and deserialized into the wrong payload type and throwing - it'd be nice to ensure that any runtime errors are because the developer failed to deserialize and handle their own types instead of it being an inevitable "Dapr" problem. |
@philliphoff I am reading the code written for streaming support and think that an public abstract class DaprPublishSubscribeClient
{
public DaprPublishSubscribeClient CreateClient();
public Task SubscribeAsync(string pubSubName, string topicName, TopicRequestHandler handler, DaprSubscriptionOptions? options = null, CancellationToken cancellationToken = default);
public IAsyncEnumerator<request> SubscribeAsync(string pubSubName, string topicName, DaprSubscriptionOptions? options = null, CancellationToken cancellationToken = default);
} What do you think about? |
@WhitWaldo do you have bandwidth to implement this for 1.15? @JoshVanL can help on anything related to the Dapr runtime as the implementor of streaming subscriptions. Otherwise @philliphoff if you've made progress and can put this in 1.15 that'd be awesome. |
@yaron2 As I recall, @philliphoff was seeing some stability issues for his own implementation which seemed pretty nearly baked, so I'm not sure that there's much I can contribute here outside of some additional stability testing. |
@yaron2 I have a few more thoughts about this. The more I think about it, the more I really like the idea of putting this in Dapr.Messaging and having multiple clients (opens the door for potentially versioning in the clients themselves), so I'd definitely take that from what @philliphoff was working on. I think it makes sense to make an In my Scheduler PR, I introduce a Dapr.Common library to which I intended to centralize exceptions across the .NET clients and introduce a generic client builder and I'd ideally use that here (to avoid yet another implementation of it in the clients). Otherwise, I think I'd make a few changes to what's already been put together here, but could probably have something hammered out pretty quickly. I don't see any reason it can't otherwise make it into 1.15. When's the target date for that release? |
We're aiming for mid November |
I'm afraid that I don't have bandwidth for feature work in the 1.15 timeframe, but if others do then I have no issue making this part of the 1.15 release. (I believe we had already targeted this for a post-1.14 release release.) |
I should be able to tackle this in that timeframe - again, it'd be great if we can get the Schedule PR first through so I can use some of those shared dependencies here and prevent some duplicate classes, but I'll work on having a draft done by the end of next week, if not sooner. |
We discussed the Jobs API PR this morning and expediting the review for it. |
Joining in late here, on review of the proposal and given the ack based nature of the Go API then i concur IAsyncEnumerable is the most modern way to do this in .Net. I the think the return type would be IAsyncEnumerable NOT IAsyncEnumerator. However I don't really see the need for the overload that takes a call back an returns a task? That feels antiquated in .NET now. public abstract class DaprPublishSubscribeClient
{
public DaprPublishSubscribeClient CreateClient();
public IAsyncEnumerable<request> SubscribeAsync(string pubSubName, string topicName, DaprSubscriptionOptions? options = null, CancellationToken cancellationToken = default);
} |
That's right - I mistyped. It should be IAsyncEnumerable. I was planning to expose it as an overload much like the Service Bus SDK does in their own receiver implementation. It opens the door to developers using it in a more modern workflow, but still leaves the API accessible to those users more comfortable with a |
Task<IReadOnlyList> this is a task that completes with a list of (all) the messages - is the plan then to specify how many messages to consume (as a batch) like service bus? In reality the message stream is expected to not end or be very long so this would usually be called in a loop and continue to fetch more. But would it be really be any different to just embracing Linq operators. IAsyncEnumerable<msg> msgs = daprClient.SubscribeAsync(...)
var batch = msgs.Take(20); As i wrote the above i started to ask myself will this be cold or hot AsyncEnumerable. (They are generally cold to align with cold observable behaviour). Then i think what will happen when there is two Enumerators at play on the same AsyncEnumerable? |
Hello, We have used Dapr in previous projects and are now starting a new startup where we plan to continue leveraging Dapr. We are particularly interested in utilizing this feature. Could you provide an estimate on when it will be available? Thank you. |
@mustafaaozcan Aiming for a release alongside 1.15 in November |
Thanks for reply WhitWaldo, Can be it early, it can effect our production date, Thanks for your support |
@alicihad Not likely as all the .NET feature bits tend to be released on the minor releases. That said, the core functionality is already baked into the Dapr sidecar and was released with 1.14 - what's discussed here is just implementing the .NET SDK to integrate with it. So with that said, there's little that stops you from either waiting for me to finish the PR/review and just writing/using your own implementation until the official bits come out in the 1.15 release. I do it all the time with the pieces I've written to validate that they work as they should. |
Dapr v1.14 adds support for streaming subscriptions which allows for dynamically subscribing to pubsub topics over a bi-directional gRPC stream. This API is exposed via the
SubscribeTopicEventsAlpha1
gRPC API.On first call, the client sends an
InitialRequest
containing the pubsub, topic name, etc. From then on, topic event messages are sent from daprd to the client over the stream. The client reports the processing status of this message back to daprd. A message will not be considered processed until the client has responded with this status. There can be multiple in-flight topic messages. Daprd unsubscribes from the topic when the stream is closed.Here is a reference implementation for the go-sdk.
The text was updated successfully, but these errors were encountered: