Skip to content

Commit

Permalink
Merge pull request #1602 from sttts/sttts-ddsif-no-polling
Browse files Browse the repository at this point in the history
0.6: ddsif: replace discovery polling
  • Loading branch information
sttts authored Jul 22, 2022
2 parents 02739c5 + c9a83ca commit 80450a6
Show file tree
Hide file tree
Showing 8 changed files with 384 additions and 252 deletions.
392 changes: 215 additions & 177 deletions pkg/informer/informer.go

Large diffs are not rendered by default.

115 changes: 115 additions & 0 deletions pkg/informer/informer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
Copyright 2022 The KCP Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package informer

import (
"reflect"
"strings"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"

"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/api/genericcontrolplanescheme"
_ "k8s.io/kubernetes/pkg/genericcontrolplane/apis/install"
)

// TestBuiltInInformableTypes tests that there is no drift between actual built-in types and the list that is hard-coded
// in builtInInformableTypes.
func TestBuiltInInformableTypes(t *testing.T) {
builtInGVRs := map[schema.GroupVersionResource]struct{}{}

// In the scheme, but not actual resources
kindsToIgnore := sets.NewString(
"List",
"CreateOptions",
"DeleteOptions",
"GetOptions",
"ListOptions",
"PatchOptions",
"UpdateOptions",
"WatchEvent",
)

// Internal types and/or things that are not list/watchable
gvksToIgnore := map[schema.GroupVersionKind]struct{}{
{Version: "v1", Kind: "APIGroup"}: {},
{Version: "v1", Kind: "APIVersions"}: {},
{Version: "v1", Kind: "RangeAllocation"}: {},
{Version: "v1", Kind: "SerializedReference"}: {},
{Version: "v1", Kind: "Status"}: {},
{Group: "authentication.k8s.io", Version: "v1", Kind: "TokenRequest"}: {},
{Group: "authentication.k8s.io", Version: "v1", Kind: "TokenReview"}: {},
{Group: "authorization.k8s.io", Version: "v1", Kind: "LocalSubjectAccessReview"}: {},
{Group: "authorization.k8s.io", Version: "v1", Kind: "SelfSubjectAccessReview"}: {},
{Group: "authorization.k8s.io", Version: "v1", Kind: "SelfSubjectRulesReview"}: {},
{Group: "authorization.k8s.io", Version: "v1", Kind: "SubjectAccessReview"}: {},
}

gvsToIgnore := map[schema.GroupVersion]struct{}{
// Covered by Group=""
{Group: "core", Version: "v1"}: {},

// These are alpha/beta versions that are not preferred (they all have v1)
{Group: "admissionregistration.k8s.io", Version: "v1beta1"}: {},
{Group: "authentication.k8s.io", Version: "v1beta1"}: {},
{Group: "authorization.k8s.io", Version: "v1beta1"}: {},
{Group: "certificates.k8s.io", Version: "v1beta1"}: {},
{Group: "coordination.k8s.io", Version: "v1beta1"}: {},
{Group: "events.k8s.io", Version: "v1beta1"}: {},
{Group: "flowcontrol.apiserver.k8s.io", Version: "v1alpha1"}: {},
{Group: "flowcontrol.apiserver.k8s.io", Version: "v1beta1"}: {},
{Group: "rbac.authorization.k8s.io", Version: "v1alpha1"}: {},
{Group: "rbac.authorization.k8s.io", Version: "v1beta1"}: {},
}

allKnownTypes := genericcontrolplanescheme.Scheme.AllKnownTypes()

// CRDs are not included in the genericcontrolplane scheme (because they're part of the apiextensions apiserver),
// so we have to manually add them
allKnownTypes[schema.GroupVersionKind{Group: "apiextensions.k8s.io", Version: "v1", Kind: "CustomResourceDefinition"}] = reflect.TypeOf(struct{}{})

for gvk := range allKnownTypes {
if kindsToIgnore.Has(gvk.Kind) {
continue
}

if _, found := gvsToIgnore[gvk.GroupVersion()]; found {
continue
}

if _, found := gvksToIgnore[gvk]; found {
continue
}

if strings.HasSuffix(gvk.Kind, "List") {
continue
}
if gvk.Version == "__internal" {
continue
}

resourceName := strings.ToLower(gvk.Kind) + "s"
gvr := gvk.GroupVersion().WithResource(resourceName)

builtInGVRs[gvr] = struct{}{}
}

require.Empty(t, cmp.Diff(builtInGVRs, builtInInformableTypes()))
}
1 change: 0 additions & 1 deletion pkg/server/options/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ var (
"tracing-config-file", // File with apiserver tracing configuration.

// KCP flags
"discovery-poll-interval", // Polling interval for dynamic discovery informers.
"profiler-address", // [Address]:port to bind the profiler to
"root-directory", // Root directory.
"shard-base-url", // Base URL to this kcp shard. Defaults to external address.
Expand Down
5 changes: 0 additions & 5 deletions pkg/server/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ func (o *Options) rawFlags() cliflag.NamedFlagSets {
fs.StringVar(&o.Extra.ShardExternalURL, "shard-external-url", o.Extra.ShardExternalURL, "URL used by outside clients to talk to this kcp shard. Defaults to external address.")
fs.StringVar(&o.Extra.ShardName, "shard-name", o.Extra.ShardName, "A name of this kcp shard. Defaults to the \"root\" name.")
fs.StringVar(&o.Extra.RootDirectory, "root-directory", o.Extra.RootDirectory, "Root directory.")
fs.DurationVar(&o.Extra.DiscoveryPollInterval, "discovery-poll-interval", o.Extra.DiscoveryPollInterval, "Polling interval for dynamic discovery informers.")

fs.BoolVar(&o.Extra.ExperimentalBindFreePort, "experimental-bind-free-port", o.Extra.ExperimentalBindFreePort, "Bind to a free port. --secure-port must be 0. Use the admin.kubeconfig to extract the chosen port.")
fs.MarkHidden("experimental-bind-free-port") // nolint:errcheck
Expand All @@ -182,10 +181,6 @@ func (o *CompletedOptions) Validate() []error {
errs = append(errs, o.AdminAuthentication.Validate()...)
errs = append(errs, o.Virtual.Validate()...)

if o.Extra.DiscoveryPollInterval == 0 {
errs = append(errs, fmt.Errorf("--discovery-poll-interval not set"))
}

return errs
}

Expand Down
21 changes: 8 additions & 13 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ import (
kcpfeatures "github.com/kcp-dev/kcp/pkg/features"
"github.com/kcp-dev/kcp/pkg/indexers"
"github.com/kcp-dev/kcp/pkg/informer"
metadataclient "github.com/kcp-dev/kcp/pkg/metadata"
boostrap "github.com/kcp-dev/kcp/pkg/server/bootstrap"
kcpserveroptions "github.com/kcp-dev/kcp/pkg/server/options"
"github.com/kcp-dev/kcp/pkg/server/requestinfo"
Expand Down Expand Up @@ -346,17 +345,13 @@ func (s *Server) Run(ctx context.Context) error {
),
)

metadataClusterClient, err := metadataclient.NewDynamicMetadataClusterClientForConfig(server.LoopbackClientConfig)
if err != nil {
return err
}
s.dynamicDiscoverySharedInformerFactory = informer.NewDynamicDiscoverySharedInformerFactory(
s.kcpSharedInformerFactory.Tenancy().V1alpha1().ClusterWorkspaces().Lister(),
kubeClusterClient.DiscoveryClient,
metadataClusterClient.Cluster(logicalcluster.Wildcard),
func(obj interface{}) bool { return true }, s.options.Extra.DiscoveryPollInterval,
s.dynamicDiscoverySharedInformerFactory, err = informer.NewDynamicDiscoverySharedInformerFactory(
server.LoopbackClientConfig,
func(obj interface{}) bool { return true },
s.apiextensionsSharedInformerFactory.Apiextensions().V1().CustomResourceDefinitions(),
indexers.NamespaceScoped(),
)
if err := s.dynamicDiscoverySharedInformerFactory.AddIndexers(indexers.NamespaceScoped()); err != nil {
if err != nil {
return err
}

Expand Down Expand Up @@ -446,8 +441,8 @@ func (s *Server) Run(ctx context.Context) error {

klog.Infof("Finished starting (remaining) kcp informers")

klog.Infof("Starting dynamic metadata informer")
s.dynamicDiscoverySharedInformerFactory.StartPolling(goContext(ctx))
klog.Infof("Starting dynamic metadata informer worker")
go s.dynamicDiscoverySharedInformerFactory.StartWorker(goContext(ctx))

klog.Infof("Synced all informers. Ready to start controllers")
close(s.syncedCh)
Expand Down
49 changes: 27 additions & 22 deletions test/e2e/conformance/cross_logical_cluster_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
apiextensionsv1client "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1"
apiextensionsexternalversions "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -43,10 +44,9 @@ import (
tenancyapi "github.com/kcp-dev/kcp/pkg/apis/tenancy/v1alpha1"
tenancyv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/tenancy/v1alpha1"
kcpclientset "github.com/kcp-dev/kcp/pkg/client/clientset/versioned"
kcpinformers "github.com/kcp-dev/kcp/pkg/client/informers/externalversions"
"github.com/kcp-dev/kcp/pkg/indexers"
"github.com/kcp-dev/kcp/pkg/informer"
metadataclient "github.com/kcp-dev/kcp/pkg/metadata"
bootstrap "github.com/kcp-dev/kcp/pkg/server/bootstrap"
"github.com/kcp-dev/kcp/test/e2e/fixtures/apifixtures"
"github.com/kcp-dev/kcp/test/e2e/framework"
)
Expand Down Expand Up @@ -160,8 +160,7 @@ func TestCRDCrossLogicalClusterListPartialObjectMetadata(t *testing.T) {

// Make sure the informers aren't throttled because dynamic informers do lots of discovery which slows down tests
cfg := server.DefaultConfig(t)
cfg.QPS = 500
cfg.Burst = 1000
cfg.QPS = -1

crdClusterClient, err := apiextensionsclient.NewClusterForConfig(cfg)
require.NoError(t, err, "failed to construct apiextensions client for server")
Expand Down Expand Up @@ -218,26 +217,32 @@ func TestCRDCrossLogicalClusterListPartialObjectMetadata(t *testing.T) {
_, err = rootShardMetadataClusterClient.Cluster(logicalcluster.Wildcard).Resource(sheriffsGVR).List(ctx, metav1.ListOptions{})
require.NoError(t, err, "expected wildcard list to work with metadata client even though schemas are different")

t.Log("Start dynamic metadata informers")
identityRootCfg, resolve := bootstrap.NewConfigWithWildcardIdentities(rootCfg, bootstrap.KcpRootGroupExportNames, bootstrap.KcpRootGroupResourceExportNames, kcpClusterClient.Cluster(tenancyv1alpha1.RootCluster))
require.Eventually(t, func() bool {
return resolve(ctx) == nil
}, wait.ForeverTestTimeout, time.Millisecond*100)
identityRootKcpClusterClient, err := kcpclientset.NewClusterForConfig(identityRootCfg)
require.NoError(t, err, "failed to construct kcp cluster client for server with identitis")
rootShardKcpInformer := kcpinformers.NewSharedInformerFactoryWithOptions(identityRootKcpClusterClient.Cluster(logicalcluster.Wildcard), 0)
rootShardKcpInformer.Tenancy().V1alpha1().ClusterWorkspaces().Lister()
rootShardKcpInformer.Start(ctx.Done())
rootShardKcpInformer.WaitForCacheSync(ctx.Done())

informerFactory := informer.NewDynamicDiscoverySharedInformerFactory(
rootShardKcpInformer.Tenancy().V1alpha1().ClusterWorkspaces().Lister(),
identityRootKcpClusterClient.DiscoveryClient,
rootShardMetadataClusterClient.Cluster(logicalcluster.Wildcard),
rootShardCRDClusterClient, err := apiextensionsclient.NewClusterForConfig(rootCfg)
require.NoError(t, err, "error creating root shard crd client")

apiExtensionsInformerFactory := apiextensionsexternalversions.NewSharedInformerFactoryWithOptions(
rootShardCRDClusterClient.Cluster(logicalcluster.Wildcard),
0,
)

informerFactory, err := informer.NewDynamicDiscoverySharedInformerFactory(
rootCfg,
func(obj interface{}) bool { return true },
time.Second*2,
apiExtensionsInformerFactory.Apiextensions().V1().CustomResourceDefinitions(),
indexers.NamespaceScoped(),
)
informerFactory.StartPolling(ctx)
require.NoError(t, err, "error creating DynamicDiscoverySharedInformerFactory")

// Have to start this after informer.NewDynamicDiscoverySharedInformerFactory() is invoked, as that adds an
// index to the crd informer that is required for the dynamic factory to work correctly.
t.Log("Start apiextensions informers")
apiExtensionsInformerFactory.Start(ctx.Done())
cacheSyncCtx, cacheSyncCancel := context.WithTimeout(ctx, wait.ForeverTestTimeout)
t.Cleanup(cacheSyncCancel)
apiExtensionsInformerFactory.WaitForCacheSync(cacheSyncCtx.Done())

t.Log("Start dynamic metadata informers")
go informerFactory.StartWorker(ctx)

t.Logf("Wait for the sheriff to show up in the informer")
// key := "default/" + clusters.ToClusterAwareKey(wsNormalCRD1a, "john-hicks-adams")
Expand Down
1 change: 0 additions & 1 deletion test/e2e/framework/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ func TestServerArgs() []string {
// start a test server with the given token auth file.
func TestServerArgsWithTokenAuthFile(tokenAuthFile string) []string {
return []string{
"--discovery-poll-interval=5s",
"-v=4",
"--token-auth-file", tokenAuthFile,
}
Expand Down
52 changes: 19 additions & 33 deletions test/e2e/watchcache/watchcache_enabled_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

v1 "k8s.io/api/core/v1"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
apiextensionsexternalversions "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -40,14 +41,9 @@ import (
kubernetesclientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

tenancyv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/tenancy/v1alpha1"
kcpclientset "github.com/kcp-dev/kcp/pkg/client/clientset/versioned"
kcpexternalversions "github.com/kcp-dev/kcp/pkg/client/informers/externalversions"
"github.com/kcp-dev/kcp/pkg/informer"
metadataclient "github.com/kcp-dev/kcp/pkg/metadata"
boostrap "github.com/kcp-dev/kcp/pkg/server/bootstrap"
"github.com/kcp-dev/kcp/test/e2e/fixtures/apifixtures"
"github.com/kcp-dev/kcp/test/e2e/fixtures/wildwest"
wildwestv1alpha1 "github.com/kcp-dev/kcp/test/e2e/fixtures/wildwest/apis/wildwest/v1alpha1"
Expand Down Expand Up @@ -252,49 +248,39 @@ func collectCacheHitsFor(ctx context.Context, t *testing.T, config *rest.Config,
return totalCacheHits, prefixCacheHit
}

const resyncPeriod = 10 * time.Hour
const byWorkspace = "byWorkspace"

func testDynamicDiscoverySharedInformerFactory(ctx context.Context, t *testing.T, config *rest.Config, expectedGVR schema.GroupVersionResource, expectedResName string, expectedClusterName logicalcluster.Name) {
nonIdentityKcpClusterClient, err := kcpclientset.NewClusterForConfig(config) // can only used for wildcard requests of apis.kcp.dev
require.NoError(t, err)
require.NoError(t, err, "failed to construct apiextensions client")

// since wildcard request are only allowed against a shard
// create a cfg that points to the root shard and use it to create ddsif
rootConfig := framework.ShardConfig(t, nonIdentityKcpClusterClient, "root", config)
rootConfig.QPS = 100
rootConfig.Burst = 200

// resolve identities for system APIBindings
identityRootConfig, resolveIdentities := boostrap.NewConfigWithWildcardIdentities(rootConfig, boostrap.KcpRootGroupExportNames, boostrap.KcpRootGroupResourceExportNames, nonIdentityKcpClusterClient.Cluster(tenancyv1alpha1.RootCluster))
require.Eventually(t, func() bool {
if err := resolveIdentities(ctx); err != nil {
klog.Errorf("failed to resolve identities, keeping trying: %v", err)
return false
}
return true
}, wait.ForeverTestTimeout, time.Millisecond*100)
crdClusterClient, err := apiextensionsclient.NewClusterForConfig(rootConfig)
require.NoError(t, err, "failed to construct apiextensions client")

rootKcpClusterClient, err := kcpclientset.NewClusterForConfig(identityRootConfig)
require.NoError(t, err)
rootKcpSharedInformerFactory := kcpexternalversions.NewSharedInformerFactoryWithOptions(rootKcpClusterClient.Cluster(logicalcluster.Wildcard), resyncPeriod)
rootMetadataClusterClient, err := metadataclient.NewDynamicMetadataClusterClientForConfig(rootConfig) // no identites necessary for partial metadata
require.NoError(t, err)
ddsif := informer.NewDynamicDiscoverySharedInformerFactory(
rootKcpSharedInformerFactory.Tenancy().V1alpha1().ClusterWorkspaces().Lister(),
rootKcpClusterClient.DiscoveryClient,
rootMetadataClusterClient.Cluster(logicalcluster.Wildcard),
func(obj interface{}) bool { return true }, 5*time.Second,
apiExtensionsInformerFactory := apiextensionsexternalversions.NewSharedInformerFactoryWithOptions(
crdClusterClient.Cluster(logicalcluster.Wildcard),
0,
)
err = ddsif.AddIndexers(cache.Indexers{byWorkspace: indexByWorkspace})
require.NoError(t, err)

t.Log("Starting KCP Shared Informer Factory")
rootKcpSharedInformerFactory.Start(ctx.Done())
t.Log("Waiting for KCP Shared Informer Factory to sync caches")
rootKcpSharedInformerFactory.WaitForCacheSync(ctx.Done())
ddsif, err := informer.NewDynamicDiscoverySharedInformerFactory(
rootConfig,
func(obj interface{}) bool { return true },
apiExtensionsInformerFactory.Apiextensions().V1().CustomResourceDefinitions(),
cache.Indexers{byWorkspace: indexByWorkspace},
)
require.NoError(t, err, "error creating DynamicDiscoverySharedInformerFactory")

t.Log("Starting apiextensions shared informer factory")
apiExtensionsInformerFactory.Start(ctx.Done())

t.Log("Starting DynamicDiscoverySharedInformerFactory")
ddsif.StartPolling(context.Background())
go ddsif.StartWorker(ctx)

t.Logf("Checking if DynamicDiscoverySharedInformerFactory has %v with name %v in cluster %v", expectedGVR.String(), expectedResName, expectedClusterName)
framework.Eventually(t, func() (success bool, reason string) {
Expand Down

0 comments on commit 80450a6

Please sign in to comment.