Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add -json flag #14

Merged
merged 3 commits into from
Aug 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v4
uses: actions/setup-go@v5
with:
go-version: "1.20"
go-version: "1.23"
- name: Run GoReleaser
uses: goreleaser/goreleaser-action@v4
with:
Expand Down
1 change: 0 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ jobs:
strategy:
matrix:
go:
- "1.21"
- "1.22"
- "1.23"
name: Build
Expand Down
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@
.envrc
cmd/tracer/tracer
dist/
tracer
./tracer
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,14 @@ tracer [options] [cluster] [task-id]

-duration duration
fetch logs duration from created / before stopping (default 1m0s)
-json
output as JSON lines
-sns string
SNS topic ARN
-stdout
output to stdout (default true)
-version
show the version
show the version
```

Environment variable `AWS_REGION` is required.
Expand Down
8 changes: 7 additions & 1 deletion cmd/tracer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
"log/slog"
"os"
"strings"
"time"
Expand Down Expand Up @@ -43,9 +44,14 @@ func main() {
flag.BoolVar(&showVersion, "version", false, "show the version")
flag.BoolVar(&opt.Stdout, "stdout", true, "output to stdout")
flag.StringVar(&opt.SNSTopicArn, "sns", "", "SNS topic ARN")
flag.BoolVar(&opt.JSON, "json", false, "output as JSON lines")
flag.VisitAll(envToFlag)
flag.Parse()

if opt.JSON {
slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stderr, nil)))
}

if showVersion {
fmt.Println("tracer", Version)
return
Expand All @@ -60,7 +66,7 @@ func main() {
copy(args, flag.Args())

if err := t.Run(ctx, args[0], args[1], &opt); err != nil {
fmt.Fprintln(os.Stderr, err)
slog.Error(err.Error())
os.Exit(1)
}
}
Expand Down
4 changes: 2 additions & 2 deletions lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ package tracer

import (
"context"
"fmt"
"log/slog"
"strings"

"github.com/aws/aws-sdk-go-v2/aws/arn"
)

func (t *Tracer) LambdaHandlerFunc(opt *RunOption) func(ctx context.Context, event *ECSTaskEvent) error {
return func(ctx context.Context, event *ECSTaskEvent) error {
fmt.Println(event.String())
slog.Info("event", "payload", event.String())
lastStatus := event.Detail.LastStatus
if lastStatus != "STOPPED" {
return nil
Expand Down
3 changes: 2 additions & 1 deletion lambda/function.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
FunctionName: 'tracer',
MemorySize: 128,
Handler: 'index.handler',
Role: 'arn:aws:iam::{account_id}:role/{role_name}',
// Role: 'arn:aws:iam::{account_id}:role/{role_name}',
Role: 'arn:aws:iam::314472643515:role/tracer',
Runtime: 'provided.al2',
}
165 changes: 125 additions & 40 deletions tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package tracer
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"os"
"sort"
"strings"
Expand Down Expand Up @@ -71,7 +73,7 @@ func (tl *Timeline) Add(event *TimelineEvent) {
tl.events = append(tl.events, event)
}

func (tl *Timeline) Print(w io.Writer) (int, error) {
func (tl *Timeline) Print(w io.Writer, json bool) (int, error) {
tl.mu.Lock()
defer tl.mu.Unlock()

Expand All @@ -81,16 +83,23 @@ func (tl *Timeline) Print(w io.Writer) (int, error) {
return tls[i].Timestamp.Before(tls[j].Timestamp)
})
n := 0
toString := func(e *TimelineEvent) string {
if json {
return e.JSON()
}
return e.String()
}
for _, e := range tls {
s := e.String()
if !tl.seen[s] {
l, err := fmt.Fprint(w, e.String())
if err != nil {
return n, err
}
n += l
tl.seen[s] = true
s := toString(e)
if tl.seen[s] {
continue
}
l, err := fmt.Fprint(w, s)
if err != nil {
return n, err
}
n += l
tl.seen[s] = true
}
return n, nil
}
Expand All @@ -106,6 +115,20 @@ func (e *TimelineEvent) String() string {
return fmt.Sprintf("%s\t%s\t%s\n", ts.Format(TimeFormat), e.Source, e.Message)
}

func (e *TimelineEvent) JSON() string {
ts := e.Timestamp.In(time.Local)
b, _ := json.Marshal(struct {
Time string `json:"time"`
Source string `json:"src"`
Message string `json:"msg"`
}{
Time: ts.Format(TimeFormat),
Source: e.Source,
Message: e.Message,
})
return string(b) + "\n"
}

func New(ctx context.Context) (*Tracer, error) {
region := os.Getenv("AWS_REGION")
awscfg, err := config.LoadDefaultConfig(ctx, config.WithRegion(region))
Expand Down Expand Up @@ -138,6 +161,7 @@ type RunOption struct {
Stdout bool
SNSTopicArn string
Duration time.Duration
JSON bool
}

func (t *Tracer) SetOutput(w io.Writer) {
Expand All @@ -151,7 +175,7 @@ func (t *Tracer) Run(ctx context.Context, cluster string, taskID string, opt *Ru
defer func() { t.report(ctx, cluster, taskID) }()

if cluster == "" {
return t.listClusters(ctx)
return t.listClusters(ctx, opt)
}

if taskID == "" {
Expand All @@ -162,6 +186,12 @@ func (t *Tracer) Run(ctx context.Context, cluster string, taskID string, opt *Ru
if err != nil {
return err
}

defer func() {
if _, err := t.timeline.Print(t.buf, opt.JSON); err != nil {
slog.Error("failed to print timeline", "error", err)
}
}()
if err := t.traceLogs(ctx, task); err != nil {
return err
}
Expand All @@ -172,14 +202,19 @@ func (t *Tracer) Run(ctx context.Context, cluster string, taskID string, opt *Ru
func (t *Tracer) report(ctx context.Context, cluster, taskID string) {
opt := t.option
if opt.Stdout {
fmt.Fprintln(t.w, subject(cluster, taskID))
sub := &subject{cluster, taskID}
if opt.JSON {
fmt.Fprintln(t.w, sub.JSON())
} else {
fmt.Fprintln(t.w, sub.String())
}
if _, err := t.WriteTo(t.w); err != nil {
fmt.Fprintln(os.Stderr, err)
slog.Error("failed to write to output", "error", err)
}
}
if opt.SNSTopicArn != "" {
if err := t.Publish(ctx, opt.SNSTopicArn, cluster, taskID); err != nil {
fmt.Fprintln(os.Stderr, err)
slog.Error("failed to publish to SNS", "error", err)
}
}
}
Expand All @@ -189,19 +224,29 @@ func (t *Tracer) WriteTo(w io.Writer) (int64, error) {
return int64(n), err
}

func subject(cluster, taskID string) string {
s := "Tracer:"
if taskID != "" {
s += " " + taskID
} else if cluster != "" {
s += " tasks"
type subject struct {
Cluster string `json:"cluster"`
TaskID string `json:"task_id"`
}

func (s *subject) JSON() string {
b, _ := json.Marshal(s)
return string(b)
}

func (s *subject) String() string {
str := "Tracer:"
if s.TaskID != "" {
str += " " + s.TaskID
} else if s.Cluster != "" {
str += " tasks"
}
if cluster != "" {
s += " on " + cluster
if s.Cluster != "" {
str += " on " + s.Cluster
} else {
s += " clusters"
str += " clusters"
}
return s
return str
}

const (
Expand All @@ -215,7 +260,7 @@ func (t *Tracer) Publish(ctx context.Context, topicArn, cluster, taskID string)
msg = msg[:snsMaxPayloadSize]
}

s := subject(cluster, taskID)
s := (&subject{cluster, taskID}).String()
if len(s) > snsSubjectLimitLength {
s = s[0:snsSubjectLimitLength-len(ellipsisString)] + ellipsisString
}
Expand All @@ -236,7 +281,7 @@ func (t *Tracer) traceTask(ctx context.Context, cluster string, taskID string) (
return nil, fmt.Errorf("failed to describe tasks: %w", err)
}
if len(res.Tasks) == 0 {
return nil, fmt.Errorf("no tasks found: %w", err)
return nil, fmt.Errorf("no tasks found. cluster: %s, task_id: %s", cluster, taskID)
}
task := res.Tasks[0]

Expand Down Expand Up @@ -278,8 +323,6 @@ func (t *Tracer) traceTask(ctx context.Context, cluster string, taskID string) (
}

func (t *Tracer) traceLogs(ctx context.Context, task *ecsTypes.Task) error {
defer t.timeline.Print(t.buf)

res, err := t.ecs.DescribeTaskDefinition(ctx, &ecs.DescribeTaskDefinitionInput{
TaskDefinition: task.TaskDefinitionArn,
})
Expand Down Expand Up @@ -393,7 +436,7 @@ func (t *Tracer) listAllTasks(ctx context.Context, cluster string) error {
return nil
}

func (t *Tracer) listClusters(ctx context.Context) error {
func (t *Tracer) listClusters(ctx context.Context, opt *RunOption) error {
res, err := t.ecs.ListClusters(ctx, &ecs.ListClustersInput{})
if err != nil {
return err
Expand All @@ -403,6 +446,17 @@ func (t *Tracer) listClusters(ctx context.Context) error {
clusters = append(clusters, arnToName(c))
}
sort.Strings(clusters)
if opt.JSON {
err := json.NewEncoder(t.buf).Encode(
struct {
Clusters []string `json:"clusters"`
}{clusters},
)
if err != nil {
return fmt.Errorf("failed to encode JSON: %w", err)
}
return nil
}
for _, c := range clusters {
t.buf.WriteString(c)
t.buf.WriteByte('\n')
Expand Down Expand Up @@ -431,9 +485,13 @@ func (t *Tracer) listTasks(ctx context.Context, cluster string, status ecsTypes.
if err != nil {
return fmt.Errorf("failed to describe tasks: %w", err)
}
for _, task := range res.Tasks {
t.buf.WriteString(strings.Join(taskToColumns(&task), "\t"))
t.buf.WriteRune('\n')
for _, ts := range res.Tasks {
task := newTask(&ts)
if t.option.JSON {
t.buf.WriteString(task.JSON())
} else {
t.buf.WriteString(task.String())
}
}
if nextToken = listRes.NextToken; nextToken == nil {
break
Expand Down Expand Up @@ -480,14 +538,41 @@ func arnToName(arn string) string {
return arn[strings.LastIndex(arn, "/")+1:]
}

func taskToColumns(task *ecsTypes.Task) []string {
return []string{
arnToName(*task.TaskArn),
arnToName(*task.TaskDefinitionArn),
aws.ToString(task.LastStatus),
aws.ToString(task.DesiredStatus),
task.CreatedAt.In(time.Local).Format(time.RFC3339),
aws.ToString(task.Group),
string(task.LaunchType),
type task struct {
Arn string `json:"arn"`
TaskDefinition string `json:"task_definition"`
LastStatus string `json:"last_status"`
DesiredStatus string `json:"desired_status"`
CreatedAt string `json:"created_at"`
Group string `json:"group"`
LaunchType string `json:"launch_type"`
}

func newTask(t *ecsTypes.Task) *task {
return &task{
Arn: arnToName(*t.TaskArn),
TaskDefinition: arnToName(*t.TaskDefinitionArn),
LastStatus: aws.ToString(t.LastStatus),
DesiredStatus: aws.ToString(t.DesiredStatus),
CreatedAt: t.CreatedAt.In(time.Local).Format(time.RFC3339),
Group: aws.ToString(t.Group),
LaunchType: string(t.LaunchType),
}
}

func (t *task) String() string {
return strings.Join([]string{
t.Arn,
t.TaskDefinition,
t.LastStatus,
t.DesiredStatus,
t.CreatedAt,
t.Group,
t.LaunchType,
}, "\t") + "\n"
}

func (t *task) JSON() string {
b, _ := json.Marshal(t)
return string(b) + "\n"
}
Loading
Loading