Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement clusterset IP in the CoredDNS plugin #1641

Merged
merged 1 commit into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions coredns/plugin/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ var _ = Describe("Lighthouse DNS plugin Handler", func() {
Context("Headless services", testHeadlessService)
Context("Local services", testLocalService)
Context("Service with multiple ports", testSRVMultiplePorts)
Context("Service with clusterset IP", testClusterSetIP)
})

type FailingResponseWriter struct {
Expand Down Expand Up @@ -959,6 +960,58 @@ func testSRVMultiplePorts() {
})
}

func testClusterSetIP() {
const clusterSetIP = "243.1.0.1"

qname := fmt.Sprintf("%s.%s.svc.clusterset.local.", service1, namespace1)

var (
rec *dnstest.Recorder
t *handlerTestDriver
)

BeforeEach(func() {
t = newHandlerTestDriver()

si := newServiceImport(namespace1, service1, mcsv1a1.ClusterSetIP)
si.Spec.IPs = []string{clusterSetIP}
si.Spec.Ports = []mcsv1a1.ServicePort{port1, port2}

t.lh.Resolver.PutServiceImport(si)

t.lh.Resolver.PutEndpointSlices(newEndpointSlice(namespace1, service1, clusterID, []mcsv1a1.ServicePort{port1},
newEndpoint(serviceIP, "", true)))

t.lh.Resolver.PutEndpointSlices(newEndpointSlice(namespace1, service1, clusterID2, []mcsv1a1.ServicePort{port2},
newEndpoint(serviceIP2, "", true)))

rec = dnstest.NewRecorder(&test.ResponseWriter{})
})

Specify("DNS query of Type A record should succeed and write an A record response", func() {
t.executeTestCase(rec, test.Case{
Qname: qname,
Qtype: dns.TypeA,
Rcode: dns.RcodeSuccess,
Answer: []dns.RR{
test.A(fmt.Sprintf("%s 5 IN A %s", qname, clusterSetIP)),
},
})
})

Specify("DNS query of Type SRV should succeed and write an SRV record response", func() {
t.executeTestCase(rec, test.Case{
Qname: qname,
Qtype: dns.TypeSRV,
Rcode: dns.RcodeSuccess,
Answer: []dns.RR{
test.SRV(fmt.Sprintf("%s 5 IN SRV 0 50 %d %s", qname, port2.Port, qname)),
test.SRV(fmt.Sprintf("%s 5 IN SRV 0 50 %d %s", qname, port1.Port, qname)),
},
})
})
}

type handlerTestDriver struct {
mockCs *fakecs.ClusterStatus
lh *lighthouse.Lighthouse
Expand Down
37 changes: 37 additions & 0 deletions coredns/resolver/clusterip_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var _ = Describe("GetDNSRecords", func() {
When("a service is present in one cluster", testClusterIPServiceInOneCluster)
When("a service is present in two clusters", testClusterIPServiceInTwoClusters)
When("a service is present in three clusters", testClusterIPServiceInThreeClusters)
Context("with a clusterset IP", testClusterSetIP)

testClusterIPServiceMisc()
})
Expand Down Expand Up @@ -441,3 +442,39 @@ func testClusterIPServiceMisc() {
})
})
}

func testClusterSetIP() {
const clusterSetIP = "243.1.0.1"

t := newTestDriver()

BeforeEach(func() {
si := newAggregatedServiceImport(namespace1, service1)
si.Spec.IPs = []string{clusterSetIP}
si.Spec.Ports = []mcsv1a1.ServicePort{port1, port2}

t.resolver.PutServiceImport(si)

t.putEndpointSlice(newClusterIPEndpointSlice(namespace1, service1, clusterID1, serviceIP1, true, port1))
t.putEndpointSlice(newClusterIPEndpointSlice(namespace1, service1, clusterID2, serviceIP2, true, port2))
})

Context("and no specific cluster is requested", func() {
It("should return the clusterset IP DNS record", func() {
t.assertDNSRecordsFound(namespace1, service1, "", "", false, resolver.DNSRecord{
IP: clusterSetIP,
Ports: []mcsv1a1.ServicePort{port1, port2},
})
})
})

Context("and a cluster is requested", func() {
It("should return its DNS record", func() {
t.assertDNSRecordsFound(namespace1, service1, clusterID1, "", false, resolver.DNSRecord{
IP: serviceIP1,
Ports: []mcsv1a1.ServicePort{port1},
ClusterName: clusterID1,
})
})
})
}
4 changes: 2 additions & 2 deletions coredns/resolver/endpoint_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (i *Interface) PutEndpointSlices(endpointSlices ...*discovery.EndpointSlice
return true
}

if !serviceInfo.isHeadless {
if !serviceInfo.isHeadless() {
return i.putClusterIPEndpointSlice(key, clusterID, endpointSlices[0], serviceInfo)
}

Expand Down Expand Up @@ -264,7 +264,7 @@ func (i *Interface) RemoveEndpointSlice(endpointSlice *discovery.EndpointSlice)

if len(serviceInfo.clusters) == 0 && !serviceInfo.isExported {
delete(i.serviceMap, key)
} else if !serviceInfo.isHeadless {
} else if !serviceInfo.isHeadless() {
serviceInfo.mergePorts()
serviceInfo.resetLoadBalancing()
}
Expand Down
9 changes: 8 additions & 1 deletion coredns/resolver/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (i *Interface) GetDNSRecords(namespace, name, clusterID, hostname string) (
return nil, false, false
}

if !serviceInfo.isHeadless {
if !serviceInfo.isHeadless() {
record, found := i.getClusterIPRecord(serviceInfo, clusterID)
if record != nil {
return []DNSRecord{*record}, false, true
Expand All @@ -64,6 +64,13 @@ func (i *Interface) getClusterIPRecord(serviceInfo *serviceInfo, clusterID strin
return &clusterInfo.endpointRecords[0], true
}

if len(serviceInfo.spec.IPs) > 0 {
return &DNSRecord{
IP: serviceInfo.spec.IPs[0],
Ports: serviceInfo.spec.Ports,
}, true
}

// If we are aware of the local cluster and we found some accessible IP, we shall return it.
localClusterID := i.clusterStatus.GetLocalClusterID()
if localClusterID != "" {
Expand Down
11 changes: 7 additions & 4 deletions coredns/resolver/service_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,20 @@ func (i *Interface) PutServiceImport(serviceImport *mcsv1a1.ServiceImport) {

if !found {
svcInfo = &serviceInfo{
clusters: make(map[string]*clusterInfo),
balancer: loadbalancer.NewSmoothWeightedRR(),
isHeadless: serviceImport.Spec.Type == mcsv1a1.Headless,
clusters: make(map[string]*clusterInfo),
balancer: loadbalancer.NewSmoothWeightedRR(),
}

if !isLegacy {
svcInfo.spec = serviceImport.Spec
}

i.serviceMap[key] = svcInfo
}

svcInfo.isExported = true

if svcInfo.isHeadless || !isLegacy {
if svcInfo.isHeadless() || !isLegacy {
return
}

Expand Down
4 changes: 4 additions & 0 deletions coredns/resolver/service_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,7 @@ func (si *serviceInfo) selectIP(checkCluster func(string) bool) *DNSRecord {

return nil
}

func (si *serviceInfo) isHeadless() bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QQ: I know it's a small function, but performance-wise - isn't it better to keep the current code ? I mean calculate it once when serviceInfo is created and not every time DNS resolver is called.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made it a method for readability and to avoid duplicate storage. It might be a little faster as a variable but the difference would be extremely negligible (probably in nanosec). Plus compilers are pretty smart about optimizing and caching frequently accessed code.

return si.spec.Type == mcsv1a1.Headless
}
2 changes: 1 addition & 1 deletion coredns/resolver/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type clusterInfo struct {
type serviceInfo struct {
clusters map[string]*clusterInfo
balancer loadbalancer.Interface
isHeadless bool
isExported bool
ports []mcsv1a1.ServicePort
spec mcsv1a1.ServiceImportSpec
}
Loading