Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[service] Split component.Host functionality into separate serviceHost struct #5292

Merged
merged 2 commits into from
Apr 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion service/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func TestCollectorReportError(t *testing.T) {
return Running == col.GetState()
}, 2*time.Second, 200*time.Millisecond)

col.service.ReportFatalError(errors.New("err2"))
col.service.host.ReportFatalError(errors.New("err2"))

wg.Wait()
assert.Equal(t, Closed, col.GetState())
Expand Down
66 changes: 66 additions & 0 deletions service/host.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package service // import "go.opentelemetry.io/collector/service"

import (
"go.opentelemetry.io/contrib/zpages"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/service/internal/builder"
"go.opentelemetry.io/collector/service/internal/extensions"
)

var _ component.Host = (*serviceHost)(nil)

type serviceHost struct {
asyncErrorChannel chan error
factories component.Factories
zPagesSpanProcessor *zpages.SpanProcessor

builtExporters builder.Exporters
builtReceivers builder.Receivers
builtPipelines builder.BuiltPipelines
builtExtensions extensions.Extensions
}

// ReportFatalError is used to report to the host that the receiver encountered
// a fatal error (i.e.: an error that the instance can't recover from) after
// its start function has already returned.
func (host *serviceHost) ReportFatalError(err error) {
host.asyncErrorChannel <- err
}

func (host *serviceHost) GetFactory(kind component.Kind, componentType config.Type) component.Factory {
switch kind {
case component.KindReceiver:
return host.factories.Receivers[componentType]
case component.KindProcessor:
return host.factories.Processors[componentType]
case component.KindExporter:
return host.factories.Exporters[componentType]
case component.KindExtension:
return host.factories.Extensions[componentType]
}
return nil
}

func (host *serviceHost) GetExtensions() map[config.ComponentID]component.Extension {
return host.builtExtensions.ToMap()
}

func (host *serviceHost) GetExporters() map[config.DataType]map[config.ComponentID]component.Exporter {
return host.builtExporters.ToMapByDataType()
}
87 changes: 26 additions & 61 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"fmt"

"go.opentelemetry.io/contrib/zpages"
"go.uber.org/multierr"

"go.opentelemetry.io/collector/component"
Expand All @@ -29,49 +28,44 @@ import (

// service represents the implementation of a component.Host.
type service struct {
factories component.Factories
buildInfo component.BuildInfo
config *config.Config
telemetry component.TelemetrySettings
zPagesSpanProcessor *zpages.SpanProcessor
asyncErrorChannel chan error

builtExporters builder.Exporters
builtReceivers builder.Receivers
builtPipelines builder.BuiltPipelines
builtExtensions extensions.Extensions
buildInfo component.BuildInfo
config *config.Config
telemetry component.TelemetrySettings
host *serviceHost
}

func newService(set *svcSettings) (*service, error) {
srv := &service{
factories: set.Factories,
buildInfo: set.BuildInfo,
config: set.Config,
telemetry: set.Telemetry,
zPagesSpanProcessor: set.ZPagesSpanProcessor,
asyncErrorChannel: set.AsyncErrorChannel,
buildInfo: set.BuildInfo,
config: set.Config,
telemetry: set.Telemetry,
host: &serviceHost{
factories: set.Factories,
zPagesSpanProcessor: set.ZPagesSpanProcessor,
asyncErrorChannel: set.AsyncErrorChannel,
},
}

var err error
if srv.builtExtensions, err = extensions.Build(srv.telemetry, srv.buildInfo, srv.config, srv.factories.Extensions); err != nil {
if srv.host.builtExtensions, err = extensions.Build(srv.telemetry, srv.buildInfo, srv.config, srv.host.factories.Extensions); err != nil {
return nil, fmt.Errorf("cannot build extensions: %w", err)
}

// Pipeline is built backwards, starting from exporters, so that we create objects
// which are referenced before objects which reference them.

// First create exporters.
if srv.builtExporters, err = builder.BuildExporters(srv.telemetry, srv.buildInfo, srv.config, srv.factories.Exporters); err != nil {
if srv.host.builtExporters, err = builder.BuildExporters(srv.telemetry, srv.buildInfo, srv.config, srv.host.factories.Exporters); err != nil {
return nil, fmt.Errorf("cannot build exporters: %w", err)
}

// Create pipelines and their processors and plug exporters to the end of the pipelines.
if srv.builtPipelines, err = builder.BuildPipelines(srv.telemetry, srv.buildInfo, srv.config, srv.builtExporters, srv.factories.Processors); err != nil {
if srv.host.builtPipelines, err = builder.BuildPipelines(srv.telemetry, srv.buildInfo, srv.config, srv.host.builtExporters, srv.host.factories.Processors); err != nil {
return nil, fmt.Errorf("cannot build pipelines: %w", err)
}

// Create receivers and plug them into the start of the pipelines.
if srv.builtReceivers, err = builder.BuildReceivers(srv.telemetry, srv.buildInfo, srv.config, srv.builtPipelines, srv.factories.Receivers); err != nil {
if srv.host.builtReceivers, err = builder.BuildReceivers(srv.telemetry, srv.buildInfo, srv.config, srv.host.builtPipelines, srv.host.factories.Receivers); err != nil {
return nil, fmt.Errorf("cannot build receivers: %w", err)
}

Expand All @@ -80,33 +74,33 @@ func newService(set *svcSettings) (*service, error) {

func (srv *service) Start(ctx context.Context) error {
srv.telemetry.Logger.Info("Starting extensions...")
if err := srv.builtExtensions.StartAll(ctx, srv); err != nil {
if err := srv.host.builtExtensions.StartAll(ctx, srv.host); err != nil {
return fmt.Errorf("failed to start extensions: %w", err)
}

srv.telemetry.Logger.Info("Starting exporters...")
if err := srv.builtExporters.StartAll(ctx, srv); err != nil {
if err := srv.host.builtExporters.StartAll(ctx, srv.host); err != nil {
return fmt.Errorf("cannot start exporters: %w", err)
}

srv.telemetry.Logger.Info("Starting processors...")
if err := srv.builtPipelines.StartProcessors(ctx, srv); err != nil {
if err := srv.host.builtPipelines.StartProcessors(ctx, srv.host); err != nil {
return fmt.Errorf("cannot start processors: %w", err)
}

srv.telemetry.Logger.Info("Starting receivers...")
if err := srv.builtReceivers.StartAll(ctx, srv); err != nil {
if err := srv.host.builtReceivers.StartAll(ctx, srv.host); err != nil {
return fmt.Errorf("cannot start receivers: %w", err)
}

return srv.builtExtensions.NotifyPipelineReady()
return srv.host.builtExtensions.NotifyPipelineReady()
}

func (srv *service) Shutdown(ctx context.Context) error {
// Accumulate errors and proceed with shutting down remaining components.
var errs error

if err := srv.builtExtensions.NotifyPipelineNotReady(); err != nil {
if err := srv.host.builtExtensions.NotifyPipelineNotReady(); err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to notify that pipeline is not ready: %w", err))
}

Expand All @@ -115,53 +109,24 @@ func (srv *service) Shutdown(ctx context.Context) error {
// time should be part of configuration.

srv.telemetry.Logger.Info("Stopping receivers...")
if err := srv.builtReceivers.ShutdownAll(ctx); err != nil {
if err := srv.host.builtReceivers.ShutdownAll(ctx); err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown receivers: %w", err))
}

srv.telemetry.Logger.Info("Stopping processors...")
if err := srv.builtPipelines.ShutdownProcessors(ctx); err != nil {
if err := srv.host.builtPipelines.ShutdownProcessors(ctx); err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown processors: %w", err))
}

srv.telemetry.Logger.Info("Stopping exporters...")
if err := srv.builtExporters.ShutdownAll(ctx); err != nil {
if err := srv.host.builtExporters.ShutdownAll(ctx); err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown exporters: %w", err))
}

srv.telemetry.Logger.Info("Stopping extensions...")
if err := srv.builtExtensions.ShutdownAll(ctx); err != nil {
if err := srv.host.builtExtensions.ShutdownAll(ctx); err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown extensions: %w", err))
}

return errs
}

// ReportFatalError is used to report to the host that the receiver encountered
// a fatal error (i.e.: an error that the instance can't recover from) after
// its start function has already returned.
func (srv *service) ReportFatalError(err error) {
srv.asyncErrorChannel <- err
}

func (srv *service) GetFactory(kind component.Kind, componentType config.Type) component.Factory {
switch kind {
case component.KindReceiver:
return srv.factories.Receivers[componentType]
case component.KindProcessor:
return srv.factories.Processors[componentType]
case component.KindExporter:
return srv.factories.Exporters[componentType]
case component.KindExtension:
return srv.factories.Extensions[componentType]
}
return nil
}

func (srv *service) GetExtensions() map[config.ComponentID]component.Extension {
return srv.builtExtensions.ToMap()
}

func (srv *service) GetExporters() map[config.DataType]map[config.ComponentID]component.Exporter {
return srv.builtExporters.ToMapByDataType()
}
22 changes: 11 additions & 11 deletions service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,20 @@ func TestService_GetFactory(t *testing.T) {
assert.NoError(t, srv.Shutdown(context.Background()))
})

assert.Nil(t, srv.GetFactory(component.KindReceiver, "wrongtype"))
assert.Equal(t, factories.Receivers["nop"], srv.GetFactory(component.KindReceiver, "nop"))
assert.Nil(t, srv.host.GetFactory(component.KindReceiver, "wrongtype"))
assert.Equal(t, factories.Receivers["nop"], srv.host.GetFactory(component.KindReceiver, "nop"))

assert.Nil(t, srv.GetFactory(component.KindProcessor, "wrongtype"))
assert.Equal(t, factories.Processors["nop"], srv.GetFactory(component.KindProcessor, "nop"))
assert.Nil(t, srv.host.GetFactory(component.KindProcessor, "wrongtype"))
assert.Equal(t, factories.Processors["nop"], srv.host.GetFactory(component.KindProcessor, "nop"))

assert.Nil(t, srv.GetFactory(component.KindExporter, "wrongtype"))
assert.Equal(t, factories.Exporters["nop"], srv.GetFactory(component.KindExporter, "nop"))
assert.Nil(t, srv.host.GetFactory(component.KindExporter, "wrongtype"))
assert.Equal(t, factories.Exporters["nop"], srv.host.GetFactory(component.KindExporter, "nop"))

assert.Nil(t, srv.GetFactory(component.KindExtension, "wrongtype"))
assert.Equal(t, factories.Extensions["nop"], srv.GetFactory(component.KindExtension, "nop"))
assert.Nil(t, srv.host.GetFactory(component.KindExtension, "wrongtype"))
assert.Equal(t, factories.Extensions["nop"], srv.host.GetFactory(component.KindExtension, "nop"))

// Try retrieve non existing component.Kind.
assert.Nil(t, srv.GetFactory(42, "nop"))
assert.Nil(t, srv.host.GetFactory(42, "nop"))
}

func TestService_GetExtensions(t *testing.T) {
Expand All @@ -64,7 +64,7 @@ func TestService_GetExtensions(t *testing.T) {
assert.NoError(t, srv.Shutdown(context.Background()))
})

extMap := srv.GetExtensions()
extMap := srv.host.GetExtensions()

assert.Len(t, extMap, 1)
assert.Contains(t, extMap, config.NewComponentID("nop"))
Expand All @@ -80,7 +80,7 @@ func TestService_GetExporters(t *testing.T) {
assert.NoError(t, srv.Shutdown(context.Background()))
})

expMap := srv.GetExporters()
expMap := srv.host.GetExporters()
assert.Len(t, expMap, 3)
assert.Len(t, expMap[config.TracesDataType], 1)
assert.Contains(t, expMap[config.TracesDataType], config.NewComponentID("nop"))
Expand Down
2 changes: 1 addition & 1 deletion service/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (tel *colTelemetry) initOnce(col *Collector) error {
}

func (tel *colTelemetry) initOpenCensus(col *Collector, instanceID string) (http.Handler, error) {
processMetricsViews, err := telemetry2.NewProcessMetricsViews(getBallastSize(col.service))
processMetricsViews, err := telemetry2.NewProcessMetricsViews(getBallastSize(col.service.host))
if err != nil {
return nil, err
}
Expand Down
22 changes: 11 additions & 11 deletions service/zpages.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,17 @@ const (
zExtensionName = "zextensionname"
)

func (srv *service) RegisterZPages(mux *http.ServeMux, pathPrefix string) {
mux.Handle(path.Join(pathPrefix, tracezPath), otelzpages.NewTracezHandler(srv.zPagesSpanProcessor))
mux.HandleFunc(path.Join(pathPrefix, servicezPath), srv.handleServicezRequest)
mux.HandleFunc(path.Join(pathPrefix, pipelinezPath), srv.handlePipelinezRequest)
func (host *serviceHost) RegisterZPages(mux *http.ServeMux, pathPrefix string) {
mux.Handle(path.Join(pathPrefix, tracezPath), otelzpages.NewTracezHandler(host.zPagesSpanProcessor))
mux.HandleFunc(path.Join(pathPrefix, servicezPath), host.handleServicezRequest)
mux.HandleFunc(path.Join(pathPrefix, pipelinezPath), host.handlePipelinezRequest)
mux.HandleFunc(path.Join(pathPrefix, featurezPath), handleFeaturezRequest)
mux.HandleFunc(path.Join(pathPrefix, extensionzPath), func(w http.ResponseWriter, r *http.Request) {
handleExtensionzRequest(srv, w, r)
handleExtensionzRequest(host, w, r)
})
}

func (srv *service) handleServicezRequest(w http.ResponseWriter, r *http.Request) {
func (host *serviceHost) handleServicezRequest(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/html; charset=utf-8")
zpages.WriteHTMLPageHeader(w, zpages.HeaderData{Title: "service"})
zpages.WriteHTMLComponentHeader(w, zpages.ComponentHeaderData{
Expand All @@ -72,15 +72,15 @@ func (srv *service) handleServicezRequest(w http.ResponseWriter, r *http.Request
zpages.WriteHTMLPageFooter(w)
}

func (srv *service) handlePipelinezRequest(w http.ResponseWriter, r *http.Request) {
func (host *serviceHost) handlePipelinezRequest(w http.ResponseWriter, r *http.Request) {
qValues := r.URL.Query()
pipelineName := qValues.Get(zPipelineName)
componentName := qValues.Get(zComponentName)
componentKind := qValues.Get(zComponentKind)

w.Header().Set("Content-Type", "text/html; charset=utf-8")
zpages.WriteHTMLPageHeader(w, zpages.HeaderData{Title: "Pipelines"})
zpages.WriteHTMLPipelinesSummaryTable(w, srv.getPipelinesSummaryTableData())
zpages.WriteHTMLPipelinesSummaryTable(w, host.getPipelinesSummaryTableData())
if pipelineName != "" && componentName != "" && componentKind != "" {
fullName := componentName
if componentKind == "processor" {
Expand All @@ -94,11 +94,11 @@ func (srv *service) handlePipelinezRequest(w http.ResponseWriter, r *http.Reques
zpages.WriteHTMLPageFooter(w)
}

func (srv *service) getPipelinesSummaryTableData() zpages.SummaryPipelinesTableData {
func (host *serviceHost) getPipelinesSummaryTableData() zpages.SummaryPipelinesTableData {
data := zpages.SummaryPipelinesTableData{}

data.Rows = make([]zpages.SummaryPipelinesTableRowData, 0, len(srv.builtPipelines))
for c, p := range srv.builtPipelines {
data.Rows = make([]zpages.SummaryPipelinesTableRowData, 0, len(host.builtPipelines))
for c, p := range host.builtPipelines {
// TODO: Change the template to use ID.
var recvs []string
for _, recvID := range p.Config.Receivers {
Expand Down