Skip to content

Commit

Permalink
Merge branch 'main' into alex/remote-disco
Browse files Browse the repository at this point in the history
  • Loading branch information
adleong committed Aug 9, 2023
2 parents 6127d8c + 62a8407 commit a03941f
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 46 deletions.
6 changes: 2 additions & 4 deletions controller/api/destination/watcher/cluster_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,8 @@ func (cs *ClusterStore) addCluster(clusterName string, secret *v1.Secret) error
stopCh,
}

go func() {
remoteAPI.Sync(stopCh)
metadataAPI.Sync(stopCh)
}()
go remoteAPI.Sync(stopCh)
go metadataAPI.Sync(stopCh)

cs.log.Infof("Added cluster %s to ClusterStore", clusterName)

Expand Down
46 changes: 22 additions & 24 deletions controller/api/destination/watcher/cluster_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,32 +132,30 @@ func TestClusterStoreHandlers(t *testing.T) {
}

// Handle delete events
if len(tt.deleteClusters) != 0 {
for k := range tt.deleteClusters {
watcher, _, found := cs.Get(k)
if !found {
t.Fatalf("Unexpected error: watcher %s should exist in the cache", k)
}
// Unfortunately, mock k8s client does not propagate
// deletes, so we have to call remove directly.
cs.removeCluster(k)
// Leave it to do its thing and gracefully shutdown
time.Sleep(50 * time.Millisecond)
var hasStopped bool
if tt.enableEndpointSlices {
hasStopped = watcher.k8sAPI.ES().Informer().IsStopped()
} else {
hasStopped = watcher.k8sAPI.Endpoint().Informer().IsStopped()
}
if !hasStopped {
t.Fatalf("Unexpected error: informers for watcher %s should be stopped", k)
}

if _, _, found := cs.Get(k); found {
t.Fatalf("Unexpected error: watcher %s should have been removed from the cache", k)
}
for k := range tt.deleteClusters {
watcher, _, found := cs.Get(k)
if !found {
t.Fatalf("Unexpected error: watcher %s should exist in the cache", k)
}
// Unfortunately, mock k8s client does not propagate
// deletes, so we have to call remove directly.
cs.removeCluster(k)
// Leave it to do its thing and gracefully shutdown
time.Sleep(50 * time.Millisecond)
var hasStopped bool
if tt.enableEndpointSlices {
hasStopped = watcher.k8sAPI.ES().Informer().IsStopped()
} else {
hasStopped = watcher.k8sAPI.Endpoint().Informer().IsStopped()
}
if !hasStopped {
t.Fatalf("Unexpected error: informers for watcher %s should be stopped", k)
}

if _, _, found := cs.Get(k); found {
t.Fatalf("Unexpected error: watcher %s should have been removed from the cache", k)
}

}
})
}
Expand Down
18 changes: 1 addition & 17 deletions controller/k8s/metadata_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,7 @@ func InitializeMetadataAPI(kubeConfig string, resources ...APIResource) (*Metada
if err != nil {
return nil, fmt.Errorf("error configuring Kubernetes API client: %w", err)
}

client, err := metadata.NewForConfig(config)
if err != nil {
return nil, err
}

api, err := newClusterScopedMetadataAPI(client, resources...)
if err != nil {
return nil, err
}

for _, gauge := range api.gauges {
if err := prometheus.Register(gauge); err != nil {
log.Warnf("failed to register Prometheus gauge %s: %s", gauge.Desc().String(), err)
}
}
return api, nil
return InitializeMetadataAPIForConfig(config, resources...)
}

func InitializeMetadataAPIForConfig(kubeConfig *rest.Config, resources ...APIResource) (*MetadataAPI, error) {
Expand Down
2 changes: 1 addition & 1 deletion controller/k8s/test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func NewFakeAPI(configs ...string) (*API, error) {

// NewFakeMetadataAPI provides a mock Kubernetes API for testing.
func NewFakeMetadataAPI(configs []string) (*MetadataAPI, error) {
sch := clientsetscheme.Scheme
sch := runtime.NewScheme()
metav1.AddMetaToScheme(sch)

var objs []runtime.Object
Expand Down

0 comments on commit a03941f

Please sign in to comment.