Skip to content

Commit

Permalink
Add monitoringjob receiver component outline (#1227)
Browse files Browse the repository at this point in the history
* Add monitorinjob receiver component outline

Includes configuration and documentation for the new monitoringjob
receiver.  Also includes the necessary plumbing to build and start a
stanza pipeline based off the receiver's output configuration with an
exposed custom input to that pipeline - a pattern that is unique to this
receiver.

Signed-off-by: Christian Kruse <[email protected]>
  • Loading branch information
c-kruse authored Sep 6, 2023
1 parent 92ab782 commit 5dadb4b
Show file tree
Hide file tree
Showing 16 changed files with 1,364 additions and 0 deletions.
1 change: 1 addition & 0 deletions pkg/receiver/jobreceiver/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
133 changes: 133 additions & 0 deletions pkg/receiver/jobreceiver/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# Monitoring Job Receiver

| Status | |
| ------------- | ----------- |
| Stability | [development]: logs |

This receiver makes it possible to collect telemetry data from sources that
do not instrument well. The monitoring job receiver executes a script or
executable at defined intervals, and propagates the output from that process
as log events. In addition, the monitoring job receiver simplifies the process
of downloading runtime assets necessary to run a particular monitoring job.

## Configuration

| Configuration | Default | Description
| ------------ | ------------ | ------------
| exec | required | A `exec` configuration block. See details [below](#execution-configuration)
| schedule | required | A `schedule` configuration block. See details [below](#schedule-configuration)
| output | | An `output` configuration block. See details [below](#output-configuration)

### Execution Configuration

| Configuration | Default | Description
| ------------ | ------------ | ------------
| command | required | The `command` to run. Should start a binary that writes to stdout and/or stderr
| arguments | | A list of string arguments to pass the command
| timeout | | [Time](#time-parameters) to wait for the process to exit before attempting to make it exit
| runtime_assets | | A list of `runtime_assets` required for the monitoring job

### Schedule Configuration

The scheduling configuration block currently only supports a single **required**
`interval` [Time](#time-parameters) parameter. Counting from collector startup, the command will be
scheduled every `interval`.

### Output Configuration

The monitoringjob receiver output is a configurable as a [Stanza][stanza]
pipeline. This allows complex operators to be chained to parse the command
output.

| Configuration | Default | Description
| ------------ | ------------ | ------------
| `type` | `event` | The output handler type [see below](#output-handler-type). Valid values are `event` and `log_entries`. |
| `event` | {} | When `type` == `event` this block may be configured with additional properties as described [below](#output-handler-type). |
| `log_entries` | {} | When `type` == `log_entries` this block may be configured with additional properties as described [below](#output-handler-type). |
| `operators` | [] | An array of [stanza][stanza] operators to act on the output. |
| `retry_on_failure.enabled` | `false` | If `true`, the receiver will pause reading a file and attempt to resend the current batch of logs if it encounters an error from downstream components. |
| `retry_on_failure.initial_interval` | `1s` | [Time](#time-parameters) to wait after the first failure before retrying. |
| `retry_on_failure.max_interval` | `30s` | Upper bound on retry backoff [interval](#time-parameters). Once this value is reached the delay between consecutive retries will remain constant at the specified value. |
| `retry_on_failure.max_elapsed_time` | `5m` | Maximum amount of [time](#time-parameters) (including retries) spent trying to send a logs batch to a downstream consumer. Once this value is reached, the data is discarded. Retrying never stops if set to `0`. |
| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes. |
| `resource` | {} | A map of `key: value` pairs to add to the entry's resource. |

#### Output Handler Type

The monitoringjob receiver can feed output into the stanza pipeline in one of
two ways depending on the configured `type`.

By default, the `type: 'event'` output handler is used. When configured with
the event output type, command output will be buffered until the process exits,
at which time the receiver will emit a single structured event capturing the
command output and optionally metadata about the execution such as exit code
and duration. This is useful for true monitoring jobs, such as using a
monitoring plugin not supported natively by OTEL.

When configured with the `log_entries` output type, command output will be
emitted as distinct log entries line-by-line. This can be used to act as a
bridge to third party logs and other log events that may not be straightforward
to access.

[stanza]: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/stanza/docs/operators/README.md

#### Output Configuration - event

| Configuration | Default | Description
| ------------ | ------------ | ------------
| include_command_name | true | When set, includes the attribute `command.name` in the log event.
| include_command_status | true | When set, includes the attribute `command.status` in the log event.
| include_command_duration | true | When set, includes the attribute `command.duration` in the log event.
| max_body_size | | When set, restricts the length of command output to a specified [ByteSize](#bytesize-parameters).

#### Output Configuration - log_entries

| Configuration | Default | Description
| ------------ | ------------ | ------------
| include_command_name | true | When set, includes the attribute `command.name` in the log event.
| include_stream_name | true | When set, includes the attribute `command.stream.name` in the log event. Indicating `stdin` or `stderr` as the origin of the event.
| max_log_size | | When set, restricts the length of a log entry to a specified [ByteSize](#bytesize-parameters).
| encoding | utf-8 | Encoding to expect from the command output. Used to detect log entry boundaries.
| multiline | | Used to override the default newline delimited log entries.
| multiline.line_start_pattern | | Regex pattern for the beginning of a log entry. Mutualy exclusive with end pattern.
| multiline.line_end_pattern | | Regex pattern for the ending of a log entry. Mutualy exclusive with start pattern.

### Runtime Asset Configuration

//todo(ck)

## Additional Features

### Time Parameters

Time parameters are expressed as go duration strings as defined by
[time.ParseDuration](https://pkg.go.dev/time#ParseDuration).

Examples: `60s`, `45m`, `2h30m40s`.

### ByteSize Parameters

ByteSize parameters can be specified either as an integer number or as a string
starting with decimal numbers optionally followed by a common byte unit
prefixes. e.g. `16kb`, `32MiB`, `1GB`.

## Example configuration

```yaml
monitoringjob:
schedule:
interval: 1h
exec:
command: check_ntp_time
arguments:
- "-H"
- time.nist.gov
timeout: 8s
output:
type: event
event:
include_command_name: false
include_command_status: true
include_command_duration: true
max_body_size: '32kib'
```
24 changes: 24 additions & 0 deletions pkg/receiver/jobreceiver/asset/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package asset

import "errors"

// Spec for asset fetching
type Spec struct {
// Name is the name of the asset
Name string `mapstructure:"name"`
// Path is the absolute path to where the asset should be installed
Path string `mapstructure:"path"`
// Url is the remote address used for fetching the asset
URL string `mapstructure:"url"`
// SHA512 is the hash of the asset tarball
SHA512 string `mapstructure:"sha512"`
}

// Validate checks an asset ID is valid, but does not attempt to fetch
// the asset or verify the integrity of its hash
func (a Spec) Validate() error {
if a.Name == "" {
return errors.New("asset name cannot be empty")
}
return nil
}
69 changes: 69 additions & 0 deletions pkg/receiver/jobreceiver/builder/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package builder

import (
"github.com/SumoLogic/sumologic-otel-collector/pkg/receiver/jobreceiver/output"
"github.com/SumoLogic/sumologic-otel-collector/pkg/receiver/jobreceiver/output/consumer"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
"go.uber.org/zap"
)

type JobRunnerBuilder interface {
Build(logger *zap.SugaredLogger, consumer consumer.Interface) (JobRunner, error)
}

type JobRunner interface {
Start(operator.Persister) error
Stop() error
}

// NewOperatorBuilder builds a stanza operator.Builder from monitoring job
// configuration objects.
func NewOperatorBuilder(outputCfg output.Config, executorCfg JobRunnerBuilder) operator.Builder {
return &pipelineInputConfig{
InputConfig: outputCfg.InputConfig,
ConsumerBuilder: outputCfg.Builder,
JobRunnerBuilder: executorCfg,
}
}

type pipelineInputConfig struct {
helper.InputConfig `mapstructure:",squash"`
ConsumerBuilder consumer.Builder

JobRunnerBuilder JobRunnerBuilder
}

// Build the stanza input operator.
func (cfg *pipelineInputConfig) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
inputBase, err := cfg.InputConfig.Build(logger)
if err != nil {
return nil, err
}
inputOp := &inputOperator{
InputOperator: inputBase,
}

// point the consumer at this input operator
consumer, err := cfg.ConsumerBuilder.Build(logger, inputOp)
if err != nil {
return nil, err
}
// point the job runner at the consumer
runner, err := cfg.JobRunnerBuilder.Build(logger, consumer)
if err != nil {
return nil, err
}
inputOp.JobRunner = runner
return inputOp, nil
}

// inputOperator is the actual stanza input operator implementation.
type inputOperator struct {
helper.InputOperator
JobRunner JobRunner
}

func (op *inputOperator) Start(p operator.Persister) error {
return op.JobRunner.Start(p)
}
53 changes: 53 additions & 0 deletions pkg/receiver/jobreceiver/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package jobreceiver

import (
"time"

"github.com/SumoLogic/sumologic-otel-collector/pkg/receiver/jobreceiver/asset"
"github.com/SumoLogic/sumologic-otel-collector/pkg/receiver/jobreceiver/output"
)

const defaultTimeout = time.Second * 10

// Config for monitoringjob receiver
type Config struct {
Exec ExecutionConfig `mapstructure:"exec"`
Schedule ScheduleConfig `mapstructure:"schedule"`
Output output.Config `mapstructure:"output"`
}

// ExecutionConfig defines the configuration for execution of a monitorinjob
// process
type ExecutionConfig struct {
// Command is the name of the binary to be executed
Command string `mapstructure:"command"`
// Arguments to pass to the command
Arguments []string `mapstructure:"arguments"`
// RuntimeAssets made available to the execution context
RuntimeAssets []asset.Spec `mapstructure:"runtime_assets"`
// Timeout is the time to wait for the process to exit before attempting
// to make it stop
Timeout time.Duration `mapstructure:"timeout,omitempty"`
}

// ScheduleConfig defines configuration for the scheduling of the monitoringjob
// receiver
type ScheduleConfig struct {
// Interval to schedule monitoring job at
Interval time.Duration `mapstructure:"interval,omitempty"`
}

// Validate checks the configuration is valid
func (cfg *Config) Validate() error {
return nil
}

func newDefaultExecutionConfig() ExecutionConfig {
return ExecutionConfig{
Timeout: defaultTimeout,
}
}

func newDefaultScheduleConfig() ScheduleConfig {
return ScheduleConfig{}
}
98 changes: 98 additions & 0 deletions pkg/receiver/jobreceiver/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package jobreceiver

import (
"path/filepath"
"testing"
"time"

"github.com/SumoLogic/sumologic-otel-collector/pkg/receiver/jobreceiver/asset"
"github.com/SumoLogic/sumologic-otel-collector/pkg/receiver/jobreceiver/output/event"
"github.com/SumoLogic/sumologic-otel-collector/pkg/receiver/jobreceiver/output/logentries"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap/confmaptest"
)

func TestConfigValidate(t *testing.T) {
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
require.NoError(t, err)

factory := NewFactory()

testCases := []struct {
Name string
Expected func() component.Config
}{
{
Name: "minimal",
Expected: func() component.Config {
c := factory.CreateDefaultConfig().(*Config)
c.Schedule.Interval = time.Hour
c.Exec.Command = "echo"
c.Exec.Arguments = []string{"hello world"}
return c
},
},
{
Name: "log_ntp",
Expected: func() component.Config {
c := factory.CreateDefaultConfig().(*Config)
c.Schedule.Interval = time.Hour
c.Exec.Command = "check_ntp_time"
c.Exec.RuntimeAssets = []asset.Spec{
{
Name: "monitoring-plugins",
URL: "https://assets.bonsai.sensu.io/asset.zip",
Path: "/opt/monitoring-plugins",
},
}
c.Exec.Arguments = []string{"-H", "time.nist.gov"}
c.Exec.Timeout = time.Second * 8
c.Output.InputConfig.OperatorType = "log_entries"
c.Output.InputConfig.Attributes = map[string]helper.ExprStringConfig{"label": "foo"}
c.Output.InputConfig.Resource = map[string]helper.ExprStringConfig{"label": "bar"}
c.Output.Builder = &logentries.LogEntriesConfig{
IncludeCommandName: true,
IncludeStreamName: true,
MaxLogSize: 16 * 1000,
Encoding: "utf-8",
Multiline: tokenize.MultilineConfig{
LineStartPattern: "$start",
},
}
return c
},
},
{
Name: "event_ntp",
Expected: func() component.Config {
c := factory.CreateDefaultConfig().(*Config)
c.Schedule.Interval = time.Hour
c.Exec.Command = "check_ntp_time"
c.Exec.Arguments = []string{"-H", "time.nist.gov"}
c.Exec.Timeout = time.Second * 8
eventCfg := c.Output.Builder.(*event.EventConfig)
require.NotNil(t, eventCfg)
eventCfg.IncludeCommandName = false
eventCfg.IncludeCommandStatus = false
eventCfg.IncludeDuration = false
eventCfg.MaxBodySize = 32 << 10
return c
},
},
}

for _, tc := range testCases {
t.Run(tc.Name, func(t *testing.T) {
actual := factory.CreateDefaultConfig()
expected := tc.Expected()
sub, err := cm.Sub(component.NewIDWithName("monitoringjob", tc.Name).String())
require.NoError(t, err)
require.NoError(t, component.UnmarshalConfig(sub, actual))
assert.Equal(t, expected, actual)
})
}
}
Loading

0 comments on commit 5dadb4b

Please sign in to comment.