Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

For Loki Sources position files: Use a best effort atomic rename #5772

Merged
merged 5 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ Main (unreleased)
resulting in remote write sending the exemplar first and Prometheus failing to ingest it due to missing
series. (@krajorama)

- `loki.source.windowsevent` and `loki.source.*` changed to use a more robust positions file to prevent corruption on reboots when writing
the positions file. (@mattdurham)

v0.37.4 (2023-11-06)
-----------------

Expand Down
17 changes: 3 additions & 14 deletions component/common/loki/positions/write_positions_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,18 @@ package positions
// same place in case of a restart.

import (
"os"
"path/filepath"

"bytes"
"github.com/natefinch/atomic"
yaml "gopkg.in/yaml.v2"
)

// writePositionFile is a fallback for Windows because renameio does not support Windows.
// See https://github.com/google/renameio#windows-support
func writePositionFile(filename string, positions map[Entry]string) error {
buf, err := yaml.Marshal(File{
Positions: positions,
})
if err != nil {
return err
}
return atomic.WriteFile(filename, bytes.NewReader(buf))

target := filepath.Clean(filename)
temp := target + "-new"

err = os.WriteFile(temp, buf, os.FileMode(positionFileMode))
if err != nil {
return err
}

return os.Rename(temp, target)
}
83 changes: 83 additions & 0 deletions component/loki/source/windowsevent/bookmark.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
//go:build windows
// +build windows

// This code is copied from Promtail with minor changes.

package windowsevent

import (
"bytes"
"errors"
"github.com/natefinch/atomic"
"io"
"io/fs"
"os"

"github.com/grafana/loki/clients/pkg/promtail/targets/windows/win_eventlog"
)

type bookMark struct {
handle win_eventlog.EvtHandle
isNew bool
path string
buf []byte
}

// newBookMark creates a new windows event bookmark.
// The bookmark will be saved at the given path. Use save to save the current position for a given event.
func newBookMark(path string) (*bookMark, error) {
// 16kb buffer for rendering bookmark
buf := make([]byte, 16<<10)

_, err := os.Stat(path)
// creates a new bookmark file if none exists.
if errors.Is(err, fs.ErrNotExist) {
_, err := os.Create(path)
if err != nil {
return nil, err
}
bm, err := win_eventlog.CreateBookmark("")
if err != nil {
return nil, err
}
return &bookMark{
handle: bm,
path: path,
isNew: true,
buf: buf,
}, nil
}
if err != nil {
return nil, err
}
// otherwise open the current one.
file, err := os.OpenFile(path, os.O_RDWR, 0666)
if err != nil {
return nil, err
}
fileContent, err := io.ReadAll(file)
if err != nil {
return nil, err
}
fileString := string(fileContent)
// load the current bookmark.
bm, err := win_eventlog.CreateBookmark(fileString)
if err != nil {
return nil, err
}
return &bookMark{
handle: bm,
path: path,
isNew: fileString == "",
buf: buf,
}, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this fragment of code few things can still go wrong and possibly the agent will be crashlooping until someone manually repairs the bookmark file or, more likely, deletes it.

Should we instead have on a high-level something like this?

func newBookmark() {
  if not fileExists {
    createNewOne()
  } else {
    err := useExistingFile()
    if err != nil {
      renameCorruptedFile()
      createNewOne()
    }
  }
}

The idea being that if we detect a corrupted file, we attempt to recover.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we error on the create bookmark then we could call it again with a blank string to force a new bookmark.

}

// save Saves the bookmark at the current event position.
func (b *bookMark) save(event win_eventlog.EvtHandle) error {
newBookmark, err := win_eventlog.UpdateBookmark(b.handle, event, b.buf)
if err != nil {
return err
}
return atomic.WriteFile(b.path, bytes.NewReader([]byte(newBookmark)))
}
5 changes: 2 additions & 3 deletions component/loki/source/windowsevent/component_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/grafana/agent/component/common/loki/utils"
"github.com/grafana/loki/clients/pkg/promtail/api"
"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"
"github.com/grafana/loki/clients/pkg/promtail/targets/windows"
)

func init() {
Expand All @@ -35,7 +34,7 @@ type Component struct {

mut sync.RWMutex
args Arguments
target *windows.Target
target *Target
handle *handler
receivers []loki.LogsReceiver
}
Expand Down Expand Up @@ -123,7 +122,7 @@ func (c *Component) Update(args component.Arguments) error {
_ = f.Close()
}

winTarget, err := windows.New(c.opts.Logger, c.handle, nil, convertConfig(newArgs))
winTarget, err := NewTarget(c.opts.Logger, c.handle, nil, convertConfig(newArgs))
if err != nil {
return err
}
Expand Down
121 changes: 121 additions & 0 deletions component/loki/source/windowsevent/format.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
//go:build windows
// +build windows

// This code is copied from Promtail with minor changes.

package windowsevent

import (
"fmt"
"syscall"

jsoniter "github.com/json-iterator/go"

"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"
"github.com/grafana/loki/clients/pkg/promtail/targets/windows/win_eventlog"
)

type Event struct {
Source string `json:"source,omitempty"`
Channel string `json:"channel,omitempty"`
Computer string `json:"computer,omitempty"`
EventID int `json:"event_id,omitempty"`
Version int `json:"version,omitempty"`

Level int `json:"level,omitempty"`
Task int `json:"task,omitempty"`
Opcode int `json:"opCode,omitempty"`

LevelText string `json:"levelText,omitempty"`
TaskText string `json:"taskText,omitempty"`
OpcodeText string `json:"opCodeText,omitempty"`

Keywords string `json:"keywords,omitempty"`
TimeCreated string `json:"timeCreated,omitempty"`
EventRecordID int `json:"eventRecordID,omitempty"`
Correlation *Correlation `json:"correlation,omitempty"`
Execution *Execution `json:"execution,omitempty"`

Security *Security `json:"security,omitempty"`
UserData string `json:"user_data,omitempty"`
EventData string `json:"event_data,omitempty"`
Message string `json:"message,omitempty"`
}

type Security struct {
UserID string `json:"userId,omitempty"`
UserName string `json:"userName,omitempty"`
}

type Execution struct {
ProcessID uint32 `json:"processId,omitempty"`
ThreadID uint32 `json:"threadId,omitempty"`
ProcessName string `json:"processName,omitempty"`
}

type Correlation struct {
ActivityID string `json:"activityID,omitempty"`
RelatedActivityID string `json:"relatedActivityID,omitempty"`
}

// formatLine format a Loki log line from a windows event.
func formatLine(cfg *scrapeconfig.WindowsEventsTargetConfig, event win_eventlog.Event) (string, error) {
structuredEvent := Event{
Source: event.Source.Name,
Channel: event.Channel,
Computer: event.Computer,
EventID: event.EventID,
Version: event.Version,
Level: event.Level,
Task: event.Task,
Opcode: event.Opcode,
LevelText: event.LevelText,
TaskText: event.TaskText,
OpcodeText: event.OpcodeText,
Keywords: event.Keywords,
TimeCreated: event.TimeCreated.SystemTime,
EventRecordID: event.EventRecordID,
}

if !cfg.ExcludeEventData {
structuredEvent.EventData = string(event.EventData.InnerXML)
}
if !cfg.ExcludeUserData {
structuredEvent.UserData = string(event.UserData.InnerXML)
}
if !cfg.ExcludeEventMessage {
structuredEvent.Message = event.Message
}
if event.Correlation.ActivityID != "" || event.Correlation.RelatedActivityID != "" {
structuredEvent.Correlation = &Correlation{
ActivityID: event.Correlation.ActivityID,
RelatedActivityID: event.Correlation.RelatedActivityID,
}
}
// best effort to get the username of the event.
if event.Security.UserID != "" {
var userName string
usid, err := syscall.StringToSid(event.Security.UserID)
if err == nil {
username, domain, _, err := usid.LookupAccount("")
if err == nil {
userName = fmt.Sprint(domain, "\\", username)
}
}
structuredEvent.Security = &Security{
UserID: event.Security.UserID,
UserName: userName,
}
}
if event.Execution.ProcessID != 0 {
structuredEvent.Execution = &Execution{
ProcessID: event.Execution.ProcessID,
ThreadID: event.Execution.ThreadID,
}
_, _, processName, err := win_eventlog.GetFromSnapProcess(event.Execution.ProcessID)
if err == nil {
structuredEvent.Execution.ProcessName = processName
}
}
return jsoniter.MarshalToString(structuredEvent)
}
Loading