diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 3e1bf49..e208499 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -12,11 +12,11 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Set up Go - uses: actions/setup-go@v4 + uses: actions/setup-go@v5 with: - go-version: "1.20" + go-version: "1.23" - name: Run GoReleaser uses: goreleaser/goreleaser-action@v4 with: diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 5473c37..fb4e29f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -5,7 +5,6 @@ jobs: strategy: matrix: go: - - "1.21" - "1.22" - "1.23" name: Build diff --git a/.gitignore b/.gitignore index 3fb7227..a15c617 100644 --- a/.gitignore +++ b/.gitignore @@ -15,4 +15,4 @@ .envrc cmd/tracer/tracer dist/ -tracer +./tracer diff --git a/README.md b/README.md index f28a12c..ac5612b 100644 --- a/README.md +++ b/README.md @@ -22,8 +22,14 @@ tracer [options] [cluster] [task-id] -duration duration fetch logs duration from created / before stopping (default 1m0s) + -json + output as JSON lines + -sns string + SNS topic ARN + -stdout + output to stdout (default true) -version - show the version + show the version ``` Environment variable `AWS_REGION` is required. diff --git a/cmd/tracer/main.go b/cmd/tracer/main.go index d6d071d..b52c1f6 100644 --- a/cmd/tracer/main.go +++ b/cmd/tracer/main.go @@ -4,6 +4,7 @@ import ( "context" "flag" "fmt" + "log/slog" "os" "strings" "time" @@ -43,9 +44,14 @@ func main() { flag.BoolVar(&showVersion, "version", false, "show the version") flag.BoolVar(&opt.Stdout, "stdout", true, "output to stdout") flag.StringVar(&opt.SNSTopicArn, "sns", "", "SNS topic ARN") + flag.BoolVar(&opt.JSON, "json", false, "output as JSON lines") flag.VisitAll(envToFlag) flag.Parse() + if opt.JSON { + slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stderr, nil))) + } + if showVersion { fmt.Println("tracer", Version) return @@ -60,7 +66,7 @@ func main() { copy(args, flag.Args()) if err := t.Run(ctx, args[0], args[1], &opt); err != nil { - fmt.Fprintln(os.Stderr, err) + slog.Error(err.Error()) os.Exit(1) } } diff --git a/lambda.go b/lambda.go index e63a098..3f4a2bc 100644 --- a/lambda.go +++ b/lambda.go @@ -2,7 +2,7 @@ package tracer import ( "context" - "fmt" + "log/slog" "strings" "github.com/aws/aws-sdk-go-v2/aws/arn" @@ -10,7 +10,7 @@ import ( func (t *Tracer) LambdaHandlerFunc(opt *RunOption) func(ctx context.Context, event *ECSTaskEvent) error { return func(ctx context.Context, event *ECSTaskEvent) error { - fmt.Println(event.String()) + slog.Info("event", "payload", event.String()) lastStatus := event.Detail.LastStatus if lastStatus != "STOPPED" { return nil diff --git a/lambda/function.jsonnet b/lambda/function.jsonnet index c1581c8..f85e3dc 100644 --- a/lambda/function.jsonnet +++ b/lambda/function.jsonnet @@ -2,6 +2,7 @@ FunctionName: 'tracer', MemorySize: 128, Handler: 'index.handler', - Role: 'arn:aws:iam::{account_id}:role/{role_name}', + // Role: 'arn:aws:iam::{account_id}:role/{role_name}', + Role: 'arn:aws:iam::314472643515:role/tracer', Runtime: 'provided.al2', } diff --git a/tracer.go b/tracer.go index 26b0520..466b053 100644 --- a/tracer.go +++ b/tracer.go @@ -3,8 +3,10 @@ package tracer import ( "bytes" "context" + "encoding/json" "fmt" "io" + "log/slog" "os" "sort" "strings" @@ -71,7 +73,7 @@ func (tl *Timeline) Add(event *TimelineEvent) { tl.events = append(tl.events, event) } -func (tl *Timeline) Print(w io.Writer) (int, error) { +func (tl *Timeline) Print(w io.Writer, json bool) (int, error) { tl.mu.Lock() defer tl.mu.Unlock() @@ -81,16 +83,23 @@ func (tl *Timeline) Print(w io.Writer) (int, error) { return tls[i].Timestamp.Before(tls[j].Timestamp) }) n := 0 + toString := func(e *TimelineEvent) string { + if json { + return e.JSON() + } + return e.String() + } for _, e := range tls { - s := e.String() - if !tl.seen[s] { - l, err := fmt.Fprint(w, e.String()) - if err != nil { - return n, err - } - n += l - tl.seen[s] = true + s := toString(e) + if tl.seen[s] { + continue } + l, err := fmt.Fprint(w, s) + if err != nil { + return n, err + } + n += l + tl.seen[s] = true } return n, nil } @@ -106,6 +115,20 @@ func (e *TimelineEvent) String() string { return fmt.Sprintf("%s\t%s\t%s\n", ts.Format(TimeFormat), e.Source, e.Message) } +func (e *TimelineEvent) JSON() string { + ts := e.Timestamp.In(time.Local) + b, _ := json.Marshal(struct { + Time string `json:"time"` + Source string `json:"src"` + Message string `json:"msg"` + }{ + Time: ts.Format(TimeFormat), + Source: e.Source, + Message: e.Message, + }) + return string(b) + "\n" +} + func New(ctx context.Context) (*Tracer, error) { region := os.Getenv("AWS_REGION") awscfg, err := config.LoadDefaultConfig(ctx, config.WithRegion(region)) @@ -138,6 +161,7 @@ type RunOption struct { Stdout bool SNSTopicArn string Duration time.Duration + JSON bool } func (t *Tracer) SetOutput(w io.Writer) { @@ -151,7 +175,7 @@ func (t *Tracer) Run(ctx context.Context, cluster string, taskID string, opt *Ru defer func() { t.report(ctx, cluster, taskID) }() if cluster == "" { - return t.listClusters(ctx) + return t.listClusters(ctx, opt) } if taskID == "" { @@ -162,6 +186,12 @@ func (t *Tracer) Run(ctx context.Context, cluster string, taskID string, opt *Ru if err != nil { return err } + + defer func() { + if _, err := t.timeline.Print(t.buf, opt.JSON); err != nil { + slog.Error("failed to print timeline", "error", err) + } + }() if err := t.traceLogs(ctx, task); err != nil { return err } @@ -172,14 +202,19 @@ func (t *Tracer) Run(ctx context.Context, cluster string, taskID string, opt *Ru func (t *Tracer) report(ctx context.Context, cluster, taskID string) { opt := t.option if opt.Stdout { - fmt.Fprintln(t.w, subject(cluster, taskID)) + sub := &subject{cluster, taskID} + if opt.JSON { + fmt.Fprintln(t.w, sub.JSON()) + } else { + fmt.Fprintln(t.w, sub.String()) + } if _, err := t.WriteTo(t.w); err != nil { - fmt.Fprintln(os.Stderr, err) + slog.Error("failed to write to output", "error", err) } } if opt.SNSTopicArn != "" { if err := t.Publish(ctx, opt.SNSTopicArn, cluster, taskID); err != nil { - fmt.Fprintln(os.Stderr, err) + slog.Error("failed to publish to SNS", "error", err) } } } @@ -189,19 +224,29 @@ func (t *Tracer) WriteTo(w io.Writer) (int64, error) { return int64(n), err } -func subject(cluster, taskID string) string { - s := "Tracer:" - if taskID != "" { - s += " " + taskID - } else if cluster != "" { - s += " tasks" +type subject struct { + Cluster string `json:"cluster"` + TaskID string `json:"task_id"` +} + +func (s *subject) JSON() string { + b, _ := json.Marshal(s) + return string(b) +} + +func (s *subject) String() string { + str := "Tracer:" + if s.TaskID != "" { + str += " " + s.TaskID + } else if s.Cluster != "" { + str += " tasks" } - if cluster != "" { - s += " on " + cluster + if s.Cluster != "" { + str += " on " + s.Cluster } else { - s += " clusters" + str += " clusters" } - return s + return str } const ( @@ -215,7 +260,7 @@ func (t *Tracer) Publish(ctx context.Context, topicArn, cluster, taskID string) msg = msg[:snsMaxPayloadSize] } - s := subject(cluster, taskID) + s := (&subject{cluster, taskID}).String() if len(s) > snsSubjectLimitLength { s = s[0:snsSubjectLimitLength-len(ellipsisString)] + ellipsisString } @@ -236,7 +281,7 @@ func (t *Tracer) traceTask(ctx context.Context, cluster string, taskID string) ( return nil, fmt.Errorf("failed to describe tasks: %w", err) } if len(res.Tasks) == 0 { - return nil, fmt.Errorf("no tasks found: %w", err) + return nil, fmt.Errorf("no tasks found. cluster: %s, task_id: %s", cluster, taskID) } task := res.Tasks[0] @@ -278,8 +323,6 @@ func (t *Tracer) traceTask(ctx context.Context, cluster string, taskID string) ( } func (t *Tracer) traceLogs(ctx context.Context, task *ecsTypes.Task) error { - defer t.timeline.Print(t.buf) - res, err := t.ecs.DescribeTaskDefinition(ctx, &ecs.DescribeTaskDefinitionInput{ TaskDefinition: task.TaskDefinitionArn, }) @@ -393,7 +436,7 @@ func (t *Tracer) listAllTasks(ctx context.Context, cluster string) error { return nil } -func (t *Tracer) listClusters(ctx context.Context) error { +func (t *Tracer) listClusters(ctx context.Context, opt *RunOption) error { res, err := t.ecs.ListClusters(ctx, &ecs.ListClustersInput{}) if err != nil { return err @@ -403,6 +446,17 @@ func (t *Tracer) listClusters(ctx context.Context) error { clusters = append(clusters, arnToName(c)) } sort.Strings(clusters) + if opt.JSON { + err := json.NewEncoder(t.buf).Encode( + struct { + Clusters []string `json:"clusters"` + }{clusters}, + ) + if err != nil { + return fmt.Errorf("failed to encode JSON: %w", err) + } + return nil + } for _, c := range clusters { t.buf.WriteString(c) t.buf.WriteByte('\n') @@ -431,9 +485,13 @@ func (t *Tracer) listTasks(ctx context.Context, cluster string, status ecsTypes. if err != nil { return fmt.Errorf("failed to describe tasks: %w", err) } - for _, task := range res.Tasks { - t.buf.WriteString(strings.Join(taskToColumns(&task), "\t")) - t.buf.WriteRune('\n') + for _, ts := range res.Tasks { + task := newTask(&ts) + if t.option.JSON { + t.buf.WriteString(task.JSON()) + } else { + t.buf.WriteString(task.String()) + } } if nextToken = listRes.NextToken; nextToken == nil { break @@ -480,14 +538,41 @@ func arnToName(arn string) string { return arn[strings.LastIndex(arn, "/")+1:] } -func taskToColumns(task *ecsTypes.Task) []string { - return []string{ - arnToName(*task.TaskArn), - arnToName(*task.TaskDefinitionArn), - aws.ToString(task.LastStatus), - aws.ToString(task.DesiredStatus), - task.CreatedAt.In(time.Local).Format(time.RFC3339), - aws.ToString(task.Group), - string(task.LaunchType), +type task struct { + Arn string `json:"arn"` + TaskDefinition string `json:"task_definition"` + LastStatus string `json:"last_status"` + DesiredStatus string `json:"desired_status"` + CreatedAt string `json:"created_at"` + Group string `json:"group"` + LaunchType string `json:"launch_type"` +} + +func newTask(t *ecsTypes.Task) *task { + return &task{ + Arn: arnToName(*t.TaskArn), + TaskDefinition: arnToName(*t.TaskDefinitionArn), + LastStatus: aws.ToString(t.LastStatus), + DesiredStatus: aws.ToString(t.DesiredStatus), + CreatedAt: t.CreatedAt.In(time.Local).Format(time.RFC3339), + Group: aws.ToString(t.Group), + LaunchType: string(t.LaunchType), } } + +func (t *task) String() string { + return strings.Join([]string{ + t.Arn, + t.TaskDefinition, + t.LastStatus, + t.DesiredStatus, + t.CreatedAt, + t.Group, + t.LaunchType, + }, "\t") + "\n" +} + +func (t *task) JSON() string { + b, _ := json.Marshal(t) + return string(b) + "\n" +} diff --git a/tracer_test.go b/tracer_test.go index 08bf4ab..224081d 100644 --- a/tracer_test.go +++ b/tracer_test.go @@ -48,6 +48,11 @@ var ( 2021-01-02T03:04:05.123Z test_source 3 test message 3 2021-01-02T03:04:05.123Z test_source 5 test message 5 2021-01-02T03:04:06.123Z test_source 2 test message 2 +` + expectedJSONOutput = `{"time":"2021-01-02T03:04:05.123Z","src":"test_source 1","msg":"test message 1"} +{"time":"2021-01-02T03:04:05.123Z","src":"test_source 3","msg":"test message 3"} +{"time":"2021-01-02T03:04:05.123Z","src":"test_source 5","msg":"test message 5"} +{"time":"2021-01-02T03:04:06.123Z","src":"test_source 2","msg":"test message 2"} ` ) @@ -71,15 +76,30 @@ func TestTimeline(t *testing.T) { ev := ev tl.Add(&ev) } - b := new(strings.Builder) - n, err := tl.Print(b) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - if n != len(expectedOutput) { - t.Errorf("unexpected length: %d", n) - } - if b.String() != expectedOutput { - t.Errorf("unexpected output: %s", b.String()) - } + t.Run("Print(plaintext)", func(t *testing.T) { + b := new(strings.Builder) + n, err := tl.Print(b, false) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if n != len(expectedOutput) { + t.Errorf("unexpected length: %d", n) + } + if b.String() != expectedOutput { + t.Errorf("unexpected output: %s", b.String()) + } + }) + t.Run("Print(json)", func(t *testing.T) { + b := new(strings.Builder) + n, err := tl.Print(b, true) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if n != len(expectedJSONOutput) { + t.Errorf("unexpected length: %d", n) + } + if b.String() != expectedJSONOutput { + t.Errorf("unexpected output: %s", b.String()) + } + }) }