Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

autopaho RPC Handler breaks on reconnections #124

Open
akindestam opened this issue Mar 23, 2023 · 7 comments
Open

autopaho RPC Handler breaks on reconnections #124

akindestam opened this issue Mar 23, 2023 · 7 comments

Comments

@akindestam
Copy link

akindestam commented Mar 23, 2023

Version
Latest git version at time of writing.

Describe the bug
The autopaho RPC handler extension provided under autopaho/extensions/rpc only calls autopaho.ConnectionManager.Subscribe() when you call rpc.NewHandler(). However autopaho currently lacks any mechanism to automatically resubscribe to topics on connection, instead relying on the OnConnectionUp callback, which can only be supplied at creation with autopaho.NewConnection().

Now a potential workaround I figured was to recreate the handler in the OnConnectionUp callback or perhaps for every single request (randomizing some part of the response topic). However this leads to memory leaks or potentially worse issues as there is no way to un-create an RPC handler once it has already been created. In particular if one is using paho.StandardRouter it will just keep on adding new callbacks for the response topic when rpc.NewHandler() calls opts.Router.RegisterHandler(...).

To reproduce
I run the following test program against mosquitto and another application that can accept RPC calls. Restarting mosquitto or otherwise disturbing the connection between the RPC caller application and the broker causes it to disconnect and reconnect as expected. However after autopaho reconnecting we never re-subscribe to the response topic so we never received the responses from the RPC callee application.

The test program:

package main

import (
	"context"
	"flag"
	"fmt"
	"log"
	"net/url"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/eclipse/paho.golang/paho"
	"github.com/eclipse/paho.golang/autopaho"
	"github.com/eclipse/paho.golang/autopaho/extensions/rpc"
)

func init() {
	ic := make(chan os.Signal, 1)
	signal.Notify(ic, os.Interrupt, syscall.SIGTERM)
	go func() {
		<-ic
		os.Exit(0)
	}()
}

const _clientId = "autopaho-mqtt-rpc-tester"

func main() {
	server := flag.String("server", "127.0.0.1:1883", "The full URL of the MQTT server to connect to")
	rTopic := flag.String("rtopic", "rpc/request", "Topic for requests to go to")
	// username := flag.String("username", "", "A username to authenticate to the MQTT server")
	// password := flag.String("password", "", "Password to match username")
	message := flag.String("message", "", "The message to use in the request")
	flag.Parse()

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	urlString := fmt.Sprintf("tcp://%s", *server)
	brokerUrl, err := url.Parse(urlString)
	if err != nil {
		log.Fatalf("Couldn't parse URL \"%s\": %v", urlString, err)
	}

	router := paho.NewStandardRouter()
	router.SetDebugLogger(logger{prefix: "router"})

	var cm *autopaho.ConnectionManager
	var handler *rpc.Handler

	cliCfg := autopaho.ClientConfig{
		BrokerUrls:        []*url.URL{brokerUrl},
		KeepAlive:         30,
		ConnectRetryDelay: 1 * time.Second,
		OnConnectionUp:    func(*autopaho.ConnectionManager, *paho.Connack) {
			log.Println("mqtt connection up")
		},
		OnConnectError:    func(err error) { log.Printf("error whilst attempting connection: %s\n", err) },
		Debug:             paho.NOOPLogger{},
		ClientConfig: paho.ClientConfig{
			Router: router,
			ClientID: _clientId,
			OnClientError: func(err error) {
				log.Println("OnClientError")
				log.Printf("server requested disconnect: %s\n", err)

			},
			OnServerDisconnect: func(d *paho.Disconnect) {
				log.Println("OnServerDisconnect")

				if d.Properties != nil {
					log.Printf("server requested disconnect: %s\n", d.Properties.ReasonString)
				} else {
					log.Printf("server requested disconnect; reason code: %d\n", d.ReasonCode)
				}
			},
		},
	}

	cliCfg.Debug = logger{prefix: "autoPaho"}
	cliCfg.PahoDebug = logger{prefix: "paho"}

	cm, err = autopaho.NewConnection(ctx, cliCfg)
	if err != nil {
		log.Fatal(err)
	}

	err = cm.AwaitConnection(ctx)
	if err != nil { // Should only happen when context is cancelled
		log.Fatalf("publisher done (AwaitConnection: %s)\n", err)
	}

	handler, err = rpc.NewHandler(ctx, rpc.HandlerOpts{
		Conn: cm,
		Router: router,
		ResponseTopicFmt: "%s/responses",
		ClientID: _clientId,
	})
	if err != nil {
		log.Fatal(err)
	}

	cntr := 0

	for {
		log.Printf("Request: %d\n", cntr)
		cntr++

		func() {
			innerCtx, innerCancel := context.WithTimeout(ctx, 30 * time.Second)
			defer innerCancel()

			resp, err := handler.Request(innerCtx, &paho.Publish{
				Topic:   *rTopic,
				Payload: []byte(*message),
				// QoS: 1,
			})
			if err != nil {
				log.Printf("Request error: %s", err)
			} else {
				fmt.Printf("%s\n", string(resp.Payload))
			}

			// time.Sleep(3 * time.Second)
			// time.Sleep(10 * time.Second)
			time.Sleep(15 * time.Second)
		}()
	}
}


// logger implements the paho.Logger interface
type logger struct {
	prefix string
}

// Println is the library provided NOOPLogger's
// implementation of the required interface function()
func (l logger) Println(v ...interface{}) {
	fmt.Println(append([]interface{}{l.prefix + ":"}, v...)...)
}

// Printf is the library provided NOOPLogger's
// implementation of the required interface function(){}
func (l logger) Printf(format string, v ...interface{}) {
	if len(format) > 0 && format[len(format)-1] != '\n' {
		format = format + "\n" // some log calls in paho do not add \n
	}
	fmt.Printf(l.prefix+":"+format, v...)
}

Expected behaviour
The autopaho RPC handler should be able to survive reconnects.

Potential solutions

Fixing this would require either:

  1. autopaho to have a mechanism to re-subscribe automatically to topics on re-connects, or
  2. at least some way to add more OnConnectionUp callbacks on the fly to an autopaho.ConnectionManager so that rpc.NewHandler can add a callback to the ConnectionManager to ensure that the response topic gets re-subscribed in the event of a disconnect and re-connect.

I am opening this issue here to get discussion of potential solutions started before I make a PR.

Personally I think number 1 would be more preferrable as I believe the most common usage pattern of autopaho would also benefit from this sort of re-subscribing mechanism. In my experiments I have made a crude version of this which I might refine a bit and send as a PR (but first I need to refine it and also sign the CLA). I can post it as a patch file in this issue if it would help discussion.

@MattBrittan
Copy link
Contributor

autopaho to have a mechanism to re-subscribe automatically to topics on re-connects, or

I'm somewhat reluctant to add this because it's something that can be handled fairly easily in the OnConnectionUp callback. Subscribing within OnConnectionUp is the recommended approach; this keeps things nice and clean. Adding code to autopaho that resubscribes will introduce a range of edge cases (e.g. what happens if the subscription fails? does the subscribe block until SUBACK received etc) and I'm not currently convinced that the benefits outweigh the additional complexity. Then again you might come up with a PR that addresses all issues in a simple manner.

at least some way to add more OnConnectionUp callbacks

Again this introduces complexity (potential deadlocks, change requested whilst callbacks are being called) and could lead to intermittent hard to locate bugs (often in the end users code). It also ties the user to one implementation which would probably lead to more options (e.g. are callbacks called in series or goroutines, what happens if the connection drops while callbacks are in progress etc).

An alternative approach would be to (very draft!):

  • Create a type ConnectionUpHandler interface with functions Add(h func(*ConnectionManager, *paho.Connack)) and Remove(h func(*ConnectionManager, *paho.Connack)) and accept an instance of the above in HandlerOpts. RPC would then register a callback which subscribes to the needed topics.
  • Create an implementation of ConnectionUpHandler that returns a func(*ConnectionManager, *paho.Connack) that can be assigned to ClientConfig.OnConnectionUp. The implementation would manage a cbs []func(*ConnectionManager, *paho.Connack) and call them all when triggered.

I think this would be a relatively simple approach (fairly easy for the end user to understand, no magic and easy for the user to replace if their use case differed - e.g. should callbacks be called in one goroutine or separate goroutines). A benefit (from my perspective) is that it would not require any changes to autopaho itself.

Note that I'm not saying no to your ideas; I'm just very cautions when it comes to adding options due to my experience managing the v3.1.1 client which is really complex due to options added over the years.

@akindestam
Copy link
Author

akindestam commented Mar 28, 2023

That's actually not too bad of an idea. I might go ahead and implement it this way!
Although it doesn't strike me as effectively being too different from my alternative 2 of turning the OnConnectionUp callbacks in ConnectionManager to a list of callbacks as the same sort of concurrency issues will have to be solved inside ConnectionUpManager itself.

It does do have the benefit of allowing you a simple OnConnectionUp callback (when not using ConnectionUpManager), and also as you say the ability for the end-user to perhaps use another type of ConnectionUpManager with semantics that they get called in goroutines for instance. However, perhaps the best benefit is that it is backwards-compatible and won't break anyones autopaho-using code except if they are using the rpc Handler (but this is arguably the currently broken part we are trying to fix anyway).

The downside is that it will be a slightly less convenient API for my use case, but I think can live with making my own helper func that sets up Router, ConnectionUpHandler, rpc Handler and ConnectionManager linking them all together and such, returning the Handler and ConnectionManager.

Where would this ConnectionUpManager belong in the source code tree if I go ahead and implement it? Directly under autopaho under extensions/rpc or under its own folder extensions/callbackmanager or something as it is not inherently tied to rpc.

What about generalizing it as a CallbackManager, since I think it might potentially be useful for other callbacks as well? (Although here I might possibly bump into limitations of golang typing itself)

@akindestam
Copy link
Author

akindestam commented Mar 28, 2023

Further question: What do you think about having cm.AwaitConnectionUp in the autopaho/extensions/rpc.Handler.Request and perhaps a few other places in there too? It would make sense in most use-cases to me, but perhaps it might limit the end-user?

@akindestam
Copy link
Author

akindestam commented Mar 28, 2023

Some shower thoughts later, I'm starting to find some potential issues:

Another particular issue we will see when going with the callback-approach -- opposed to having auto-resubscribe functionality built-in to autopaho -- is when creating the RPC handler it currently subscribes directly (it assumes connection is up). With subscribing on a callback we will have to figure out whether we are already up or not in the Handler.NewHandler, if we are up we need to call cm.Subscribe directly but if we are currently offline the subscription will be handled by the callback...

This might need a goroutine in the handler that subscribes immediately if it is currently up, but waits on connection. After this it would need to block until next reconnection (blocking even if currently online) before doing Subscribe again... Some channel like ConnectionManager.connUp but it doesn't get closed when we connect.

This makes me start thinking that it might be better to handle this complexity directly in autopaho with an auto-resubscribe funtionality or perhaps it can be made external in a similar way to the proposed ConnectionUpManager just that it is tailored to handling (re)subscriptions specifically. As a bonus, perhaps the -- let's say -- AutoSubscriptionManager could take a Router as well and handle both setting topic (re)subscriptions and setting up callbacks in the Router for a streamlined way of setting up subscriptions and handlers in one call. (Although that might limit its usability for some edge use cases, so I can also see a case for it taking care of subscription only)

@MattBrittan
Copy link
Contributor

What about generalizing it as a CallbackManager,
That is quite possible using generics (probably makes things slightly more complicated). However as this should only be around 10 lines of code I'm not sure its worth it.

when creating the RPC handler it currently subscribes directly (it assumes connection is up)

The best approach here will depend upon your use-case. In many cases you would want to wait until NewHandler completes successfully (meaning the connection is up) before progressing to other things (e.g. making RPC calls). As such I'm not sure how much of an issue this is (and you run the risk of implementing a solution that works for you but not for others so is best outside of the library).

having cm.AwaitConnectionUp
I'm not really sure what you mean by this. But ConnectionUpHandler this would be very easy to add a handler that does something whenever the connection comes up (e.g. emit a value to a channel).

As I said previously I'm not averse to looking at your proposed approach; however I just wanted to caution you that any changes to the library should add minimal complexity and ideally not force users to take a particular approach (that leads to the need for yet more options :-) ). Please do feel free to submit a PR and we can discuss at that point (but I don`t want you to do too much work without an understanding of how it will be assessed - really want to avoid a repeat of the 3.1.1 client...).

As a bonus, perhaps the -- let's say -- AutoSubscriptionManager could take a Router as well and handle both setting topic (re)subscriptions and setting up callbacks in the Router...

Be cautious with this; when operating at QOS1+/Cleansession=False subscriptions survive a disconnect/reconnect (e.g. application restart). A common issue with the 3.1.1. client is that users just call Subscribe not realising that messages could be arriving before the subscribe can be processed (because they are queued on the broker) so there is no handler in place (which can end up being fatal - e.g.). This is one reason I quite like the current approach of keeping the two things separate.

@akindestam
Copy link
Author

akindestam commented Apr 6, 2023

EDIT: See post after this one first

So I started out trying to make the auto-resubcription feature as a separate structure that could plug in to autopaho just to see where it would go and what issues I'd run into. The main issue I bumped into was that I'd need some way to execute code before connUp in ConnectionManager gets closed (and thus ConnectionManager.AwaitConnection(ctx) returns) to make my AwaitSubscribed function work without having a race condition.

The idea with the AwaitSubscribed function was to use it in rpc.Handler.Request to delay making any requests until we are certain that we have subscribed to the response topic (and even providing the ability to detect failures to subscribe).

Note that even a simpler solution to response topic re-subscription that ultimately relies on OnConnectionUp to re-subscribe to the response topic will have similar issues that we cannot reliably delay rpc.Handler.Request until the subscription is done. It is possible to reliably delay it on the first subscription only, which might be fine for QoS 1+ CleanStart=false, as the broker would know of our client and thus queue any responses until the subscribe actually finishes so any RPC requests sent before the subscribe finishes would eventually get their response.

I might make another go at fixing this using the simpler API you proposed. This would likely be the least messy, although it would kind of depend on #116 and CleanStart=false for reliable operation. The other way forward I see is that I continue down this path but with my original idea of implementing this auto-resubscribe feature in ConnectionManager itself so that AwaitSubscribed can be reliably implemented and we can be sure to make no RPC requests before the response topic is subscribed so that we would avoid losing responses even with CleanStart=true. A third might be to have some sort of OnConnectionDown hook in ConnectionManager that we can use to recreate the subFinished channels.

For reference this is the API and implementation I thus far came up with (a bit rough still, I mostly wrote it to sort my thoughts out so it's not complete, and the lock usage might not be correct or optimal yet):

package autosub

import (
	"context"
	"sync"

	"github.com/eclipse/paho.golang/autopaho"
	"github.com/eclipse/paho.golang/paho"
)

type subscribeNotification struct {
	ack *paho.Suback
	err error
}

type SubscriptionManagerOptions struct {
	ConnectionManager *autopaho.ConnectionManager
	Router paho.Router
	Debug paho.Debug
	Timeout time.Duration
}

type Subscription struct {
	sm SubscriptionManager
	suback subscribeNotification
	subFinished chan struct{}
	subscribe *paho.Subscribe
	mu sync.Mutex
}

type SubscriptionManager struct {
	opts SubscriptionManagerOptions
	// TODO: change to list instead?
	subscriptions map[string]Subscription
	mu sync.Mutex
}

type Subscribe struct {
	Topic string,
	Handler paho.MessageHandler
	paho.SubscribeOptions
	paho.SubscribeProperties
}

func NewSubscriptionManager(opts SubscriptionManagerOptions) (*SubscriptionManager) {
	if opts.Timeout == 0 {
		opts.Timeout = 10 * time.Second
	}
	if opts.Debug == nil {
		opts.Debug = paho.NOOPLogger{}
	}
	return &SubscriptionManager{opts: opts}
}

func (sm *SubscriptionManager) doSubscribe(sub *subscription, recreateChannel bool) {
	ctx, cancel = context.WithTimeout(context.Background(), sm.opts.timeout)
	defer cancel()

	if recreateChannel {
		sub.mu.Lock()
		sub.subFinished = make(chan struct{})
		sub.mu.Unlock()
	}

	suback, err = sm.opts.ConnectionManager.Subscribe(ctx, sub.subscribe)
	sub.suback = subscribeNotification{suback, err}
	if err != nil {
		sm.opts.Debug.Printf("subcribe returned error: %s\n", err)
	}
	close(sub.subFinished)
}

func (sm *SubscriptionManager) GetOnConnectionUpCallback() (func(*autopaho.ConnectionManager, *paho.Connack)) {
	return func(*autopaho.ConnectionManager, *paho.Connack) {
		sm.mu.Lock()
		defer sm.mu.Unlock()

		for _, sub := range sm.subscriptions {
			sm.doSubscribe(&sub, true)
		}
	}
}

func (sm *SubscriptionManager) AddSubscription(s *Subscribe) (subscription *Subscription) {
	sub := Subscription{
		sm: sm,
		subFinished: make(chan struct{}),
		subscribe: &paho.Subscribe{
			Properties: s.SubscribeProperties,
			Subscriptions: map[string]paho.SubscribeOptions{
				s.Topic: s.SubscribeOptions
			},
		},
	}

	go func(){
		sm.router.RegisterHandler(topic, handler)

		ctx, cancel = context.WithTimeout(context.Background(), sm.opts.timeout)
		defer cancel()
		sm.opts.ConnectionManager.AwaitConnection(ctx)
		sm.doSubscribe(&sub, false)

		sm.mu.Lock()
		sm.subscriptions[s.Topic] = sub
		sm.mu.Unlock()
	}()

	return sub
}

func (s *Subscription) AwaitSubscribed(ctx context.Context) error {
	err := s.sm.opts.ConnectionManager.AwaitConnection(ctx)
	if err != nil {
		return err
	}
	// FIXME: race here since AwaitConnection will return before the OnConnectionUp handler gets called
	
	s.sm.mu.Lock()
	s.mu.Lock()
	ch := s.subFinished
	s.mu.Unlock()
	s.sm.mu.Unlock()

	select {
	case <-ch:
		return nil
	case <-ctx.Done():
		return ctx.Err()
	}
}

func (s *Subscription) GetSubAck(ctx context.Context) (*paho.Suback, error) {
	s.AwaitSubscribed(ctx)
	return s.suback.ack, s.suback.err
}

@akindestam
Copy link
Author

akindestam commented Apr 6, 2023

Another route entirely: Remove the handling of subscribing to the response topic from autopaho/extensions/rpc.Handler and have the user handle subscription to the response topic themselves (they would have to make sure to subscribe to it in OnConnectionUp).
This would not need any particular magic auto-re-subscription mechanism and it would be the least LoC solution to the problem and lets the user select what QoS they want for the response topic as well, at the expense of a slightly less convenient API for the RPC handler.

Problem: Need to make sure RPC response handler is in router before subscription. Meaning likely the RPC handler will not be handling this as well, leaving it up to the user to register this too.

Likely means we end up with some circular reference fun that needs to be worked around. This ends up being a bit inelegant.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants