Skip to content

Commit

Permalink
allow split data-plane and control plane
Browse files Browse the repository at this point in the history
  • Loading branch information
ArthurHlt committed Sep 9, 2023
1 parent e60b4c3 commit 3601701
Show file tree
Hide file tree
Showing 11 changed files with 245 additions and 58 deletions.
85 changes: 62 additions & 23 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,19 @@ type App struct {
geoLoc *geolocs.GeoLoc
hcHandler *healthchecks.HcHandler
grpcServer *grpc.Server
onlyServeDns bool
noServeDns bool
}

func NewApp(cnf *config.Config) (*App, error) {
func NewApp(cnf *config.Config, onlyServeDns, noServeDns bool) (*App, error) {
ctx, cancelFunc := context.WithCancel(context.Background())
app := &App{
entry: log.WithField("component", "app"),
cnf: cnf,
ctx: ctx,
cancelFunc: cancelFunc,
entry: log.WithField("component", "app"),
cnf: cnf,
ctx: ctx,
cancelFunc: cancelFunc,
onlyServeDns: onlyServeDns,
noServeDns: noServeDns,
}
err := app.loadConsulClient()
if err != nil {
Expand Down Expand Up @@ -113,28 +117,52 @@ func (a *App) loadConsulClient() error {
}

func (a *App) loadConsulDiscoverer() error {
consulDisco := disco.NewConsulDiscoverer(a.consulClient, a.cnf.DcName, a.cnf.HealthCheckConfig.HealthcheckAddress)
if a.onlyServeDns {
a.entry.Info("Only serve DNS: no consul discoverer")
return nil
}
consulDisco := disco.NewConsulDiscoverer(
a.consulClient,
a.cnf.HealthCheckConfig.HealthcheckAuth,
a.cnf.DcName,
a.cnf.HealthCheckConfig.HealthcheckAddress,
)
a.consulDisco = consulDisco
return nil
}

func (a *App) loadRetriever() error {
retriever := rets.NewRetriever(a.cnf.DcName, 10, time.Duration(*a.cnf.ConsulConfig.ScrapInterval), a.consulClient)
if a.noServeDns {
retriever.DisableCatalogPolling()
}
a.retriever = retriever
return nil
}

func (a *App) loadGeoLoc() error {
if a.noServeDns {
a.entry.Info("No dns server: no geoloc loaded")
return nil
}
a.geoLoc = geolocs.NewGeoLoc(a.cnf.GeoLoc.DcPositions, a.cnf.GeoLoc.GeoDb.Reader)
return nil
}

func (a *App) loadLbFactory() error {
if a.noServeDns {
a.entry.Info("No dns server: no lb factory loaded")
return nil
}
a.lbFactory = lb.NewLBFactory(a.geoLoc, a.cnf.DNSServer.TrustEdns)
return nil
}

func (a *App) loadGSLBHandler() error {
if a.noServeDns {
a.entry.Info("No dns server: no gslb handler loaded")
return nil
}
_, local4, _ := net.ParseCIDR("127.0.0.1/32") // nolint:errcheck
_, local6, _ := net.ParseCIDR("::1/128") // nolint:errcheck
allowed := []*config.CIDR{
Expand All @@ -150,14 +178,22 @@ func (a *App) loadGSLBHandler() error {
}

func (a *App) loadHcHandler() error {
a.hcHandler = healthchecks.NewHcHandler()
if a.onlyServeDns {
a.entry.Info("Only serve DNS: no healthcheck handler")
return nil
}
a.hcHandler = healthchecks.NewHcHandler(a.cnf.HealthCheckConfig)
return nil
}

func (a *App) register() error {
regs.DefaultRegCatalog.Register(a.gslbHandler)
regs.DefaultRegKV.Register(a.consulDisco)
regs.DefaultRegMember.Register(a.hcHandler)
if !a.noServeDns {
regs.DefaultRegCatalog.Register(a.gslbHandler)
}
if !a.onlyServeDns {
regs.DefaultRegKV.Register(a.consulDisco)
regs.DefaultRegMember.Register(a.hcHandler)
}
return nil
}

Expand All @@ -175,19 +211,22 @@ func (a *App) Run() error {
log.Panicf("retriever.Run: %v", err)
}
}()
wg.Add(1)
go func() {
defer wg.Done()
dnsServer := servers.NewDNSServer(a.cnf.DNSServer, a.gslbHandler)
dnsServer.Run(a.ctx)
}()
wg.Add(1)
go func() {
defer wg.Done()
grpcServer := servers.NewHTTPServer(a.cnf.HTTPServer, a.hcHandler, a.grpcServer)
grpcServer.Run(a.ctx)
}()

if !a.noServeDns {
wg.Add(1)
go func() {
defer wg.Done()
dnsServer := servers.NewDNSServer(a.cnf.DNSServer, a.gslbHandler)
dnsServer.Run(a.ctx)
}()
}
if !a.onlyServeDns {
wg.Add(1)
go func() {
defer wg.Done()
grpcServer := servers.NewHTTPServer(a.cnf.HTTPServer, a.hcHandler, a.grpcServer)
grpcServer.Run(a.ctx)
}()
}
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
s := <-sig
Expand Down
3 changes: 3 additions & 0 deletions app/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ func (a *App) makeGrpcOptions() ([]grpc.ServerOption, error) {
}

func (a *App) loadGrpcServer() error {
if a.onlyServeDns {
return nil
}
grpcOptions, err := a.makeGrpcOptions()
if err != nil {
return fmt.Errorf("agent: failed to make grpc options: %v", err)
Expand Down
39 changes: 36 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package config

import (
"encoding/base64"
"fmt"
log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v2"
Expand Down Expand Up @@ -61,7 +62,13 @@ func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {
if err != nil {
return fmt.Errorf("split host port: %w", err)
}
c.HealthCheckConfig.HealthcheckAddress = fmt.Sprintf("127.0.0.1:%s", port)
scheme := "https"
if c.HTTPServer.ListenLocalPort != 0 {
scheme = "http"
port = fmt.Sprintf("%d", c.HTTPServer.ListenLocalPort)
}
c.HealthCheckConfig.HealthcheckAddress = fmt.Sprintf("%s://127.0.0.1:%s", scheme, port)

}
if c.ConsulConfig == nil {
return fmt.Errorf("consul_config is required")
Expand All @@ -76,8 +83,9 @@ func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {
}

type HTTPServerConfig struct {
Listen string `yaml:"listen"`
TLSPem TLSPem `yaml:"tls_pem"`
Listen string `yaml:"listen"`
ListenLocalPort uint `yaml:"listen_local_port"`
TLSPem TLSPem `yaml:"tls_pem"`
}

func (c *HTTPServerConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
Expand All @@ -103,6 +111,31 @@ type TLSPem struct {
PrivateKeyPath string `yaml:"private_key_path"`
}

type BasicAuth struct {
Username string `yaml:"username"`
Password string `yaml:"password"`
}

func (c *BasicAuth) UnmarshalYAML(unmarshal func(interface{}) error) error {
type plain BasicAuth
err := unmarshal((*plain)(c))
if err != nil {
return err
}
if c.Username == "" {
return fmt.Errorf("username is required")
}
if c.Password == "" {
return fmt.Errorf("password is required")
}
return nil
}

func (c *BasicAuth) GetBasicAuth() string {
auth := c.Username + ":" + c.Password
return base64.StdEncoding.EncodeToString([]byte(auth))
}

type ConsulConfig struct {
Addr string `yaml:"addr"`
Scheme string `yaml:"scheme"`
Expand Down
4 changes: 3 additions & 1 deletion config/hc.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package config

type HealthCheckConfig struct {
HealthcheckAddress string `yaml:"healthcheck_address"`
HealthcheckAddress string `yaml:"healthcheck_address"`
HealthcheckAuth *BasicAuth `yaml:"healthcheck_auth"`
AllowOnlyLocalhost bool `yaml:"allow_only_localhost"`
}

func (c *HealthCheckConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
Expand Down
1 change: 1 addition & 0 deletions contexes/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type gslocCtxKey int
const (
DNSMsg gslocCtxKey = iota
RemoteAddr
FromLocalhost
)

func SetDNSMsg(ctx context.Context, msg *dns.Msg) context.Context {
Expand Down
10 changes: 8 additions & 2 deletions disco/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@ type ConsulDiscoverer struct {
consulClient *consul.Client
hcAddr string
dcName string
headers map[string][]string
}

func NewConsulDiscoverer(consulClient *consul.Client, dcName, hcAddr string) *ConsulDiscoverer {
func NewConsulDiscoverer(consulClient *consul.Client, basicAuth *config.BasicAuth, dcName, hcAddr string) *ConsulDiscoverer {
headers := make(map[string][]string)
if basicAuth != nil {
headers["Authorization"] = []string{fmt.Sprintf("Basic %s", basicAuth.GetBasicAuth())}
}
return &ConsulDiscoverer{
consulClient: consulClient,
hcAddr: hcAddr,
Expand Down Expand Up @@ -69,8 +74,9 @@ func (cd *ConsulDiscoverer) registerMembers(entry *entries.SignedEntry, members
Interval: entry.GetHealthcheck().GetInterval().AsDuration().String(),
Timeout: entry.GetHealthcheck().GetTimeout().AsDuration().String(),
TLSSkipVerify: true,
HTTP: fmt.Sprintf("https://%s/hc/%s/member/%s", cd.hcAddr, entry.GetEntry().GetFqdn(), member.GetIp()),
HTTP: fmt.Sprintf("%s/hc/%s/member/%s", cd.hcAddr, entry.GetEntry().GetFqdn(), member.GetIp()),
Method: "POST",
Header: cd.headers,
Body: string(hcBytes),
},
})
Expand Down
45 changes: 44 additions & 1 deletion healthchecks/handler.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,29 @@
package healthchecks

import (
"crypto/sha256"
"crypto/subtle"
"fmt"
"github.com/gorilla/mux"
hcconf "github.com/orange-cloudfoundry/gsloc-go-sdk/gsloc/api/config/healthchecks/v1"
"github.com/orange-cloudfoundry/gsloc/config"
log "github.com/sirupsen/logrus"
"google.golang.org/protobuf/encoding/protojson"
"io"
"net/http"
"strings"
"sync"
)

type HcHandler struct {
disabledEntIp *sync.Map
cnf *config.HealthCheckConfig
}

func NewHcHandler() *HcHandler {
func NewHcHandler(cnf *config.HealthCheckConfig) *HcHandler {
return &HcHandler{
disabledEntIp: &sync.Map{},
cnf: cnf,
}
}

Expand All @@ -31,7 +37,44 @@ func (h *HcHandler) EnableEntryIp(fqdn, ip string) {
h.disabledEntIp.Delete(fmt.Sprintf("%s-%s", fqdn, ip))
}

func (h *HcHandler) checkAuth(req *http.Request) bool {
if h.cnf.HealthcheckAuth == nil {
return true
}
username, password, ok := req.BasicAuth()
if !ok {
return false
}
usernameHash := sha256.Sum256([]byte(username))
passwordHash := sha256.Sum256([]byte(password))
expectedUsernameHash := sha256.Sum256([]byte(h.cnf.HealthcheckAuth.Username))
expectedPasswordHash := sha256.Sum256([]byte(h.cnf.HealthcheckAuth.Password))

usernameMatch := (subtle.ConstantTimeCompare(usernameHash[:], expectedUsernameHash[:]) == 1)
passwordMatch := (subtle.ConstantTimeCompare(passwordHash[:], expectedPasswordHash[:]) == 1)

return usernameMatch && passwordMatch
}

func (h *HcHandler) isFromLocalhost(req *http.Request) bool {
if strings.HasPrefix(req.RemoteAddr, "127.0.0.1:") {
return true
}
if strings.HasPrefix(req.RemoteAddr, "[::1]:") {
return true
}
return false
}

func (h *HcHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if h.cnf.AllowOnlyLocalhost && !h.isFromLocalhost(req) {
http.Error(w, "only localhost allowed", http.StatusForbidden)
return
}
if !h.isFromLocalhost(req) && !h.checkAuth(req) {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
}
b, err := io.ReadAll(req.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand Down
6 changes: 4 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@ var (
)

type ServeCmd struct {
ConfigPath string `required:"" short:"c" help:"path to the configuration file" type:"path" default:"./config.yml"`
ConfigPath string `required:"" short:"c" help:"path to the configuration file" type:"path" default:"./config.yml"`
OnlyServeDns bool `help:"only serve dns and remove api, healthcheck and consul registering. This is for separate dns (data-plane) and backend/api (control-plane)" default:"false"`
NoServeDns bool `help:"do not serve dns server and only have api, healthcheck and and consul registering. This is for separate dns (data-plane) and backend/api (control-plane)" default:"false"`
}

func (r *ServeCmd) Run() error {
cnf, err := config.LoadConfig(r.ConfigPath)
if err != nil {
return fmt.Errorf("read config: %s", err)
}
appRun, err := app.NewApp(cnf)
appRun, err := app.NewApp(cnf, r.OnlyServeDns, r.NoServeDns)
if err != nil {
return fmt.Errorf("create app: %s", err)
}
Expand Down
Loading

0 comments on commit 3601701

Please sign in to comment.