Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

For Loki Sources position files: Use a best effort atomic rename #5772

Merged
merged 5 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ Main (unreleased)

- Change User-Agent header for outbound requests to include agent-mode, goos, and deployment mode. Example `GrafanaAgent/v0.38.0 (flow; linux; docker)` (@captncraig)

- `loki.source.windowsevent` and `loki.source.*` changed to use a more robust positions file to prevent corruption on reboots when writing
the positions file. (@mattdurham)

v0.37.4 (2023-11-06)
-----------------

Expand Down
17 changes: 3 additions & 14 deletions component/common/loki/positions/write_positions_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,18 @@ package positions
// same place in case of a restart.

import (
"os"
"path/filepath"

"bytes"
"github.com/natefinch/atomic"
yaml "gopkg.in/yaml.v2"
)

// writePositionFile is a fallback for Windows because renameio does not support Windows.
// See https://github.com/google/renameio#windows-support
func writePositionFile(filename string, positions map[Entry]string) error {
buf, err := yaml.Marshal(File{
Positions: positions,
})
if err != nil {
return err
}
return atomic.WriteFile(filename, bytes.NewReader(buf))

target := filepath.Clean(filename)
temp := target + "-new"

err = os.WriteFile(temp, buf, os.FileMode(positionFileMode))
if err != nil {
return err
}

return os.Rename(temp, target)
}
89 changes: 89 additions & 0 deletions component/loki/source/windowsevent/bookmark.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
//go:build windows
// +build windows

// This code is copied from Promtail v1.6.2-0.20231004111112-07cbef92268a with minor changes.

package windowsevent

import (
"bytes"
"errors"
"github.com/natefinch/atomic"
"io"
"io/fs"
"os"

"github.com/grafana/loki/clients/pkg/promtail/targets/windows/win_eventlog"
)

type bookMark struct {
handle win_eventlog.EvtHandle
isNew bool
path string
buf []byte
}

// newBookMark creates a new windows event bookmark.
// The bookmark will be saved at the given path. Use save to save the current position for a given event.
func newBookMark(path string) (*bookMark, error) {
// 16kb buffer for rendering bookmark
buf := make([]byte, 16<<10)

_, err := os.Stat(path)
// creates a new bookmark file if none exists.
if errors.Is(err, fs.ErrNotExist) {
_, err := os.Create(path)
if err != nil {
return nil, err
}
bm, err := win_eventlog.CreateBookmark("")
if err != nil {
return nil, err
}
return &bookMark{
handle: bm,
path: path,
isNew: true,
buf: buf,
}, nil
}
if err != nil {
return nil, err
}
// otherwise open the current one.
file, err := os.OpenFile(path, os.O_RDWR, 0666)
if err != nil {
return nil, err
}
fileContent, err := io.ReadAll(file)
if err != nil {
return nil, err
}
fileString := string(fileContent)
// load the current bookmark.
bm, err := win_eventlog.CreateBookmark(fileString)
if err != nil {
// If we errored likely due to incorrect data then create a blank one
bm, err = win_eventlog.CreateBookmark("")
fileString = ""
// This should never fail but just in case.
if err != nil {
return nil, err
}
}
return &bookMark{
handle: bm,
path: path,
isNew: fileString == "",
buf: buf,
}, nil
}

// save Saves the bookmark at the current event position.
func (b *bookMark) save(event win_eventlog.EvtHandle) error {
newBookmark, err := win_eventlog.UpdateBookmark(b.handle, event, b.buf)
if err != nil {
return err
}
return atomic.WriteFile(b.path, bytes.NewReader([]byte(newBookmark)))
}
5 changes: 2 additions & 3 deletions component/loki/source/windowsevent/component_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/grafana/agent/component/common/loki/utils"
"github.com/grafana/loki/clients/pkg/promtail/api"
"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"
"github.com/grafana/loki/clients/pkg/promtail/targets/windows"
)

func init() {
Expand All @@ -35,7 +34,7 @@ type Component struct {

mut sync.RWMutex
args Arguments
target *windows.Target
target *Target
handle *handler
receivers []loki.LogsReceiver
}
Expand Down Expand Up @@ -123,7 +122,7 @@ func (c *Component) Update(args component.Arguments) error {
_ = f.Close()
}

winTarget, err := windows.New(c.opts.Logger, c.handle, nil, convertConfig(newArgs))
winTarget, err := NewTarget(c.opts.Logger, c.handle, nil, convertConfig(newArgs))
if err != nil {
return err
}
Expand Down
121 changes: 121 additions & 0 deletions component/loki/source/windowsevent/format.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
//go:build windows
// +build windows

// This code is copied from Promtail v1.6.2-0.20231004111112-07cbef92268a with minor changes.

package windowsevent

import (
"fmt"
"syscall"

jsoniter "github.com/json-iterator/go"

"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"
"github.com/grafana/loki/clients/pkg/promtail/targets/windows/win_eventlog"
)

type Event struct {
Source string `json:"source,omitempty"`
Channel string `json:"channel,omitempty"`
Computer string `json:"computer,omitempty"`
EventID int `json:"event_id,omitempty"`
Version int `json:"version,omitempty"`

Level int `json:"level,omitempty"`
Task int `json:"task,omitempty"`
Opcode int `json:"opCode,omitempty"`

LevelText string `json:"levelText,omitempty"`
TaskText string `json:"taskText,omitempty"`
OpcodeText string `json:"opCodeText,omitempty"`

Keywords string `json:"keywords,omitempty"`
TimeCreated string `json:"timeCreated,omitempty"`
EventRecordID int `json:"eventRecordID,omitempty"`
Correlation *Correlation `json:"correlation,omitempty"`
Execution *Execution `json:"execution,omitempty"`

Security *Security `json:"security,omitempty"`
UserData string `json:"user_data,omitempty"`
EventData string `json:"event_data,omitempty"`
Message string `json:"message,omitempty"`
}

type Security struct {
UserID string `json:"userId,omitempty"`
UserName string `json:"userName,omitempty"`
}

type Execution struct {
ProcessID uint32 `json:"processId,omitempty"`
ThreadID uint32 `json:"threadId,omitempty"`
ProcessName string `json:"processName,omitempty"`
}

type Correlation struct {
ActivityID string `json:"activityID,omitempty"`
RelatedActivityID string `json:"relatedActivityID,omitempty"`
}

// formatLine format a Loki log line from a windows event.
func formatLine(cfg *scrapeconfig.WindowsEventsTargetConfig, event win_eventlog.Event) (string, error) {
structuredEvent := Event{
Source: event.Source.Name,
Channel: event.Channel,
Computer: event.Computer,
EventID: event.EventID,
Version: event.Version,
Level: event.Level,
Task: event.Task,
Opcode: event.Opcode,
LevelText: event.LevelText,
TaskText: event.TaskText,
OpcodeText: event.OpcodeText,
Keywords: event.Keywords,
TimeCreated: event.TimeCreated.SystemTime,
EventRecordID: event.EventRecordID,
}

if !cfg.ExcludeEventData {
structuredEvent.EventData = string(event.EventData.InnerXML)
}
if !cfg.ExcludeUserData {
structuredEvent.UserData = string(event.UserData.InnerXML)
}
if !cfg.ExcludeEventMessage {
structuredEvent.Message = event.Message
}
if event.Correlation.ActivityID != "" || event.Correlation.RelatedActivityID != "" {
structuredEvent.Correlation = &Correlation{
ActivityID: event.Correlation.ActivityID,
RelatedActivityID: event.Correlation.RelatedActivityID,
}
}
// best effort to get the username of the event.
if event.Security.UserID != "" {
var userName string
usid, err := syscall.StringToSid(event.Security.UserID)
if err == nil {
username, domain, _, err := usid.LookupAccount("")
if err == nil {
userName = fmt.Sprint(domain, "\\", username)
}
}
structuredEvent.Security = &Security{
UserID: event.Security.UserID,
UserName: userName,
}
}
if event.Execution.ProcessID != 0 {
structuredEvent.Execution = &Execution{
ProcessID: event.Execution.ProcessID,
ThreadID: event.Execution.ThreadID,
}
_, _, processName, err := win_eventlog.GetFromSnapProcess(event.Execution.ProcessID)
if err == nil {
structuredEvent.Execution.ProcessName = processName
}
}
return jsoniter.MarshalToString(structuredEvent)
}
Loading