-
Notifications
You must be signed in to change notification settings - Fork 94
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/implement autopaho rpc #98
Feature/implement autopaho rpc #98
Conversation
…thread safe manner
autopaho/cmd/rpc/.env
Outdated
export subdemo_connectRetryDelay=10000 | ||
export subdemo_writeToStdout="true" | ||
export subdemo_writeToDisk="false" | ||
export subdemo_OutputFile="/binds/receivedMessages.txt" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you mean to leave the .env file in the PR? If so I don't think a default like this starting from / is appropriate, even if it's not used by default
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did, I thought it may be useful for the RPC example. I've removed it now
autopaho/auto.go
Outdated
@@ -326,3 +326,13 @@ func (c *ConnectionManager) Publish(ctx context.Context, p *paho.Publish) (*paho | |||
} | |||
return cli.Publish(ctx, p) | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exported functions need a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function has now been removed
The changes to the paho part are minor and fine. I'm not sure what @MattBrittan might think about the UseClient() function you've added to the autopaho client, if he's ok with the principle of it I might suggest that it needs a better name (UsingClient(), UsingRawClient(), something like to make it clearer) and it needs a comment to go with the function for the generated docs. On the question at the end of the earlier PR;
I don't consider it unstable, it's had quite a lot of use but the v1 release implies a stability of the API that I've been reticent to commit to. There are issues with the API design of the V3 client I originally worked on that we're now rather stuck with and in hindsight weren't great choices. I've been holding off for a long time for more feedback on the API design of the V5 client but maybe no complaints is the all the feedback I need. |
I can't say that I'm keen on this (especially without a fairly detailed comment!) because it's something that could easily be misused (I feel that if users need advanced functionality then they should use
I would really like to see persistence implemented before this hits V1. While we may be able to add this without changing the API (I'm not yet sure!) it's likely to be the most major change needed and the fact it's lacking this really limits where the client can be used (I was a little concerned to see the implementation in telegraph because users are not going to be aware of the implications of using V5 without persistence). |
@alsm @MattBrittan I have now removed the UseClient function as you suggested. I was previously using this so that I could update the "c.cli.Router" in autopaho.ConnectionManager to register handlers. Instead I now initialise the router and register my handlers before initialising the ConnectionManager and passing it through the config as you suggested. The only other change I have done is in auto.go where I pass the previously connected client's router to the new client so that the registered handlers are persisted through reconnects. That's great to hear this project is considered stable, as I mentioned the team I am in is currently working on replacing our existing RPC mechanism we use in production. @MattBrittan What needs to be done to complete the persistence feature? How much is already implemented? This is a feature we are keen to have and could potentially aid in development if useful. |
autopaho/auto.go
Outdated
@@ -207,8 +207,13 @@ func NewConnection(ctx context.Context, cfg ClientConfig) (*ConnectionManager, e | |||
if cli == nil { | |||
break mainLoop // Only occurs when context is cancelled | |||
} | |||
r := c.cli.Router // Get a reference to the previous client's Router |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess something like this is needed if ClientConfig.Router
is nil
- good catch; I had not considered that the temporary router would be thrown away in that case. However I wonder if a better solution would be to change NewConnection
to do:
if cfg.Router == nil { // Ensure router will survive reconnections
cfg.Router = paho.NewStandardRouter()
}
I'm not going to hold up your PR because of this but it feels cleaner to me and avoids creating then throwing away a router (I am being picky but think there is a potential, very unlikely to actually occur, race with the first packet from the broker and have been bitten by similar things in the past).
I have no issues with this being accepted but have made one suggestion (which would be nice to get into the finished code).
The time consuming thing is likely to be working out the architecture. I have put some thought into this in the past but did not come up with a design (really need to dedicate some time to thinking it through). Adding persistence would be relatively easy, but doing so in a way which does not contaminate the nice, clean, code that @alsm has written is the challenge. Note that whilst I say it's relatively easy it is also highly concurrent which is a challenge in itself (have seen a lot of subtle bugs in the V3 client caused by goroutine interactions). |
autopaho/auto.go
Outdated
c.mu.Lock() | ||
c.cli = cli | ||
if r != nil { | ||
if cfg.Router != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if you are implementing the change I suggested but Sorry - I meant before the cliCfg := cfg
. That way it will get used for every connection via:
establishBrokerConnection ->
paho.NewClient ->
c := &Client{ ... ClientConfig: conf, ...
(there would then be no need to set c.cli.Router
directly). It would make sense to do this at the top of the function (as we check/edit other parts of the config struct there = e.g. something like
if cfg.ConnectTimeout == 0 {
cfg.ConnectTimeout = 10 * time.Second
}
if cfg.Router == nil { // Important that the router survives reconnections
cfg.Router = paho.NewStandardRouter()
}
Sorry - it's late here so just had a really quick look so apologies if I misunderstood your intent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@MattBrittan I've just updated the PR, I did not realise that the cfg being passed to the NewClient would then persist the router, so there is no need to assign it explicitly as you mentioned. Please let let me know of any other changes you would like :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is actually something that should be changed (not necessarily as part of your PR). If the user passes a nil
Router then the router will be recreated every-time we reconnect which is likely to be unexpected. I have no issues with this change (its basically just introducing a new example now). @alsm are you happy with the changes to the paho
rpc demo and router_test
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@MattBrittan yeah I'm happy with the rpc changes. Sorry I didn't respond quicker, I'm away on vacation in Norway atm and will have intermittent connectivity for another week.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @alsm - I have gone ahead and merged. Suggested a couple of minor changes but they are not essential and can be handled in another PR (this one has been open long enough). I hope you have a great break!
return resp, nil | ||
select { | ||
case <-ctx.Done(): | ||
return nil, fmt.Errorf("context ended") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better to return ctx.Error()
here (its fairly common to check for context errors).
@@ -68,7 +68,7 @@ func (h *Handler) Request(pb *paho.Publish) (*paho.Publish, error) { | |||
pb.Properties.ResponseTopic = fmt.Sprintf("%s/responses", h.c.ClientID) | |||
pb.Retain = false | |||
|
|||
_, err := h.c.Publish(context.Background(), pb) | |||
_, err := h.c.Publish(ctx, pb) | |||
if err != nil { | |||
return nil, err | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should really make the same change here as you have in AutoPaho (stop trying to receive if ctx.Done()).
Thanks very much - I made a few comments but have accepted the PR because it looks pretty good and I'm conscious of the fact that it's been open a while!. |
@MattBrittan Apologies I was on holiday the last week. Thanks for your comments, they look like very useful changes. I'll submit another PR to make these updates soon |
closes #92 #97
This PR supersedes my last 2 PRs since I did not want to try rewrite the git history of the branch.
The PR fixes the unit test for router_test and implements the RPC extension for autopaho. In addition to this I have replaced several un safe uses of context.Background and allowed the caller to provide a context to the function. The most significant issue was the rpc.Handler.Request function, in which the go routine could block indefinitely waiting for a response from the server.