Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[🍒][CDAP-20683] Propagate all exception details form system worker to appfabric when starting pipelines #15624

Merged
merged 1 commit into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.cdap.cdap.common.lang.ClassLoaders;
import io.cdap.cdap.common.logging.LoggingContext;
import io.cdap.cdap.common.logging.LoggingContextAccessor;
import io.cdap.cdap.common.logging.LoggingContextAccessor.LoggingContextRestorer;
import io.cdap.cdap.common.service.Retries;
import io.cdap.cdap.common.service.RetryStrategies;
import io.cdap.cdap.common.service.RetryStrategy;
Expand Down Expand Up @@ -518,7 +519,7 @@
return twillController != null;
}

public TwillController createTwillControllerFromRunRecord(RunRecordDetail runRecordDetail) {

Check warning on line 522 in cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/remote/RemoteExecutionTwillRunnerService.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.javadoc.MissingJavadocMethodCheck

Missing a Javadoc comment.
Map<String, String> systemArgs = runRecordDetail.getSystemArgs();
try {
ClusterMode clusterMode = ClusterMode.valueOf(
Expand Down Expand Up @@ -632,21 +633,29 @@

// If the startup task failed, publish failure state and delete the program running state
startupTaskCompletion.whenComplete((res, throwable) -> {
if (throwable == null) {
LOG.debug("Startup task completed for program run {}", programRunId);
} else {
LOG.error("Fail to start program run {}", programRunId, throwable);
// The startup task completion can be failed in multiple scenarios.
// It can be caused by the startup task failure.
// It can also be due to cancellation from the controller, or a start up timeout.
// In either case, always cancel the startup task. If the task is already completed, there is no.
startupTaskFuture.cancel(true);
try {
// Attempt to force kill the remote process. If there is no such process found, it won't throw.
processController.kill(RuntimeJobStatus.RUNNING);
} catch (Exception e) {
LOG.warn("Force termination of remote process for {} failed", programRunId, e);
Map<String, String> systemArgs = programOpts.getArguments().asMap();
LoggingContext loggingContext = LoggingContextHelper.getLoggingContextWithRunId(
programRunId, systemArgs);
try (LoggingContextRestorer restoreContext = LoggingContextAccessor.setLoggingContext(loggingContext)) {
if (throwable == null) {
LOG.debug("Startup task completed for program run {}", programRunId);
} else {
LOG.error("Fail to start program run {}", programRunId, throwable);
// The startup task completion can be failed in multiple scenarios.
// It can be caused by the startup task failure.
// It can also be due to cancellation from the controller, or a start up timeout.
// In either case, always cancel the startup task. If the task is already completed, there is no.
startupTaskFuture.cancel(true);
try {
// Attempt to force kill the remote process. If there is no such process found, it won't throw.
processController.kill(RuntimeJobStatus.RUNNING);
} catch (Exception e) {
LOG.warn("Force termination of remote process for {} failed", programRunId, e);
}
programStateWriter.error(programRunId, throwable);
}
} catch (Exception e) {
LOG.error("Exception caught while setting logging context for program run {}", programRunId, e);
programStateWriter.error(programRunId, throwable);
}
});
Expand Down Expand Up @@ -789,7 +798,7 @@
return cluster;
}

SSHConfig getSSHConfig() {

Check warning on line 801 in cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/remote/RemoteExecutionTwillRunnerService.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.naming.AbbreviationAsWordInNameCheck

Abbreviation in name 'getSSHConfig' must contain no more than '1' consecutive capital letters.
return sshConfig;
}

Expand All @@ -810,7 +819,7 @@
* Creates a {@link SSHKeyPair} by loading keys from the given location and {@link Cluster}
* properties.
*/
private SSHKeyPair createSSHKeyPair(Location keysDir, Cluster cluster) {

Check warning on line 822 in cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/remote/RemoteExecutionTwillRunnerService.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.naming.AbbreviationAsWordInNameCheck

Abbreviation in name 'createSSHKeyPair' must contain no more than '1' consecutive capital letters.
String sshUser = cluster.getProperties().get(Constants.RuntimeMonitor.SSH_USER);
if (sshUser == null) {
throw new IllegalStateException("Missing SSH user");
Expand All @@ -829,7 +838,7 @@
* @param keysDir the {@link Location} that contains the ssh keys
* @return a {@link SSHConfig}
*/
private SSHConfig createSSHConfig(Cluster cluster, Location keysDir) {

Check warning on line 841 in cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/remote/RemoteExecutionTwillRunnerService.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.naming.AbbreviationAsWordInNameCheck

Abbreviation in name 'createSSHConfig' must contain no more than '1' consecutive capital letters.
// Loads the SSH keys
SSHKeyPair sshKeyPair = createSSHKeyPair(keysDir, cluster);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2014 Cask Data, Inc.
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
Expand All @@ -14,6 +14,7 @@
* the License.
*/


package io.cdap.cdap.common.logging;

import java.util.Collections;
Expand All @@ -34,6 +35,9 @@
private static final InheritableThreadLocal<LoggingContext> loggingContext =
new InheritableThreadLocal<>();

public interface LoggingContextRestorer extends Cancellable, AutoCloseable {

Check warning on line 38 in cdap-common/src/main/java/io/cdap/cdap/common/logging/LoggingContextAccessor.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.javadoc.MissingJavadocTypeCheck

Missing a Javadoc comment.
// The AutoCloseable interface implicitly provides the close() method
}
/**
* Sets the logging context.
* <p>
Expand All @@ -46,7 +50,7 @@
* @return Cancellable that can be used to revert the logging context and MDC Map to its original
* value
*/
public static Cancellable setLoggingContext(LoggingContext context) {
public static LoggingContextRestorer setLoggingContext(LoggingContext context) {

Check warning on line 53 in cdap-common/src/main/java/io/cdap/cdap/common/logging/LoggingContextAccessor.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.whitespace.EmptyLineSeparatorCheck

'METHOD_DEF' should be separated from previous line.
final LoggingContext saveLoggingContext = loggingContext.get();
final Map saveContextMap = MDC.getCopyOfContextMap();
final Thread saveCurrentThread = Thread.currentThread();
Expand All @@ -59,21 +63,33 @@
// MDC will throw this if there is no valid binding. Normally this shouldn't happen, but in case it does,
// we'll just ignore it as it doesn't affect platform logic at all as we always use loggingContext.
}
return new Cancellable() {
return new LoggingContextRestorer() {
private boolean cancelled;

@Override
/**
* Cancels the current logging context change and restores the previous context.
*/
@Override // From Cancellable
public void cancel() {
if (Thread.currentThread() == saveCurrentThread && !cancelled) {
MDC.setContextMap(saveContextMap == null ? Collections.emptyMap() : saveContextMap);
loggingContext.set(saveLoggingContext);
cancelled = true;
}
}

/**
* AutoCloseable implementation to ensure restoration of context.
* Delegates to the cancel() method.
*/
@Override // From AutoCloseable
public void close() throws Exception {
cancel(); // Calls the cancel logic
}
};
}

/**

Check warning on line 92 in cdap-common/src/main/java/io/cdap/cdap/common/logging/LoggingContextAccessor.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.javadoc.SummaryJavadocCheck

Summary javadoc is missing.
* @return LoggingContext if it was set. Returns null otherwise.
*/
public static LoggingContext getLoggingContext() {
Expand Down
Loading