Skip to content

Commit

Permalink
flow: decouple component HTTP API (#4361)
Browse files Browse the repository at this point in the history
* component: add ParseID as a common function

Splitting a global component ID into its module ID and local ID will be
a commonly used utility which makes sense to include as part of the API
for components.

* service/http: cross-module component handler

This changes the component handler for the HTTP service to be able to
handle requests across all modules from the same root Flow controller.

* flow: remove unnecessary HTTP handler

This removes the HTTP handlers from the Flow controller and modules, as
all HTTP handlers are now handled centrally from the HTTP service.
  • Loading branch information
rfratto authored Jul 6, 2023
1 parent f6241a1 commit 85098fa
Show file tree
Hide file tree
Showing 14 changed files with 280 additions and 126 deletions.
30 changes: 28 additions & 2 deletions component/component_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,37 @@ package component

import (
"encoding/json"
"errors"
"strings"
"time"

"github.com/grafana/agent/pkg/river/encoding/riverjson"
)

var (
// ErrComponentNotFound is returned by [Provider.GetComponent] when the
// specified component isn't found.
ErrComponentNotFound = errors.New("component not found")

// ErrModuleNotFound is returned by [Provider.ListComponents] when the
// specified module isn't found.
ErrModuleNotFound = errors.New("module not found")
)

// A Provider is a system which exposes a list of running components.
type Provider interface {
// GetComponent returns information about an individual running component
// given its global ID. The provided opts field configures how much detail to
// return; see [InfoOptions] for more information.
//
// GetComponent returns an error if a component is not found.
// GetComponent returns ErrComponentNotFound if a component is not found.
GetComponent(id ID, opts InfoOptions) (*Info, error)

// ListComponents returns the list of active components. The provided opts
// field configures how much detail to return; see [InfoOptions] for more
// information.
//
// Returns an error if the provided moduleID doesn't exist.
// Returns ErrModuleNotFound if the provided moduleID doesn't exist.
ListComponents(moduleID string, opts InfoOptions) ([]*Info, error)
}

Expand All @@ -30,6 +42,20 @@ type ID struct {
LocalID string // Local ID of the component, unique to the module it is running in.
}

// ParseID parses an input string of the form "LOCAL_ID" or
// "MODULE_ID/LOCAL_ID" into an ID. The final slash character is used to
// separate the ModuleID and LocalID.
func ParseID(input string) ID {
slashIndex := strings.LastIndexByte(input, '/')
if slashIndex == -1 {
return ID{LocalID: input}
}
return ID{
ModuleID: input[:slashIndex],
LocalID: input[slashIndex+1:],
}
}

// InfoOptions is used by to determine how much information to return with
// [Info].
type InfoOptions struct {
Expand Down
7 changes: 0 additions & 7 deletions component/module/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package file

import (
"context"
"net/http"
"sync"

"go.uber.org/atomic"
Expand Down Expand Up @@ -55,7 +54,6 @@ type Component struct {
var (
_ component.Component = (*Component)(nil)
_ component.HealthComponent = (*Component)(nil)
_ component.HTTPComponent = (*Component)(nil)
)

// New creates a new module.file component.
Expand Down Expand Up @@ -140,11 +138,6 @@ func (c *Component) Update(args component.Arguments) error {
return c.mod.LoadFlowContent(newArgs.Arguments, c.getContent().Value)
}

// Handler implements component.HTTPComponent.
func (c *Component) Handler() http.Handler {
return c.mod.HTTPHandler()
}

// CurrentHealth implements component.HealthComponent.
func (c *Component) CurrentHealth() component.Health {
leastHealthy := component.LeastHealthy(
Expand Down
7 changes: 0 additions & 7 deletions component/module/git/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package git

import (
"context"
"net/http"
"path/filepath"
"reflect"
"sync"
Expand Down Expand Up @@ -70,7 +69,6 @@ type Component struct {
var (
_ component.Component = (*Component)(nil)
_ component.HealthComponent = (*Component)(nil)
_ component.HTTPComponent = (*Component)(nil)
)

// New creates a new module.git component.
Expand Down Expand Up @@ -238,11 +236,6 @@ func (c *Component) CurrentHealth() component.Health {
return component.LeastHealthy(c.health, c.mod.CurrentHealth())
}

// Handler implements component.HTTPComponent.
func (c *Component) Handler() http.Handler {
return c.mod.HTTPHandler()
}

// DebugInfo implements component.DebugComponent.
func (c *Component) DebugInfo() interface{} {
type DebugInfo struct {
Expand Down
6 changes: 0 additions & 6 deletions component/module/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package module
import (
"context"
"fmt"
"net/http"
"sync"
"time"

Expand Down Expand Up @@ -72,11 +71,6 @@ func (c *ModuleComponent) CurrentHealth() component.Health {
return c.health
}

// HTTPHandler returns the underlying module handle for the UI.
func (c *ModuleComponent) HTTPHandler() http.Handler {
return c.mod.ComponentHandler()
}

// SetHealth contains the implementation details for setHealth in a module component.
func (c *ModuleComponent) setHealth(h component.Health) {
c.mut.Lock()
Expand Down
6 changes: 0 additions & 6 deletions component/module/string/string.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package string

import (
"context"
"net/http"

"github.com/grafana/agent/component"
"github.com/grafana/agent/component/module"
Expand Down Expand Up @@ -39,7 +38,6 @@ type Component struct {
var (
_ component.Component = (*Component)(nil)
_ component.HealthComponent = (*Component)(nil)
_ component.HTTPComponent = (*Component)(nil)
)

// New creates a new module.string component.
Expand Down Expand Up @@ -71,10 +69,6 @@ func (c *Component) Update(args component.Arguments) error {
return c.mod.LoadFlowContent(newArgs.Arguments, newArgs.Content.Value)
}

func (c *Component) Handler() http.Handler {
return c.mod.HTTPHandler()
}

// CurrentHealth implements component.HealthComponent.
func (c *Component) CurrentHealth() component.Health {
return c.mod.CurrentHealth()
Expand Down
5 changes: 0 additions & 5 deletions component/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"net"
"net/http"
"reflect"
"strings"

Expand Down Expand Up @@ -53,10 +52,6 @@ type Module interface {
// Run blocks until the provided context is canceled. The ID of a module as defined in
// ModuleController.NewModule will not be released until Run returns.
Run(context.Context)

// ComponentHandler returns an HTTP handler which exposes endpoints of
// components managed by the Module.
ComponentHandler() http.Handler
}

// ExportFunc is used for onExport of the Module
Expand Down
6 changes: 3 additions & 3 deletions pkg/flow/flow_components.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func (f *Flow) GetComponent(id component.ID, opts component.InfoOptions) (*compo
if id.ModuleID != "" {
mod, ok := f.modules.Get(id.ModuleID)
if !ok {
return nil, fmt.Errorf("component %q does not exist", id)
return nil, component.ErrComponentNotFound
}

return mod.f.GetComponent(component.ID{LocalID: id.LocalID}, opts)
Expand All @@ -26,7 +26,7 @@ func (f *Flow) GetComponent(id component.ID, opts component.InfoOptions) (*compo

node := graph.GetByID(id.LocalID)
if node == nil {
return nil, fmt.Errorf("component %q does not exist", id)
return nil, component.ErrComponentNotFound
}

cn, ok := node.(*controller.ComponentNode)
Expand All @@ -45,7 +45,7 @@ func (f *Flow) ListComponents(moduleID string, opts component.InfoOptions) ([]*c
if moduleID != "" {
mod, ok := f.modules.Get(moduleID)
if !ok {
return nil, fmt.Errorf("module %q does not exist", moduleID)
return nil, component.ErrModuleNotFound
}

return mod.f.ListComponents("", opts)
Expand Down
42 changes: 0 additions & 42 deletions pkg/flow/flow_http.go

This file was deleted.

18 changes: 0 additions & 18 deletions pkg/flow/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ package flow
import (
"context"
"fmt"
"net/http"
"path"
"sync"

"github.com/gorilla/mux"
"github.com/grafana/agent/component"
"github.com/grafana/agent/pkg/cluster"
"github.com/grafana/agent/pkg/flow/internal/controller"
Expand Down Expand Up @@ -144,22 +142,6 @@ func (c *module) Run(ctx context.Context) {
c.f.Run(ctx)
}

// ComponentHandler returns an HTTP handler which exposes endpoints of
// components managed by the underlying flow system.
func (c *module) ComponentHandler() (_ http.Handler) {
r := mux.NewRouter()

r.PathPrefix("/{id}/").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Re-add the full path to ensure that nested controllers propagate
// requests properly.
r.URL.Path = path.Join(c.o.HTTPPath, r.URL.Path)

c.f.ComponentHandler().ServeHTTP(w, r)
})

return r
}

// moduleControllerOptions holds static options for module controller.
type moduleControllerOptions struct {
// Logger to use for controller logs and components. A no-op logger will be
Expand Down
30 changes: 14 additions & 16 deletions service/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (s *Service) Run(ctx context.Context, host service.Host) error {
r.PathPrefix("/debug/pprof").Handler(http.DefaultServeMux)
}

r.PathPrefix(s.componentHttpPathPrefix + "{id}/").Handler(s.componentHandler(host))
r.PathPrefix(s.componentHttpPathPrefix).Handler(s.componentHandler(host))

if s.node != nil {
cr, ch := s.node.Handler()
Expand Down Expand Up @@ -196,20 +196,18 @@ func (s *Service) Run(ctx context.Context, host service.Host) error {
}

func (s *Service) componentHandler(host service.Host) http.HandlerFunc {
// TODO(rfratto): make this work across modules. Right now this handler
// only works for the top-level module, and forwards requests to inner
// modules.
//
// This means that the Flow controller still has HTTP logic until this is
// resolved.

return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
id := vars["id"]
// Trim the path prefix to get our full path.
trimmedPath := strings.TrimPrefix(r.URL.Path, s.componentHttpPathPrefix)

// splitURLPath should only fail given an unexpected path.
componentID, componentPath, err := splitURLPath(host, trimmedPath)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(w, "failed to parse URL path %q: %s\n", r.URL.Path, err)
}

// We pass no options in component.InfoOptions, because we're only
// interested in the component instance.
info, err := host.GetComponent(component.ID{LocalID: id}, component.InfoOptions{})
info, err := host.GetComponent(componentID, component.InfoOptions{})
if err != nil {
w.WriteHeader(http.StatusNotFound)
return
Expand All @@ -226,9 +224,9 @@ func (s *Service) componentHandler(host service.Host) http.HandlerFunc {
return
}

// Remove prefix from path, so each component can handle paths from their
// own root path.
r.URL.Path = strings.TrimPrefix(r.URL.Path, path.Join(s.componentHttpPathPrefix, id))
// Send just the remaining path to our component so each component can
// handle paths from their own root path.
r.URL.Path = componentPath
handler.ServeHTTP(w, r)
}
}
Expand Down
Loading

0 comments on commit 85098fa

Please sign in to comment.