From 3e3ea3f82edd4c5d906dc7fc0d1702be4a91b4b0 Mon Sep 17 00:00:00 2001 From: "yang.zhang4" Date: Fri, 19 Jul 2019 10:09:00 +0800 Subject: [PATCH] add MsgSendTimeout as a connection option, it specifies maximum blocking time for calling the Conn.Send function. --- conn.go | 19 ++++++++----------- conn_options.go | 17 +++++++++++++++++ errors.go | 2 +- 3 files changed, 26 insertions(+), 12 deletions(-) diff --git a/conn.go b/conn.go index cc8f85b..9de9a74 100644 --- a/conn.go +++ b/conn.go @@ -17,7 +17,7 @@ import ( const DefaultHeartBeatError = 5 * time.Second // Default timeout of calling Conn.Send function -const DefaultSendTimeout = 10 * time.Second +const DefaultMsgSendTimeout = 10 * time.Second // A Conn is a connection to a STOMP server. Create a Conn using either // the Dial or Connect function. @@ -30,6 +30,7 @@ type Conn struct { server string readTimeout time.Duration writeTimeout time.Duration + msgSendTimeout time.Duration hbGracePeriodMultiplier float64 closed bool closeMutex *sync.Mutex @@ -175,6 +176,8 @@ func Connect(conn io.ReadWriteCloser, opts ...func(*Conn) error) (*Conn, error) } } + c.msgSendTimeout = options.MsgSendTimeout + // TODO(jpj): make any non-standard headers in the CONNECTED // frame available. This could be implemented as: // (a) a callback function supplied as an option; or @@ -424,11 +427,6 @@ func (c *Conn) MustDisconnect() error { // Any number of options can be specified in opts. See the examples for usage. Options include whether // to receive a RECEIPT, should the content-length be suppressed, and sending custom header entries. func (c *Conn) Send(destination, contentType string, body []byte, opts ...func(*frame.Frame) error) error { - return c.SendWithTimeout(destination, contentType, body, DefaultSendTimeout, opts...) -} - -//like Send, but with timeout -func (c *Conn) SendWithTimeout(destination, contentType string, body []byte, timeout time.Duration, opts ...func(*frame.Frame) error) error { c.closeMutex.Lock() defer c.closeMutex.Unlock() if c.closed { @@ -447,8 +445,7 @@ func (c *Conn) SendWithTimeout(destination, contentType string, body []byte, tim C: make(chan *frame.Frame), } - //c.writeCh <- request - err := sendDataToWriteChWithTimeout(c.writeCh, request, timeout) + err := sendDataToWriteChWithTimeout(c.writeCh, request, c.msgSendTimeout) if err != nil { return err } @@ -459,8 +456,8 @@ func (c *Conn) SendWithTimeout(destination, contentType string, body []byte, tim } else { // no receipt required request := writeRequest{Frame: f} - //c.writeCh <- request - err := sendDataToWriteChWithTimeout(c.writeCh, request, timeout) + + err := sendDataToWriteChWithTimeout(c.writeCh, request, c.msgSendTimeout) if err != nil { return err } @@ -478,7 +475,7 @@ func sendDataToWriteChWithTimeout(ch chan writeRequest, request writeRequest, ti timer := time.NewTimer(timeout) select { case <-timer.C: - return ErrSendTimeout + return ErrMsgSendTimeout case ch <- request: timer.Stop() return nil diff --git a/conn_options.go b/conn_options.go index 0e0aed0..86020b8 100644 --- a/conn_options.go +++ b/conn_options.go @@ -16,6 +16,7 @@ type connOptions struct { ReadTimeout time.Duration WriteTimeout time.Duration HeartBeatError time.Duration + MsgSendTimeout time.Duration HeartBeatGracePeriodMultiplier float64 Login, Passcode string AcceptVersions []string @@ -30,6 +31,7 @@ func newConnOptions(conn *Conn, opts []func(*Conn) error) (*connOptions, error) WriteTimeout: time.Minute, HeartBeatGracePeriodMultiplier: 1.0, HeartBeatError: DefaultHeartBeatError, + MsgSendTimeout: DefaultMsgSendTimeout, } // This is a slight of hand, attach the options to the Conn long @@ -127,6 +129,14 @@ var ConnOpt struct { // shorter time duration during unit testing. HeartBeatError func(errorTimeout time.Duration) func(*Conn) error + // MsgSendTimeout is a connect option that allows the client to specify + // the timeout for the Conn.Send function. + // The msgSendTimeout parameter specifies maximum blocking time for calling + // the Conn.Send function. + // If not specified, this option defaults to 10 seconds. + // Less than or equal to zero means infinite + MsgSendTimeout func(msgSendTimeout time.Duration) func(*Conn) error + // HeartBeatGracePeriodMultiplier is used to calculate the effective read heart-beat timeout // the broker will enforce for each client’s connection. The multiplier is applied to // the read-timeout interval the client specifies in its CONNECT frame @@ -196,6 +206,13 @@ func init() { } } + ConnOpt.MsgSendTimeout = func(msgSendTimeout time.Duration) func(*Conn) error { + return func(c *Conn) error { + c.options.MsgSendTimeout = msgSendTimeout + return nil + } + } + ConnOpt.HeartBeatGracePeriodMultiplier = func(multiplier float64) func(*Conn) error { return func(c *Conn) error { c.options.HeartBeatGracePeriodMultiplier = multiplier diff --git a/errors.go b/errors.go index c61c8e3..7fb3f56 100644 --- a/errors.go +++ b/errors.go @@ -16,7 +16,7 @@ var ( ErrCompletedSubscription = newErrorMessage("subscription is unsubscribed") ErrClosedUnexpectedly = newErrorMessage("connection closed unexpectedly") ErrAlreadyClosed = newErrorMessage("connection already closed") - ErrSendTimeout = newErrorMessage("send timeout") + ErrMsgSendTimeout = newErrorMessage("msg send timeout") ErrNilOption = newErrorMessage("nil option") )