Skip to content

Commit

Permalink
mutate routes from listeners
Browse files Browse the repository at this point in the history
Signed-off-by: Maksim Paskal <[email protected]>
  • Loading branch information
maksim-paskal committed Oct 25, 2023
1 parent 1949735 commit 427c651
Show file tree
Hide file tree
Showing 13 changed files with 411 additions and 264 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ runRaceDetection:
-grpc.address=127.0.0.1:18080 \
-web.https.address=127.0.0.1:18081 \
-web.http.address=127.0.0.1:18082

# to load some specific configmap
# -namespace=namespace \
# -configmap.names=configmap \
runCli:
go run ./cmd/cli -debug -namespace=1 -pod=2 \
-tls.CA=certs/CA.crt \
Expand Down
1 change: 0 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ services:
ports:
- 127.0.0.1:18081:18081
command:
- /app/envoy-control-plane
- -log.level=INFO
- -log.pretty
- -kubeconfig.path=/app/kubeconfig
Expand Down
20 changes: 14 additions & 6 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,7 @@ func schedule(ctx context.Context) {
func syncAll(ctx context.Context) {
log.Infof("syncAll every %s", *config.Get().EndpointCheckPeriod)

for {
time.Sleep(*config.Get().EndpointCheckPeriod)

for ctx.Err() == nil {
configstore.StoreMap.Range(func(k, v interface{}) bool {
cs, ok := v.(*configstore.ConfigStore)

Expand All @@ -165,15 +163,19 @@ func syncAll(ctx context.Context) {

return true
})

select {
case <-time.After(*config.Get().EndpointCheckPeriod):
case <-ctx.Done():
break
}
}
}

func rotateCertificates(ctx context.Context) {
log.Infof("syncAll every %s", *config.Get().SSLRotationPeriod)

for {
time.Sleep(*config.Get().SSLRotationPeriod)

for ctx.Err() == nil {
configstore.StoreMap.Range(func(k, v interface{}) bool {
cs, ok := v.(*configstore.ConfigStore)

Expand All @@ -193,5 +195,11 @@ func rotateCertificates(ctx context.Context) {

return true
})

select {
case <-time.After(*config.Get().SSLRotationPeriod):
case <-ctx.Done():
break
}
}
}
2 changes: 2 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Type struct {
LogReportCaller *bool `yaml:"logReportCaller"`
ConfigFile *string
ConfigMapLabels *string `yaml:"configMapLabels"`
ConfigMapNames *string `yaml:"configMapNames"`
KubeConfigFile *string `yaml:"kubeConfigFile"`
WatchNamespaced *bool `yaml:"watchNamespaced"`
LeaderElection *bool `yaml:"leaderElection"`
Expand Down Expand Up @@ -73,6 +74,7 @@ var config = Type{
LogReportCaller: flag.Bool("log.reportCaller", true, "log file name and line number"),
ConfigFile: flag.String("config", getEnvDefault("CONFIG", "config.yaml"), "load config from file"),
ConfigMapLabels: flag.String("configmap.labels", "app=envoy-control-plane", "config directory"),
ConfigMapNames: flag.String("configmap.names", "", "name of configmap to import, comma separated"),
KubeConfigFile: flag.String("kubeconfig.path", "", "kubeconfig path"),
WatchNamespaced: flag.Bool("namespaced", true, "watch pod in one namespace"),
LeaderElection: flag.Bool("leaderElection", true, "leader election"),
Expand Down
208 changes: 204 additions & 4 deletions pkg/config/configType.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,20 @@ import (
"strings"
"text/template"

cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
hcm "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
tls "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/wellknown"
"github.com/golang/protobuf/ptypes/wrappers"
"github.com/maksim-paskal/envoy-control-plane/pkg/resources"
"github.com/maksim-paskal/utils-go"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"google.golang.org/protobuf/types/known/anypb"
"gopkg.in/yaml.v3"
)

Expand Down Expand Up @@ -64,6 +76,8 @@ type ConfigType struct { //nolint: golint,revive
Secrets []interface{} `yaml:"secrets"`
// extensions.transport_sockets.tls.v3.CertificateValidationContext
Validation interface{} `yaml:"validation"`
// internal resources
clusters, routes, listeners, secrets []types.Resource
}

func (c *ConfigType) HasClusterWeights() bool {
Expand All @@ -76,6 +90,22 @@ func (c *ConfigType) HasClusterWeights() bool {
return false
}

func (c *ConfigType) GetClusters() []types.Resource {
return c.clusters
}

func (c *ConfigType) GetRoutes() []types.Resource {
return c.routes
}

func (c *ConfigType) GetListeners() []types.Resource {
return c.listeners
}

func (c *ConfigType) GetSecrets() []types.Resource {
return c.secrets
}

type ClusterWeight struct {
Value int64
}
Expand All @@ -94,15 +124,63 @@ func (c *ConfigType) GetClusterWeight(name string) (*ClusterWeight, error) {
return nil, nil //nolint: nilnil
}

func ParseConfigYaml(nodeID string, text string, data interface{}) (ConfigType, error) {
func (c *ConfigType) SaveResources() error {
clusters, err := resources.YamlToResources(c.Clusters, cluster.Cluster{})
if err != nil {
return errors.Wrap(err, "error parsing clusters")
}

routes, err := resources.YamlToResources(c.Routes, route.RouteConfiguration{})
if err != nil {
return errors.Wrap(err, "error parsing routes")
}

listeners, err := resources.YamlToResources(c.Listeners, listener.Listener{})
if err != nil {
return errors.Wrap(err, "error parsing listeners")
}

secrets, err := resources.YamlToResources(c.Secrets, tls.Secret{})
if err != nil {
return errors.Wrap(err, "error parsing secrets")
}

c.clusters = clusters
c.routes = routes
c.listeners = listeners
c.secrets = secrets

// update cluster weights
if c.HasClusterWeights() {
if err := mutateWeightedRoutesInListeners(c, c.listeners); err != nil {
return errors.Wrap(err, "errors in mutateWeightedRoutesInListeners")
}

if err := mutateWeightedRoutes(c, c.routes); err != nil {
return errors.Wrap(err, "errors in mutateWeightedRoutes")
}
}

// remove all require_client_certificate from listiners
if *Get().SSLDoNotUseValidation {
err = filterCertificates(c.listeners)
if err != nil {
return errors.Wrap(err, "errors in filterCertificates")
}
}

return nil
}

func ParseConfigYaml(nodeID string, text string, data interface{}) (*ConfigType, error) {
t := template.New(nodeID)
templates := template.Must(t.Funcs(utils.GoTemplateFunc(t)).Parse(text))

var tpl bytes.Buffer

err := templates.ExecuteTemplate(&tpl, path.Base(nodeID), data)
if err != nil {
return ConfigType{}, errors.Wrap(err, "templates.ExecuteTemplate")
return nil, errors.Wrap(err, "templates.ExecuteTemplate")
}

config := ConfigType{
Expand All @@ -111,8 +189,130 @@ func ParseConfigYaml(nodeID string, text string, data interface{}) (ConfigType,

err = yaml.Unmarshal(tpl.Bytes(), &config)
if err != nil {
return ConfigType{}, errors.Wrap(err, "yaml.Unmarshal")
return nil, errors.Wrap(err, "yaml.Unmarshal")
}

return &config, nil
}

var errUnknownClass = errors.New("unknown class")

// remove require_client_certificate from all listeners.
func filterCertificates(listiners []types.Resource) error {
for _, listiner := range listiners {
c, ok := listiner.(*listener.Listener)
if !ok {
return errUnknownClass
}

for _, filterChain := range c.GetFilterChains() {
s := filterChain.GetTransportSocket()
if s != nil {
if s.GetName() == wellknown.TransportSocketTLS {
r := tls.DownstreamTlsContext{}

err := s.GetTypedConfig().UnmarshalTo(&r)
if err != nil {
return err
}

if r.GetRequireClientCertificate() != nil {
r.RequireClientCertificate.Value = false
}

pbst, err := anypb.New(&r)
if err != nil {
return err
}

s.ConfigType = &core.TransportSocket_TypedConfig{
TypedConfig: pbst,
}
}
}
}
}

return nil
}

// routes can be stored in listener filters.
func mutateWeightedRoutesInListeners(configType *ConfigType, listiners []types.Resource) error {
for _, listiner := range listiners {
l, ok := listiner.(*listener.Listener)
if !ok {
return errUnknownClass
}

for _, fc := range l.GetFilterChains() {
for _, f := range fc.GetFilters() {
if f.GetName() != wellknown.HTTPConnectionManager {
continue
}

m := hcm.HttpConnectionManager{}

if err := f.GetTypedConfig().UnmarshalTo(&m); err != nil {
return errors.Wrap(err, "error unmarshal to HttpConnectionManager")
}

if err := mutateWeightedRouteConfiguration(configType, m.GetRouteConfig()); err != nil {
return errors.Wrap(err, "error mutateWeightedRouteConfiguration")
}

pbst, err := anypb.New(&m)
if err != nil {
return errors.Wrap(err, "error anypb.New")
}

f.ConfigType = &listener.Filter_TypedConfig{
TypedConfig: pbst,
}
}
}
}

return nil
}

func mutateWeightedRoutes(configType *ConfigType, routes []types.Resource) error {
for _, item := range routes {
r, ok := item.(*route.RouteConfiguration)
if !ok {
return errUnknownClass
}

if err := mutateWeightedRouteConfiguration(configType, r); err != nil {
return errors.Wrap(err, "error mutateWeightedRouteConfiguration")
}
}

return nil
}

func mutateWeightedRouteConfiguration(configType *ConfigType, r *route.RouteConfiguration) error {
if r == nil {
return nil
}

for _, v := range r.GetVirtualHosts() {
for _, vr := range v.GetRoutes() {
if wc := vr.GetRoute().GetWeightedClusters(); wc != nil {
for _, c := range wc.GetClusters() {
weight, err := configType.GetClusterWeight(c.GetName())
if err != nil {
return err
}

if weight != nil && c.GetWeight().GetValue() != uint32(weight.Value) {
log.Warnf("mutateWeightedRoutes: %s, weight: %d -> %d", c.GetName(), c.GetWeight().GetValue(), weight)

c.Weight = &wrappers.UInt32Value{Value: uint32(weight.Value)}
}
}
}
}
}

return config, nil
return nil
}
18 changes: 17 additions & 1 deletion pkg/configmapsstore/configMapsStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,25 @@ import (
"github.com/maksim-paskal/envoy-control-plane/pkg/config"
"github.com/maksim-paskal/envoy-control-plane/pkg/configstore"
"github.com/maksim-paskal/envoy-control-plane/pkg/controlplane"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
)

var mutex sync.Mutex

func checkConfigMapLabels(cm *v1.ConfigMap) bool {
// if config has exact names, it only loads them
if len(*config.Get().ConfigMapNames) != 0 {
for _, name := range strings.Split(*config.Get().ConfigMapNames, ",") {
if cm.Name == name {
return true
}
}

return false
}

label := strings.Split(*config.Get().ConfigMapLabels, "=")

return (cm.Labels[label[0]] == label[1])
Expand Down Expand Up @@ -85,9 +97,13 @@ func NewConfigMap(ctx context.Context, cm *v1.ConfigMap) error {
cs.Stop()
}

if err := config.SaveResources(); err != nil {
return errors.Wrap(err, "error in config.SaveResources")
}

log.Infof("Create configStore %s", config.ID)

newConfigStore, err := configstore.New(ctx, &config)
newConfigStore, err := configstore.New(ctx, config)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 427c651

Please sign in to comment.