Skip to content

Commit

Permalink
cmd/telemetrygen: add HTTP export for logs (open-telemetry#29078)
Browse files Browse the repository at this point in the history
**Description:**

Closes
open-telemetry#18867

**Testing:**

Ran opentelemetry-collector locally with debug exporter, then used
telemetrygen with `--otlp-http` with and without `--otlp-insecure`.

**Documentation:** None
  • Loading branch information
axw authored Nov 13, 2023
1 parent c9f1f25 commit 770f1c0
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 36 deletions.
27 changes: 27 additions & 0 deletions .chloggen/telemetrygen-logs-http.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: cmd/telemetrygen

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for --otlp-http for telemetrygen logs

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [18867]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
4 changes: 2 additions & 2 deletions cmd/telemetrygen/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (

// TestConfig_HTTPPath verifies that the HTTPPath configuration defaults are correctly set for each sub-command.
func TestConfig_HTTPPath(t *testing.T) {
t.Run("LogsConfigEmptyDefaultUrlPath", func(t *testing.T) {
assert.Equal(t, "", logsCfg.HTTPPath)
t.Run("LogsConfigValidDefaultUrlPath", func(t *testing.T) {
assert.Equal(t, "/v1/logs", logsCfg.HTTPPath)
})

t.Run("MetricsConfigValidDefaultUrlPath", func(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions cmd/telemetrygen/internal/logs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ type Config struct {
// Flags registers config flags.
func (c *Config) Flags(fs *pflag.FlagSet) {
c.CommonFlags(fs)

fs.StringVar(&c.HTTPPath, "otlp-http-url-path", "/v1/logs", "Which URL path to write to")

fs.IntVar(&c.NumLogs, "logs", 1, "Number of logs to generate in each worker (ignored if duration is provided)")
fs.StringVar(&c.Body, "body", "the message", "Body of the log")
}
93 changes: 93 additions & 0 deletions cmd/telemetrygen/internal/logs/exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package logs

import (
"bytes"
"context"
"fmt"
"io"
"net/http"

"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/plog/plogotlp"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

type exporter interface {
export(plog.Logs) error
}

func newExporter(ctx context.Context, cfg *Config) (exporter, error) {
if cfg.UseHTTP {
return &httpClientExporter{
client: http.DefaultClient,
cfg: cfg,
}, nil
}

if !cfg.Insecure {
return nil, fmt.Errorf("'telemetrygen logs' only supports insecure gRPC")
}
// only support grpc in insecure mode
clientConn, err := grpc.DialContext(ctx, cfg.Endpoint(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, err
}
return &gRPCClientExporter{client: plogotlp.NewGRPCClient(clientConn)}, nil
}

type gRPCClientExporter struct {
client plogotlp.GRPCClient
}

func (e *gRPCClientExporter) export(logs plog.Logs) error {
req := plogotlp.NewExportRequestFromLogs(logs)
if _, err := e.client.Export(context.Background(), req); err != nil {
return err
}
return nil
}

type httpClientExporter struct {
client *http.Client
cfg *Config
}

func (e *httpClientExporter) export(logs plog.Logs) error {
scheme := "https"
if e.cfg.Insecure {
scheme = "http"
}
path := e.cfg.HTTPPath
url := fmt.Sprintf("%s://%s%s", scheme, e.cfg.Endpoint(), path)

req := plogotlp.NewExportRequestFromLogs(logs)
body, err := req.MarshalProto()
if err != nil {
return fmt.Errorf("failed to marshal logs to protobuf: %w", err)
}

httpReq, err := http.NewRequestWithContext(context.Background(), "POST", url, bytes.NewReader(body))
if err != nil {
return fmt.Errorf("failed to create logs HTTP request: %w", err)
}
for k, v := range e.cfg.Headers {
httpReq.Header.Set(k, v)
}
httpReq.Header.Set("Content-Type", "application/x-protobuf")
resp, err := e.client.Do(httpReq)
if err != nil {
return fmt.Errorf("failed to execute logs HTTP request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
var respData bytes.Buffer
_, _ = io.Copy(&respData, resp.Body)
return fmt.Errorf("log request failed with status %s (%s)", resp.Status, respData.String())
}

return nil
}
36 changes: 2 additions & 34 deletions cmd/telemetrygen/internal/logs/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,59 +10,27 @@ import (
"sync/atomic"
"time"

"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/plog/plogotlp"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.uber.org/zap"
"golang.org/x/time/rate"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/open-telemetry/opentelemetry-collector-contrib/cmd/telemetrygen/internal/common"
)

type exporter interface {
export(plog.Logs) error
}

type gRPCClientExporter struct {
client plogotlp.GRPCClient
}

func (e *gRPCClientExporter) export(logs plog.Logs) error {
req := plogotlp.NewExportRequestFromLogs(logs)
if _, err := e.client.Export(context.Background(), req); err != nil {
return err
}
return nil
}

// Start starts the log telemetry generator
func Start(cfg *Config) error {
logger, err := common.CreateLogger(cfg.SkipSettingGRPCLogger)
if err != nil {
return err
}

if cfg.UseHTTP {
return fmt.Errorf("http is not supported by 'telemetrygen logs'")
}

if !cfg.Insecure {
return fmt.Errorf("'telemetrygen logs' only supports insecure gRPC")
}

// only support grpc in insecure mode
clientConn, err := grpc.DialContext(context.TODO(), cfg.Endpoint(), grpc.WithTransportCredentials(insecure.NewCredentials()))
e, err := newExporter(context.Background(), cfg)
if err != nil {
return err
}
exporter := &gRPCClientExporter{
client: plogotlp.NewGRPCClient(clientConn),
}

if err = Run(cfg, exporter, logger); err != nil {
if err = Run(cfg, e, logger); err != nil {
logger.Error("failed to stop the exporter", zap.Error(err))
return err
}
Expand Down

0 comments on commit 770f1c0

Please sign in to comment.