Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement autopaho.ClientManager with rpc extension #95

Closed
wants to merge 130 commits into from

Conversation

bkneis
Copy link
Contributor

@bkneis bkneis commented Aug 1, 2022

closes #92

This PR wraps the paho.Client and autopaho.ClientManager structs in a common interface PubSubClient. The paho extension RPC has then been updated to use this interface so that we can use either paho.Client or autopaho.ClientManager for RPC applications.

An example has been added to paho's cmd directory that uses rpc.Handler using a autopaho.ClientManager, as opposed to cmd/rpc that uses the paho.client.

Note that the autopaho.ClientManager also needed updating. After a client disconnects it is re initialised with a new paho.StandardRouter. Instead we now check if this router was not nil, if so we copy the existing router to the new client so that the RPC handlers are still registered.

Because the autopaho.ClientManager wraps the reference of the paho.Client with a mutex, we use a function variable wrapping the client's router in autopaho.ClientManager.UseRouter instead of returning a reference of the router to the caller. Since this memory access would then not be synchronised.

eclipsewebmaster and others added 30 commits November 1, 2018 11:41
I wasn't stopping the pingHandler when the client lost its connection.
I had left some properties as a write rather than writeBinary so they
couldn't be unpacked properly

Also have a working extension for request/response with demo
Add extension for Request/Response
Adding tests are overdue and it's showing up the areas that I haven't
got right, especially in the packets library

eclipse-paho#5
Don't add 0 byte buffers to net.Buffers
More tests, fixing issues with will message
Move to using config struct for client options
The call to the OnDisconnect handler was scheduled before the disconnect
process had completed, because the user may legitimately want to call
Connect() again in this handler don't call it until disconnected.
Fix tests and don't call OnDisconnect early
Cleanup client if fail to Connect
More tests and clean up Disconnection
As reported in eclipse-paho#19 the cleanup function in Connect will attempt to Stop
the PingHandler, as it happens the PingHandler will not have been
started at any point before when cleanup() may be called. However
Incoming() will have been started and will also attempt to Stop()
PingHandler which in the issue given will still have a nil channel
Also I've added some more error checks.
Check the ping channel is not nil in Stop()
alsm and others added 11 commits October 22, 2021 08:53
add string methods to all the packet definitions in the packets library

eclipse-paho#75
Not all net.Conn implementations are thread safe for net.Buffers.WriteTo method
because they don't implement private net.writeBuffers interface.

To fix this, we check if io.Writer in ControlPacket.WriteTo implements
sync.Locker, call Lock() before writing to connection and defer Unlock().
This requirement can be satisfied by embedding sync.Mutex as sync.Locker in
net.Conn wrapper struct.

Introduced a helper function NewThreadSafeConn to wrap net.Conn and fixed the
potential race conditions in autopaho for TLS and websocket connections (since
ControlPacket.WriteTo is the only method writing to net.Conn, we can safely
replace Write lock with sync.Locker called in WriteTo).
Introduce threadSafeConn wrapper for thead-safe writing to net.Conn
@bkneis bkneis changed the title Feature/improvements Implement autopaho.ClientManager with rpc extension Aug 1, 2022
Copy link
Contributor

@alsm alsm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your PR, I think there are useful ideas here but I disagree with the design decisions, additionally without signing the ECA I could not accept this

autopaho/auto.go Outdated
func (c *ConnectionManager) UseRouter(fn func(paho.Router) error) error {
c.mu.Lock()
cli := c.cli
c.mu.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean to have this a deferred unlock of the mutex?

Copy link
Contributor Author

@bkneis bkneis Aug 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the other functions in the autopaho.ClientManager followed this pattern and they needed to synchronise access to the client too, so I followed suit. It works since a copy by value occurs assigning a new instance of the paho.Client to the local variable cli, then releasing the lock protecting the client. This local instance of the client will then be used and cleaned up by the GC once the function returns.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, after investigating further the assignment of the local cli variable will be a copy of the pointer's address. Therefore mutating cli will change c.cli. That means the existing pattern for accessing the client is prone to race conditions if the autopaho.ConnectionManager accesses the client while a reference is used after the unlock. I'll update this PR to defer the unlock for all functions in the connection manager accessing the client.

autopaho/auto.go Outdated
func (c *ConnectionManager) GetClientID() string {
c.mu.Lock()
cli := c.cli
c.mu.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above, should this be deferred

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

paho/client.go Outdated
@@ -27,6 +27,22 @@ var (
ErrManualAcknowledgmentDisabled = errors.New("manual acknowledgments disabled")
)

type Publisher interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per the Go Code Review Comments https://github.com/golang/go/wiki/CodeReviewComments#interfaces I disagree with defining these interfaces in this package

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, I have now moved to the package using them (paho/extensions/rpc)

@bkneis
Copy link
Contributor Author

bkneis commented Aug 4, 2022

Hi @alsm thanks for getting back to me on this PR. I wanted to introduce myself, I work for a company currently planning on using this client to migrate our existing RPC server over to MQTTv5. I'm anticipating working on this client with the rest of my team and plan on bringing in some improvements to the project in the near future. Our next objective was to develop a test harness to perform some functional tests under moderate load.

In regards to this PR, I am trying to fix my eclipse account. I had an old email on my github when I created the PR, I've updated now but the validation keeps using the old address. I may have to close and re open this PR as I can't see a way to delete / create my eclipse account. I'll add another comment once I fix this.

Other than moving the interfaces which is now done and the deferring of the mutex unlock, was there any other design issues you had before we could merge?

@MattBrittan
Copy link
Contributor

MattBrittan commented Aug 4, 2022

Just an FYI - The way the Mutex is used in autopaho was intentional and I don't believe there are any race conditions in the existing code (but can always be proved wrong!). The idea is that the Mutex is preventing simultaneous reads/writes of the c.cli variable. It is NOT attempting to prevent simultaneous use of c.cli (we just take a copy first to avoid the race); the following sequence of events is quite possible (intentionally):

  1. Call to Subscribe takes a copy of c.cli
  2. mainLoop completes the reconnection process and replaces c.cli with a new client
  3. Subscribe passes message to old paho.Client (will fail as that clients connection is down)

While using defer is a frequently used idiom I don't think it's appropriate here because it disguises the intention of the Mutex. This leads to unexpected changes in functionality (i.e. current master allows multiple simultaneous Subscribe calls; your change prevents this). Note that there is also a related bug - AwaitConnection - This is used to wait for a connection; by using defer c.mu.Unlock() you are preventing said connection from being completed (this would require a lock on the Mutex - see here). I'm really keen to avoid the trap the v3 client fell into (tried to do too much and it became difficult to make changes without introducing deadlocks or races).

I have some concerns with the direction this change takes us (not sure if this matches with ALSM's comment re "design decisions"). paho is a low level MQTT client, autopaho aims to make things as simple as possible for the developer (higher level, reduced functionality). The designs differ due to the different intended uses. Adding a common interface over the top seems like a good idea but I feel it's likely to be constraining if we need to maintain that "contract". For example UseRouter is likely to be confusing with autopaho because if the connection drops, and is re-established, the router will no longer be linked with the live connection (so gets no messages).

Sorry - should add a welcome! Please don't take the above as a criticism; I'm just keen to ensure any changes are well thought out (currently redesigning the handling of the connection status in the v3 client and it's not fun!). After completing that I really want to look at persistence here but that is going to be a fairly major change (that I've been meaning to get to for over a year!).

@alsm
Copy link
Contributor

alsm commented Aug 7, 2022

It's awesome that you're looking to use this client and especially that you're looking to take advantage of the v5 features to do RPC. It's always gratifying to see projects you work on get used.

To echo @MattBrittan paho and autopaho are designed to be different. I wrote the paho client with the intention of supporting as much of the specification as possible allowing maximum flexibility and customisation with the tradeoff of being not as user friendly. Autopaho (which Matt wrote so if I say anything incorrect about it, apologies) was designed to ease the most common use cases for an MQTT client and add features like auto reconnection. As such it's not appropriate for them to try and meet a shared interface and I wouldn't want to take changes like that, for example the GetClientID() function is unnecessary in the paho client, the struct member is public and it is not idiomatic with the rest of the client to have a helper function for it.

I'd suggest you look at the features of both clients and decide which would make more sense for you to build around and then we can see how that client meets your needs.

@bkneis
Copy link
Contributor Author

bkneis commented Aug 8, 2022

@alsm @MattBrittan Thanks for clarifying all of this with me. OK I see the difference you are trying to establish between the 2 packages. Could I update my PR then to move the RPC extension into the autopaho package? And assume the rpc.Handler is going to be provided a autopaho.ConnectionManager instead of an paho.Client? If this is not suitable can I replicate the extension to the autopaho package? So both paho and autopaho have a RPC extension that works with there corresponding interface?

I really think this is imperative for the RPC extension to be useful for real world use cases. MQTT clients are likely to be deployed in the field with poor internet so auto re connection for an RPC client is a must. It would be cumbersome to have the caller deal with disconnects and reassign the rpc.Handler to the new client.

Also I noticed the git tag of the repo still uses v0.X, is this project still considered unstable? Is a v1 release expected anytime soon? If so what would need to be implemented for this?

@autumnfound
Copy link

@bkneis EF webdev rep here! Your PR is failing the ECA check as the email that is set in the email field of your commits points to your old address instead of the new one. We read in the address that is set into commits, look it up in our system for an ECA, and validate it that way. To resolve this, you'll have to amend your existing commits to update the email, or alternatively squash and update the one new commits tracked email address.

While we do have bindings for Github ID in our systems, that is for setting your permissions and isn't present when making validation requests to the server. To prevent this from happening in the future, be sure to update your local Git settings as is defined as a requirement in the Eclipse Handbook. More instructions from Github are available as well

@bkneis
Copy link
Contributor Author

bkneis commented Aug 9, 2022

@autumnfound That was exactly the issue! Thank you very much, I have created another PR now with the updated email address at Superseded by #98

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement RPC extension using autopaho.ClientManager instead of paho.Client
9 participants