Skip to content

Commit

Permalink
fix(flow): ensure format consistency for all emitted logs
Browse files Browse the repository at this point in the history
Signed-off-by: hainenber <[email protected]>
  • Loading branch information
hainenber committed Dec 17, 2023
1 parent fadf21f commit 211479f
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 17 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ Main (unreleased)

- Add staleness tracking to labelstore to reduce memory usage. (@mattdurham)

- Fix inconsistently formatted agent logs. (@hainenber)

### Other changes

- Bump github.com/IBM/sarama from v1.41.2 to v1.42.1
Expand Down
32 changes: 16 additions & 16 deletions cmd/internal/flowmode/cmd_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,6 @@ func (fr *flowRun) Run(configPath string) error {
// injected.
otel.SetTracerProvider(t)

level.Info(l).Log("boringcrypto enabled", boringcrypto.Enabled)

// Immediately start the tracer.
go func() {
err := t.Run(ctx)
Expand Down Expand Up @@ -288,20 +286,6 @@ func (fr *flowRun) Run(configPath string) error {
}()
}

// Report usage of enabled components
if !fr.disableReporting {
reporter, err := usagestats.NewReporter(l)
if err != nil {
return fmt.Errorf("failed to create reporter: %w", err)
}
go func() {
err := reporter.Start(ctx, getEnabledComponentsFunc(f))
if err != nil {
level.Error(l).Log("msg", "failed to start reporter", "err", err)
}
}()
}

// Perform the initial reload. This is done after starting the HTTP server so
// that /metric and pprof endpoints are available while the Flow controller
// is loading.
Expand All @@ -325,6 +309,22 @@ func (fr *flowRun) Run(configPath string) error {
return err
}

level.Info(l).Log("boringcrypto enabled", boringcrypto.Enabled)

// Report usage of enabled components
if !fr.disableReporting {
reporter, err := usagestats.NewReporter(l)
if err != nil {
return fmt.Errorf("failed to create reporter: %w", err)
}
go func() {
err := reporter.Start(ctx, getEnabledComponentsFunc(f))
if err != nil {
level.Error(l).Log("msg", "failed to start reporter", "err", err)
}
}()
}

// By now, have either joined or started a new cluster.
// Nodes initially join in the Viewer state. After the graph has been
// loaded successfully, we can move to the Participant state to signal that
Expand Down
29 changes: 28 additions & 1 deletion pkg/flow/internal/controller/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ func (l *Loader) Apply(args map[string]any, componentBlocks []*ast.BlockStmt, co
defer span.End()

logger := log.With(l.log, "trace_id", span.SpanContext().TraceID())
level.Info(logger).Log("msg", "starting complete graph evaluation")
defer func() {
span.SetStatus(codes.Ok, "")

Expand All @@ -154,6 +153,34 @@ func (l *Loader) Apply(args map[string]any, componentBlocks []*ast.BlockStmt, co

l.cache.ClearModuleExports()

// Prioritize evaluate the logging block to ensure logging format conformity
for _, leave := range newGraph.Leaves() {
if leave.NodeID() != "logging" {
continue
}

n := leave.(BlockNode)

_, span := tracer.Start(spanCtx, "EvaluateNode", trace.WithSpanKind(trace.SpanKindInternal))
span.SetAttributes(attribute.String("node_id", n.NodeID()))
if err := l.evaluate(logger, n); err != nil {
diags.Add(diag.Diagnostic{
Severity: diag.SeverityLevelError,
Message: fmt.Sprintf("Failed to evaluate node for config block: %s", err),
StartPos: ast.StartPos(n.Block()).Position(),
EndPos: ast.EndPos(n.Block()).Position(),
})
span.SetStatus(codes.Error, err.Error())
}
if exp, ok := n.(*ExportConfigNode); ok {
l.cache.CacheModuleExportValue(exp.Label(), exp.Value())
}
level.Info(logger).Log("msg", "finished node evaluation", "node_id", n.NodeID(), "duration", time.Since(start))
span.SetStatus(codes.Ok, "")
}

level.Info(logger).Log("msg", "starting complete graph evaluation")

// Evaluate all the components.
_ = dag.WalkTopological(&newGraph, newGraph.Leaves(), func(n dag.Node) error {
_, span := tracer.Start(spanCtx, "EvaluateNode", trace.WithSpanKind(trace.SpanKindInternal))
Expand Down

0 comments on commit 211479f

Please sign in to comment.