Skip to content

Commit

Permalink
CDAP-20759: Wait for labels to be present on cluster before returning
Browse files Browse the repository at this point in the history
  • Loading branch information
tivv committed Aug 3, 2023
1 parent ac0951a commit 6d00c47
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -581,12 +583,14 @@ void updateClusterLabels(String clusterName,
* @param clusterName name of the cluster
* @param labelsToSet Key/Value pairs to set on the Dataproc cluster.
* @param labelsToRemove collection of labels to remove from the Dataproc cluster.
*
* @return future that would complete after label update is finished
*/
void updateClusterLabels(String clusterName,
Future<?> updateClusterLabels(String clusterName,
Map<String, String> labelsToSet,
Collection<String> labelsToRemove) throws RetryableProvisionException, InterruptedException {
if (labelsToSet.isEmpty() && labelsToRemove.isEmpty()) {
return;
return CompletableFuture.completedFuture(null);
}
try {
Cluster cluster = getDataprocCluster(clusterName)
Expand All @@ -601,7 +605,7 @@ void updateClusterLabels(String clusterName,
.allMatch(e -> Objects.equals(e.getValue(), existingLabels.get(e.getKey())))
&& labelsToRemove.stream().noneMatch(existingLabels::containsKey)
) {
return;
return CompletableFuture.completedFuture(null);
}
Map<String, String> newLabels = new HashMap<>(existingLabels);
newLabels.keySet().removeAll(labelsToRemove);
Expand All @@ -624,6 +628,7 @@ void updateClusterLabels(String clusterName,
LOG.warn("Encountered {} warning {} while setting labels on cluster:\n{}",
numWarnings, numWarnings > 1 ? "s" : "", String.join("\n", metadata.getWarningsList()));
}
return operationFuture;
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof ApiException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;

Check warning on line 44 in cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/DataprocProvisioner.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.imports.UnusedImportsCheck

Unused import - java.util.concurrent.ExecutionException.
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -339,10 +342,13 @@ private Cluster tryReuseCluster(DataprocClient client, ProvisionerContext contex
LOG.info("Found cluster to reuse: {}", clusterName);
// Add cdap-reuse-for to find cluster later if needed
// And remove reuseUntil to indicate the cluster is taken
client.updateClusterLabels(clusterName,
Collections.singletonMap(LABEL_RUN_KEY, clusterKey),
Map<String, String> runLabels = Collections.singletonMap(LABEL_RUN_KEY, clusterKey);
Future<?> updateLabelsFuture = client.updateClusterLabels(clusterName,
runLabels,
Collections.singleton(LABEL_REUSE_UNTIL)
);
// Ensure that label update happened - it may still be going
waitForLabelsUpdateToApply(client, conf, updateLabelsFuture, clusterName, runLabels);
return cluster.get();
}
} catch (Exception e) {
Expand All @@ -369,6 +375,41 @@ private Cluster tryReuseCluster(DataprocClient client, ProvisionerContext contex
}
}

/**
* Waits for the cluster to be found by the run label. The operation may take some time.
* Note that we don't want to wait for the whole operation to finish, we jsut need to be sure
* that cluster can be found by the labels.
* @param client dataproc client

Check warning on line 382 in cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/DataprocProvisioner.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.javadoc.RequireEmptyLineBeforeBlockTagGroupCheck

Javadoc tag '@param' should be preceded with an empty line.
* @param conf provisioner configuration
* @param updateLabelsFuture future for the cluster update operation
* @param clusterName cluster name
* @param runLabels map with labels to look for the cluster
* @throws Exception if wait failed, interrupted or cluster can't be found even after operation
* finished.

Check warning on line 388 in cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/DataprocProvisioner.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.javadoc.JavadocTagContinuationIndentationCheck

Line continuation have incorrect indentation level, expected level should be 4.
*/
private static void waitForLabelsUpdateToApply(DataprocClient client, DataprocConf conf,
Future<?> updateLabelsFuture, String clusterName,
Map<String, String> runLabels)
throws Exception {
boolean wasDone = false;
while (client.getClusters(runLabels).count() == 0) {
if (wasDone) {
// Future was competed before getClusters, but we still can't find the cluster
// Something's wrong here, let's try reuse again
throw new IllegalStateException("Label update was not reflected on cluster "
+ clusterName);
}
try {
updateLabelsFuture.get(conf.getClusterReuseRetryDelayMs(), TimeUnit.MILLISECONDS);
wasDone = true;
} catch (TimeoutException e) {
LOG.trace("Label update did not finish in {} ms, retry the check",
conf.getClusterReuseRetryDelayMs());
}

}
}

private boolean isReuseSupported(DataprocConf conf) {
return conf.isClusterReuseEnabled() && conf.isSkipDelete()
&& (conf.getIdleTtlMinutes() <= 0
Expand Down

0 comments on commit 6d00c47

Please sign in to comment.