From c6321f0b1e31492c2918d7cf0af273d2302d6dad Mon Sep 17 00:00:00 2001 From: cecile Date: Thu, 11 Nov 2021 09:43:55 +0100 Subject: [PATCH 1/2] Add tcp option to set keep alive value on the tcp layer Signed-off-by: cecile --- client.go | 2 +- netconn.go | 13 +++++++------ options.go | 8 ++++++++ tcp.go | 7 +++++++ 4 files changed, 23 insertions(+), 7 deletions(-) create mode 100644 tcp.go diff --git a/client.go b/client.go index 11d04de1..edd8c0ce 100644 --- a/client.go +++ b/client.go @@ -393,7 +393,7 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) { tlsCfg = c.options.OnConnectAttempt(broker, c.options.TLSConfig) } // Start by opening the network connection (tcp, tls, ws) etc - conn, err = openConnection(broker, tlsCfg, c.options.ConnectTimeout, c.options.HTTPHeaders, c.options.WebsocketOptions) + conn, err = openConnection(broker, tlsCfg, c.options.ConnectTimeout, c.options.HTTPHeaders, c.options.WebsocketOptions, c.options.TcpOptions) if err != nil { ERROR.Println(CLI, err.Error()) WARN.Println(CLI, "failed to connect to broker, trying next") diff --git a/netconn.go b/netconn.go index c1238496..fc2b6f49 100644 --- a/netconn.go +++ b/netconn.go @@ -37,7 +37,7 @@ import ( // openConnection opens a network connection using the protocol indicated in the URL. // Does not carry out any MQTT specific handshakes. -func openConnection(uri *url.URL, tlsc *tls.Config, timeout time.Duration, headers http.Header, websocketOptions *WebsocketOptions) (net.Conn, error) { +func openConnection(uri *url.URL, tlsc *tls.Config, timeout time.Duration, headers http.Header, websocketOptions *WebsocketOptions, tcpOptions *TcpOptions) (net.Conn, error) { switch uri.Scheme { case "ws": conn, err := NewWebsocket(uri.String(), nil, timeout, headers, websocketOptions) @@ -48,7 +48,8 @@ func openConnection(uri *url.URL, tlsc *tls.Config, timeout time.Duration, heade case "mqtt", "tcp": allProxy := os.Getenv("all_proxy") if len(allProxy) == 0 { - conn, err := net.DialTimeout("tcp", uri.Host, timeout) + dialer := net.Dialer{Timeout: timeout, KeepAlive: tcpOptions.KeepAlive} + conn, err := dialer.Dial("tcp", uri.Host) if err != nil { return nil, err } @@ -67,10 +68,11 @@ func openConnection(uri *url.URL, tlsc *tls.Config, timeout time.Duration, heade // this check is preserved for compatibility with older versions // which used uri.Host only (it works for local paths, e.g. unix://socket.sock in current dir) + dialer := net.Dialer{Timeout: timeout, KeepAlive: tcpOptions.KeepAlive} if len(uri.Host) > 0 { - conn, err = net.DialTimeout("unix", uri.Host, timeout) + conn, err = dialer.Dial("unix", uri.Host) } else { - conn, err = net.DialTimeout("unix", uri.Path, timeout) + conn, err = dialer.Dial("unix", uri.Path) } if err != nil { @@ -80,14 +82,13 @@ func openConnection(uri *url.URL, tlsc *tls.Config, timeout time.Duration, heade case "ssl", "tls", "mqtts", "mqtt+ssl", "tcps": allProxy := os.Getenv("all_proxy") if len(allProxy) == 0 { - conn, err := tls.DialWithDialer(&net.Dialer{Timeout: timeout}, "tcp", uri.Host, tlsc) + conn, err := tls.DialWithDialer(&net.Dialer{Timeout: timeout, KeepAlive: tcpOptions.KeepAlive}, "tcp", uri.Host, tlsc) if err != nil { return nil, err } return conn, nil } proxyDialer := proxy.FromEnvironment() - conn, err := proxyDialer.Dial("tcp", uri.Host) if err != nil { return nil, err diff --git a/options.go b/options.go index d7a24e42..c7fb218e 100644 --- a/options.go +++ b/options.go @@ -96,6 +96,7 @@ type ClientOptions struct { HTTPHeaders http.Header WebsocketOptions *WebsocketOptions MaxResumePubInFlight int // // 0 = no limit; otherwise this is the maximum simultaneous messages sent while resuming + TcpOptions *TcpOptions } // NewClientOptions will create a new ClientClientOptions type with some @@ -137,6 +138,7 @@ func NewClientOptions() *ClientOptions { ResumeSubs: false, HTTPHeaders: make(map[string][]string), WebsocketOptions: &WebsocketOptions{}, + TcpOptions: &TcpOptions{}, } return o } @@ -419,3 +421,9 @@ func (o *ClientOptions) SetMaxResumePubInFlight(MaxResumePubInFlight int) *Clien o.MaxResumePubInFlight = MaxResumePubInFlight return o } + +// SetTcpOptions sets the additional tcp options used in a tcp connection +func (o *ClientOptions) SetTcpOptions(w *TcpOptions) *ClientOptions { + o.TcpOptions = w + return o +} diff --git a/tcp.go b/tcp.go new file mode 100644 index 00000000..5ec12ffd --- /dev/null +++ b/tcp.go @@ -0,0 +1,7 @@ +package mqtt + +import "time" + +type TcpOptions struct { + KeepAlive time.Duration +} From 6934fb5d37ccb799209c94a673224f6f29f13e59 Mon Sep 17 00:00:00 2001 From: cecile Date: Wed, 17 Nov 2021 12:40:22 +0100 Subject: [PATCH 2/2] use net.Dialer directly into ClientOptions package. --- client.go | 2 +- netconn.go | 6 ++---- options.go | 12 +++++++----- tcp.go | 7 ------- 4 files changed, 10 insertions(+), 17 deletions(-) delete mode 100644 tcp.go diff --git a/client.go b/client.go index edd8c0ce..ef8e4b7a 100644 --- a/client.go +++ b/client.go @@ -393,7 +393,7 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) { tlsCfg = c.options.OnConnectAttempt(broker, c.options.TLSConfig) } // Start by opening the network connection (tcp, tls, ws) etc - conn, err = openConnection(broker, tlsCfg, c.options.ConnectTimeout, c.options.HTTPHeaders, c.options.WebsocketOptions, c.options.TcpOptions) + conn, err = openConnection(broker, tlsCfg, c.options.ConnectTimeout, c.options.HTTPHeaders, c.options.WebsocketOptions, c.options.Dialer) if err != nil { ERROR.Println(CLI, err.Error()) WARN.Println(CLI, "failed to connect to broker, trying next") diff --git a/netconn.go b/netconn.go index fc2b6f49..7e3899e9 100644 --- a/netconn.go +++ b/netconn.go @@ -37,7 +37,7 @@ import ( // openConnection opens a network connection using the protocol indicated in the URL. // Does not carry out any MQTT specific handshakes. -func openConnection(uri *url.URL, tlsc *tls.Config, timeout time.Duration, headers http.Header, websocketOptions *WebsocketOptions, tcpOptions *TcpOptions) (net.Conn, error) { +func openConnection(uri *url.URL, tlsc *tls.Config, timeout time.Duration, headers http.Header, websocketOptions *WebsocketOptions, dialer *net.Dialer) (net.Conn, error) { switch uri.Scheme { case "ws": conn, err := NewWebsocket(uri.String(), nil, timeout, headers, websocketOptions) @@ -48,7 +48,6 @@ func openConnection(uri *url.URL, tlsc *tls.Config, timeout time.Duration, heade case "mqtt", "tcp": allProxy := os.Getenv("all_proxy") if len(allProxy) == 0 { - dialer := net.Dialer{Timeout: timeout, KeepAlive: tcpOptions.KeepAlive} conn, err := dialer.Dial("tcp", uri.Host) if err != nil { return nil, err @@ -68,7 +67,6 @@ func openConnection(uri *url.URL, tlsc *tls.Config, timeout time.Duration, heade // this check is preserved for compatibility with older versions // which used uri.Host only (it works for local paths, e.g. unix://socket.sock in current dir) - dialer := net.Dialer{Timeout: timeout, KeepAlive: tcpOptions.KeepAlive} if len(uri.Host) > 0 { conn, err = dialer.Dial("unix", uri.Host) } else { @@ -82,7 +80,7 @@ func openConnection(uri *url.URL, tlsc *tls.Config, timeout time.Duration, heade case "ssl", "tls", "mqtts", "mqtt+ssl", "tcps": allProxy := os.Getenv("all_proxy") if len(allProxy) == 0 { - conn, err := tls.DialWithDialer(&net.Dialer{Timeout: timeout, KeepAlive: tcpOptions.KeepAlive}, "tcp", uri.Host, tlsc) + conn, err := tls.DialWithDialer(dialer, "tcp", uri.Host, tlsc) if err != nil { return nil, err } diff --git a/options.go b/options.go index c7fb218e..e745258d 100644 --- a/options.go +++ b/options.go @@ -23,6 +23,7 @@ package mqtt import ( "crypto/tls" + "net" "net/http" "net/url" "strings" @@ -96,7 +97,7 @@ type ClientOptions struct { HTTPHeaders http.Header WebsocketOptions *WebsocketOptions MaxResumePubInFlight int // // 0 = no limit; otherwise this is the maximum simultaneous messages sent while resuming - TcpOptions *TcpOptions + Dialer *net.Dialer } // NewClientOptions will create a new ClientClientOptions type with some @@ -138,7 +139,7 @@ func NewClientOptions() *ClientOptions { ResumeSubs: false, HTTPHeaders: make(map[string][]string), WebsocketOptions: &WebsocketOptions{}, - TcpOptions: &TcpOptions{}, + Dialer: &net.Dialer{Timeout: 30 * time.Second}, } return o } @@ -357,6 +358,7 @@ func (o *ClientOptions) SetWriteTimeout(t time.Duration) *ClientOptions { // Default 30 seconds. Currently only operational on TCP/TLS connections. func (o *ClientOptions) SetConnectTimeout(t time.Duration) *ClientOptions { o.ConnectTimeout = t + o.Dialer.Timeout = t return o } @@ -422,8 +424,8 @@ func (o *ClientOptions) SetMaxResumePubInFlight(MaxResumePubInFlight int) *Clien return o } -// SetTcpOptions sets the additional tcp options used in a tcp connection -func (o *ClientOptions) SetTcpOptions(w *TcpOptions) *ClientOptions { - o.TcpOptions = w +// SetDialer sets the tcp dialer options used in a tcp connection +func (o *ClientOptions) SetDialer(dialer *net.Dialer) *ClientOptions { + o.Dialer = dialer return o } diff --git a/tcp.go b/tcp.go deleted file mode 100644 index 5ec12ffd..00000000 --- a/tcp.go +++ /dev/null @@ -1,7 +0,0 @@ -package mqtt - -import "time" - -type TcpOptions struct { - KeepAlive time.Duration -}