Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

In-order routing and late ack #52

Merged
merged 7 commits into from
Apr 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/eclipse/paho.golang
go 1.15

require (
github.com/google/go-cmp v0.5.5
github.com/stretchr/testify v1.7.0
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
)
5 changes: 4 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
Expand Down
2 changes: 1 addition & 1 deletion packets/properties_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func TestPropertiess(t *testing.T) {
}
}

func TestInvalidPropertiess(t *testing.T) {
func TestInvalidProperties(t *testing.T) {
if ValidateID(PUBLISH, PropRequestResponseInfo) {
t.Fatalf("'requestReplyInfo' is invalid for 'PUBLISH' packets")
}
Expand Down
100 changes: 70 additions & 30 deletions paho/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package paho
import (
"context"
"fmt"
"math"
"net"
"strings"
"sync"
Expand Down Expand Up @@ -54,6 +55,7 @@ type (
caCtx *caContext
raCtx *CPContext
stop chan struct{}
publishPackets chan *packets.Publish
workers sync.WaitGroup
serverProps CommsProperties
clientProps CommsProperties
Expand Down Expand Up @@ -145,21 +147,33 @@ func NewClient(conf ClientConfig) *Client {
// returned. Otherwise the failure Connack (if there is one) is returned
// along with an error indicating the reason for the failure to connect.
func (c *Client) Connect(ctx context.Context, cp *Connect) (*Connack, error) {
if c.Conn == nil {
alsm marked this conversation as resolved.
Show resolved Hide resolved
return nil, fmt.Errorf("client connection is nil")
}

cleanup := func() {
c.mu.Lock()
defer c.mu.Unlock()
defer c.workers.Wait()

select {
case <-c.stop:
//already shutting down, do nothing
//already shutting down, do nothing
default:
close(c.stop)
close(c.publishPackets)
}
c.Conn.Close()
}
if c.Conn == nil {
return nil, fmt.Errorf("client connection is nil")
_ = c.Conn.Close()
}

c.stop = make(chan struct{})

var publishPacketsSize uint16 = math.MaxUint16
if cp.Properties != nil && cp.Properties.ReceiveMaximum != nil {
publishPacketsSize = *cp.Properties.ReceiveMaximum
}
c.publishPackets = make(chan *packets.Publish, publishPacketsSize)

c.debug.Println("connecting")
c.mu.Lock()
defer c.mu.Unlock()
Expand All @@ -181,10 +195,19 @@ func (c *Client) Connect(ctx context.Context, cp *Connect) (*Connack, error) {
}
}

c.debug.Println("starting publish packets loop")
c.workers.Add(1)
go func() {
defer c.workers.Done()
defer c.debug.Println("returning from publish packets loop worker")
c.routePublishPackets()
}()

c.debug.Println("starting Incoming")
c.workers.Add(1)
go func() {
defer c.workers.Done()
defer c.debug.Println("returning from incoming worker")
c.Incoming()
}()

Expand Down Expand Up @@ -262,12 +285,49 @@ func (c *Client) Connect(ctx context.Context, cp *Connect) (*Connack, error) {
c.workers.Add(1)
go func() {
defer c.workers.Done()
defer c.debug.Println("returning from ping handler worker")
c.PingHandler.Start(c.Conn, time.Duration(keepalive)*time.Second)
}()

return ca, nil
}

func (c *Client) routePublishPackets() {
for {
select {
case <-c.stop:
return
case pb, open := <-c.publishPackets:
if !open {
return
}
c.Router.Route(pb)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One potential concern with this is that the processing runs on a single thread. This could be an issue where the handler is storing the message to a remote database (or other high latency process). I cannot see how you can process multiple messages simultaneously (because routePublishPackets will block until Route completes). Allowing for this may be a bit complicated due to the spec requirement that acknowledgments be sent "in the order in which the corresponding PUBLISH packets were received". I don't have a solution to this right now (will think about it) but wanted to raise it (would be great to get the interface right on the first attempt!). Great PR by the way!.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, thanks! If you want you can just avoid blocking inside your router:

c := NewClient(ClientConfig{
	Conn: ts.ClientConn(),
	Router: NewSingleHandlerRouter(func(p *Publish) {
		go doSomething(p)
	}),
})

This way at least you have both options, blocking and not-blocking. Plus you can handle that on your side. Of course, if you do that, you lose the benefit of having the message redelivered by the broker in case you weren't able to complete your "side-effect" operation successfully.

Copy link
Contributor

@MattBrittan MattBrittan Apr 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I probably need to add a bit of context to this. I'm starting work on adding persistence and, initially anyway, we (discussed with @alsm) plan on implementing outgoing persistence only (on the basis that, with your change, the broker effectively handles inbound persistence - realise there are QOS2 issues but...). On that basis I was keen to have a way of processing incoming publish packets, in parallel, whilst retaining control of the ACK (meaning your suggestion of go doSomething(p) is not quite sufficient) because this is something I'd need in production in the absence of another form of persistance (high volume of messages going into a database).
The idea I'm currently mulling is to expand the approach you have taken and send ALL packets (both sent and received) via a channel to a router which can take whatever action it wants before taking action (which may be just sending a packet, calling a handler, sending a response to a received packet etc (need to write this up in more detail but will probably do this as a test implementation because it's probably easier to see what works that way). There would be a generic implementation of this which just sends the standard response (probably with a delay to allow the handler to run in the case of a publish packet) but users would be able to replace the router thus enabling the implementation of persistence via a wrapper (rather than adding significant complexity to the main paho package) and whatever other message handling needs users may have (e.g. issue #40 would be covered).
Very keen to hear your thoughts on this (it's probably going to take a few iterations to get right but I figured it was worth mentioning because it may influence this code and I believe you have more experience using it than most!).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I understand your point and I think it's an interesting approach. I was hoping I could get things done in different PRs though. If that doesn't work of course it's OK, just expressing my preference. That's why I proposed the go doSomething(p) in the router because that way we can still manage to merge this PR and potentially have exactly what we have now on master (with the difference that we go async in the router rather than in the client) plus the in-order delivery if you avoid blocking. So with approach we would still be quite backwards compatible and manage to merge an improvement.

Then I had another idea for increasing performance and I spent some time last week testing my approach, which I haven't proposed yet (I was planning to do so in another PR).

My use case is a bridge from the MQTT broker to Apache Pulsar. As you can imagine doing one message at a time leads to very poor results, especially considering that Pulsar is distributed and there's consensus involved as well so multiple nodes need to acknowledge before they get back with an ack to the client. On my local machine we're talking ~675 messages per second (quite poor).

So I started experimenting with the idea of batching. If instead of pushing a single message at a time I buffer them and then write them as a batch to Pulsar (similar to a MongoDB InsertMany), then on the same machine I get more than 100000 messages done per second (vs 675, quite an improvement!).

At this point acknowledging in order on the MQTT side would be quite simple because we just buffered, say the first 1000 messages, then stop, write the batch, ack them in order and carry on.

With this in mind I started thinking of a way to give users an API to avoid messing up with the ack order. And then I thought of cumulative acking. Here's the idea:

  • the client by default behaves like in this PR
  • it has a configurable attribute EnableCumulativeAcking
  • if EnableCumulativeAcking is true then the client doesn't ack automatically and keeps track of packetID and QoS of every packet it comes across and stores them in order in a slice (memory footprint is minimal, max it could be 65535 times a struct with just those two attributes (uint16 and int if I remember well)
  • then we expose a CumulativeAck() method that, given a packetID, it looks for it in the slice and starts acking from the beginning till that packet that has found

With an API like that, once you write your batch (which performs even better then going in parallel in most of the benchmarks I wrote), you just CumulativeAck() the last one and then the client does the magic for you with what I think it's little complexity.

Of course this might require some thinking when designing your application since things like batching usually can have a target only, for example a single topic on Pulsar or a single collection on MongoDB. Thus the need to make sure on your side that whatever comes out of a topic should go in a single place (e.g. collection/topic) only in order to get the most out of it.

Main advantages:

  • keeps the default as is (for backwards compatibility)
  • you can still ack a single message with CumulativeAck()
  • given the method name, the requirement of acking in order becomes pretty clear for early users as well
  • performs quite nicely
  • decreases complexity vs a pure in-parallel approach (given the requirements). say that you process 100 messages in parallel but only one doesn't succeed, you hold back the other 99 for as long as a timeout. there might be retries involved and if you decide to ack it anyway even if you can't apply the side effect you might lose the message. with batching it's all or none, easier to debug and timeouts would add pressure mostly on the broker rather than in the client (and the broker usually has plenty more resources than a client and hopefully clustering which should help redistribute load). if faster processing is needed then shared subscriptions can be used with the advantage of keeping a single client as simple as possible.

Anyway, sorry for the long post 😅
I just wanted to share some thoughts, performance is indeed quite important for me as well.

Next week I'll try to review the issue you linked and I'll put more thought on your approach. If you have some code to share please do, I tend to reason better with implementations. The way you describe it seems legit 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks very much for the detailed explanation! My (slight) concern was that this change does alter the API slightly (moving from go c.Router.Route(pb) to c.Router.Route(pb) which would require end users to alter their code if they wish to avoid sequential processing (a small change, and we are pre 1.0 so to be expected, but this does provide the opportunity to make a larger change that could simplify things going forward). I'm currently still on 3.1 because persistence is a requirement for me so I cannot really comment too much and, for the avoidance of doubt, have no objections to this PR going ahead as-is (but perhaps consider the thoughts below).

Fleshing out my concern a bit:

  • If users wish to maintain the current functionality (multiple simultaneous processing of inbound messages) they will need to change their code (possibly not a breaking change but could have a major impact in high volume uses).
  • It does not provide a way to process multiple messages simultaneously AND control the ACK.
  • If you decide you do not want to acknowledge a packet the only option I can see is to terminate the MQTT connection before returning (and that is problematic because routePublishPackets will block so you could not wait for all go routines to exit).
  • A change would be needed if inbound persistence is implemented in the future (there is no way to say "don't send an ACK" at all).

Ref your proposed "API to avoid messing up with the ack order"; I do wonder if this ends up over complicating things within the paho package (I've made quite a few changes to the v3 client and really like the simplicity of the v5 client!). My feeling is that anyone who is batching messages should be fairly competent and could take care of ACK order themselves (or they may choose to ignore this requirement in the spec; my guess is that most brokers would handle out of order ACK's whatever the spec says!).
I wonder if, rather than passing a Publish to the handler we passed something like (note: somewhat off the top of my head; I have not implemented this!):

// note: In order to maintain compliance with the MQTT speck `Ack` must be called in the order in which packets are received.
type PublishWithAck {
   *Publish
      
   defaultAck *io.WriterTo  // Possibly not needed but would make Ack function simpler!
}

// Ack - Acknowledge the Publish packet 
// Passing a nil will result in the default acknowledgement being sent. Might be important for the user to know if the ACK could not be sent (this has been done with issue #40 in mind; I'm thinking that longer term it will be possible to request all packets in this way)
func (p *PublishWithAck) Ack(io.WriterTo) err {
   // todo... Need to ensure only one ACK is sent (so may need a mutex etc)
 }

As an example of how this could be used the following would be needed to maintain compatibility with the current functionality:

Router: NewSingleHandlerRouter(func(p *PublishWithAck) {
		go doSomething(p.Publish)
		p.Ack(nil)
	}),

I believe that this would support your batching use-case (you could add the packets to a slice and once it hits a predetermined threshold send them all to Pulsar and then call the .Ack functions in order). If we wanted to simplify things further for the user then a configurable attribute DisableAutoPublishAck could be added (so unless this is called the functionality is pretty much as now except that the router is called sequentially and passed a different type). Note: This is kind-of similar to the 3.1 client (except that does not support turning off the Auto Ack so it's pretty useless there!).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I agree it's not optimal but given the MQTT specs we are given I'm trying to split the work in a reasonable way.
At the moment this client isn't guaranteeing in-order delivery and with this change, if we consider what we're merging against, we fix that and keep the rest relatively the same. As you mention I wouldn't block this PR but all the things you say should be addressed indeed.

The batching idea comes mostly from @alsm concerns expressed here. My first proposition was indeed just an Ack() method and then let the user handle the different cases.

Also, as you mention, there are MQTT brokers that do not require acknowledgments in order (i.e. VerneMQ).

If you're OK with this I say we continue this discussion in a new dedicated issue where we try to tackle the acknowledgements. I don't mind creating an issue myself, please let me know what you think.

As a sidenote, I'll go as far as to say that we should try to understand why that requirement was defined in MQTT in the first place and see if we can change it in a new version. Without the requirement of acknowledging in order and with in-order delivery guaranteed over shared-subscriptions, the MQTT protocol would be able to serve a lot more use cases more efficiently (for example EMQX does the in-order delivery over shared-subscriptions by routing via a hash of the topic or the client ID, very similar to what Kafka and Pulsar do with key partitioning).

Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Francesco,
Thinking about this further I can implement the changes mentioned as an option with minimal impact to users so no issues there.
Ref why it was done this way see this. Basically I would see the rationale as something along the lines of "A QOS2 message should only be forwarded when PUBCOMP has been sent and, thus, PUBREC and PUBREL must be sent in order". In reality this does not really impact the end recipient if messages are passed on when the PUB arrives (as happens in this client). In response to @alsm's question re this Ian Craggs commented

I do think however there is a question of ordering, that the ack for a message shouldn't be delayed so long that the ack for a subsequently received PUBLISH is sent first, at least not routinely.

so it does not appear that he feels the ordering essential...

Matt

Copy link
Contributor Author

@fracasula fracasula Apr 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MattBrittan As I said before I'm all for giving the users the ability to Ack() themselves. Even with the API that you're proposing though (i.e. PublishWithAck), it's easy to mess up with the order. I'm cool with it though because I think the user should be able to handle that. So as long as we document it properly and note that some brokers do not need them in order after all then I'm cool with it (I checked with the EMQX guys and at least for now they don't care about the order as well).

I can also try something if you guys think it is required in this PR after all, but before doing any work I would like to hear from @alsm, which has been silent for a bit now. I think it'd be beneficial if we can all agree on this before doing the changes.

Another option, as I proposed here, is to have a func (c *Client) Ack(pb *packets.Publish) error which seems to be a fairly common approach amongst clients.

A third option, is to have the router take a 2nd parameter which would be a lambda function to ack that packet:

case packets.PUBLISH:
	pb := recv.Content.(*packets.Publish)
	ack := func() error {
		switch pb.QoS {
		case 1:
			pa := packets.Puback{
				Properties: &packets.Properties{},
				PacketID:   pb.PacketID,
			}
			return c.write(ctx, &pa)
		case 2:
			pr := packets.Pubrec{
				Properties: &packets.Properties{},
				PacketID:   pb.PacketID,
			}
			return c.write(ctx, &pr)
		}

		return nil
	}

	if c.Router != nil {
		go c.Router.Route(pb, ack)
	} else {
		_ = ack()
	}

My preference for now would be to have the Ack() in the client and leave the packets as dumb as possible. Something that I like about the packets, as they are now, is that you can safely pass them by value as well, which is a plus for me. Sometimes I prefer a copy in the stack rather than in the heap which might add pressure on the garbage collector. I haven't done any escape analysis though to verify the impact but it's something that we should keep in mind in case we end up putting stuff inside packets that can't be copied just because, in order for them to expose an .Ack() method, they end up having pointers to mutexes, client, connections etc...

Thoughts? 🙂

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fracasula I'll raise a new issue re this because I think manual handling of ACK is outside the scope of your original PR and the discussion is likely to get lost if it remains here. As mentioned before I have no issues with this PR going forward but will leave this comment unresolved because, as you say, input from @alsm would be great.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd been away so just catching up on all this...woah 😄
Initial thoughts;
As a receiving client it should make no difference to the server the order in which I ack the incoming messages, for QoS1 it just means the packet id can be reused, for QoS2 most client shortcut and pass the message to the application on PUBREC rather than PUBCOMP. The only thing here would be if a server doesn't consider a message sent to it to be "published" until the client has sent the PUBREL, but that doesn't have to matter here as we're looking at receiving.
As is though this PR addresses the issue of ordered processing of messages so I'm happy for it to proceed as is and we can look at the acking and performance issues in another issue.

switch pb.QoS {
case 1:
pa := packets.Puback{
Properties: &packets.Properties{},
PacketID: pb.PacketID,
}
c.debug.Println("sending PUBACK")
_, err := pa.WriteTo(c.Conn)
if err != nil {
c.errors.Printf("failed to send PUBACK for %d: %s", pb.PacketID, err)
}
case 2:
pr := packets.Pubrec{
Properties: &packets.Properties{},
PacketID: pb.PacketID,
}
c.debug.Printf("sending PUBREC")
_, err := pr.WriteTo(c.Conn)
if err != nil {
c.errors.Printf("failed to send PUBREC for %d: %s", pb.PacketID, err)
}
}
}
}
}

// Incoming is the Client function that reads and handles incoming
// packets from the server. The function is started as a goroutine
// from Connect(), it exits when it receives a server initiated
Expand Down Expand Up @@ -314,29 +374,7 @@ func (c *Client) Incoming() {
case packets.PUBLISH:
pb := recv.Content.(*packets.Publish)
c.debug.Printf("received QoS%d PUBLISH", pb.QoS)
go c.Router.Route(pb)
switch pb.QoS {
case 1:
pa := packets.Puback{
Properties: &packets.Properties{},
PacketID: pb.PacketID,
}
c.debug.Println("sending PUBACK")
_, err := pa.WriteTo(c.Conn)
if err != nil {
c.errors.Printf("failed to send PUBACK for %d: %s", pa.PacketID, err)
}
case 2:
pr := packets.Pubrec{
Properties: &packets.Properties{},
PacketID: pb.PacketID,
}
c.debug.Printf("sending PUBREC")
_, err := pr.WriteTo(c.Conn)
if err != nil {
c.errors.Printf("failed to send PUBREC for %d: %s", pr.PacketID, err)
}
}
c.publishPackets <- pb
case packets.PUBACK, packets.PUBCOMP, packets.SUBACK, packets.UNSUBACK:
c.debug.Printf("received %s packet with id %d", recv.PacketType(), recv.PacketID())
if cpCtx := c.MIDs.Get(recv.PacketID()); cpCtx != nil {
Expand Down Expand Up @@ -412,18 +450,20 @@ func (c *Client) Incoming() {

func (c *Client) close() {
c.mu.Lock()
defer c.mu.Unlock()

select {
case <-c.stop:
//already shutting down, do nothing
default:
close(c.stop)
close(c.publishPackets)
}
c.debug.Println("client stopped")
c.PingHandler.Stop()
c.debug.Println("ping stopped")
c.Conn.Close()
_ = c.Conn.Close()
c.debug.Println("conn closed")
c.mu.Unlock()
}

// Error is called to signify that an error situation has occurred, this
Expand Down
Loading