Skip to content

Commit

Permalink
Add E2E test for clusterset IP
Browse files Browse the repository at this point in the history
Also detect if clusterset IP is enabled globally by inspecting the
env var passed to the pod container in the LH agent deployemnt and, if
so, skip the other ClusterIP service discovery tests since they will
fail.

Signed-off-by: Tom Pantelis <[email protected]>
  • Loading branch information
tpantelis committed Sep 23, 2024
1 parent 70fcb84 commit b4df550
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 20 deletions.
77 changes: 77 additions & 0 deletions test/e2e/discovery/clusterset_ip_enabled.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
SPDX-License-Identifier: Apache-2.0
Copyright Contributors to the Submariner project.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package discovery

import (
"fmt"
"strconv"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
lhframework "github.com/submariner-io/lighthouse/test/e2e/framework"
"github.com/submariner-io/shipyard/test/e2e/framework"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"
)

var _ = Describe("Test Service Discovery Across Clusters", Label(TestLabel), func() {
f := lhframework.NewFramework("discovery")

When("clusterset IP is enabled for an exported service", func() {
It("should resolve the allocated clusterset IP", func() {
RunClusterSetIPTest(f)
})
})
})

func RunClusterSetIPTest(f *lhframework.Framework) {
clusterAName := framework.TestContext.ClusterIDs[framework.ClusterA]
clusterBName := framework.TestContext.ClusterIDs[framework.ClusterB]

framework.By(fmt.Sprintf("Creating an Nginx Deployment on on %q", clusterBName))
f.NewNginxDeployment(framework.ClusterB)

framework.By(fmt.Sprintf("Creating a Nginx Service on %q", clusterBName))

nginxServiceClusterB := f.NewNginxService(framework.ClusterB)

f.CreateServiceExport(framework.ClusterB, &mcsv1a1.ServiceExport{
ObjectMeta: metav1.ObjectMeta{
Name: nginxServiceClusterB.Name,
Namespace: nginxServiceClusterB.Namespace,
// TODO - use constants.UseClustersetIP
Annotations: map[string]string{"lighthouse.submariner.io/use-clusterset-ip": strconv.FormatBool(true)},
},
})

f.AwaitServiceExportedStatusCondition(framework.ClusterB, nginxServiceClusterB.Name, nginxServiceClusterB.Namespace)

framework.By(fmt.Sprintf("Creating a Netshoot Deployment on %q", clusterAName))

netshootPodList := f.NewNetShootDeployment(framework.ClusterA)

svc, err := f.GetService(framework.ClusterB, nginxServiceClusterB.Name, nginxServiceClusterB.Namespace)
Expect(err).NotTo(HaveOccurred())

nginxServiceClusterB = svc
serviceImport := f.AwaitAggregatedServiceImport(framework.ClusterA, nginxServiceClusterB, 1)
Expect(serviceImport.Spec.IPs).To(HaveLen(1), "ServiceImport was not allocated an IP")

f.VerifyIPWithDig(framework.ClusterA, nginxServiceClusterB, netshootPodList, checkedDomains, "", serviceImport.Spec.IPs[0], true)
}
6 changes: 6 additions & 0 deletions test/e2e/discovery/service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ var checkedDomains = lhframework.CheckedDomains
var _ = Describe("Test Service Discovery Across Clusters", Label(TestLabel), func() {
f := lhframework.NewFramework("discovery")

BeforeEach(func() {
if lhframework.ClusterSetIPEnabled {
Skip("The clusterset IP feature is enabled globally - skipping the test")
}
})

When("a pod tries to resolve a service in a remote cluster", func() {
It("should be able to discover the remote service successfully", func() {
RunServiceDiscoveryTest(f)
Expand Down
71 changes: 51 additions & 20 deletions test/e2e/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@ import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/submariner-io/admiral/pkg/names"
"github.com/submariner-io/admiral/pkg/resource"
"github.com/submariner-io/admiral/pkg/slices"
"github.com/submariner-io/lighthouse/pkg/constants"
"github.com/submariner-io/shipyard/test/e2e/framework"
Expand Down Expand Up @@ -61,9 +64,10 @@ type Framework struct {
}

var (
MCSClients []*mcsClientset.Clientset
EndpointClients []dynamic.ResourceInterface
SubmarinerClients []dynamic.ResourceInterface
MCSClients []*mcsClientset.Clientset
EndpointClients []dynamic.ResourceInterface
SubmarinerClients []dynamic.ResourceInterface
ClusterSetIPEnabled = false
)

func init() {
Expand Down Expand Up @@ -97,6 +101,25 @@ func beforeSuite() {
}

framework.DetectGlobalnet()

for _, client := range framework.KubeClients {
framework.AwaitUntil("find lighthouse agent deployment", func() (interface{}, error) {
return client.AppsV1().Deployments(framework.TestContext.SubmarinerNamespace).Get(context.TODO(),
names.ServiceDiscoveryComponent, metav1.GetOptions{})
}, func(result interface{}) (bool, string, error) {
d := result.(*appsv1.Deployment)
for i := range d.Spec.Template.Spec.Containers[0].Env {
if d.Spec.Template.Spec.Containers[0].Env[i].Name == "SUBMARINER_CLUSTERSET_IP_ENABLED" {
ClusterSetIPEnabled = d.Spec.Template.Spec.Containers[0].Env[i].Value == strconv.FormatBool(true)
if ClusterSetIPEnabled {
break
}
}
}

return true, "", nil
})
}
}

func createLighthouseClient(restConfig *rest.Config) *mcsClientset.Clientset {
Expand Down Expand Up @@ -131,18 +154,23 @@ func createSubmarinerClientSet(restConfig *rest.Config) dynamic.ResourceInterfac
}

func (f *Framework) NewServiceExport(cluster framework.ClusterIndex, name, namespace string) *mcsv1a1.ServiceExport {
nginxServiceExport := mcsv1a1.ServiceExport{
return f.CreateServiceExport(cluster, &mcsv1a1.ServiceExport{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Name: name,
Namespace: namespace,
},
}
se := MCSClients[cluster].MulticlusterV1alpha1().ServiceExports(namespace)
framework.By(fmt.Sprintf("Creating serviceExport %s.%s on %q", name, namespace, framework.TestContext.ClusterIDs[cluster]))
serviceExport := framework.AwaitUntil("create serviceExport", func() (interface{}, error) {
return se.Create(context.TODO(), &nginxServiceExport, metav1.CreateOptions{})
}, framework.NoopCheckResult).(*mcsv1a1.ServiceExport)
})
}

func (f *Framework) CreateServiceExport(cluster framework.ClusterIndex, serviceExport *mcsv1a1.ServiceExport) *mcsv1a1.ServiceExport {
se := MCSClients[cluster].MulticlusterV1alpha1().ServiceExports(serviceExport.Namespace)

return serviceExport
framework.By(fmt.Sprintf("Creating serviceExport %s.%s on %q", serviceExport.Name, serviceExport.Namespace,
framework.TestContext.ClusterIDs[cluster]))

return framework.AwaitUntil("create serviceExport", func() (interface{}, error) {
return se.Create(context.TODO(), serviceExport, metav1.CreateOptions{})
}, framework.NoopCheckResult).(*mcsv1a1.ServiceExport)
}

func (f *Framework) AwaitServiceExportedStatusCondition(cluster framework.ClusterIndex, name, namespace string) {
Expand Down Expand Up @@ -186,14 +214,17 @@ func (f *Framework) GetService(cluster framework.ClusterIndex, name, namespace s
return framework.KubeClients[cluster].CoreV1().Services(namespace).Get(context.TODO(), name, metav1.GetOptions{})
}

func (f *Framework) AwaitAggregatedServiceImport(targetCluster framework.ClusterIndex, svc *v1.Service, clusterCount int) {
func (f *Framework) AwaitAggregatedServiceImport(targetCluster framework.ClusterIndex, svc *v1.Service, clusterCount int,
) *mcsv1a1.ServiceImport {
framework.By(fmt.Sprintf("Retrieving ServiceImport for %q in ns %q on %q", svc.Name, svc.Namespace,
framework.TestContext.ClusterIDs[targetCluster]))

si := MCSClients[targetCluster].MulticlusterV1alpha1().ServiceImports(svc.Namespace)
var si *mcsv1a1.ServiceImport

siClient := MCSClients[targetCluster].MulticlusterV1alpha1().ServiceImports(svc.Namespace)

framework.AwaitUntil("retrieve ServiceImport", func() (interface{}, error) {
obj, err := si.Get(context.TODO(), svc.Name, metav1.GetOptions{})
obj, err := siClient.Get(context.TODO(), svc.Name, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
return nil, nil //nolint:nilnil // Intentional
}
Expand All @@ -212,7 +243,7 @@ func (f *Framework) AwaitAggregatedServiceImport(targetCluster framework.Cluster
return false, "ServiceImport not found", nil
}

si := result.(*mcsv1a1.ServiceImport)
si = result.(*mcsv1a1.ServiceImport)

if len(si.Status.Clusters) != clusterCount {
return false, fmt.Sprintf("Actual cluster count %d does not match expected %d",
Expand All @@ -231,14 +262,14 @@ func (f *Framework) AwaitAggregatedServiceImport(targetCluster framework.Cluster
if svc.Spec.ClusterIP != v1.ClusterIPNone && !slices.Equivalent(expPorts, si.Spec.Ports, func(p mcsv1a1.ServicePort) string {
return fmt.Sprintf("%s%s%d", p.Name, p.Protocol, p.Port)
}) {
s1, _ := json.MarshalIndent(expPorts, "", " ")
s2, _ := json.MarshalIndent(si.Spec.Ports, "", " ")

return false, fmt.Sprintf("ServiceImport ports do not match. Expected: %s, Actual: %s", s1, s2), nil
return false, fmt.Sprintf("ServiceImport ports do not match. Expected: %s, Actual: %s",
resource.ToJSON(expPorts), resource.ToJSON(si.Spec.Ports)), nil
}

return true, "", nil
})

return si
}

func (f *Framework) NewHeadlessServiceWithParams(name, portName string, protcol v1.Protocol,
Expand Down

0 comments on commit b4df550

Please sign in to comment.