Skip to content

Commit

Permalink
Handle multiple EndpointSlices in the resolver
Browse files Browse the repository at this point in the history
Signed-off-by: Tom Pantelis <[email protected]>
  • Loading branch information
tpantelis committed Jul 21, 2023
1 parent 95a9207 commit 708b185
Show file tree
Hide file tree
Showing 6 changed files with 241 additions and 103 deletions.
60 changes: 53 additions & 7 deletions coredns/resolver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
discovery "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
k8slabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
logf "sigs.k8s.io/controller-runtime/pkg/log"
mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"
Expand All @@ -34,8 +35,9 @@ import (
var logger = log.Logger{Logger: logf.Log.WithName("Resolver")}

type controller struct {
resolver *Interface
stopCh chan struct{}
resolver *Interface
resourceWatcher watcher.Interface
stopCh chan struct{}
}

func NewController(r *Interface) *controller {
Expand Down Expand Up @@ -72,12 +74,14 @@ func (c *controller) Start(config watcher.Config) error {
},
}

resourceWatcher, err := watcher.New(&config)
var err error

c.resourceWatcher, err = watcher.New(&config)
if err != nil {
return errors.Wrap(err, "error creating the resource watcher")
}

err = resourceWatcher.Start(c.stopCh)
err = c.resourceWatcher.Start(c.stopCh)
if err != nil {
return errors.Wrap(err, "error starting the resource watcher")
}
Expand All @@ -92,12 +96,49 @@ func (c *controller) Stop() {
}

func (c *controller) onEndpointSliceCreateOrUpdate(obj runtime.Object, _ int) bool {
return c.resolver.PutEndpointSlices(obj.(*discovery.EndpointSlice))
endpointSlice := obj.(*discovery.EndpointSlice)
if ignoreEndpointSlice(endpointSlice) {
return false
}

if !isHeadless(endpointSlice) {
return c.resolver.PutEndpointSlices(endpointSlice)
}

return c.resolver.PutEndpointSlices(c.getAllEndpointSlices(endpointSlice)...)
}

func (c *controller) getAllEndpointSlices(forEPS *discovery.EndpointSlice) []*discovery.EndpointSlice {
list := c.resourceWatcher.ListResources(&discovery.EndpointSlice{}, k8slabels.SelectorFromSet(map[string]string{
constants.LabelSourceNamespace: forEPS.Labels[constants.LabelSourceNamespace],
mcsv1a1.LabelServiceName: forEPS.Labels[mcsv1a1.LabelServiceName],
constants.MCSLabelSourceCluster: forEPS.Labels[constants.MCSLabelSourceCluster],
}))

epSlices := make([]*discovery.EndpointSlice, len(list))
for i := range list {
epSlices[i] = list[i].(*discovery.EndpointSlice)
}

return epSlices
}

func (c *controller) onEndpointSliceDelete(obj runtime.Object, _ int) bool {
c.resolver.RemoveEndpointSlice(obj.(*discovery.EndpointSlice))
return false
endpointSlice := obj.(*discovery.EndpointSlice)
if ignoreEndpointSlice(endpointSlice) {
return false
}

if !isHeadless(endpointSlice) {
c.resolver.RemoveEndpointSlice(endpointSlice)
}

epSlices := c.getAllEndpointSlices(endpointSlice)
if len(epSlices) == 0 {
c.resolver.RemoveEndpointSlice(endpointSlice)
}

return c.resolver.PutEndpointSlices(epSlices...)
}

func (c *controller) onServiceImportCreateOrUpdate(obj runtime.Object, _ int) bool {
Expand All @@ -109,3 +150,8 @@ func (c *controller) onServiceImportDelete(obj runtime.Object, _ int) bool {
c.resolver.RemoveServiceImport(obj.(*mcsv1a1.ServiceImport))
return false
}

func ignoreEndpointSlice(eps *discovery.EndpointSlice) bool {
isOnBroker := eps.Namespace != eps.Labels[constants.LabelSourceNamespace]
return isOnBroker
}
127 changes: 121 additions & 6 deletions coredns/resolver/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ package resolver_test
import (
"context"

"github.com/submariner-io/lighthouse/coredns/constants"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/submariner-io/admiral/pkg/syncer/test"
Expand All @@ -34,13 +36,13 @@ import (
var _ = Describe("Controller", func() {
t := newTestDriver()

expDNSRecord := resolver.DNSRecord{
IP: serviceIP1,
Ports: []mcsv1a1.ServicePort{port1},
ClusterName: clusterID1,
}
When("a ClusterIP service EndpointSlice is created", func() {
expDNSRecord := resolver.DNSRecord{
IP: serviceIP1,
Ports: []mcsv1a1.ServicePort{port1},
ClusterName: clusterID1,
}

When("an EndpointSlice is created", func() {
var endpointSlice *discovery.EndpointSlice

JustBeforeEach(func() {
Expand Down Expand Up @@ -100,4 +102,117 @@ var _ = Describe("Controller", func() {
})
})
})

When("there's multiple EndpointSlices for a headless service", func() {
var epsName1, epsName2 string

JustBeforeEach(func() {
t.createServiceImport(newHeadlessAggregatedServiceImport(namespace1, service1))

eps := newEndpointSlice(namespace1, service1, clusterID1, []mcsv1a1.ServicePort{port1},
discovery.Endpoint{
Addresses: []string{endpointIP1},
Conditions: discovery.EndpointConditions{Ready: &ready},
},
discovery.Endpoint{
Addresses: []string{endpointIP2},
Conditions: discovery.EndpointConditions{Ready: &ready},
},
)
epsName1 = eps.Name
t.createEndpointSlice(eps)

eps = newEndpointSlice(namespace1, service1, clusterID1, []mcsv1a1.ServicePort{port2},
discovery.Endpoint{
Addresses: []string{endpointIP3},
Conditions: discovery.EndpointConditions{Ready: &ready},
},
discovery.Endpoint{
Addresses: []string{endpointIP4},
Conditions: discovery.EndpointConditions{Ready: &ready},
},
)
epsName2 = eps.Name
t.createEndpointSlice(eps)
})

Specify("GetDNSRecords should return their DNS record", func() {
t.awaitDNSRecordsFound(namespace1, service1, clusterID1, "", true,
resolver.DNSRecord{
IP: endpointIP1,
Ports: []mcsv1a1.ServicePort{port1},
ClusterName: clusterID1,
},
resolver.DNSRecord{
IP: endpointIP2,
Ports: []mcsv1a1.ServicePort{port1},
ClusterName: clusterID1,
},
resolver.DNSRecord{
IP: endpointIP3,
Ports: []mcsv1a1.ServicePort{port2},
ClusterName: clusterID1,
},
resolver.DNSRecord{
IP: endpointIP4,
Ports: []mcsv1a1.ServicePort{port2},
ClusterName: clusterID1,
})
})

Context("and one is deleted", func() {
Specify("GetDNSRecords should return the remaining DNS records", func() {
t.awaitDNSRecords(namespace1, service1, clusterID1, "", true)

Expect(t.endpointSlices.Namespace(namespace1).Delete(context.TODO(), epsName1, metav1.DeleteOptions{})).To(Succeed())

t.awaitDNSRecordsFound(namespace1, service1, clusterID1, "", true,
resolver.DNSRecord{
IP: endpointIP3,
Ports: []mcsv1a1.ServicePort{port2},
ClusterName: clusterID1,
},
resolver.DNSRecord{
IP: endpointIP4,
Ports: []mcsv1a1.ServicePort{port2},
ClusterName: clusterID1,
})
})
})

Context("and both are deleted", func() {
Specify("GetDNSRecords should return no DNS records", func() {
t.awaitDNSRecords(namespace1, service1, clusterID1, "", true)

Expect(t.endpointSlices.Namespace(namespace1).Delete(context.TODO(), epsName1, metav1.DeleteOptions{})).To(Succeed())
Expect(t.endpointSlices.Namespace(namespace1).Delete(context.TODO(), epsName2, metav1.DeleteOptions{})).To(Succeed())

t.awaitDNSRecords(namespace1, service1, clusterID1, "", false)
})
})
})

When("an EndpointSlice is on the broker", func() {
JustBeforeEach(func() {
t.createServiceImport(newAggregatedServiceImport(namespace1, service1))
t.createEndpointSlice(&discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: test.RemoteNamespace,
Labels: map[string]string{
constants.MCSLabelSourceCluster: "test",
mcsv1a1.LabelServiceName: "test",
constants.LabelSourceNamespace: namespace1,
},
},
})
})

It("should not process it", func() {
Consistently(func() bool {
t.awaitDNSRecords(namespace1, service1, clusterID1, "", false)
return true
}).Should(BeTrue())
})
})
})
Loading

0 comments on commit 708b185

Please sign in to comment.