Skip to content

Commit

Permalink
Update to v3 (#201)
Browse files Browse the repository at this point in the history
* Update to jcp version with v3 support

* Kick dockerhub

* Use only one snapshot collecting callback

* Fix ktlint and detekt

* Remove optional operator

* Change java-control-plane version to stable

* Remove duplicate snapshot collecting callback

* Migrate all v2 resources to v3.

* Remove setSni no longer needed for Envoys <= 1.14.0-dev

* Use one cache for resources

* Remove ApiVersion.V3 from RDS

* Restore v2/v3 caches

* Fix v2 cache metric

* tmp

* Added logic to generate V3 clusters

* Changed name

* Keep v3 clusters

* Fix v3 api

* Removed unused import

* Changed debug endpoint path, fixed deprecated fields

* Fixed metrics

* Code review changes, copied v3 cache test, change debug endpoint

* Fixed produces to work with older tools

Co-authored-by: slonka <[email protected]>
Co-authored-by: Lukasz Dziedziak <[email protected]>
  • Loading branch information
3 people authored Nov 9, 2020
1 parent 7aa1030 commit e73204a
Show file tree
Hide file tree
Showing 64 changed files with 1,827 additions and 632 deletions.
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ allprojects {
apply plugin: 'kotlin-spring'

project.ext.versions = [

kotlin : '1.3.72',
java_controlplane : '0.1.24-optimization-SNAPSHOT',
java_controlplane : '0.1.24',
spring_boot : '2.3.4.RELEASE',
grpc : '1.21.0',
jaxb : '2.3.1',
Expand Down
2 changes: 1 addition & 1 deletion docs/features/load_balancing.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ egress http connection manager in Envoy static config:
key: canary
type: STRING
remove: false
- name: envoy.router
- name: envoy.filters.http.router
```
The `envoy.filters.http.header_to_metadata` should be added before
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package pl.allegro.tech.servicemesh.envoycontrol;

import io.envoyproxy.controlplane.cache.CacheStatusInfo;
import io.envoyproxy.controlplane.cache.StatusInfo;
import java.util.ArrayList;
import java.util.Collection;
import javax.annotation.concurrent.ThreadSafe;

/**
* {@code GroupCacheStatusInfo} provides an implementation of {@link StatusInfo} for a group of {@link CacheStatusInfo}.
* This class is copy of {@link io.envoyproxy.controlplane.cache.GroupCacheStatusInfo}
*/
@ThreadSafe
class GroupCacheStatusInfo<T> implements StatusInfo<T> {
private final Collection<CacheStatusInfo<T>> statuses;

public GroupCacheStatusInfo(Collection<CacheStatusInfo<T>> statuses) {
this.statuses = new ArrayList<>(statuses);
}

/**
* {@inheritDoc}
*/
@Override
public long lastWatchRequestTime() {
return statuses.stream().mapToLong(CacheStatusInfo::lastWatchRequestTime).max().orElse(0);
}

/**
* {@inheritDoc}
*/
@Override
public T nodeGroup() {
return statuses.stream().map(CacheStatusInfo::nodeGroup).findFirst().orElse(null);
}

/**
* {@inheritDoc}
*/
@Override
public int numWatches() {
return statuses.stream().mapToInt(CacheStatusInfo::numWatches).sum();
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pl.allegro.tech.servicemesh.envoycontrol;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.protobuf.Message;
Expand All @@ -13,8 +14,8 @@
import io.envoyproxy.controlplane.cache.StatusInfo;
import io.envoyproxy.controlplane.cache.Watch;
import io.envoyproxy.controlplane.cache.WatchCancelledException;
import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment;
import io.envoyproxy.envoy.api.v2.DiscoveryRequest;
import io.envoyproxy.controlplane.cache.XdsRequest;
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -34,10 +35,12 @@
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static io.envoyproxy.controlplane.cache.Resources.RESOURCE_TYPES_IN_ORDER;

/**
* This class is copy of {@link io.envoyproxy.controlplane.cache.SimpleCache}
*/
public class SimpleCache<T> implements SnapshotCache<T> {
public class SimpleCache<T, U extends Snapshot> implements SnapshotCache<T, U> {

private static final Logger LOGGER = LoggerFactory.getLogger(SimpleCache.class);

Expand All @@ -49,8 +52,8 @@ public class SimpleCache<T> implements SnapshotCache<T> {
private final Lock writeLock = lock.writeLock();

@GuardedBy("lock")
private final Map<T, Snapshot> snapshots = new HashMap<>();
private final ConcurrentMap<T, CacheStatusInfo<T>> statuses = new ConcurrentHashMap<>();
private final Map<T, U> snapshots = new HashMap<>();
private final ConcurrentMap<T, ConcurrentMap<Resources.ResourceType, CacheStatusInfo<T>>> statuses = new ConcurrentHashMap<>();

private AtomicLong watchCount = new AtomicLong();

Expand All @@ -73,10 +76,10 @@ public boolean clearSnapshot(T group) {
// we take a writeLock to prevent watches from being created
writeLock.lock();
try {
CacheStatusInfo<T> status = statuses.get(group);
Map<Resources.ResourceType, CacheStatusInfo<T>> status = statuses.get(group);

// If we don't know about this group, do nothing.
if (status != null && status.numWatches() > 0) {
if (status != null && status.values().stream().mapToLong(CacheStatusInfo::numWatches).sum() > 0) {
LOGGER.warn("tried to clear snapshot for group with existing watches, group={}", group);

return false;
Expand All @@ -97,21 +100,30 @@ public boolean clearSnapshot(T group) {
@Override
public Watch createWatch(
boolean ads,
DiscoveryRequest request,
XdsRequest request,
Set<String> knownResourceNames,
Consumer<Response> responseConsumer,
boolean hasClusterChanged) {
Resources.ResourceType requestResourceType = request.getResourceType();
Preconditions.checkNotNull(requestResourceType, "unsupported type URL %s",
request.getTypeUrl());
T group;
if (request.v3Request() != null) {
group = groups.hash(request.v3Request().getNode());
} else {
group = groups.hash(request.v2Request().getNode());
}

T group = groups.hash(request.getNode());
// even though we're modifying, we take a readLock to allow multiple watches to be created in parallel since it
// doesn't conflict
readLock.lock();
try {
CacheStatusInfo<T> status = statuses.computeIfAbsent(group, g -> new CacheStatusInfo<>(group));
CacheStatusInfo<T> status = statuses.computeIfAbsent(group, g -> new ConcurrentHashMap<>())
.computeIfAbsent(requestResourceType, s -> new CacheStatusInfo<>(group));
status.setLastWatchRequestTime(System.currentTimeMillis());

Snapshot snapshot = snapshots.get(group);
String version = snapshot == null ? "" : snapshot.version(request.getTypeUrl(), request.getResourceNamesList());
U snapshot = snapshots.get(group);
String version = snapshot == null ? "" : snapshot.version(requestResourceType, request.getResourceNamesList());

Watch watch = new Watch(ads, request, responseConsumer);

Expand All @@ -124,15 +136,15 @@ public Watch createWatch(

// If any of the newly requested resources are in the snapshot respond immediately. If not we'll fall back to
// version comparisons.
if (snapshot.resources(request.getTypeUrl())
if (snapshot.resources(requestResourceType)
.keySet()
.stream()
.anyMatch(newResourceHints::contains)) {
respond(watch, snapshot, group);

return watch;
}
} else if (hasClusterChanged && request.getTypeUrl().equals(Resources.ENDPOINT_TYPE_URL)) {
} else if (hasClusterChanged && requestResourceType.equals(Resources.ResourceType.ENDPOINT)) {
respond(watch, snapshot, group);

return watch;
Expand Down Expand Up @@ -187,9 +199,10 @@ public Watch createWatch(

/**
* {@inheritDoc}
* @return
*/
@Override
public Snapshot getSnapshot(T group) {
public U getSnapshot(T group) {
readLock.lock();

try {
Expand All @@ -214,9 +227,9 @@ public Collection<T> groups() {
* It can be called concurrently for different groups.
*/
@Override
public void setSnapshot(T group, Snapshot snapshot) {
public void setSnapshot(T group, U snapshot) {
// we take a writeLock to prevent watches from being created while we update the snapshot
CacheStatusInfo<T> status;
ConcurrentMap<Resources.ResourceType, CacheStatusInfo<T>> status;
writeLock.lock();
try {
// Update the existing snapshot entry.
Expand All @@ -242,20 +255,27 @@ public StatusInfo statusInfo(T group) {
readLock.lock();

try {
return statuses.get(group);
ConcurrentMap<Resources.ResourceType, CacheStatusInfo<T>> statusMap = statuses.get(group);
if (statusMap == null || statusMap.isEmpty()) {
return null;
}

return new GroupCacheStatusInfo<>(statusMap.values());
} finally {
readLock.unlock();
}
}

@VisibleForTesting
protected void respondWithSpecificOrder(T group, Snapshot snapshot, CacheStatusInfo<T> status) {
for (String typeUrl : Resources.TYPE_URLS) {
protected void respondWithSpecificOrder(T group, U snapshot, ConcurrentMap<Resources.ResourceType, CacheStatusInfo<T>> statusMap) {
for (Resources.ResourceType resourceType : RESOURCE_TYPES_IN_ORDER) {
CacheStatusInfo<T> status = statusMap.get(resourceType);
if (status == null) continue; // todo: why this happens?
status.watchesRemoveIf((id, watch) -> {
if (!watch.request().getTypeUrl().equals(typeUrl)) {
if (!watch.request().getResourceType().equals(resourceType)) {
return false;
}
String version = snapshot.version(watch.request().getTypeUrl(), watch.request().getResourceNamesList());
String version = snapshot.version(watch.request().getResourceType(), watch.request().getResourceNamesList());

if (!watch.request().getVersionInfo().equals(version)) {
if (LOGGER.isDebugEnabled()) {
Expand All @@ -277,7 +297,7 @@ protected void respondWithSpecificOrder(T group, Snapshot snapshot, CacheStatusI
}
}

private Response createResponse(DiscoveryRequest request, Map<String, ? extends Message> resources, String version) {
private Response createResponse(XdsRequest request, Map<String, ? extends Message> resources, String version) {
Collection<? extends Message> filtered = request.getResourceNamesList().isEmpty()
? resources.values()
: request.getResourceNamesList().stream()
Expand All @@ -288,8 +308,8 @@ private Response createResponse(DiscoveryRequest request, Map<String, ? extends
return Response.create(request, filtered, version);
}

private boolean respond(Watch watch, Snapshot snapshot, T group) {
Map<String, ? extends Message> snapshotResources = snapshot.resources(watch.request().getTypeUrl());
private boolean respond(Watch watch, U snapshot, T group) {
Map<String, ? extends Message> snapshotResources = snapshot.resources(watch.request().getResourceType());
Map<String, ClusterLoadAssignment> snapshotForMissingResources = Collections.emptyMap();

if (!watch.request().getResourceNamesList().isEmpty() && watch.ads()) {
Expand All @@ -309,12 +329,12 @@ private boolean respond(Watch watch, Snapshot snapshot, T group) {
// If shouldSendMissingEndpoints is set to true, we will respond to such request anyway, to prevent
// such problems with Envoy.
if (shouldSendMissingEndpoints
&& watch.request().getTypeUrl().equals(Resources.ENDPOINT_TYPE_URL)) {
&& watch.request().getResourceType().equals(Resources.ResourceType.ENDPOINT)) {
LOGGER.info("adding missing resources [{}] to response for {} in ADS mode from node {} at version {}",
String.join(", ", missingNames),
watch.request().getTypeUrl(),
group,
snapshot.version(watch.request().getTypeUrl(), watch.request().getResourceNamesList())
snapshot.version(watch.request().getResourceType(), watch.request().getResourceNamesList())
);
snapshotForMissingResources = new HashMap<>(missingNames.size());
for (String missingName : missingNames) {
Expand All @@ -328,7 +348,7 @@ private boolean respond(Watch watch, Snapshot snapshot, T group) {
"not responding in ADS mode for {} from node {} at version {} for request [{}] since [{}] not in snapshot",
watch.request().getTypeUrl(),
group,
snapshot.version(watch.request().getTypeUrl(), watch.request().getResourceNamesList()),
snapshot.version(watch.request().getResourceType(), watch.request().getResourceNamesList()),
String.join(", ", watch.request().getResourceNamesList()),
String.join(", ", missingNames));

Expand All @@ -337,7 +357,7 @@ private boolean respond(Watch watch, Snapshot snapshot, T group) {
}
}

String version = snapshot.version(watch.request().getTypeUrl(), watch.request().getResourceNamesList());
String version = snapshot.version(watch.request().getResourceType(), watch.request().getResourceNamesList());

LOGGER.debug("responding for {} from node {} at version {} with version {}",
watch.request().getTypeUrl(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package pl.allegro.tech.servicemesh.envoycontrol.v2;

import io.envoyproxy.controlplane.cache.NodeGroup;
import io.envoyproxy.controlplane.cache.v2.Snapshot;

public class SimpleCache<T> extends pl.allegro.tech.servicemesh.envoycontrol.SimpleCache<T, Snapshot> {
public SimpleCache(NodeGroup<T> nodeGroup, Boolean shouldSendMissingEndpoints) {
super(nodeGroup, shouldSendMissingEndpoints);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package pl.allegro.tech.servicemesh.envoycontrol.v3;

import io.envoyproxy.controlplane.cache.NodeGroup;
import io.envoyproxy.controlplane.cache.v3.Snapshot;

public class SimpleCache<T> extends pl.allegro.tech.servicemesh.envoycontrol.SimpleCache<T, Snapshot> {
public SimpleCache(NodeGroup<T> nodeGroup, Boolean shouldSendMissingEndpoints) {
super(nodeGroup, shouldSendMissingEndpoints);
}
}
Loading

0 comments on commit e73204a

Please sign in to comment.