Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add remote-discovery service mirroring #11201

Merged
merged 2 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,34 @@ spec:
type: array
items:
type: string
remoteDiscoverySelector:
mateiidavid marked this conversation as resolved.
Show resolved Hide resolved
description: Selector for Services to mirror in remote discovery mode
type: object
properties:
matchLabels:
type: object
x-kubernetes-preserve-unknown-fields: true
matchExpressions:
description: List of selector requirements
type: array
items:
description: A selector item requires a key and an operator
type: object
required:
- key
- operator
properties:
key:
description: Label key that selector should apply to
type: string
operator:
description: Evaluation of a label in relation to set
type: string
enum: [In, NotIn, Exists, DoesNotExist]
values:
type: array
items:
type: string
targetClusterName:
description: Name of target cluster to link to
type: string
Expand Down
9 changes: 9 additions & 0 deletions multicluster/cmd/link.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type (
controlPlaneVersion string
dockerRegistry string
selector string
remoteDiscoverySelector string
gatewayAddresses string
gatewayPort uint32
ha bool
Expand Down Expand Up @@ -229,6 +230,11 @@ A full list of configurable values can be found at https://github.com/linkerd/li
return err
}

remoteDiscoverySelector, err := metav1.ParseToLabelSelector(opts.remoteDiscoverySelector)
if err != nil {
return err
}

link := mc.Link{
Name: opts.clusterName,
Namespace: opts.namespace,
Expand All @@ -241,6 +247,7 @@ A full list of configurable values can be found at https://github.com/linkerd/li
GatewayIdentity: gatewayIdentity,
ProbeSpec: probeSpec,
Selector: *selector,
RemoteDiscoverySelector: *remoteDiscoverySelector,
}

obj, err := link.ToUnstructured()
Expand Down Expand Up @@ -303,6 +310,7 @@ A full list of configurable values can be found at https://github.com/linkerd/li
cmd.Flags().StringVar(&opts.dockerRegistry, "registry", opts.dockerRegistry,
fmt.Sprintf("Docker registry to pull service mirror controller image from ($%s)", flags.EnvOverrideDockerRegistry))
cmd.Flags().StringVarP(&opts.selector, "selector", "l", opts.selector, "Selector (label query) to filter which services in the target cluster to mirror")
cmd.Flags().StringVar(&opts.remoteDiscoverySelector, "remote-discovery-selector", opts.remoteDiscoverySelector, "Selector (label query) to filter which services in the target cluster to mirror in remote discovery mode")
cmd.Flags().StringVar(&opts.gatewayAddresses, "gateway-addresses", opts.gatewayAddresses, "If specified, overwrites gateway addresses when gateway service is not type LoadBalancer (comma separated list)")
cmd.Flags().Uint32Var(&opts.gatewayPort, "gateway-port", opts.gatewayPort, "If specified, overwrites gateway port when gateway service is not type LoadBalancer")
cmd.Flags().BoolVar(&opts.ha, "ha", opts.ha, "Enable HA configuration for the service-mirror deployment (default false)")
Expand Down Expand Up @@ -401,6 +409,7 @@ func newLinkOptionsWithDefault() (*linkOptions, error) {
logLevel: defaults.LogLevel,
logFormat: defaults.LogFormat,
selector: fmt.Sprintf("%s=%s", k8s.DefaultExportedServiceSelector, "true"),
remoteDiscoverySelector: fmt.Sprintf("%s=%s", k8s.DefaultExportedServiceSelector, "remote-discovery"),
gatewayAddresses: "",
gatewayPort: 0,
ha: false,
Expand Down
28 changes: 28 additions & 0 deletions multicluster/cmd/testdata/install_default.golden

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 28 additions & 0 deletions multicluster/cmd/testdata/install_ha.golden

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 28 additions & 0 deletions multicluster/cmd/testdata/install_psp.golden

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

115 changes: 88 additions & 27 deletions multicluster/service-mirror/cluster_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,11 @@ func (rcsw *RemoteClusterServiceWatcher) getMirroredServiceLabels(remoteService
return labels
}

if rcsw.isRemoteDiscovery(remoteService) {
labels[consts.RemoteDiscoveryLabel] = rcsw.link.TargetClusterName
labels[consts.RemoteServiceLabel] = remoteService.GetName()
}

for key, value := range remoteService.ObjectMeta.Labels {
if strings.HasPrefix(key, consts.SvcMirrorPrefix) {
continue
Expand Down Expand Up @@ -430,27 +435,50 @@ func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceDeleted(ctx context.
// new gateway being assigned or additional ports exposed. This method takes care of that.
func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceUpdated(ctx context.Context, ev *RemoteServiceUpdated) error {
rcsw.log.Infof("Updating mirror service %s/%s", ev.localService.Namespace, ev.localService.Name)
gatewayAddresses, err := rcsw.resolveGatewayAddress()
if err != nil {
return err
}

copiedEndpoints := ev.localEndpoints.DeepCopy()
copiedEndpoints.Subsets = []corev1.EndpointSubset{
{
Addresses: gatewayAddresses,
Ports: rcsw.getEndpointsPorts(ev.remoteUpdate),
},
}
if rcsw.isRemoteDiscovery(ev.remoteUpdate) {
// The service is mirrored in remote discovery mode and any local
// endpoints for it should be deleted if they exist.
if ev.localEndpoints != nil {
err := rcsw.localAPIClient.Client.CoreV1().Endpoints(ev.localService.Namespace).Delete(ctx, ev.localService.Name, metav1.DeleteOptions{})
if err != nil {
return RetryableError{[]error{
fmt.Errorf("failed to delete mirror endpoints for %s/%s: %w", ev.localService.Namespace, ev.localService.Name, err),
}}
}
}
} else if ev.localEndpoints == nil {
// The service is mirrored in gateway mode and gateway endpoints should
// be created for it.
err := rcsw.createGatewayEndpoints(ctx, ev.remoteUpdate)
if err != nil {
return err
}
} else {
// The service is mirrored in gateway mode and gateway endpoints already
// exist for it but may need to be updated.
gatewayAddresses, err := rcsw.resolveGatewayAddress()
if err != nil {
return err
}

if copiedEndpoints.Annotations == nil {
copiedEndpoints.Annotations = make(map[string]string)
}
copiedEndpoints.Annotations[consts.RemoteGatewayIdentity] = rcsw.link.GatewayIdentity
copiedEndpoints := ev.localEndpoints.DeepCopy()
copiedEndpoints.Subsets = []corev1.EndpointSubset{
{
Addresses: gatewayAddresses,
Ports: rcsw.getEndpointsPorts(ev.remoteUpdate),
},
}

err = rcsw.updateMirrorEndpoints(ctx, copiedEndpoints)
if err != nil {
return RetryableError{[]error{err}}
if copiedEndpoints.Annotations == nil {
copiedEndpoints.Annotations = make(map[string]string)
}
copiedEndpoints.Annotations[consts.RemoteGatewayIdentity] = rcsw.link.GatewayIdentity

err = rcsw.updateMirrorEndpoints(ctx, copiedEndpoints)
if err != nil {
return RetryableError{[]error{err}}
}
}

ev.localService.Labels = rcsw.getMirroredServiceLabels(ev.remoteUpdate)
Expand Down Expand Up @@ -518,6 +546,10 @@ func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceCreated(ctx context.
}
}

if rcsw.isRemoteDiscovery(remoteService) {
// For remote discovery services, skip creating gateway endpoints.
return nil
}
return rcsw.createGatewayEndpoints(ctx, remoteService)
}

Expand Down Expand Up @@ -657,15 +689,19 @@ func (rcsw *RemoteClusterServiceWatcher) createOrUpdateService(service *corev1.S
lastMirroredRemoteVersion, ok := localService.Annotations[consts.RemoteResourceVersionAnnotation]
if ok && lastMirroredRemoteVersion != service.ResourceVersion {
endpoints, err := rcsw.localAPIClient.Endpoint().Lister().Endpoints(service.Namespace).Get(localName)
if err == nil {
rcsw.eventsQueue.Add(&RemoteServiceUpdated{
localService: localService,
localEndpoints: endpoints,
remoteUpdate: service,
})
return nil
if err != nil {
if kerrors.IsNotFound(err) {
endpoints = nil
} else {
return RetryableError{[]error{err}}
}
}
return RetryableError{[]error{err}}
rcsw.eventsQueue.Add(&RemoteServiceUpdated{
localService: localService,
localEndpoints: endpoints,
remoteUpdate: service,
})
return nil
}

return nil
Expand Down Expand Up @@ -1174,10 +1210,35 @@ func (rcsw *RemoteClusterServiceWatcher) updateReadiness(endpoints *corev1.Endpo
}

func (rcsw *RemoteClusterServiceWatcher) isExported(l map[string]string) bool {
// Treat an empty selector as "Nothing" instead of "Everything" so that
// when the selector field is unset, we don't export all Services.
if len(rcsw.link.Selector.MatchExpressions)+len(rcsw.link.Selector.MatchLabels) == 0 {
return false
}
selector, err := metav1.LabelSelectorAsSelector(&rcsw.link.Selector)
if err != nil {
rcsw.log.Errorf("Invalid selector: %s", err)
return false
}
return selector.Matches(labels.Set(l))
remoteDiscoverySelector, err := metav1.LabelSelectorAsSelector(&rcsw.link.RemoteDiscoverySelector)
if err != nil {
rcsw.log.Errorf("Invalid selector: %s", err)
return false
}
return selector.Matches(labels.Set(l)) || remoteDiscoverySelector.Matches(labels.Set(l))
}

func (rcsw *RemoteClusterServiceWatcher) isRemoteDiscovery(svc *corev1.Service) bool {
// Treat an empty remoteDisocverySelector as "Nothing" instead of
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Treat an empty remoteDisocverySelector as "Nothing" instead of
// Treat an empty remoteDiscoverySelector as "Nothing" instead of

// "Everything" so that when the remoteDiscoverySelector field is unset, we
// don't export all Services.
if len(rcsw.link.RemoteDiscoverySelector.MatchExpressions)+len(rcsw.link.RemoteDiscoverySelector.MatchLabels) == 0 {
return false
}
remoteDiscoverySelector, err := metav1.LabelSelectorAsSelector(&rcsw.link.RemoteDiscoverySelector)
if err != nil {
rcsw.log.Errorf("Invalid selector: %s", err)
return false
}
return remoteDiscoverySelector.Matches(labels.Set(svc.Labels))
}
Loading
Loading