Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide a way to identify operations that should not be retried #226

Merged
merged 1 commit into from
Jan 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion autopaho/auto.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,12 @@ connectionLoop:
c.debug.Printf("publishing message from queue with topic %s", pub2.Topic)
if _, err = cli.PublishWithOptions(ctx, &pub2, paho.PublishOptions{Method: paho.PublishMethod_AsyncSend}); err != nil {
c.errors.Printf("error publishing from queue: %s", err)
if errors.Is(err, paho.ErrNetworkErrorAfterStored) { // Message in session so remove from queue
if errors.Is(err, paho.ErrInvalidArguments) { // Some errors should not be retried
if err := entry.Remove(); err != nil {
c.errors.Printf("error removing queue entry: %s", err)
}
// Need a way to notify the user of this
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think having a callback for PublishViaQueue would be useful so the user can be notified. That same callback could be passed to the call to PublishWithOptions if/when callbacks are added for async published in paho

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed - but I think that can be added at a later date (I think this should be done as part of #216; the idea being that the callback should be called whenever the publish is complete whether successfully or stopped due to error).

} else if errors.Is(err, paho.ErrNetworkErrorAfterStored) { // Message in session so remove from queue
if err := entry.Remove(); err != nil {
c.errors.Printf("error removing queue entry: %s", err)
}
Expand Down
20 changes: 20 additions & 0 deletions autopaho/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package autopaho
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net"
Expand All @@ -27,6 +28,7 @@ import (
"testing"
"time"

"github.com/eclipse/paho.golang/autopaho/queue"
memqueue "github.com/eclipse/paho.golang/autopaho/queue/memory"
"github.com/eclipse/paho.golang/internal/testserver"
"github.com/eclipse/paho.golang/packets"
Expand Down Expand Up @@ -145,7 +147,20 @@ func TestQueuedMessages(t *testing.T) {
if err != nil {
t.Fatalf("expected NewConnection success: %s", err)
}

testFmt := "Test%d"
// Start with an invalid message; this will be queued but then rejected by paho.PublishWithOptions, and it's
// important that it's discarded (otherwise it would be retried continually) - issue #214
if err = cm.PublishViaQueue(ctx, &QueuePublish{
Publish: &paho.Publish{
QoS: 3,
Topic: fmt.Sprintf(testFmt, 0),
Properties: nil,
Payload: []byte(fmt.Sprintf(testFmt, 0)),
},
}); err != nil {
t.Fatalf("invalid publish failed")
}

// Transmit first 100 messages (should go into queue)
for i := 1; i <= 100; i++ {
Expand Down Expand Up @@ -221,6 +236,11 @@ func TestQueuedMessages(t *testing.T) {
t.Fatal("test server did not shutdown within expected time")
}

// Check that the queue is empty
if _, err := cm.queue.Peek(); !errors.Is(err, queue.ErrEmpty) {
t.Error("queue should be empty")
}

// Check that we received the expected messages, in the expected order
for i := 1; i <= 200; i++ {
exp := fmt.Sprintf(testFmt, i)
Expand Down
16 changes: 9 additions & 7 deletions paho/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ var (
ErrManualAcknowledgmentDisabled = errors.New("manual acknowledgments disabled")
ErrNetworkErrorAfterStored = errors.New("error after packet added to state") // Could not send packet but its stored (and response will be sent on chan at some point in the future)
ErrConnectionLost = errors.New("connection lost after request transmitted") // We don't know whether the server received the request or not

ErrInvalidArguments = errors.New("invalid argument") // If included (errors.Join) in an error, there is a problem with the arguments passed. Retrying on the same connection with the same arguments will not succeed.
)

type (
Expand Down Expand Up @@ -652,17 +654,17 @@ func (c *Client) Subscribe(ctx context.Context, s *Subscribe) (*Suback, error) {
for _, sub := range s.Subscriptions {
if strings.ContainsAny(sub.Topic, "#+") {
// Using a wildcard in a subscription when not supported
return nil, fmt.Errorf("cannot subscribe to %s, server does not support wildcards", sub.Topic)
return nil, fmt.Errorf("%w: cannot subscribe to %s, server does not support wildcards", ErrInvalidArguments, sub.Topic)
}
}
}
if !c.serverProps.SubIDAvailable && s.Properties != nil && s.Properties.SubscriptionIdentifier != nil {
return nil, fmt.Errorf("cannot send subscribe with subID set, server does not support subID")
return nil, fmt.Errorf("%w: cannot send subscribe with subID set, server does not support subID", ErrInvalidArguments)
}
if !c.serverProps.SharedSubAvailable {
for _, sub := range s.Subscriptions {
if strings.HasPrefix(sub.Topic, "$share") {
return nil, fmt.Errorf("cannont subscribe to %s, server does not support shared subscriptions", sub.Topic)
return nil, fmt.Errorf("%w: cannont subscribe to %s, server does not support shared subscriptions", ErrInvalidArguments, sub.Topic)
}
}
}
Expand Down Expand Up @@ -824,18 +826,18 @@ type PublishOptions struct {
// Warning: Publish may outlive the connection when QOS1+ (managed in `session_state`)
func (c *Client) PublishWithOptions(ctx context.Context, p *Publish, o PublishOptions) (*PublishResponse, error) {
if p.QoS > c.serverProps.MaximumQoS {
return nil, fmt.Errorf("cannot send Publish with QoS %d, server maximum QoS is %d", p.QoS, c.serverProps.MaximumQoS)
return nil, fmt.Errorf("%w: cannot send Publish with QoS %d, server maximum QoS is %d", ErrInvalidArguments, p.QoS, c.serverProps.MaximumQoS)
}
if p.Properties != nil && p.Properties.TopicAlias != nil {
if c.serverProps.TopicAliasMaximum > 0 && *p.Properties.TopicAlias > c.serverProps.TopicAliasMaximum {
return nil, fmt.Errorf("cannot send publish with TopicAlias %d, server topic alias maximum is %d", *p.Properties.TopicAlias, c.serverProps.TopicAliasMaximum)
return nil, fmt.Errorf("%w: cannot send publish with TopicAlias %d, server topic alias maximum is %d", ErrInvalidArguments, *p.Properties.TopicAlias, c.serverProps.TopicAliasMaximum)
}
}
if !c.serverProps.RetainAvailable && p.Retain {
return nil, fmt.Errorf("cannot send Publish with retain flag set, server does not support retained messages")
return nil, fmt.Errorf("%w: cannot send Publish with retain flag set, server does not support retained messages", ErrInvalidArguments)
}
if (p.Properties == nil || p.Properties.TopicAlias == nil) && p.Topic == "" {
return nil, fmt.Errorf("cannot send a publish with no TopicAlias and no Topic set")
return nil, fmt.Errorf("%w: cannot send a publish with no TopicAlias and no Topic set", ErrInvalidArguments)
}

if c.config.PublishHook != nil {
Expand Down