Skip to content

Commit

Permalink
add MsgSendTimeout as a connection option, it specifies maximum block…
Browse files Browse the repository at this point in the history
…ing time for calling the Conn.Send function.
  • Loading branch information
zhangyang0108 committed Jul 19, 2019
1 parent f75aff2 commit 3e3ea3f
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 12 deletions.
19 changes: 8 additions & 11 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
const DefaultHeartBeatError = 5 * time.Second

// Default timeout of calling Conn.Send function
const DefaultSendTimeout = 10 * time.Second
const DefaultMsgSendTimeout = 10 * time.Second

// A Conn is a connection to a STOMP server. Create a Conn using either
// the Dial or Connect function.
Expand All @@ -30,6 +30,7 @@ type Conn struct {
server string
readTimeout time.Duration
writeTimeout time.Duration
msgSendTimeout time.Duration
hbGracePeriodMultiplier float64
closed bool
closeMutex *sync.Mutex
Expand Down Expand Up @@ -175,6 +176,8 @@ func Connect(conn io.ReadWriteCloser, opts ...func(*Conn) error) (*Conn, error)
}
}

c.msgSendTimeout = options.MsgSendTimeout

// TODO(jpj): make any non-standard headers in the CONNECTED
// frame available. This could be implemented as:
// (a) a callback function supplied as an option; or
Expand Down Expand Up @@ -424,11 +427,6 @@ func (c *Conn) MustDisconnect() error {
// Any number of options can be specified in opts. See the examples for usage. Options include whether
// to receive a RECEIPT, should the content-length be suppressed, and sending custom header entries.
func (c *Conn) Send(destination, contentType string, body []byte, opts ...func(*frame.Frame) error) error {
return c.SendWithTimeout(destination, contentType, body, DefaultSendTimeout, opts...)
}

//like Send, but with timeout
func (c *Conn) SendWithTimeout(destination, contentType string, body []byte, timeout time.Duration, opts ...func(*frame.Frame) error) error {
c.closeMutex.Lock()
defer c.closeMutex.Unlock()
if c.closed {
Expand All @@ -447,8 +445,7 @@ func (c *Conn) SendWithTimeout(destination, contentType string, body []byte, tim
C: make(chan *frame.Frame),
}

//c.writeCh <- request
err := sendDataToWriteChWithTimeout(c.writeCh, request, timeout)
err := sendDataToWriteChWithTimeout(c.writeCh, request, c.msgSendTimeout)
if err != nil {
return err
}
Expand All @@ -459,8 +456,8 @@ func (c *Conn) SendWithTimeout(destination, contentType string, body []byte, tim
} else {
// no receipt required
request := writeRequest{Frame: f}
//c.writeCh <- request
err := sendDataToWriteChWithTimeout(c.writeCh, request, timeout)

err := sendDataToWriteChWithTimeout(c.writeCh, request, c.msgSendTimeout)
if err != nil {
return err
}
Expand All @@ -478,7 +475,7 @@ func sendDataToWriteChWithTimeout(ch chan writeRequest, request writeRequest, ti
timer := time.NewTimer(timeout)
select {
case <-timer.C:
return ErrSendTimeout
return ErrMsgSendTimeout
case ch <- request:
timer.Stop()
return nil
Expand Down
17 changes: 17 additions & 0 deletions conn_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type connOptions struct {
ReadTimeout time.Duration
WriteTimeout time.Duration
HeartBeatError time.Duration
MsgSendTimeout time.Duration
HeartBeatGracePeriodMultiplier float64
Login, Passcode string
AcceptVersions []string
Expand All @@ -30,6 +31,7 @@ func newConnOptions(conn *Conn, opts []func(*Conn) error) (*connOptions, error)
WriteTimeout: time.Minute,
HeartBeatGracePeriodMultiplier: 1.0,
HeartBeatError: DefaultHeartBeatError,
MsgSendTimeout: DefaultMsgSendTimeout,
}

// This is a slight of hand, attach the options to the Conn long
Expand Down Expand Up @@ -127,6 +129,14 @@ var ConnOpt struct {
// shorter time duration during unit testing.
HeartBeatError func(errorTimeout time.Duration) func(*Conn) error

// MsgSendTimeout is a connect option that allows the client to specify
// the timeout for the Conn.Send function.
// The msgSendTimeout parameter specifies maximum blocking time for calling
// the Conn.Send function.
// If not specified, this option defaults to 10 seconds.
// Less than or equal to zero means infinite
MsgSendTimeout func(msgSendTimeout time.Duration) func(*Conn) error

// HeartBeatGracePeriodMultiplier is used to calculate the effective read heart-beat timeout
// the broker will enforce for each client’s connection. The multiplier is applied to
// the read-timeout interval the client specifies in its CONNECT frame
Expand Down Expand Up @@ -196,6 +206,13 @@ func init() {
}
}

ConnOpt.MsgSendTimeout = func(msgSendTimeout time.Duration) func(*Conn) error {
return func(c *Conn) error {
c.options.MsgSendTimeout = msgSendTimeout
return nil
}
}

ConnOpt.HeartBeatGracePeriodMultiplier = func(multiplier float64) func(*Conn) error {
return func(c *Conn) error {
c.options.HeartBeatGracePeriodMultiplier = multiplier
Expand Down
2 changes: 1 addition & 1 deletion errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var (
ErrCompletedSubscription = newErrorMessage("subscription is unsubscribed")
ErrClosedUnexpectedly = newErrorMessage("connection closed unexpectedly")
ErrAlreadyClosed = newErrorMessage("connection already closed")
ErrSendTimeout = newErrorMessage("send timeout")
ErrMsgSendTimeout = newErrorMessage("msg send timeout")
ErrNilOption = newErrorMessage("nil option")
)

Expand Down

0 comments on commit 3e3ea3f

Please sign in to comment.