Skip to content

Commit

Permalink
init cached client and use it for ToolchainConfig loading (#460)
Browse files Browse the repository at this point in the history
  • Loading branch information
MatousJobanek authored Sep 12, 2024
1 parent 685016f commit c7890b8
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 63 deletions.
81 changes: 45 additions & 36 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"fmt"
"net/http"
"os"
"os/signal"
"syscall"
"time"

toolchainv1alpha1 "github.com/codeready-toolchain/api/api/v1alpha1"
Expand All @@ -20,15 +18,16 @@ import (
"github.com/codeready-toolchain/registration-service/pkg/server"
"github.com/codeready-toolchain/toolchain-common/pkg/cluster"
commonconfig "github.com/codeready-toolchain/toolchain-common/pkg/configuration"

errs "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap/zapcore"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"
runtimecluster "sigs.k8s.io/controller-runtime/pkg/cluster"
controllerlog "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
)
Expand Down Expand Up @@ -63,16 +62,16 @@ func main() {
os.Exit(1)
}

// create runtime client
cl, err := configClient(cfg)
ctx := controllerruntime.SetupSignalHandler()

// create cached runtime client
cl, err := newCachedClient(ctx, cfg)
if err != nil {
panic(err.Error())
}

crtConfig, err := configuration.ForceLoadRegistrationServiceConfig(cl)
if err != nil {
panic(fmt.Sprintf("failed to initialize configuration: %s", err.Error()))
}
configuration.SetClient(cl)
crtConfig := configuration.GetRegistrationServiceConfig()
crtConfig.Print()

if crtConfig.Verification().CaptchaEnabled() {
Expand Down Expand Up @@ -154,43 +153,25 @@ func main() {
}
}()

// update cache every 2 seconds
go func() {
for {
if _, err := configuration.ForceLoadRegistrationServiceConfig(cl); err != nil {
log.Error(nil, err, "failed to update the configuration cache")
}
time.Sleep(2 * time.Second)
}
}()

gracefulShutdown(configuration.GracefulTimeout, regsvcSrv.HTTPServer(), regsvcMetricsSrv, proxySrv, proxyMetricsSrv)
gracefulShutdown(ctx, configuration.GracefulTimeout, regsvcSrv.HTTPServer(), regsvcMetricsSrv, proxySrv, proxyMetricsSrv)
}

func gracefulShutdown(timeout time.Duration, hs ...*http.Server) {
// For a channel used for notification of just one signal value, a buffer of
// size 1 is sufficient.
stop := make(chan os.Signal, 1)

// We'll accept graceful shutdowns when quit via SIGINT (Ctrl+C) or SIGTERM
// (Ctrl+/). SIGKILL, SIGQUIT will not be caught.
signal.Notify(stop, os.Interrupt, syscall.SIGTERM, syscall.SIGINT)
sigReceived := <-stop
log.Infof(nil, "Signal received: %+v", sigReceived.String())

ctx, cancel := context.WithTimeout(context.Background(), timeout)
func gracefulShutdown(ctx context.Context, timeout time.Duration, hs ...*http.Server) {
<-ctx.Done()
// We are done
ctxTimeout, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
log.Infof(nil, "Shutdown with timeout: %s", timeout.String())
for _, s := range hs {
if err := s.Shutdown(ctx); err != nil {
if err := s.Shutdown(ctxTimeout); err != nil {
log.Errorf(nil, err, "Shutdown error")
} else {
log.Info(nil, "Server stopped.")
}
}
}

func configClient(cfg *rest.Config) (client.Client, error) {
func newCachedClient(ctx context.Context, cfg *rest.Config) (client.Client, error) {
scheme := runtime.NewScheme()
var AddToSchemes runtime.SchemeBuilder
addToSchemes := append(AddToSchemes,
Expand All @@ -200,9 +181,37 @@ func configClient(cfg *rest.Config) (client.Client, error) {
if err != nil {
return nil, err
}
return client.New(cfg, client.Options{
Scheme: scheme,

hostCluster, err := runtimecluster.New(cfg, func(options *runtimecluster.Options) {
options.Scheme = scheme
// cache only in the host-operator namespace
options.Namespace = configuration.Namespace()
})
if err != nil {
return nil, err
}
go func() {
if err := hostCluster.Start(ctx); err != nil {
panic(fmt.Errorf("failed to create cached client: %w", err))
}
}()

if !hostCluster.GetCache().WaitForCacheSync(ctx) {
return nil, fmt.Errorf("unable to sync the cache of the client")
}

// populate the cache backed by shared informers that are initialized lazily on the first call
// for the given GVK with all resources we are interested in from the host-operator namespace
objectsToList := []client.ObjectList{&toolchainv1alpha1.ToolchainConfigList{}, &corev1.SecretList{}}
for i := range objectsToList {
if err := hostCluster.GetClient().List(ctx, objectsToList[i], client.InNamespace(configuration.Namespace())); err != nil {
return nil, err
}
}

log.Info(nil, "Informer caches synced")

return hostCluster.GetClient(), nil
}

func createCaptchaFileFromSecret(cfg configuration.RegistrationServiceConfig) error {
Expand Down
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ require (
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/emicklei/go-restful/v3 v3.8.0 // indirect
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
github.com/fsnotify/fsnotify v1.5.4 // indirect
github.com/gabriel-vasile/mimetype v1.4.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
Expand Down Expand Up @@ -86,10 +87,13 @@ require (
go.opentelemetry.io/otel/metric v1.24.0 // indirect
go.opentelemetry.io/otel/trace v1.24.0 // indirect
golang.org/x/arch v0.7.0 // indirect
gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect
google.golang.org/api v0.177.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240429193739-8cf5692501f6 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240429193739-8cf5692501f6 // indirect
google.golang.org/grpc v1.63.2 // indirect
k8s.io/apiextensions-apiserver v0.25.0 // indirect
k8s.io/component-base v0.25.0 // indirect
k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed // indirect
)

Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,13 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMikGIFbs31qQ=
github.com/evanphx/json-patch v5.6.0+incompatible h1:jBYDEEiFBPxA0v50tFdvOzQQTCvpL6mnFh5mB2/l16U=
github.com/evanphx/json-patch v5.6.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJCLunww=
github.com/evanphx/json-patch/v5 v5.6.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4=
github.com/fsnotify/fsnotify v1.5.4 h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI=
github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU=
github.com/gabriel-vasile/mimetype v1.4.3 h1:in2uUcidCuFcDKtdcBxlR0rJ1+fsokWf+uqxgUFjbI0=
github.com/gabriel-vasile/mimetype v1.4.3/go.mod h1:d8uq/6HKRL6CGdk+aubisF/M5GcPfT7nKyLpA0lbSSk=
github.com/gin-contrib/cors v1.6.0 h1:0Z7D/bVhE6ja07lI8CTjTonp6SB07o8bNuFyRbsBUQg=
Expand Down Expand Up @@ -442,6 +444,7 @@ golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211103235746-7861aae1554b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down Expand Up @@ -492,6 +495,7 @@ golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gomodules.xyz/jsonpatch/v2 v2.2.0 h1:4pT439QV83L+G9FkcCriY6EkpcK6r6bK+A5FBUMI7qY=
gomodules.xyz/jsonpatch/v2 v2.2.0/go.mod h1:WXp+iVDkoLQqPudfQ9GBlwB2eZ5DKOnjQZCYdOS8GPY=
google.golang.org/api v0.177.0 h1:8a0p/BbPa65GlqGWtUKxot4p0TV8OGOfyTjtmkXNXmk=
google.golang.org/api v0.177.0/go.mod h1:srbhue4MLjkjbkux5p3dw/ocYOSZTaIEvf7bCOnFQDw=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
Expand Down Expand Up @@ -558,13 +562,15 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
k8s.io/api v0.25.0 h1:H+Q4ma2U/ww0iGB78ijZx6DRByPz6/733jIuFpX70e0=
k8s.io/api v0.25.0/go.mod h1:ttceV1GyV1i1rnmvzT3BST08N6nGt+dudGrquzVQWPk=
k8s.io/apiextensions-apiserver v0.25.0 h1:CJ9zlyXAbq0FIW8CD7HHyozCMBpDSiH7EdrSTCZcZFY=
k8s.io/apiextensions-apiserver v0.25.0/go.mod h1:3pAjZiN4zw7R8aZC5gR0y3/vCkGlAjCazcg1me8iB/E=
k8s.io/apimachinery v0.25.0 h1:MlP0r6+3XbkUG2itd6vp3oxbtdQLQI94fD5gCS+gnoU=
k8s.io/apimachinery v0.25.0/go.mod h1:qMx9eAk0sZQGsXGu86fab8tZdffHbwUfsvzqKn4mfB0=
k8s.io/apiserver v0.25.0 h1:8kl2ifbNffD440MyvHtPaIz1mw4mGKVgWqM0nL+oyu4=
k8s.io/apiserver v0.25.0/go.mod h1:BKwsE+PTC+aZK+6OJQDPr0v6uS91/HWxX7evElAH6xo=
k8s.io/client-go v0.25.0 h1:CVWIaCETLMBNiTUta3d5nzRbXvY5Hy9Dpl+VvREpu5E=
k8s.io/client-go v0.25.0/go.mod h1:lxykvypVfKilxhTklov0wz1FoaUZ8X4EwbhS6rpRfN8=
k8s.io/component-base v0.25.0 h1:haVKlLkPCFZhkcqB6WCvpVxftrg6+FK5x1ZuaIDaQ5Y=
k8s.io/component-base v0.25.0/go.mod h1:F2Sumv9CnbBlqrpdf7rKZTmmd2meJq0HizeyY/yAFxk=
k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8=
k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I=
k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=
Expand Down
20 changes: 5 additions & 15 deletions pkg/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,13 @@ func Namespace() string {
return os.Getenv(commonconfig.WatchNamespaceEnvVar)
}

// GetRegistrationServiceConfig returns a RegistrationServiceConfig using the cache, or if the cache was not initialized
// then retrieves the latest config using the provided client and updates the cache
// GetRegistrationServiceConfig returns a RegistrationServiceConfig reflecting the current state of the ToolchainConfig CR and the associated secrets
func GetRegistrationServiceConfig() RegistrationServiceConfig {
if configurationClient == nil {
logger.Error(fmt.Errorf("configuration client is not initialized"), "using default configuration")
return RegistrationServiceConfig{cfg: &toolchainv1alpha1.ToolchainConfigSpec{}}
}
config, secrets, err := commonconfig.GetConfig(configurationClient, &toolchainv1alpha1.ToolchainConfig{})
config, secrets, err := commonconfig.LoadLatest(configurationClient, &toolchainv1alpha1.ToolchainConfig{})
if err != nil {
// return default config
logger.Error(err, "failed to retrieve RegistrationServiceConfig, using default configuration")
Expand All @@ -79,18 +78,9 @@ func GetRegistrationServiceConfig() RegistrationServiceConfig {
return NewRegistrationServiceConfig(config, secrets)
}

// ForceLoadRegistrationServiceConfig updates the cache using the provided client and returns the latest RegistrationServiceConfig
func ForceLoadRegistrationServiceConfig(cl client.Client) (RegistrationServiceConfig, error) {
if configurationClient == nil {
configurationClient = cl
}
config, secrets, err := commonconfig.LoadLatest(cl, &toolchainv1alpha1.ToolchainConfig{})
if err != nil {
// return default config
logger.Error(err, "failed to force load RegistrationServiceConfig")
return RegistrationServiceConfig{cfg: &toolchainv1alpha1.ToolchainConfigSpec{}}, err
}
return NewRegistrationServiceConfig(config, secrets), nil
// SetClient sets the client to be used to fetch the configuration
func SetClient(cl client.Client) {
configurationClient = cl
}

type RegistrationServiceConfig struct {
Expand Down
6 changes: 5 additions & 1 deletion pkg/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,11 @@ func (p *Proxy) StartProxy(port string) *http.Server {
// listen concurrently to allow for graceful shutdown
go func() {
if err := srv.ListenAndServe(); err != nil {
log.Error(nil, err, err.Error())
if errors.Is(err, http.ErrServerClosed) {
log.Info(nil, fmt.Sprintf("%s - this is expected when server shutdown has been initiated", err.Error()))
} else {
log.Error(nil, err, err.Error())
}
}
}()

Expand Down
19 changes: 8 additions & 11 deletions test/testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,9 @@ func (s *UnitTestSuite) SetConfig(opts ...testconfig.ToolchainConfigOption) conf
err = s.ConfigClient.Create(context.TODO(), newcfg)
require.NoError(s.T(), err)

// update config cache
cfg, err := configuration.ForceLoadRegistrationServiceConfig(s.ConfigClient)
require.NoError(s.T(), err)
return cfg
// set client & get the config
configuration.SetClient(s.ConfigClient)
return configuration.GetRegistrationServiceConfig()
}

func (s *UnitTestSuite) SetSecret(secret *corev1.Secret) {
Expand All @@ -112,10 +111,8 @@ func (s *UnitTestSuite) SetSecret(secret *corev1.Secret) {
require.True(s.T(), errors.IsNotFound(err), "unexpected error")
err = s.ConfigClient.Create(context.TODO(), secret)
require.NoError(s.T(), err)
// update config cache
cfg, err := configuration.ForceLoadRegistrationServiceConfig(s.ConfigClient)
require.NoError(s.T(), err)
require.NotEmpty(s.T(), cfg)
// set client
configuration.SetClient(s.ConfigClient)
}

func (s *UnitTestSuite) DefaultConfig() configuration.RegistrationServiceConfig {
Expand All @@ -125,9 +122,9 @@ func (s *UnitTestSuite) DefaultConfig() configuration.RegistrationServiceConfig
obj := testconfig.NewToolchainConfigObj(s.T(), testconfig.RegistrationService().Environment("unit-tests"))
err := s.ConfigClient.Create(context.TODO(), obj)
require.NoError(s.T(), err)
cfg, err := configuration.ForceLoadRegistrationServiceConfig(s.ConfigClient)
require.NoError(s.T(), err)
return cfg
// set client & get the config
configuration.SetClient(s.ConfigClient)
return configuration.GetRegistrationServiceConfig()
}

func (s *UnitTestSuite) WithFactoryOption(opt factory.Option) {
Expand Down

0 comments on commit c7890b8

Please sign in to comment.