Skip to content

Commit

Permalink
Merge pull request eclipse-paho#52 from fracasula/in-order-routing
Browse files Browse the repository at this point in the history
In-order routing and late ack
  • Loading branch information
Al S-M authored Apr 29, 2021
2 parents 39b3dab + e13ab5a commit e35eb25
Show file tree
Hide file tree
Showing 9 changed files with 219 additions and 45 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/eclipse/paho.golang
go 1.15

require (
github.com/google/go-cmp v0.5.5
github.com/stretchr/testify v1.7.0
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
)
5 changes: 4 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
Expand Down
2 changes: 1 addition & 1 deletion packets/properties_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func TestPropertiess(t *testing.T) {
}
}

func TestInvalidPropertiess(t *testing.T) {
func TestInvalidProperties(t *testing.T) {
if ValidateID(PUBLISH, PropRequestResponseInfo) {
t.Fatalf("'requestReplyInfo' is invalid for 'PUBLISH' packets")
}
Expand Down
100 changes: 70 additions & 30 deletions paho/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package paho
import (
"context"
"fmt"
"math"
"net"
"strings"
"sync"
Expand Down Expand Up @@ -54,6 +55,7 @@ type (
caCtx *caContext
raCtx *CPContext
stop chan struct{}
publishPackets chan *packets.Publish
workers sync.WaitGroup
serverProps CommsProperties
clientProps CommsProperties
Expand Down Expand Up @@ -145,21 +147,33 @@ func NewClient(conf ClientConfig) *Client {
// returned. Otherwise the failure Connack (if there is one) is returned
// along with an error indicating the reason for the failure to connect.
func (c *Client) Connect(ctx context.Context, cp *Connect) (*Connack, error) {
if c.Conn == nil {
return nil, fmt.Errorf("client connection is nil")
}

cleanup := func() {
c.mu.Lock()
defer c.mu.Unlock()
defer c.workers.Wait()

select {
case <-c.stop:
//already shutting down, do nothing
//already shutting down, do nothing
default:
close(c.stop)
close(c.publishPackets)
}
c.Conn.Close()
}
if c.Conn == nil {
return nil, fmt.Errorf("client connection is nil")
_ = c.Conn.Close()
}

c.stop = make(chan struct{})

var publishPacketsSize uint16 = math.MaxUint16
if cp.Properties != nil && cp.Properties.ReceiveMaximum != nil {
publishPacketsSize = *cp.Properties.ReceiveMaximum
}
c.publishPackets = make(chan *packets.Publish, publishPacketsSize)

c.debug.Println("connecting")
c.mu.Lock()
defer c.mu.Unlock()
Expand All @@ -181,10 +195,19 @@ func (c *Client) Connect(ctx context.Context, cp *Connect) (*Connack, error) {
}
}

c.debug.Println("starting publish packets loop")
c.workers.Add(1)
go func() {
defer c.workers.Done()
defer c.debug.Println("returning from publish packets loop worker")
c.routePublishPackets()
}()

c.debug.Println("starting Incoming")
c.workers.Add(1)
go func() {
defer c.workers.Done()
defer c.debug.Println("returning from incoming worker")
c.Incoming()
}()

Expand Down Expand Up @@ -262,12 +285,49 @@ func (c *Client) Connect(ctx context.Context, cp *Connect) (*Connack, error) {
c.workers.Add(1)
go func() {
defer c.workers.Done()
defer c.debug.Println("returning from ping handler worker")
c.PingHandler.Start(c.Conn, time.Duration(keepalive)*time.Second)
}()

return ca, nil
}

func (c *Client) routePublishPackets() {
for {
select {
case <-c.stop:
return
case pb, open := <-c.publishPackets:
if !open {
return
}
c.Router.Route(pb)
switch pb.QoS {
case 1:
pa := packets.Puback{
Properties: &packets.Properties{},
PacketID: pb.PacketID,
}
c.debug.Println("sending PUBACK")
_, err := pa.WriteTo(c.Conn)
if err != nil {
c.errors.Printf("failed to send PUBACK for %d: %s", pb.PacketID, err)
}
case 2:
pr := packets.Pubrec{
Properties: &packets.Properties{},
PacketID: pb.PacketID,
}
c.debug.Printf("sending PUBREC")
_, err := pr.WriteTo(c.Conn)
if err != nil {
c.errors.Printf("failed to send PUBREC for %d: %s", pb.PacketID, err)
}
}
}
}
}

// Incoming is the Client function that reads and handles incoming
// packets from the server. The function is started as a goroutine
// from Connect(), it exits when it receives a server initiated
Expand Down Expand Up @@ -314,29 +374,7 @@ func (c *Client) Incoming() {
case packets.PUBLISH:
pb := recv.Content.(*packets.Publish)
c.debug.Printf("received QoS%d PUBLISH", pb.QoS)
go c.Router.Route(pb)
switch pb.QoS {
case 1:
pa := packets.Puback{
Properties: &packets.Properties{},
PacketID: pb.PacketID,
}
c.debug.Println("sending PUBACK")
_, err := pa.WriteTo(c.Conn)
if err != nil {
c.errors.Printf("failed to send PUBACK for %d: %s", pa.PacketID, err)
}
case 2:
pr := packets.Pubrec{
Properties: &packets.Properties{},
PacketID: pb.PacketID,
}
c.debug.Printf("sending PUBREC")
_, err := pr.WriteTo(c.Conn)
if err != nil {
c.errors.Printf("failed to send PUBREC for %d: %s", pr.PacketID, err)
}
}
c.publishPackets <- pb
case packets.PUBACK, packets.PUBCOMP, packets.SUBACK, packets.UNSUBACK:
c.debug.Printf("received %s packet with id %d", recv.PacketType(), recv.PacketID())
if cpCtx := c.MIDs.Get(recv.PacketID()); cpCtx != nil {
Expand Down Expand Up @@ -412,18 +450,20 @@ func (c *Client) Incoming() {

func (c *Client) close() {
c.mu.Lock()
defer c.mu.Unlock()

select {
case <-c.stop:
//already shutting down, do nothing
default:
close(c.stop)
close(c.publishPackets)
}
c.debug.Println("client stopped")
c.PingHandler.Stop()
c.debug.Println("ping stopped")
c.Conn.Close()
_ = c.Conn.Close()
c.debug.Println("conn closed")
c.mu.Unlock()
}

// Error is called to signify that an error situation has occurred, this
Expand Down
Loading

0 comments on commit e35eb25

Please sign in to comment.