diff --git a/README.md b/README.md index ad9e3f5..f68d99c 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ Tailor, the library for tailing nginx access logs ----- [![Go Doc](https://godoc.org/github.com/un000/tailor?status.svg)](https://godoc.org/github.com/un000/tailor) -Tailor provides the functionality of tailing nginx access log under logrotate. +Tailor provides the functionality of tailing for e. g. nginx logs under logrotate. Tailor will follow a selected log file and reopen it if it's been rotated. Now, tailor doesn't require inotify, because it polls logs with a tiny delay. So the library can achieve cross-platform. @@ -40,7 +40,7 @@ func main() { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - lines, errs, err := t.Run(ctx) + err := t.Run(ctx) if err != nil { panic(err) } @@ -48,13 +48,13 @@ func main() { fmt.Println("Tailing file:", t.FileName()) for { select { - case line, ok := <-lines: + case line, ok := <-t.Lines(): if !ok { return } fmt.Println(line.StringTrimmed()) - case err, ok := <-errs: + case err, ok := <-t.Errors(): if !ok { return } diff --git a/example/main.go b/example/main.go index 6d89989..65d8209 100644 --- a/example/main.go +++ b/example/main.go @@ -19,7 +19,7 @@ func main() { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - lines, errs, err := t.Run(ctx) + err := t.Run(ctx) if err != nil { panic(err) } @@ -27,13 +27,13 @@ func main() { fmt.Println("Tailing file:", t.FileName()) for { select { - case line, ok := <-lines: + case line, ok := <-t.Lines(): if !ok { return } fmt.Println(line.StringTrimmed()) - case err, ok := <-errs: + case err, ok := <-t.Errors(): if !ok { return } diff --git a/tailor.go b/tailor.go index daeebc4..f79628c 100644 --- a/tailor.go +++ b/tailor.go @@ -29,6 +29,9 @@ type Tailor struct { lastSize int64 lag int64 + lines chan Line + errs chan error + working int32 } @@ -53,9 +56,9 @@ func New(filename string, opts ...Option) *Tailor { // If the file has been logrotated, Tailor will follow the first file to the end and after reopen it. // If error happens file will be closed. // Tailor makes an exponential sleep to reduce stat syscalls. -func (t *Tailor) Run(ctx context.Context) (<-chan Line, <-chan error, error) { +func (t *Tailor) Run(ctx context.Context) error { if !atomic.CompareAndSwapInt32(&t.working, 0, 1) { - return nil, nil, errors.New("already working") + return errors.New("already working") } finalizer := func() { @@ -69,47 +72,47 @@ func (t *Tailor) Run(ctx context.Context) (<-chan Line, <-chan error, error) { t.file, err = os.Open(t.fileName) if err != nil { finalizer() - return nil, nil, errors.Wrap(err, "can't open file for tailing") + return errors.Wrap(err, "can't open file for tailing") } _, err = t.file.Seek(t.opts.runOffset, t.opts.runWhence) if err != nil { finalizer() - return nil, nil, errors.Wrapf(err, "error seeking file %s", t.fileName) + return errors.Wrapf(err, "error seeking file %s", t.fileName) } err = t.seekToLineStart() if err != nil { finalizer() - return nil, nil, errors.Wrapf(err, "error seeking to the line beginning %s", t.fileName) + return errors.Wrapf(err, "error seeking to the line beginning %s", t.fileName) } err = t.updateFileStatus() if err != nil { finalizer() - return nil, nil, errors.Wrapf(err, "error getting file size %s", t.fileName) + return errors.Wrapf(err, "error getting file size %s", t.fileName) } - lines, errs := t.readLoop(ctx) + t.readLoop(ctx) - return lines, errs, nil + return nil } // readLoop starts goroutine, which reads the given file and send to the line chan tailed strings. -func (t *Tailor) readLoop(ctx context.Context) (chan Line, chan error) { - lines := make(chan Line) - errs := make(chan error) +func (t *Tailor) readLoop(ctx context.Context) { + t.lines = make(chan Line) + t.errs = make(chan error) go func() { defer func() { if t.file != nil { err := t.file.Close() if err != nil { - errs <- errors.Wrap(err, "error closing file") + t.errs <- errors.Wrap(err, "error closing file") } } - close(lines) - close(errs) + close(t.lines) + close(t.errs) atomic.StoreInt32(&t.working, 0) }() @@ -125,7 +128,7 @@ func (t *Tailor) readLoop(ctx context.Context) (chan Line, chan error) { case <-lagReporter.C: err := t.updateFileStatus() if err != nil { - errs <- errors.Wrap(err, "error getting file status") + t.errs <- errors.Wrap(err, "error getting file status") break } default: @@ -150,7 +153,7 @@ func (t *Tailor) readLoop(ctx context.Context) (chan Line, chan error) { break } } else { - errs <- errors.Wrap(err, "error reading line") + t.errs <- errors.Wrap(err, "error reading line") return } @@ -171,34 +174,34 @@ func (t *Tailor) readLoop(ctx context.Context) (chan Line, chan error) { if len(line) == 0 && err == io.EOF { isSameFile, err := t.isFileStillTheSame() if err != nil { - errs <- errors.Wrap(err, "error checking that file is the same") + t.errs <- errors.Wrap(err, "error checking that file is the same") return } if !isSameFile { err := t.file.Close() if err != nil { - errs <- errors.Wrap(err, "error closing current file") + t.errs <- errors.Wrap(err, "error closing current file") } t.file, err = os.Open(t.fileName) if err != nil { if os.IsNotExist(err) { - errs <- ErrFileNotExists + t.errs <- ErrFileNotExists return } - errs <- errors.Wrap(err, "error reopening file") + t.errs <- errors.Wrap(err, "error reopening file") return } err = t.updateFileStatus() if err != nil { - errs <- errors.Wrap(err, "error getting file status") + t.errs <- errors.Wrap(err, "error getting file status") return } err = t.seekToLineStart() if err != nil { - errs <- errors.Wrap(err, "error seeking to the line beginning") + t.errs <- errors.Wrap(err, "error seeking to the line beginning") return } @@ -212,20 +215,28 @@ func (t *Tailor) readLoop(ctx context.Context) (chan Line, chan error) { continue } if err != nil && err != io.EOF { - errs <- errors.Wrap(err, "error reading line") + t.errs <- errors.Wrap(err, "error reading line") return } pollerTimeout = t.opts.pollerTimeout - lines <- Line{ + t.lines <- Line{ line: line, fileName: t.fileName, } } }() +} + +// Lines returns chanel of read lines. +func (t *Tailor) Lines() chan Line { + return t.lines +} - return lines, errs +// Errors returns chanel of errors, associated with reading files. +func (t *Tailor) Errors() chan error { + return t.errs } // exponentialSleep sleeps for pollerTimeout and returns new exponential grown timeout <= maxWait. diff --git a/tailor_test.go b/tailor_test.go index 0767e41..5fb7000 100644 --- a/tailor_test.go +++ b/tailor_test.go @@ -34,7 +34,7 @@ func TestTailFileFromStart(t *testing.T) { } ctx, _ := context.WithTimeout(context.Background(), 2*time.Second) - lines, errs, err := f.Run(ctx) + err = f.Run(ctx) if err != nil { t.Error(err) } @@ -48,7 +48,7 @@ func TestTailFileFromStart(t *testing.T) { for ; i <= 3; i++ { select { - case line, ok := <-lines: + case line, ok := <-f.Lines(): if !ok { return } @@ -57,7 +57,7 @@ func TestTailFileFromStart(t *testing.T) { t.Error(err) } t.Log(line.StringTrimmed()) - case err, ok := <-errs: + case err, ok := <-f.Errors(): if !ok { return }