Skip to content

Commit

Permalink
Handle multiple EndpointSlices in the resolver
Browse files Browse the repository at this point in the history
For headless services, the controller retrieves all the EndpointSlices and,
on add/update, passes them to PutEndpointSlices. On delete, i there's no
more PutEndpointSlices, it calls RemoveEndpointSlice, otherwise
PutEndpointSlices.

Signed-off-by: Tom Pantelis <[email protected]>
  • Loading branch information
tpantelis committed Jul 26, 2023
1 parent 8a4369f commit 03004c6
Show file tree
Hide file tree
Showing 6 changed files with 241 additions and 103 deletions.
60 changes: 53 additions & 7 deletions coredns/resolver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
discovery "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
k8slabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
logf "sigs.k8s.io/controller-runtime/pkg/log"
mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"
Expand All @@ -34,8 +35,9 @@ import (
var logger = log.Logger{Logger: logf.Log.WithName("Resolver")}

type controller struct {
resolver *Interface
stopCh chan struct{}
resolver *Interface
resourceWatcher watcher.Interface
stopCh chan struct{}
}

func NewController(r *Interface) *controller {
Expand Down Expand Up @@ -72,12 +74,14 @@ func (c *controller) Start(config watcher.Config) error {
},
}

resourceWatcher, err := watcher.New(&config)
var err error

c.resourceWatcher, err = watcher.New(&config)
if err != nil {
return errors.Wrap(err, "error creating the resource watcher")
}

err = resourceWatcher.Start(c.stopCh)
err = c.resourceWatcher.Start(c.stopCh)
if err != nil {
return errors.Wrap(err, "error starting the resource watcher")
}
Expand All @@ -92,12 +96,49 @@ func (c *controller) Stop() {
}

func (c *controller) onEndpointSliceCreateOrUpdate(obj runtime.Object, _ int) bool {
return c.resolver.PutEndpointSlices(obj.(*discovery.EndpointSlice))
endpointSlice := obj.(*discovery.EndpointSlice)
if ignoreEndpointSlice(endpointSlice) {
return false
}

if !isHeadless(endpointSlice) {
return c.resolver.PutEndpointSlices(endpointSlice)
}

return c.resolver.PutEndpointSlices(c.getAllEndpointSlices(endpointSlice)...)
}

func (c *controller) getAllEndpointSlices(forEPS *discovery.EndpointSlice) []*discovery.EndpointSlice {
list := c.resourceWatcher.ListResources(&discovery.EndpointSlice{}, k8slabels.SelectorFromSet(map[string]string{
constants.LabelSourceNamespace: forEPS.Labels[constants.LabelSourceNamespace],
mcsv1a1.LabelServiceName: forEPS.Labels[mcsv1a1.LabelServiceName],
constants.MCSLabelSourceCluster: forEPS.Labels[constants.MCSLabelSourceCluster],
}))

epSlices := make([]*discovery.EndpointSlice, len(list))
for i := range list {
epSlices[i] = list[i].(*discovery.EndpointSlice)
}

return epSlices
}

func (c *controller) onEndpointSliceDelete(obj runtime.Object, _ int) bool {
c.resolver.RemoveEndpointSlice(obj.(*discovery.EndpointSlice))
return false
endpointSlice := obj.(*discovery.EndpointSlice)
if ignoreEndpointSlice(endpointSlice) {
return false
}

if !isHeadless(endpointSlice) {
c.resolver.RemoveEndpointSlice(endpointSlice)
}

epSlices := c.getAllEndpointSlices(endpointSlice)
if len(epSlices) == 0 {
c.resolver.RemoveEndpointSlice(endpointSlice)
}

return c.resolver.PutEndpointSlices(epSlices...)
}

func (c *controller) onServiceImportCreateOrUpdate(obj runtime.Object, _ int) bool {
Expand All @@ -109,3 +150,8 @@ func (c *controller) onServiceImportDelete(obj runtime.Object, _ int) bool {
c.resolver.RemoveServiceImport(obj.(*mcsv1a1.ServiceImport))
return false
}

func ignoreEndpointSlice(eps *discovery.EndpointSlice) bool {
isOnBroker := eps.Namespace != eps.Labels[constants.LabelSourceNamespace]
return isOnBroker
}
127 changes: 121 additions & 6 deletions coredns/resolver/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ package resolver_test
import (
"context"

"github.com/submariner-io/lighthouse/coredns/constants"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/submariner-io/admiral/pkg/syncer/test"
Expand All @@ -34,13 +36,13 @@ import (
var _ = Describe("Controller", func() {
t := newTestDriver()

expDNSRecord := resolver.DNSRecord{
IP: serviceIP1,
Ports: []mcsv1a1.ServicePort{port1},
ClusterName: clusterID1,
}
When("a ClusterIP service EndpointSlice is created", func() {
expDNSRecord := resolver.DNSRecord{
IP: serviceIP1,
Ports: []mcsv1a1.ServicePort{port1},
ClusterName: clusterID1,
}

When("an EndpointSlice is created", func() {
var endpointSlice *discovery.EndpointSlice

JustBeforeEach(func() {
Expand Down Expand Up @@ -100,4 +102,117 @@ var _ = Describe("Controller", func() {
})
})
})

When("there's multiple EndpointSlices for a headless service", func() {
var epsName1, epsName2 string

JustBeforeEach(func() {
t.createServiceImport(newHeadlessAggregatedServiceImport(namespace1, service1))

eps := newEndpointSlice(namespace1, service1, clusterID1, []mcsv1a1.ServicePort{port1},
discovery.Endpoint{
Addresses: []string{endpointIP1},
Conditions: discovery.EndpointConditions{Ready: &ready},
},
discovery.Endpoint{
Addresses: []string{endpointIP2},
Conditions: discovery.EndpointConditions{Ready: &ready},
},
)
epsName1 = eps.Name
t.createEndpointSlice(eps)

eps = newEndpointSlice(namespace1, service1, clusterID1, []mcsv1a1.ServicePort{port2},
discovery.Endpoint{
Addresses: []string{endpointIP3},
Conditions: discovery.EndpointConditions{Ready: &ready},
},
discovery.Endpoint{
Addresses: []string{endpointIP4},
Conditions: discovery.EndpointConditions{Ready: &ready},
},
)
epsName2 = eps.Name
t.createEndpointSlice(eps)
})

Specify("GetDNSRecords should return their DNS record", func() {
t.awaitDNSRecordsFound(namespace1, service1, clusterID1, "", true,
resolver.DNSRecord{
IP: endpointIP1,
Ports: []mcsv1a1.ServicePort{port1},
ClusterName: clusterID1,
},
resolver.DNSRecord{
IP: endpointIP2,
Ports: []mcsv1a1.ServicePort{port1},
ClusterName: clusterID1,
},
resolver.DNSRecord{
IP: endpointIP3,
Ports: []mcsv1a1.ServicePort{port2},
ClusterName: clusterID1,
},
resolver.DNSRecord{
IP: endpointIP4,
Ports: []mcsv1a1.ServicePort{port2},
ClusterName: clusterID1,
})
})

Context("and one is deleted", func() {
Specify("GetDNSRecords should return the remaining DNS records", func() {
t.awaitDNSRecords(namespace1, service1, clusterID1, "", true)

Expect(t.endpointSlices.Namespace(namespace1).Delete(context.TODO(), epsName1, metav1.DeleteOptions{})).To(Succeed())

t.awaitDNSRecordsFound(namespace1, service1, clusterID1, "", true,
resolver.DNSRecord{
IP: endpointIP3,
Ports: []mcsv1a1.ServicePort{port2},
ClusterName: clusterID1,
},
resolver.DNSRecord{
IP: endpointIP4,
Ports: []mcsv1a1.ServicePort{port2},
ClusterName: clusterID1,
})
})
})

Context("and both are deleted", func() {
Specify("GetDNSRecords should return no DNS records", func() {
t.awaitDNSRecords(namespace1, service1, clusterID1, "", true)

Expect(t.endpointSlices.Namespace(namespace1).Delete(context.TODO(), epsName1, metav1.DeleteOptions{})).To(Succeed())
Expect(t.endpointSlices.Namespace(namespace1).Delete(context.TODO(), epsName2, metav1.DeleteOptions{})).To(Succeed())

t.awaitDNSRecords(namespace1, service1, clusterID1, "", false)
})
})
})

When("an EndpointSlice is on the broker", func() {
JustBeforeEach(func() {
t.createServiceImport(newAggregatedServiceImport(namespace1, service1))
t.createEndpointSlice(&discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: test.RemoteNamespace,
Labels: map[string]string{
constants.MCSLabelSourceCluster: "test",
mcsv1a1.LabelServiceName: "test",
constants.LabelSourceNamespace: namespace1,
},
},
})
})

It("should not process it", func() {
Consistently(func() bool {
t.awaitDNSRecords(namespace1, service1, clusterID1, "", false)
return true
}).Should(BeTrue())
})
})
})
Loading

0 comments on commit 03004c6

Please sign in to comment.