Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
In-order routing and late ack #52
In-order routing and late ack #52
Changes from all commits
bae62de
615984e
69b6322
6421dca
1ce736a
059a889
7bbf6d3
File filter
Filter by extension
Conversations
Jump to
There are no files selected for viewing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One potential concern with this is that the processing runs on a single thread. This could be an issue where the handler is storing the message to a remote database (or other high latency process). I cannot see how you can process multiple messages simultaneously (because
routePublishPackets
will block untilRoute
completes). Allowing for this may be a bit complicated due to the spec requirement that acknowledgments be sent "in the order in which the corresponding PUBLISH packets were received". I don't have a solution to this right now (will think about it) but wanted to raise it (would be great to get the interface right on the first attempt!). Great PR by the way!.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey, thanks! If you want you can just avoid blocking inside your router:
This way at least you have both options, blocking and not-blocking. Plus you can handle that on your side. Of course, if you do that, you lose the benefit of having the message redelivered by the broker in case you weren't able to complete your "side-effect" operation successfully.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I probably need to add a bit of context to this. I'm starting work on adding persistence and, initially anyway, we (discussed with @alsm) plan on implementing outgoing persistence only (on the basis that, with your change, the broker effectively handles inbound persistence - realise there are QOS2 issues but...). On that basis I was keen to have a way of processing incoming publish packets, in parallel, whilst retaining control of the ACK (meaning your suggestion of
go doSomething(p)
is not quite sufficient) because this is something I'd need in production in the absence of another form of persistance (high volume of messages going into a database).The idea I'm currently mulling is to expand the approach you have taken and send ALL packets (both sent and received) via a channel to a
router
which can take whatever action it wants before taking action (which may be just sending a packet, calling a handler, sending a response to a received packet etc (need to write this up in more detail but will probably do this as a test implementation because it's probably easier to see what works that way). There would be a generic implementation of this which just sends the standard response (probably with a delay to allow the handler to run in the case of a publish packet) but users would be able to replace therouter
thus enabling the implementation of persistence via a wrapper (rather than adding significant complexity to the mainpaho
package) and whatever other message handling needs users may have (e.g. issue #40 would be covered).Very keen to hear your thoughts on this (it's probably going to take a few iterations to get right but I figured it was worth mentioning because it may influence this code and I believe you have more experience using it than most!).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I understand your point and I think it's an interesting approach. I was hoping I could get things done in different PRs though. If that doesn't work of course it's OK, just expressing my preference. That's why I proposed the
go doSomething(p)
in the router because that way we can still manage to merge this PR and potentially have exactly what we have now onmaster
(with the difference that we go async in the router rather than in the client) plus the in-order delivery if you avoid blocking. So with approach we would still be quite backwards compatible and manage to merge an improvement.Then I had another idea for increasing performance and I spent some time last week testing my approach, which I haven't proposed yet (I was planning to do so in another PR).
My use case is a bridge from the MQTT broker to Apache Pulsar. As you can imagine doing one message at a time leads to very poor results, especially considering that Pulsar is distributed and there's consensus involved as well so multiple nodes need to acknowledge before they get back with an ack to the client. On my local machine we're talking ~675 messages per second (quite poor).
So I started experimenting with the idea of batching. If instead of pushing a single message at a time I buffer them and then write them as a batch to Pulsar (similar to a MongoDB InsertMany), then on the same machine I get more than 100000 messages done per second (vs 675, quite an improvement!).
At this point acknowledging in order on the MQTT side would be quite simple because we just buffered, say the first 1000 messages, then stop, write the batch, ack them in order and carry on.
With this in mind I started thinking of a way to give users an API to avoid messing up with the ack order. And then I thought of cumulative acking. Here's the idea:
EnableCumulativeAcking
EnableCumulativeAcking
istrue
then the client doesn't ack automatically and keeps track ofpacketID
and QoS of every packet it comes across and stores them in order in a slice (memory footprint is minimal, max it could be 65535 times a struct with just those two attributes (uint16
andint
if I remember well)CumulativeAck()
method that, given apacketID
, it looks for it in the slice and starts acking from the beginning till that packet that has foundWith an API like that, once you write your batch (which performs even better then going in parallel in most of the benchmarks I wrote), you just
CumulativeAck()
the last one and then the client does the magic for you with what I think it's little complexity.Of course this might require some thinking when designing your application since things like batching usually can have a target only, for example a single topic on Pulsar or a single collection on MongoDB. Thus the need to make sure on your side that whatever comes out of a topic should go in a single place (e.g. collection/topic) only in order to get the most out of it.
Main advantages:
CumulativeAck()
Anyway, sorry for the long post 😅
I just wanted to share some thoughts, performance is indeed quite important for me as well.
Next week I'll try to review the issue you linked and I'll put more thought on your approach. If you have some code to share please do, I tend to reason better with implementations. The way you describe it seems legit 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks very much for the detailed explanation! My (slight) concern was that this change does alter the API slightly (moving from
go c.Router.Route(pb)
toc.Router.Route(pb)
which would require end users to alter their code if they wish to avoid sequential processing (a small change, and we are pre 1.0 so to be expected, but this does provide the opportunity to make a larger change that could simplify things going forward). I'm currently still on 3.1 because persistence is a requirement for me so I cannot really comment too much and, for the avoidance of doubt, have no objections to this PR going ahead as-is (but perhaps consider the thoughts below).Fleshing out my concern a bit:
routePublishPackets
will block so you could not wait for all go routines to exit).Ref your proposed "API to avoid messing up with the ack order"; I do wonder if this ends up over complicating things within the
paho
package (I've made quite a few changes to the v3 client and really like the simplicity of the v5 client!). My feeling is that anyone who is batching messages should be fairly competent and could take care of ACK order themselves (or they may choose to ignore this requirement in the spec; my guess is that most brokers would handle out of order ACK's whatever the spec says!).I wonder if, rather than passing a
Publish
to the handler we passed something like (note: somewhat off the top of my head; I have not implemented this!):As an example of how this could be used the following would be needed to maintain compatibility with the current functionality:
I believe that this would support your batching use-case (you could add the packets to a slice and once it hits a predetermined threshold send them all to Pulsar and then call the
.Ack
functions in order). If we wanted to simplify things further for the user then a configurable attributeDisableAutoPublishAck
could be added (so unless this is called the functionality is pretty much as now except that the router is called sequentially and passed a different type). Note: This is kind-of similar to the 3.1 client (except that does not support turning off the Auto Ack so it's pretty useless there!).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I agree it's not optimal but given the MQTT specs we are given I'm trying to split the work in a reasonable way.
At the moment this client isn't guaranteeing in-order delivery and with this change, if we consider what we're merging against, we fix that and keep the rest relatively the same. As you mention I wouldn't block this PR but all the things you say should be addressed indeed.
The batching idea comes mostly from @alsm concerns expressed here. My first proposition was indeed just an
Ack()
method and then let the user handle the different cases.Also, as you mention, there are MQTT brokers that do not require acknowledgments in order (i.e. VerneMQ).
If you're OK with this I say we continue this discussion in a new dedicated issue where we try to tackle the acknowledgements. I don't mind creating an issue myself, please let me know what you think.
As a sidenote, I'll go as far as to say that we should try to understand why that requirement was defined in MQTT in the first place and see if we can change it in a new version. Without the requirement of acknowledging in order and with in-order delivery guaranteed over shared-subscriptions, the MQTT protocol would be able to serve a lot more use cases more efficiently (for example EMQX does the in-order delivery over shared-subscriptions by routing via a hash of the topic or the client ID, very similar to what Kafka and Pulsar do with key partitioning).
Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Francesco,
Thinking about this further I can implement the changes mentioned as an option with minimal impact to users so no issues there.
Ref why it was done this way see this. Basically I would see the rationale as something along the lines of "A QOS2 message should only be forwarded when PUBCOMP has been sent and, thus, PUBREC and PUBREL must be sent in order". In reality this does not really impact the end recipient if messages are passed on when the PUB arrives (as happens in this client). In response to @alsm's question re this Ian Craggs commented
so it does not appear that he feels the ordering essential...
Matt
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@MattBrittan As I said before I'm all for giving the users the ability to
Ack()
themselves. Even with the API that you're proposing though (i.e.PublishWithAck
), it's easy to mess up with the order. I'm cool with it though because I think the user should be able to handle that. So as long as we document it properly and note that some brokers do not need them in order after all then I'm cool with it (I checked with the EMQX guys and at least for now they don't care about the order as well).I can also try something if you guys think it is required in this PR after all, but before doing any work I would like to hear from @alsm, which has been silent for a bit now. I think it'd be beneficial if we can all agree on this before doing the changes.
Another option, as I proposed here, is to have a
func (c *Client) Ack(pb *packets.Publish) error
which seems to be a fairly common approach amongst clients.A third option, is to have the router take a 2nd parameter which would be a lambda function to ack that packet:
My preference for now would be to have the
Ack()
in the client and leave the packets as dumb as possible. Something that I like about the packets, as they are now, is that you can safely pass them by value as well, which is a plus for me. Sometimes I prefer a copy in the stack rather than in the heap which might add pressure on the garbage collector. I haven't done any escape analysis though to verify the impact but it's something that we should keep in mind in case we end up putting stuff inside packets that can't be copied just because, in order for them to expose an.Ack()
method, they end up having pointers to mutexes, client, connections etc...Thoughts? 🙂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fracasula I'll raise a new issue re this because I think manual handling of
ACK
is outside the scope of your original PR and the discussion is likely to get lost if it remains here. As mentioned before I have no issues with this PR going forward but will leave this comment unresolved because, as you say, input from @alsm would be great.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd been away so just catching up on all this...woah 😄
Initial thoughts;
As a receiving client it should make no difference to the server the order in which I ack the incoming messages, for QoS1 it just means the packet id can be reused, for QoS2 most client shortcut and pass the message to the application on PUBREC rather than PUBCOMP. The only thing here would be if a server doesn't consider a message sent to it to be "published" until the client has sent the PUBREL, but that doesn't have to matter here as we're looking at receiving.
As is though this PR addresses the issue of ordered processing of messages so I'm happy for it to proceed as is and we can look at the acking and performance issues in another issue.