Skip to content

Commit

Permalink
Merge pull request #108 from codemedic/customisable-logger
Browse files Browse the repository at this point in the history
Make it possible to set logger
  • Loading branch information
worg authored Jul 16, 2021
2 parents 619e774 + 1c502e8 commit 0f8a368
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 17 deletions.
10 changes: 6 additions & 4 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package stomp
import (
"errors"
"io"
"log"
"net"
"strconv"
"sync"
Expand Down Expand Up @@ -39,6 +38,7 @@ type Conn struct {
closed bool
closeMutex *sync.Mutex
options *connOptions
log Logger
}

type writeRequest struct {
Expand All @@ -64,7 +64,7 @@ func Dial(network, addr string, opts ...func(*Conn) error) (*Conn, error) {

// Add option to set host and make it the first option in list,
// so that if host has been explicitly specified it will override.
opts = append([](func(*Conn) error){ConnOpt.Host(host)}, opts...)
opts = append([]func(*Conn) error{ConnOpt.Host(host)}, opts...)

return Connect(c, opts...)
}
Expand All @@ -87,6 +87,8 @@ func Connect(conn io.ReadWriteCloser, opts ...func(*Conn) error) (*Conn, error)
return nil, err
}

c.log = options.Logger

if options.ReadBufferSize > 0 {
reader = frame.NewReaderSize(conn, options.ReadBufferSize)
}
Expand Down Expand Up @@ -311,7 +313,7 @@ func processLoop(c *Conn, writer *frame.Writer) {
}

case frame.ERROR:
log.Println("received ERROR; Closing underlying connection")
c.log.Error("received ERROR; Closing underlying connection")
for _, ch := range channels {
ch <- f
close(ch)
Expand All @@ -329,7 +331,7 @@ func processLoop(c *Conn, writer *frame.Writer) {
if ch, ok := channels[id]; ok {
ch <- f
} else {
log.Println("ignored MESSAGE for subscription", id)
c.log.Infof("ignored MESSAGE for subscription: %s", id)
}
}
}
Expand Down
16 changes: 16 additions & 0 deletions conn_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/go-stomp/stomp/v3/frame"
"github.com/go-stomp/stomp/v3/internal/log"
)

// ConnOptions is an opaque structure used to collection options
Expand All @@ -25,6 +26,7 @@ type connOptions struct {
ReadChannelCapacity, WriteChannelCapacity int
ReadBufferSize, WriteBufferSize int
ResponseHeadersCallback func(*frame.Header)
Logger Logger
}

func newConnOptions(conn *Conn, opts []func(*Conn) error) (*connOptions, error) {
Expand All @@ -36,6 +38,7 @@ func newConnOptions(conn *Conn, opts []func(*Conn) error) (*connOptions, error)
HeartBeatError: DefaultHeartBeatError,
MsgSendTimeout: DefaultMsgSendTimeout,
RcvReceiptTimeout: DefaultRcvReceiptTimeout,
Logger: log.StdLogger{},
}

// This is a slight of hand, attach the options to the Conn long
Expand Down Expand Up @@ -178,6 +181,9 @@ var ConnOpt struct {

// ResponseHeaders lets you provide a callback function to get the headers from the CONNECT response
ResponseHeaders func(func(*frame.Header)) func(*Conn) error

// Logger lets you provide a callback function that sets the logger used by a connection
Logger func(logger Logger) func(*Conn) error
}

func init() {
Expand Down Expand Up @@ -294,4 +300,14 @@ func init() {
return nil
}
}

ConnOpt.Logger = func(log Logger) func(*Conn) error {
return func(c *Conn) error {
if log != nil {
c.log = log
}

return nil
}
}
}
51 changes: 51 additions & 0 deletions internal/log/stdlogger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package log

import (
"fmt"
stdlog "log"
)

var (
debugPrefix = "DEBUG: "
infoPrefix = "INFO: "
warnPrefix = "WARN: "
errorPrefix = "ERROR: "
)

func logf(prefix string, format string, value ...interface{}) {
_ = stdlog.Output(3, fmt.Sprintf(prefix+format+"\n", value...))
}

type StdLogger struct{}

func (s StdLogger) Debugf(format string, value ...interface{}) {
logf(debugPrefix, format, value...)
}

func (s StdLogger) Debug(message string) {
logf(debugPrefix, "%s", message)
}

func (s StdLogger) Infof(format string, value ...interface{}) {
logf(infoPrefix, format, value...)
}

func (s StdLogger) Info(message string) {
logf(infoPrefix, "%s", message)
}

func (s StdLogger) Warningf(format string, value ...interface{}) {
logf(warnPrefix, format, value...)
}

func (s StdLogger) Warning(message string) {
logf(warnPrefix, "%s", message)
}

func (s StdLogger) Errorf(format string, value ...interface{}) {
logf(errorPrefix, format, value...)
}

func (s StdLogger) Error(message string) {
logf(errorPrefix, "%s", message)
}
13 changes: 13 additions & 0 deletions logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package stomp

type Logger interface {
Debugf(format string, value ...interface{})
Infof(format string, value ...interface{})
Warningf(format string, value ...interface{})
Errorf(format string, value ...interface{})

Debug(message string)
Info(message string)
Warning(message string)
Error(message string)
}
5 changes: 5 additions & 0 deletions server/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package client

import (
"time"

"github.com/go-stomp/stomp/v3"
)

// Contains information the client package needs from the
Expand All @@ -17,4 +19,7 @@ type Config interface {
// 11 days, but less than 12 days), then it is truncated to the
// maximum permitted values.
HeartBeat() time.Duration

// Logger provides the logger for a client
Logger() stomp.Logger
}
17 changes: 9 additions & 8 deletions server/client/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package client
import (
"fmt"
"io"
"log"
"net"
"strconv"
"time"
Expand Down Expand Up @@ -40,6 +39,7 @@ type Conn struct {
subList *SubscriptionList // List of subscriptions requiring acknowledgement
subs map[string]*Subscription // All subscriptions, keyed by id
validator stomp.Validator // For validating STOMP frames
log stomp.Logger
}

// Creates a new client connection. The config parameter contains
Expand All @@ -58,6 +58,7 @@ func NewConn(config Config, rw net.Conn, ch chan Request) *Conn {
txStore: &txStore{},
subList: NewSubscriptionList(),
subs: make(map[string]*Subscription),
log: config.Logger(),
}
go c.readLoop()
go c.processLoop()
Expand Down Expand Up @@ -127,9 +128,9 @@ func (c *Conn) readLoop() {
f, err := reader.Read()
if err != nil {
if err == io.EOF {
log.Println("connection closed:", c.rw.RemoteAddr())
c.log.Errorf("connection closed: %s", c.rw.RemoteAddr())
} else {
log.Println("read failed:", err, ":", c.rw.RemoteAddr())
c.log.Errorf("read failed: %v : %s", err, c.rw.RemoteAddr())
}

// Close the read channel so that the processing loop will
Expand Down Expand Up @@ -246,7 +247,7 @@ func (c *Conn) processLoop() {
if c.validator != nil {
err := c.validator.Validate(f)
if err != nil {
log.Println("validation failed for", f.Command, "frame", err)
c.log.Warningf("validation failed for %s frame: %v", f.Command, err)
c.sendErrorImmediately(err, f)
return
}
Expand Down Expand Up @@ -475,28 +476,28 @@ func (c *Conn) handleConnect(f *frame.Frame) error {
passcode, _ := f.Header.Contains(frame.Passcode)
if !c.config.Authenticate(login, passcode) {
// sleep to slow down a rogue client a little bit
log.Println("authentication failed")
c.log.Error("authentication failed")
time.Sleep(time.Second)
return authenticationFailed
}

c.version, err = determineVersion(f)
if err != nil {
log.Println("protocol version negotiation failed")
c.log.Error("protocol version negotiation failed")
return err
}
c.validator = stomp.NewValidator(c.version)

if c.version == stomp.V10 {
// don't want to handle V1.0 at the moment
// TODO: get working for V1.0
log.Println("unsupported version", c.version)
c.log.Errorf("unsupported version %s", c.version)
return unsupportedVersion
}

cx, cy, err := getHeartBeat(f)
if err != nil {
log.Println("invalid heart-beat")
c.log.Error("invalid heart-beat")
return err
}

Expand Down
8 changes: 6 additions & 2 deletions server/processor.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package server

import (
"log"
"net"
"strings"
"time"

"github.com/go-stomp/stomp/v3"
"github.com/go-stomp/stomp/v3/frame"
"github.com/go-stomp/stomp/v3/server/client"
"github.com/go-stomp/stomp/v3/server/queue"
Expand Down Expand Up @@ -114,7 +114,7 @@ func (proc *requestProcessor) Listen(l net.Listener) {
if max := 5 * time.Second; timeout > max {
timeout = max
}
log.Printf("stomp: Accept error: %v; retrying in %v", err, timeout)
proc.server.Log.Infof("stomp: Accept error: %v; retrying in %v", err, timeout)
time.Sleep(timeout)
continue
}
Expand Down Expand Up @@ -152,3 +152,7 @@ func (c *config) Authenticate(login, passcode string) bool {
// no authentication defined
return true
}

func (c *config) Logger() stomp.Logger {
return c.server.Log
}
8 changes: 8 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ package server
import (
"net"
"time"

"github.com/go-stomp/stomp/v3"
"github.com/go-stomp/stomp/v3/internal/log"
)

// The STOMP server has the concept of queues and topics. A message
Expand Down Expand Up @@ -41,6 +44,7 @@ type Server struct {
Authenticator Authenticator // Authenticates login/passcodes. If nil no authentication is performed
QueueStorage QueueStorage // Implementation of queue storage. If nil, in-memory queues are used.
HeartBeat time.Duration // Preferred value for heart-beat read/write timeout, if zero, then DefaultHeartBeat.
Log stomp.Logger
}

// ListenAndServe listens on the TCP network address addr and then calls Serve.
Expand Down Expand Up @@ -76,6 +80,10 @@ func (s *Server) ListenAndServe() error {
// service thread for each connection. The service threads read
// requests and then process each request.
func (s *Server) Serve(l net.Listener) error {
if s.Log == nil {
s.Log = log.StdLogger{}
}

proc := newRequestProcessor(s)
return proc.Serve(l)
}
5 changes: 2 additions & 3 deletions subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package stomp

import (
"fmt"
"log"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -155,7 +154,7 @@ func (s *Subscription) readLoop(ch chan *frame.Frame) {
s.id,
s.destination,
message)
log.Println(text)
s.conn.log.Info(text)
contentType := f.Header.Get(frame.ContentType)
msg := &Message{
Err: &Error{
Expand All @@ -178,7 +177,7 @@ func (s *Subscription) readLoop(ch chan *frame.Frame) {
}
return
} else {
log.Printf("Subscription %s: %s: unsupported frame type: %+v\n", s.id, s.destination, f)
s.conn.log.Infof("Subscription %s: %s: unsupported frame type: %+v", s.id, s.destination, f)
}
}
}

0 comments on commit 0f8a368

Please sign in to comment.