Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

In-order routing and late ack #52

Merged
merged 7 commits into from
Apr 29, 2021
Merged

Conversation

fracasula
Copy link
Contributor

@fracasula fracasula commented Mar 25, 2021

This PR aims to solve two issues:

  1. Message ordering: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901240
  2. Message acknowledgment only after the message processing has completed to ensure re-delivery of "unacked" messages from the server in case of abnormal client termination

In order to avoid blocking the Incoming() loop (which would lead to unexpected behaviours such as client disconnections etc...) you are routing the PUBLISH packets asynchronously (i.e. by creating a go routine: go c.Router.Route(pb)). Thus a go routine is created for each PUBLISH packet, which changes the order of the messages as received from the server. See example in the Go Playground.

The solution I'm proposing is to queue the PUBLISH packets in the same order as they are being received in a buffered channel of the size of the "receive maximum" (worst case the buffer size would be 65535).

We then create a new worker in Connect() that listens to the new buffered channel and calls the router without creating a go routine. It is up to the user to create a routine in their message handler if they don't want to block. This way the client would give users maximum flexibility when building routers (blocking vs non-blocking APIs).

Since the call to the router in this PR is blocking we can now make sure that the PUBACK is sent to the server only after the message handler is done with the message. In case of abnormal termination of the client the PUBACK won't be sent to the server which will redeliver the related PUBLISH packet (according to "clean start" and "session expiry interval" settings).

This has been tested with running instances of VerneMQ with these settings:

  • 10 producers publishing 100k messages each (concurrently)
  • a single consumer with receive maximum 65535 and session expiry interval 0xffffffff

The consumer is clearly swamped with messages but given the "receive maximum" setting it receives only 65535 PUBLISH packets which fit perfectly in the buffered channel so the Incoming() loop is never blocked.

Testing

I'm not sure about test coverage here, I'd write an integration test with a real broker running but it doesn't look like you have support for integration tests in this project. Do you think an additional unit test could be useful here? Any pointers?

Signed-off-by: fracasula <[email protected]>
@alsm
Copy link
Contributor

alsm commented Mar 25, 2021

Thanks for this, I'll take a closer look tomorrow.

@fracasula
Copy link
Contributor Author

@alsm I also wanted to discuss the possibility to let the user acknowledge messages when they want. One possible solution would be to add an Ack() method to the Client so instead of:

func (c *Client) routePublishPackets() {
	for pb := range c.publishPackets {
		c.Router.Route(pb)
		switch pb.QoS {
		case 1:
			pa := packets.Puback{
				Properties: &packets.Properties{},
				PacketID:   pb.PacketID,
			}
			c.debug.Println("sending PUBACK")
			_, err := pa.WriteTo(c.Conn)
			if err != nil {
				c.errors.Printf("failed to send PUBACK for %d: %s", pb.PacketID, err)
			}
		case 2:
			pr := packets.Pubrec{
				Properties: &packets.Properties{},
				PacketID:   pb.PacketID,
			}
			c.debug.Printf("sending PUBREC")
			_, err := pr.WriteTo(c.Conn)
			if err != nil {
				c.errors.Printf("failed to send PUBREC for %d: %s", pb.PacketID, err)
			}
		}
	}
}

We could do:

func (c *Client) routePublishPackets() {
	for pb := range c.publishPackets {
		c.Router.Route(pb)
	}
}

// Ack is used to send a PUBACK back to the server for QoS 1 and 2 messages
func (c *Client) Ack(pb *packets.Publish) error {
	switch pb.QoS {
	case 1:
		pa := packets.Puback{
			Properties: &packets.Properties{},
			PacketID:   pb.PacketID,
		}
		c.debug.Println("sending PUBACK")
		_, err := pa.WriteTo(c.Conn)
		if err != nil {
			c.errors.Printf("failed to send PUBACK for %d: %s", pb.PacketID, err)
		}
		return err
	case 2:
		pr := packets.Pubrec{
			Properties: &packets.Properties{},
			PacketID:   pb.PacketID,
		}
		c.debug.Printf("sending PUBREC")
		_, err := pr.WriteTo(c.Conn)
		if err != nil {
			c.errors.Printf("failed to send PUBREC for %d: %s", pb.PacketID, err)
		}
		return err
	}
	return nil
}

This way users can still create go routines to process messages concurrently but still in order according to their own business logic.

For example, let's say that a consumer is subscribed to +/temperature where the + wildcard is the client name e.g. sensor-1/temperature, sensor-2/temperature etcetera.

In the client the messages will come in order thanks to the buffered channel but then the user router can still spawn one go routine per sensor, one for sensor-1 and one for sensor-2 (or have dedicated channels per sensor) and still be able to process the messages in order for all sensors without risking of acknowledging a message that we are not done with.

@alsm
Copy link
Contributor

alsm commented Mar 30, 2021

I am against allowing applications to have any control over the acking of messages, I let this into the v3 client and I don't think it was really beneficial, there are couple of reasons. Firstly the contract as regards handling messages is between the server and the client, once the client has invoked a handler for a message its responsibility ends, if the handler subsequently crashses, blocks, loses the message that's unfortunate but not really the client's issue. Secondly it's almost certainly going to cause problems with spec compliance and performance.

The Client MUST send PUBACK packets in the order in which the corresponding PUBLISH packets were received (QoS 1 messages) [MQTT-4.6.0-2]
The Client MUST send PUBREC packets in the order in which the corresponding PUBLISH packets were received (QoS 2 messages) [MQTT-4.6.0-3]
The Client MUST send PUBREL packets in the order in which the corresponding PUBREC packets were received (QoS 2 messages) [MQTT-4.6.0-4]

ordering is single threaded effectively through the client, not separated by topic or anything else. You mention the risk of acknowledging a message that you are not done with, but I'm not sure I see the problem here, message acknowledgments are only between the client and the server, not end to end with the original sender. I can see wanting to co opt the ack mechanism to effectively have the server handle state/persistence for you, I'm not sure if that's the right thing to be doing though. That's a topic that we should probably bring up on the paho-dev mailing list.

On your PR I like the way of ordering, although again I'm not sure about blocking the ack until the message handler (or handlers) comes back. This also exposes another problem with Ack() being something applications can call, multiple handlers can match a single publish and as soon as a PUBACK has been sent the message id can be reused, it would be very hard to coordinate potentially multiple handlers calling Ack() for the same message and ensuring that it didn't interfere with another message too.

@fracasula
Copy link
Contributor Author

fracasula commented Mar 30, 2021

Yes I can see how difficult it could become to maintain the order of acknowledgments if we were to expose an Ack() method. I will put more thought on this to see if I can think of something, in the meantime thank you.

Going back to the original proposal though, if we block on the acking part we're not breaking anything, we're still following the specs and we're sending the acks in the order they were received by the client. We are just letting the users decide whether it is the case for them to block or not in their router. Performance wise it's always possible to decrease the ReceiveMaximum window and avoid prefetching too many messages if you're planning to block for example. There are many things that can be done on the broker side as well.

One more thing about performance. I think it matters but on a different level and that is a combination of application (not client) design and client/server settings. First of all we have to make sure that we meet the requirements (e.g. avoid message loss) then if performance becomes an issue one should either relax the requirements or use a different topology. Shared-subscriptions and clustering are examples of how a topology can change to meet the performance needs.

My problem with the current approach is that the ack/routing behaviour is hard-coded in the client just because it might cause performance issues in some use cases? (with what brokers and what settings btw? we've been doing this in production for months with VMQ and we are satisfied with the performance, we got 26k agents connected simultaneously sending messages).

Also, I couldn't find any spec saying that the PUBACK should be sent immediately after a PUBLISH with no delay. Is there such a spec? If yes please let me know.

Also, if you don't like this approach, do you have any other idea about how we can avoid message loss on ungraceful client termination?

@alsm
Copy link
Contributor

alsm commented Mar 30, 2021

You're of course right that as currently written your PR doesn't make a huge difference to the way the client runs, and I wrote the client such that we could have pluggable router implementations, unlike the v3 client we aren't tied to a single style. I guess the consideration is what should be the default, what is the least surprising behaviour. Clearly out of order reception is not it so I definitely want the ordering you offer. On the delay to responding to a PUBLISH, this is in the spec in section 3.3.4;

The Client MUST NOT delay the sending of any packets other than PUBLISH packets due to having sent Receive Maximum PUBLISH packets without receiving acknowledgements for them [MQTT-3.3.4-8]. The value of Receive Maximum applies only to the current Network Connection.

This doesn't actually say that they should be sent immediately, but is waiting for a message handler to return a delay? I would say that it is as that interaction between the client library and an application is outside the scope of the spec. Then again most message handlers will return fairly quickly and how would the server know any different? I can see the attraction of doing it this way, it negates the need for any incoming persistence store for the client. I'm leaning towards accepting it as it's a beneficial change I just want to think a little more on it and talk it over with some other people.

@dambrisco
Copy link

I am against allowing applications to have any control over the acking of messages, I let this into the v3 client and I don't think it was really beneficial, there are couple of reasons. Firstly the contract as regards handling messages is between the server and the client, once the client has invoked a handler for a message its responsibility ends, if the handler subsequently crashses, blocks, loses the message that's unfortunate but not really the client's issue. Secondly it's almost certainly going to cause problems with spec compliance and performance.

Just wanted to chime in that as active users of the v3 client, we actually rely on the ability to ack messages ourselves. Don't mean to derail the thread or insinuate that supporting such a feature is necessary, but fwiw our perspective is that paho is simply a library we use to write our own client.

@fracasula
Copy link
Contributor Author

@alsm thanks for getting back to me, all this has been very informative for me so far 🙇

In regards to the spec you quote, I'm not sure I understand them fully, let's see if you can help me out a bit:

The Client MUST NOT delay the sending of any packets other than PUBLISH packets due to having sent Receive Maximum PUBLISH packets without receiving acknowledgements for them [MQTT-3.3.4-8]. The value of Receive Maximum applies only to the current Network Connection.

Am I correct in assuming that if the Client is merely consuming and not publishing anything (thus not sending any PUBLISH packets) then the condition does not apply? I'm referring to the fact that it must not delay packets due to having sent PUBLISH packets, so if it doesn't send them then the rule doesn't apply? Not sure it's that simple though but if that is the case then that's kind of the thing that I'm trying to build, an MQTT bridge to Apache Pulsar.

Anyway, sorry for the little digression, I just wanted to make sure that I'm getting this right. Thanks again and in the meantime I'll wait for you to get back to us.

@alsm
Copy link
Contributor

alsm commented Mar 31, 2021

Discussing this with @icraggs I am happy for this to go in, it matches the way the paho C client handles this situation. Looking at the code I wonder if routePublishPackets() should be doing a select on c.stop as well, if the channel has packets in but the client is disconnected do we want to continue to process the the already received messages? Unacked messages will be resent by the server when the client reconnects. I've just noticed I'm not actually waiting on the client waitgroup to return, but I should and when I add that waiting for processing of received messages to complete could block notification that the client is disconnected.

@alsm
Copy link
Contributor

alsm commented Mar 31, 2021

Am I correct in assuming that if the Client is merely consuming and not publishing anything (thus not sending any PUBLISH packets) then the condition does not apply? I'm referring to the fact that it must not delay packets due to having sent PUBLISH packets, so if it doesn't send them then the rule doesn't apply? Not sure it's that simple though but if that is the case then that's kind of the thing that I'm trying to build, an MQTT bridge to Apache Pulsar.

Anyway, sorry for the little digression, I just wanted to make sure that I'm getting this right. Thanks again and in the meantime I'll wait for you to get back to us.

It seems not everyone reads this section the same way :) I read it as "The client must not delay the sending of any packets, other than publish packets that want to be sent but cannot due to there being receive maximum publishes already inflight with the server."
@icraggs - "I read that spec statement as referring to the receive maximum limit, making it clear that it applies to PUBLISH packets only, not PUBACKs, PUBRECs, PUBRELs or PUBCOMPs. I do think however there is a question of ordering, that the ack for a message shouldn't be delayed so long that the ack for a subsequently received PUBLISH is sent first, at least not routinely."

With Ian's reading it doesn't seem to be applicable to your case at all.

@fracasula
Copy link
Contributor Author

Discussing this with @icraggs I am happy for this to go in, it matches the way the paho C client handles this situation.

That's great to hear @alsm!

Looking at the code I wonder if routePublishPackets() should be doing a select on c.stop as well

It makes sense yeah, I pushed some more changes that make the routine exit on <-c.stop.

I've just noticed I'm not actually waiting on the client waitgroup to return, but I should and when I add that waiting for processing of received messages to complete could block notification that the client is disconnected.

I took the liberty of adding a defer c.workers.Wait() in the cleanup lambda function that is defined in the Connect() method. Let me know if it makes sense.

I also tried to add the wait in the close() method but that can lead to a deadlock when the Incoming loop comes across an error for example. In that case c.Error() is called which calls c.close(). So if c.close() where to wait on the workers it will never finish given that one of the workers is Incoming().

My question is, where do you think it makes more sense to wait for the workers? I think in close(), if you agree we might need to rethink a few things in order to avoid the deadlock though. Anyway, it should be done in a different PR I guess.

Note: please have a look at the new test, I also added some stuff to the test server to spy on the PUBACK and PUBREC packets sent by the client.

if !open {
return
}
c.Router.Route(pb)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One potential concern with this is that the processing runs on a single thread. This could be an issue where the handler is storing the message to a remote database (or other high latency process). I cannot see how you can process multiple messages simultaneously (because routePublishPackets will block until Route completes). Allowing for this may be a bit complicated due to the spec requirement that acknowledgments be sent "in the order in which the corresponding PUBLISH packets were received". I don't have a solution to this right now (will think about it) but wanted to raise it (would be great to get the interface right on the first attempt!). Great PR by the way!.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, thanks! If you want you can just avoid blocking inside your router:

c := NewClient(ClientConfig{
	Conn: ts.ClientConn(),
	Router: NewSingleHandlerRouter(func(p *Publish) {
		go doSomething(p)
	}),
})

This way at least you have both options, blocking and not-blocking. Plus you can handle that on your side. Of course, if you do that, you lose the benefit of having the message redelivered by the broker in case you weren't able to complete your "side-effect" operation successfully.

Copy link
Contributor

@MattBrittan MattBrittan Apr 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I probably need to add a bit of context to this. I'm starting work on adding persistence and, initially anyway, we (discussed with @alsm) plan on implementing outgoing persistence only (on the basis that, with your change, the broker effectively handles inbound persistence - realise there are QOS2 issues but...). On that basis I was keen to have a way of processing incoming publish packets, in parallel, whilst retaining control of the ACK (meaning your suggestion of go doSomething(p) is not quite sufficient) because this is something I'd need in production in the absence of another form of persistance (high volume of messages going into a database).
The idea I'm currently mulling is to expand the approach you have taken and send ALL packets (both sent and received) via a channel to a router which can take whatever action it wants before taking action (which may be just sending a packet, calling a handler, sending a response to a received packet etc (need to write this up in more detail but will probably do this as a test implementation because it's probably easier to see what works that way). There would be a generic implementation of this which just sends the standard response (probably with a delay to allow the handler to run in the case of a publish packet) but users would be able to replace the router thus enabling the implementation of persistence via a wrapper (rather than adding significant complexity to the main paho package) and whatever other message handling needs users may have (e.g. issue #40 would be covered).
Very keen to hear your thoughts on this (it's probably going to take a few iterations to get right but I figured it was worth mentioning because it may influence this code and I believe you have more experience using it than most!).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I understand your point and I think it's an interesting approach. I was hoping I could get things done in different PRs though. If that doesn't work of course it's OK, just expressing my preference. That's why I proposed the go doSomething(p) in the router because that way we can still manage to merge this PR and potentially have exactly what we have now on master (with the difference that we go async in the router rather than in the client) plus the in-order delivery if you avoid blocking. So with approach we would still be quite backwards compatible and manage to merge an improvement.

Then I had another idea for increasing performance and I spent some time last week testing my approach, which I haven't proposed yet (I was planning to do so in another PR).

My use case is a bridge from the MQTT broker to Apache Pulsar. As you can imagine doing one message at a time leads to very poor results, especially considering that Pulsar is distributed and there's consensus involved as well so multiple nodes need to acknowledge before they get back with an ack to the client. On my local machine we're talking ~675 messages per second (quite poor).

So I started experimenting with the idea of batching. If instead of pushing a single message at a time I buffer them and then write them as a batch to Pulsar (similar to a MongoDB InsertMany), then on the same machine I get more than 100000 messages done per second (vs 675, quite an improvement!).

At this point acknowledging in order on the MQTT side would be quite simple because we just buffered, say the first 1000 messages, then stop, write the batch, ack them in order and carry on.

With this in mind I started thinking of a way to give users an API to avoid messing up with the ack order. And then I thought of cumulative acking. Here's the idea:

  • the client by default behaves like in this PR
  • it has a configurable attribute EnableCumulativeAcking
  • if EnableCumulativeAcking is true then the client doesn't ack automatically and keeps track of packetID and QoS of every packet it comes across and stores them in order in a slice (memory footprint is minimal, max it could be 65535 times a struct with just those two attributes (uint16 and int if I remember well)
  • then we expose a CumulativeAck() method that, given a packetID, it looks for it in the slice and starts acking from the beginning till that packet that has found

With an API like that, once you write your batch (which performs even better then going in parallel in most of the benchmarks I wrote), you just CumulativeAck() the last one and then the client does the magic for you with what I think it's little complexity.

Of course this might require some thinking when designing your application since things like batching usually can have a target only, for example a single topic on Pulsar or a single collection on MongoDB. Thus the need to make sure on your side that whatever comes out of a topic should go in a single place (e.g. collection/topic) only in order to get the most out of it.

Main advantages:

  • keeps the default as is (for backwards compatibility)
  • you can still ack a single message with CumulativeAck()
  • given the method name, the requirement of acking in order becomes pretty clear for early users as well
  • performs quite nicely
  • decreases complexity vs a pure in-parallel approach (given the requirements). say that you process 100 messages in parallel but only one doesn't succeed, you hold back the other 99 for as long as a timeout. there might be retries involved and if you decide to ack it anyway even if you can't apply the side effect you might lose the message. with batching it's all or none, easier to debug and timeouts would add pressure mostly on the broker rather than in the client (and the broker usually has plenty more resources than a client and hopefully clustering which should help redistribute load). if faster processing is needed then shared subscriptions can be used with the advantage of keeping a single client as simple as possible.

Anyway, sorry for the long post 😅
I just wanted to share some thoughts, performance is indeed quite important for me as well.

Next week I'll try to review the issue you linked and I'll put more thought on your approach. If you have some code to share please do, I tend to reason better with implementations. The way you describe it seems legit 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks very much for the detailed explanation! My (slight) concern was that this change does alter the API slightly (moving from go c.Router.Route(pb) to c.Router.Route(pb) which would require end users to alter their code if they wish to avoid sequential processing (a small change, and we are pre 1.0 so to be expected, but this does provide the opportunity to make a larger change that could simplify things going forward). I'm currently still on 3.1 because persistence is a requirement for me so I cannot really comment too much and, for the avoidance of doubt, have no objections to this PR going ahead as-is (but perhaps consider the thoughts below).

Fleshing out my concern a bit:

  • If users wish to maintain the current functionality (multiple simultaneous processing of inbound messages) they will need to change their code (possibly not a breaking change but could have a major impact in high volume uses).
  • It does not provide a way to process multiple messages simultaneously AND control the ACK.
  • If you decide you do not want to acknowledge a packet the only option I can see is to terminate the MQTT connection before returning (and that is problematic because routePublishPackets will block so you could not wait for all go routines to exit).
  • A change would be needed if inbound persistence is implemented in the future (there is no way to say "don't send an ACK" at all).

Ref your proposed "API to avoid messing up with the ack order"; I do wonder if this ends up over complicating things within the paho package (I've made quite a few changes to the v3 client and really like the simplicity of the v5 client!). My feeling is that anyone who is batching messages should be fairly competent and could take care of ACK order themselves (or they may choose to ignore this requirement in the spec; my guess is that most brokers would handle out of order ACK's whatever the spec says!).
I wonder if, rather than passing a Publish to the handler we passed something like (note: somewhat off the top of my head; I have not implemented this!):

// note: In order to maintain compliance with the MQTT speck `Ack` must be called in the order in which packets are received.
type PublishWithAck {
   *Publish
      
   defaultAck *io.WriterTo  // Possibly not needed but would make Ack function simpler!
}

// Ack - Acknowledge the Publish packet 
// Passing a nil will result in the default acknowledgement being sent. Might be important for the user to know if the ACK could not be sent (this has been done with issue #40 in mind; I'm thinking that longer term it will be possible to request all packets in this way)
func (p *PublishWithAck) Ack(io.WriterTo) err {
   // todo... Need to ensure only one ACK is sent (so may need a mutex etc)
 }

As an example of how this could be used the following would be needed to maintain compatibility with the current functionality:

Router: NewSingleHandlerRouter(func(p *PublishWithAck) {
		go doSomething(p.Publish)
		p.Ack(nil)
	}),

I believe that this would support your batching use-case (you could add the packets to a slice and once it hits a predetermined threshold send them all to Pulsar and then call the .Ack functions in order). If we wanted to simplify things further for the user then a configurable attribute DisableAutoPublishAck could be added (so unless this is called the functionality is pretty much as now except that the router is called sequentially and passed a different type). Note: This is kind-of similar to the 3.1 client (except that does not support turning off the Auto Ack so it's pretty useless there!).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I agree it's not optimal but given the MQTT specs we are given I'm trying to split the work in a reasonable way.
At the moment this client isn't guaranteeing in-order delivery and with this change, if we consider what we're merging against, we fix that and keep the rest relatively the same. As you mention I wouldn't block this PR but all the things you say should be addressed indeed.

The batching idea comes mostly from @alsm concerns expressed here. My first proposition was indeed just an Ack() method and then let the user handle the different cases.

Also, as you mention, there are MQTT brokers that do not require acknowledgments in order (i.e. VerneMQ).

If you're OK with this I say we continue this discussion in a new dedicated issue where we try to tackle the acknowledgements. I don't mind creating an issue myself, please let me know what you think.

As a sidenote, I'll go as far as to say that we should try to understand why that requirement was defined in MQTT in the first place and see if we can change it in a new version. Without the requirement of acknowledging in order and with in-order delivery guaranteed over shared-subscriptions, the MQTT protocol would be able to serve a lot more use cases more efficiently (for example EMQX does the in-order delivery over shared-subscriptions by routing via a hash of the topic or the client ID, very similar to what Kafka and Pulsar do with key partitioning).

Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Francesco,
Thinking about this further I can implement the changes mentioned as an option with minimal impact to users so no issues there.
Ref why it was done this way see this. Basically I would see the rationale as something along the lines of "A QOS2 message should only be forwarded when PUBCOMP has been sent and, thus, PUBREC and PUBREL must be sent in order". In reality this does not really impact the end recipient if messages are passed on when the PUB arrives (as happens in this client). In response to @alsm's question re this Ian Craggs commented

I do think however there is a question of ordering, that the ack for a message shouldn't be delayed so long that the ack for a subsequently received PUBLISH is sent first, at least not routinely.

so it does not appear that he feels the ordering essential...

Matt

Copy link
Contributor Author

@fracasula fracasula Apr 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MattBrittan As I said before I'm all for giving the users the ability to Ack() themselves. Even with the API that you're proposing though (i.e. PublishWithAck), it's easy to mess up with the order. I'm cool with it though because I think the user should be able to handle that. So as long as we document it properly and note that some brokers do not need them in order after all then I'm cool with it (I checked with the EMQX guys and at least for now they don't care about the order as well).

I can also try something if you guys think it is required in this PR after all, but before doing any work I would like to hear from @alsm, which has been silent for a bit now. I think it'd be beneficial if we can all agree on this before doing the changes.

Another option, as I proposed here, is to have a func (c *Client) Ack(pb *packets.Publish) error which seems to be a fairly common approach amongst clients.

A third option, is to have the router take a 2nd parameter which would be a lambda function to ack that packet:

case packets.PUBLISH:
	pb := recv.Content.(*packets.Publish)
	ack := func() error {
		switch pb.QoS {
		case 1:
			pa := packets.Puback{
				Properties: &packets.Properties{},
				PacketID:   pb.PacketID,
			}
			return c.write(ctx, &pa)
		case 2:
			pr := packets.Pubrec{
				Properties: &packets.Properties{},
				PacketID:   pb.PacketID,
			}
			return c.write(ctx, &pr)
		}

		return nil
	}

	if c.Router != nil {
		go c.Router.Route(pb, ack)
	} else {
		_ = ack()
	}

My preference for now would be to have the Ack() in the client and leave the packets as dumb as possible. Something that I like about the packets, as they are now, is that you can safely pass them by value as well, which is a plus for me. Sometimes I prefer a copy in the stack rather than in the heap which might add pressure on the garbage collector. I haven't done any escape analysis though to verify the impact but it's something that we should keep in mind in case we end up putting stuff inside packets that can't be copied just because, in order for them to expose an .Ack() method, they end up having pointers to mutexes, client, connections etc...

Thoughts? 🙂

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fracasula I'll raise a new issue re this because I think manual handling of ACK is outside the scope of your original PR and the discussion is likely to get lost if it remains here. As mentioned before I have no issues with this PR going forward but will leave this comment unresolved because, as you say, input from @alsm would be great.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd been away so just catching up on all this...woah 😄
Initial thoughts;
As a receiving client it should make no difference to the server the order in which I ack the incoming messages, for QoS1 it just means the packet id can be reused, for QoS2 most client shortcut and pass the message to the application on PUBREC rather than PUBCOMP. The only thing here would be if a server doesn't consider a message sent to it to be "published" until the client has sent the PUBREL, but that doesn't have to matter here as we're looking at receiving.
As is though this PR addresses the issue of ordered processing of messages so I'm happy for it to proceed as is and we can look at the acking and performance issues in another issue.

@fracasula fracasula marked this pull request as ready for review April 19, 2021 08:39
@fracasula
Copy link
Contributor Author

@alsm is there anything else that you think needs to be done here? any thoughts on the test coverage? I ran some integration tests as well on my end, the rest was the unit tests you see in this PR.

@alsm alsm merged commit 6f81099 into eclipse-paho:master Apr 29, 2021
@alsm
Copy link
Contributor

alsm commented Apr 29, 2021

No, It's all good was just checking we were all done with it

@fracasula fracasula deleted the in-order-routing branch June 4, 2021 07:50
bkneis pushed a commit to bkneis/paho.golang that referenced this pull request Aug 9, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants