-
Notifications
You must be signed in to change notification settings - Fork 94
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
In-order routing and late ack #52
Conversation
Signed-off-by: fracasula <[email protected]>
9852812
to
bae62de
Compare
Thanks for this, I'll take a closer look tomorrow. |
@alsm I also wanted to discuss the possibility to let the user acknowledge messages when they want. One possible solution would be to add an func (c *Client) routePublishPackets() {
for pb := range c.publishPackets {
c.Router.Route(pb)
switch pb.QoS {
case 1:
pa := packets.Puback{
Properties: &packets.Properties{},
PacketID: pb.PacketID,
}
c.debug.Println("sending PUBACK")
_, err := pa.WriteTo(c.Conn)
if err != nil {
c.errors.Printf("failed to send PUBACK for %d: %s", pb.PacketID, err)
}
case 2:
pr := packets.Pubrec{
Properties: &packets.Properties{},
PacketID: pb.PacketID,
}
c.debug.Printf("sending PUBREC")
_, err := pr.WriteTo(c.Conn)
if err != nil {
c.errors.Printf("failed to send PUBREC for %d: %s", pb.PacketID, err)
}
}
}
} We could do: func (c *Client) routePublishPackets() {
for pb := range c.publishPackets {
c.Router.Route(pb)
}
}
// Ack is used to send a PUBACK back to the server for QoS 1 and 2 messages
func (c *Client) Ack(pb *packets.Publish) error {
switch pb.QoS {
case 1:
pa := packets.Puback{
Properties: &packets.Properties{},
PacketID: pb.PacketID,
}
c.debug.Println("sending PUBACK")
_, err := pa.WriteTo(c.Conn)
if err != nil {
c.errors.Printf("failed to send PUBACK for %d: %s", pb.PacketID, err)
}
return err
case 2:
pr := packets.Pubrec{
Properties: &packets.Properties{},
PacketID: pb.PacketID,
}
c.debug.Printf("sending PUBREC")
_, err := pr.WriteTo(c.Conn)
if err != nil {
c.errors.Printf("failed to send PUBREC for %d: %s", pb.PacketID, err)
}
return err
}
return nil
} This way users can still create go routines to process messages concurrently but still in order according to their own business logic. For example, let's say that a consumer is subscribed to In the client the messages will come in order thanks to the buffered channel but then the user router can still spawn one go routine per sensor, one for |
I am against allowing applications to have any control over the acking of messages, I let this into the v3 client and I don't think it was really beneficial, there are couple of reasons. Firstly the contract as regards handling messages is between the server and the client, once the client has invoked a handler for a message its responsibility ends, if the handler subsequently crashses, blocks, loses the message that's unfortunate but not really the client's issue. Secondly it's almost certainly going to cause problems with spec compliance and performance.
ordering is single threaded effectively through the client, not separated by topic or anything else. You mention the risk of acknowledging a message that you are not done with, but I'm not sure I see the problem here, message acknowledgments are only between the client and the server, not end to end with the original sender. I can see wanting to co opt the ack mechanism to effectively have the server handle state/persistence for you, I'm not sure if that's the right thing to be doing though. That's a topic that we should probably bring up on the paho-dev mailing list. On your PR I like the way of ordering, although again I'm not sure about blocking the ack until the message handler (or handlers) comes back. This also exposes another problem with Ack() being something applications can call, multiple handlers can match a single publish and as soon as a PUBACK has been sent the message id can be reused, it would be very hard to coordinate potentially multiple handlers calling Ack() for the same message and ensuring that it didn't interfere with another message too. |
Yes I can see how difficult it could become to maintain the order of acknowledgments if we were to expose an Going back to the original proposal though, if we block on the acking part we're not breaking anything, we're still following the specs and we're sending the acks in the order they were received by the client. We are just letting the users decide whether it is the case for them to block or not in their router. Performance wise it's always possible to decrease the One more thing about performance. I think it matters but on a different level and that is a combination of application (not client) design and client/server settings. First of all we have to make sure that we meet the requirements (e.g. avoid message loss) then if performance becomes an issue one should either relax the requirements or use a different topology. Shared-subscriptions and clustering are examples of how a topology can change to meet the performance needs. My problem with the current approach is that the ack/routing behaviour is hard-coded in the client just because it might cause performance issues in some use cases? (with what brokers and what settings btw? we've been doing this in production for months with VMQ and we are satisfied with the performance, we got 26k agents connected simultaneously sending messages). Also, I couldn't find any spec saying that the Also, if you don't like this approach, do you have any other idea about how we can avoid message loss on ungraceful client termination? |
You're of course right that as currently written your PR doesn't make a huge difference to the way the client runs, and I wrote the client such that we could have pluggable router implementations, unlike the v3 client we aren't tied to a single style. I guess the consideration is what should be the default, what is the least surprising behaviour. Clearly out of order reception is not it so I definitely want the ordering you offer. On the delay to responding to a PUBLISH, this is in the spec in section 3.3.4;
This doesn't actually say that they should be sent immediately, but is waiting for a message handler to return a delay? I would say that it is as that interaction between the client library and an application is outside the scope of the spec. Then again most message handlers will return fairly quickly and how would the server know any different? I can see the attraction of doing it this way, it negates the need for any incoming persistence store for the client. I'm leaning towards accepting it as it's a beneficial change I just want to think a little more on it and talk it over with some other people. |
Just wanted to chime in that as active users of the v3 client, we actually rely on the ability to |
@alsm thanks for getting back to me, all this has been very informative for me so far 🙇 In regards to the spec you quote, I'm not sure I understand them fully, let's see if you can help me out a bit:
Am I correct in assuming that if the Client is merely consuming and not publishing anything (thus not sending any Anyway, sorry for the little digression, I just wanted to make sure that I'm getting this right. Thanks again and in the meantime I'll wait for you to get back to us. |
Discussing this with @icraggs I am happy for this to go in, it matches the way the paho C client handles this situation. Looking at the code I wonder if |
It seems not everyone reads this section the same way :) I read it as "The client must not delay the sending of any packets, other than publish packets that want to be sent but cannot due to there being receive maximum publishes already inflight with the server." With Ian's reading it doesn't seem to be applicable to your case at all. |
That's great to hear @alsm!
It makes sense yeah, I pushed some more changes that make the routine exit on
I took the liberty of adding a I also tried to add the wait in the My question is, where do you think it makes more sense to wait for the workers? I think in Note: please have a look at the new test, I also added some stuff to the test server to spy on the |
if !open { | ||
return | ||
} | ||
c.Router.Route(pb) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One potential concern with this is that the processing runs on a single thread. This could be an issue where the handler is storing the message to a remote database (or other high latency process). I cannot see how you can process multiple messages simultaneously (because routePublishPackets
will block until Route
completes). Allowing for this may be a bit complicated due to the spec requirement that acknowledgments be sent "in the order in which the corresponding PUBLISH packets were received". I don't have a solution to this right now (will think about it) but wanted to raise it (would be great to get the interface right on the first attempt!). Great PR by the way!.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey, thanks! If you want you can just avoid blocking inside your router:
c := NewClient(ClientConfig{
Conn: ts.ClientConn(),
Router: NewSingleHandlerRouter(func(p *Publish) {
go doSomething(p)
}),
})
This way at least you have both options, blocking and not-blocking. Plus you can handle that on your side. Of course, if you do that, you lose the benefit of having the message redelivered by the broker in case you weren't able to complete your "side-effect" operation successfully.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I probably need to add a bit of context to this. I'm starting work on adding persistence and, initially anyway, we (discussed with @alsm) plan on implementing outgoing persistence only (on the basis that, with your change, the broker effectively handles inbound persistence - realise there are QOS2 issues but...). On that basis I was keen to have a way of processing incoming publish packets, in parallel, whilst retaining control of the ACK (meaning your suggestion of go doSomething(p)
is not quite sufficient) because this is something I'd need in production in the absence of another form of persistance (high volume of messages going into a database).
The idea I'm currently mulling is to expand the approach you have taken and send ALL packets (both sent and received) via a channel to a router
which can take whatever action it wants before taking action (which may be just sending a packet, calling a handler, sending a response to a received packet etc (need to write this up in more detail but will probably do this as a test implementation because it's probably easier to see what works that way). There would be a generic implementation of this which just sends the standard response (probably with a delay to allow the handler to run in the case of a publish packet) but users would be able to replace the router
thus enabling the implementation of persistence via a wrapper (rather than adding significant complexity to the main paho
package) and whatever other message handling needs users may have (e.g. issue #40 would be covered).
Very keen to hear your thoughts on this (it's probably going to take a few iterations to get right but I figured it was worth mentioning because it may influence this code and I believe you have more experience using it than most!).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I understand your point and I think it's an interesting approach. I was hoping I could get things done in different PRs though. If that doesn't work of course it's OK, just expressing my preference. That's why I proposed the go doSomething(p)
in the router because that way we can still manage to merge this PR and potentially have exactly what we have now on master
(with the difference that we go async in the router rather than in the client) plus the in-order delivery if you avoid blocking. So with approach we would still be quite backwards compatible and manage to merge an improvement.
Then I had another idea for increasing performance and I spent some time last week testing my approach, which I haven't proposed yet (I was planning to do so in another PR).
My use case is a bridge from the MQTT broker to Apache Pulsar. As you can imagine doing one message at a time leads to very poor results, especially considering that Pulsar is distributed and there's consensus involved as well so multiple nodes need to acknowledge before they get back with an ack to the client. On my local machine we're talking ~675 messages per second (quite poor).
So I started experimenting with the idea of batching. If instead of pushing a single message at a time I buffer them and then write them as a batch to Pulsar (similar to a MongoDB InsertMany), then on the same machine I get more than 100000 messages done per second (vs 675, quite an improvement!).
At this point acknowledging in order on the MQTT side would be quite simple because we just buffered, say the first 1000 messages, then stop, write the batch, ack them in order and carry on.
With this in mind I started thinking of a way to give users an API to avoid messing up with the ack order. And then I thought of cumulative acking. Here's the idea:
- the client by default behaves like in this PR
- it has a configurable attribute
EnableCumulativeAcking
- if
EnableCumulativeAcking
istrue
then the client doesn't ack automatically and keeps track ofpacketID
and QoS of every packet it comes across and stores them in order in a slice (memory footprint is minimal, max it could be 65535 times a struct with just those two attributes (uint16
andint
if I remember well) - then we expose a
CumulativeAck()
method that, given apacketID
, it looks for it in the slice and starts acking from the beginning till that packet that has found
With an API like that, once you write your batch (which performs even better then going in parallel in most of the benchmarks I wrote), you just CumulativeAck()
the last one and then the client does the magic for you with what I think it's little complexity.
Of course this might require some thinking when designing your application since things like batching usually can have a target only, for example a single topic on Pulsar or a single collection on MongoDB. Thus the need to make sure on your side that whatever comes out of a topic should go in a single place (e.g. collection/topic) only in order to get the most out of it.
Main advantages:
- keeps the default as is (for backwards compatibility)
- you can still ack a single message with
CumulativeAck()
- given the method name, the requirement of acking in order becomes pretty clear for early users as well
- performs quite nicely
- decreases complexity vs a pure in-parallel approach (given the requirements). say that you process 100 messages in parallel but only one doesn't succeed, you hold back the other 99 for as long as a timeout. there might be retries involved and if you decide to ack it anyway even if you can't apply the side effect you might lose the message. with batching it's all or none, easier to debug and timeouts would add pressure mostly on the broker rather than in the client (and the broker usually has plenty more resources than a client and hopefully clustering which should help redistribute load). if faster processing is needed then shared subscriptions can be used with the advantage of keeping a single client as simple as possible.
Anyway, sorry for the long post 😅
I just wanted to share some thoughts, performance is indeed quite important for me as well.
Next week I'll try to review the issue you linked and I'll put more thought on your approach. If you have some code to share please do, I tend to reason better with implementations. The way you describe it seems legit 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks very much for the detailed explanation! My (slight) concern was that this change does alter the API slightly (moving from go c.Router.Route(pb)
to c.Router.Route(pb)
which would require end users to alter their code if they wish to avoid sequential processing (a small change, and we are pre 1.0 so to be expected, but this does provide the opportunity to make a larger change that could simplify things going forward). I'm currently still on 3.1 because persistence is a requirement for me so I cannot really comment too much and, for the avoidance of doubt, have no objections to this PR going ahead as-is (but perhaps consider the thoughts below).
Fleshing out my concern a bit:
- If users wish to maintain the current functionality (multiple simultaneous processing of inbound messages) they will need to change their code (possibly not a breaking change but could have a major impact in high volume uses).
- It does not provide a way to process multiple messages simultaneously AND control the ACK.
- If you decide you do not want to acknowledge a packet the only option I can see is to terminate the MQTT connection before returning (and that is problematic because
routePublishPackets
will block so you could not wait for all go routines to exit). - A change would be needed if inbound persistence is implemented in the future (there is no way to say "don't send an ACK" at all).
Ref your proposed "API to avoid messing up with the ack order"; I do wonder if this ends up over complicating things within the paho
package (I've made quite a few changes to the v3 client and really like the simplicity of the v5 client!). My feeling is that anyone who is batching messages should be fairly competent and could take care of ACK order themselves (or they may choose to ignore this requirement in the spec; my guess is that most brokers would handle out of order ACK's whatever the spec says!).
I wonder if, rather than passing a Publish
to the handler we passed something like (note: somewhat off the top of my head; I have not implemented this!):
// note: In order to maintain compliance with the MQTT speck `Ack` must be called in the order in which packets are received.
type PublishWithAck {
*Publish
defaultAck *io.WriterTo // Possibly not needed but would make Ack function simpler!
}
// Ack - Acknowledge the Publish packet
// Passing a nil will result in the default acknowledgement being sent. Might be important for the user to know if the ACK could not be sent (this has been done with issue #40 in mind; I'm thinking that longer term it will be possible to request all packets in this way)
func (p *PublishWithAck) Ack(io.WriterTo) err {
// todo... Need to ensure only one ACK is sent (so may need a mutex etc)
}
As an example of how this could be used the following would be needed to maintain compatibility with the current functionality:
Router: NewSingleHandlerRouter(func(p *PublishWithAck) {
go doSomething(p.Publish)
p.Ack(nil)
}),
I believe that this would support your batching use-case (you could add the packets to a slice and once it hits a predetermined threshold send them all to Pulsar and then call the .Ack
functions in order). If we wanted to simplify things further for the user then a configurable attribute DisableAutoPublishAck
could be added (so unless this is called the functionality is pretty much as now except that the router is called sequentially and passed a different type). Note: This is kind-of similar to the 3.1 client (except that does not support turning off the Auto Ack so it's pretty useless there!).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I agree it's not optimal but given the MQTT specs we are given I'm trying to split the work in a reasonable way.
At the moment this client isn't guaranteeing in-order delivery and with this change, if we consider what we're merging against, we fix that and keep the rest relatively the same. As you mention I wouldn't block this PR but all the things you say should be addressed indeed.
The batching idea comes mostly from @alsm concerns expressed here. My first proposition was indeed just an Ack()
method and then let the user handle the different cases.
Also, as you mention, there are MQTT brokers that do not require acknowledgments in order (i.e. VerneMQ).
If you're OK with this I say we continue this discussion in a new dedicated issue where we try to tackle the acknowledgements. I don't mind creating an issue myself, please let me know what you think.
As a sidenote, I'll go as far as to say that we should try to understand why that requirement was defined in MQTT in the first place and see if we can change it in a new version. Without the requirement of acknowledging in order and with in-order delivery guaranteed over shared-subscriptions, the MQTT protocol would be able to serve a lot more use cases more efficiently (for example EMQX does the in-order delivery over shared-subscriptions by routing via a hash of the topic or the client ID, very similar to what Kafka and Pulsar do with key partitioning).
Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Francesco,
Thinking about this further I can implement the changes mentioned as an option with minimal impact to users so no issues there.
Ref why it was done this way see this. Basically I would see the rationale as something along the lines of "A QOS2 message should only be forwarded when PUBCOMP has been sent and, thus, PUBREC and PUBREL must be sent in order". In reality this does not really impact the end recipient if messages are passed on when the PUB arrives (as happens in this client). In response to @alsm's question re this Ian Craggs commented
I do think however there is a question of ordering, that the ack for a message shouldn't be delayed so long that the ack for a subsequently received PUBLISH is sent first, at least not routinely.
so it does not appear that he feels the ordering essential...
Matt
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@MattBrittan As I said before I'm all for giving the users the ability to Ack()
themselves. Even with the API that you're proposing though (i.e. PublishWithAck
), it's easy to mess up with the order. I'm cool with it though because I think the user should be able to handle that. So as long as we document it properly and note that some brokers do not need them in order after all then I'm cool with it (I checked with the EMQX guys and at least for now they don't care about the order as well).
I can also try something if you guys think it is required in this PR after all, but before doing any work I would like to hear from @alsm, which has been silent for a bit now. I think it'd be beneficial if we can all agree on this before doing the changes.
Another option, as I proposed here, is to have a func (c *Client) Ack(pb *packets.Publish) error
which seems to be a fairly common approach amongst clients.
A third option, is to have the router take a 2nd parameter which would be a lambda function to ack that packet:
case packets.PUBLISH:
pb := recv.Content.(*packets.Publish)
ack := func() error {
switch pb.QoS {
case 1:
pa := packets.Puback{
Properties: &packets.Properties{},
PacketID: pb.PacketID,
}
return c.write(ctx, &pa)
case 2:
pr := packets.Pubrec{
Properties: &packets.Properties{},
PacketID: pb.PacketID,
}
return c.write(ctx, &pr)
}
return nil
}
if c.Router != nil {
go c.Router.Route(pb, ack)
} else {
_ = ack()
}
My preference for now would be to have the Ack()
in the client and leave the packets as dumb as possible. Something that I like about the packets, as they are now, is that you can safely pass them by value as well, which is a plus for me. Sometimes I prefer a copy in the stack rather than in the heap which might add pressure on the garbage collector. I haven't done any escape analysis though to verify the impact but it's something that we should keep in mind in case we end up putting stuff inside packets that can't be copied just because, in order for them to expose an .Ack()
method, they end up having pointers to mutexes, client, connections etc...
Thoughts? 🙂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fracasula I'll raise a new issue re this because I think manual handling of ACK
is outside the scope of your original PR and the discussion is likely to get lost if it remains here. As mentioned before I have no issues with this PR going forward but will leave this comment unresolved because, as you say, input from @alsm would be great.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd been away so just catching up on all this...woah 😄
Initial thoughts;
As a receiving client it should make no difference to the server the order in which I ack the incoming messages, for QoS1 it just means the packet id can be reused, for QoS2 most client shortcut and pass the message to the application on PUBREC rather than PUBCOMP. The only thing here would be if a server doesn't consider a message sent to it to be "published" until the client has sent the PUBREL, but that doesn't have to matter here as we're looking at receiving.
As is though this PR addresses the issue of ordered processing of messages so I'm happy for it to proceed as is and we can look at the acking and performance issues in another issue.
@alsm is there anything else that you think needs to be done here? any thoughts on the test coverage? I ran some integration tests as well on my end, the rest was the unit tests you see in this PR. |
No, It's all good was just checking we were all done with it |
In-order routing and late ack
This PR aims to solve two issues:
In order to avoid blocking the
Incoming()
loop (which would lead to unexpected behaviours such as client disconnections etc...) you are routing thePUBLISH
packets asynchronously (i.e. by creating ago
routine:go c.Router.Route(pb)
). Thus ago
routine is created for eachPUBLISH
packet, which changes the order of the messages as received from the server. See example in the Go Playground.The solution I'm proposing is to queue the
PUBLISH
packets in the same order as they are being received in a buffered channel of the size of the "receive maximum" (worst case the buffer size would be65535
).We then create a new worker in
Connect()
that listens to the new buffered channel and calls the router without creating ago
routine. It is up to the user to create a routine in their message handler if they don't want to block. This way the client would give users maximum flexibility when building routers (blocking vs non-blocking APIs).Since the call to the router in this PR is blocking we can now make sure that the
PUBACK
is sent to the server only after the message handler is done with the message. In case of abnormal termination of the client thePUBACK
won't be sent to the server which will redeliver the relatedPUBLISH
packet (according to "clean start" and "session expiry interval" settings).This has been tested with running instances of VerneMQ with these settings:
The consumer is clearly swamped with messages but given the "receive maximum" setting it receives only 65535
PUBLISH
packets which fit perfectly in the buffered channel so theIncoming()
loop is never blocked.Testing
I'm not sure about test coverage here, I'd write an integration test with a real broker running but it doesn't look like you have support for integration tests in this project. Do you think an additional unit test could be useful here? Any pointers?