Skip to content

Commit

Permalink
Add StartUpTask timeout errors to pipeline logs
Browse files Browse the repository at this point in the history
  • Loading branch information
dj-smart committed Apr 24, 2024
1 parent f9bd603 commit ef671b0
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.cdap.cdap.common.lang.ClassLoaders;
import io.cdap.cdap.common.logging.LoggingContext;
import io.cdap.cdap.common.logging.LoggingContextAccessor;
import io.cdap.cdap.common.logging.LoggingContextAccessor.LoggingContextRestorer;
import io.cdap.cdap.common.service.Retries;
import io.cdap.cdap.common.service.RetryStrategies;
import io.cdap.cdap.common.service.RetryStrategy;
Expand Down Expand Up @@ -632,21 +633,29 @@ public TwillController create(@Nullable Callable<Void> startupTask, long timeout

// If the startup task failed, publish failure state and delete the program running state
startupTaskCompletion.whenComplete((res, throwable) -> {
if (throwable == null) {
LOG.debug("Startup task completed for program run {}", programRunId);
} else {
LOG.error("Fail to start program run {}", programRunId, throwable);
// The startup task completion can be failed in multiple scenarios.
// It can be caused by the startup task failure.
// It can also be due to cancellation from the controller, or a start up timeout.
// In either case, always cancel the startup task. If the task is already completed, there is no.
startupTaskFuture.cancel(true);
try {
// Attempt to force kill the remote process. If there is no such process found, it won't throw.
processController.kill(RuntimeJobStatus.RUNNING);
} catch (Exception e) {
LOG.warn("Force termination of remote process for {} failed", programRunId, e);
Map<String, String> systemArgs = programOpts.getArguments().asMap();
LoggingContext loggingContext = LoggingContextHelper.getLoggingContextWithRunId(
programRunId, systemArgs);
try (LoggingContextRestorer restoreContext = LoggingContextAccessor.setLoggingContext(loggingContext)) {
if (throwable == null) {
LOG.debug("Startup task completed for program run {}", programRunId);
} else {
LOG.error("Fail to start program run {}", programRunId, throwable);
// The startup task completion can be failed in multiple scenarios.
// It can be caused by the startup task failure.
// It can also be due to cancellation from the controller, or a start up timeout.
// In either case, always cancel the startup task. If the task is already completed, there is no.
startupTaskFuture.cancel(true);
try {
// Attempt to force kill the remote process. If there is no such process found, it won't throw.
processController.kill(RuntimeJobStatus.RUNNING);
} catch (Exception e) {
LOG.warn("Force termination of remote process for {} failed", programRunId, e);
}
programStateWriter.error(programRunId, throwable);
}
} catch (Exception e) {
LOG.error("Exception caught while setting logging context for program run {}", programRunId, e);
programStateWriter.error(programRunId, throwable);
}
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
/*
* Copyright © 2014 Cask Data, Inc.
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

Check failure on line 4 in cdap-common/src/main/java/io/cdap/cdap/common/logging/LoggingContextAccessor.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.sizes.LineLengthCheck

Line is longer than 120 characters (found 171).
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Check failure on line 8 in cdap-common/src/main/java/io/cdap/cdap/common/logging/LoggingContextAccessor.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.sizes.LineLengthCheck

Line is longer than 120 characters (found 308).
*/

package io.cdap.cdap.common.logging;
Expand All @@ -34,6 +28,9 @@ public class LoggingContextAccessor {
private static final InheritableThreadLocal<LoggingContext> loggingContext =
new InheritableThreadLocal<>();

public interface LoggingContextRestorer extends Cancellable, AutoCloseable {

Check warning on line 31 in cdap-common/src/main/java/io/cdap/cdap/common/logging/LoggingContextAccessor.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.javadoc.MissingJavadocTypeCheck

Missing a Javadoc comment.
// The AutoCloseable interface implicitly provides the close() method
}
/**
* Sets the logging context.
* <p>
Expand All @@ -46,7 +43,7 @@ public class LoggingContextAccessor {
* @return Cancellable that can be used to revert the logging context and MDC Map to its original
* value
*/
public static Cancellable setLoggingContext(LoggingContext context) {
public static LoggingContextRestorer setLoggingContext(LoggingContext context) {

Check warning on line 46 in cdap-common/src/main/java/io/cdap/cdap/common/logging/LoggingContextAccessor.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.whitespace.EmptyLineSeparatorCheck

'METHOD_DEF' should be separated from previous line.
final LoggingContext saveLoggingContext = loggingContext.get();
final Map saveContextMap = MDC.getCopyOfContextMap();
final Thread saveCurrentThread = Thread.currentThread();
Expand All @@ -59,17 +56,29 @@ public static Cancellable setLoggingContext(LoggingContext context) {
// MDC will throw this if there is no valid binding. Normally this shouldn't happen, but in case it does,
// we'll just ignore it as it doesn't affect platform logic at all as we always use loggingContext.
}
return new Cancellable() {
return new LoggingContextRestorer() {
private boolean cancelled;

@Override
/**
* Cancels the current logging context change and restores the previous context.
*/
@Override // From Cancellable
public void cancel() {
if (Thread.currentThread() == saveCurrentThread && !cancelled) {
MDC.setContextMap(saveContextMap == null ? Collections.emptyMap() : saveContextMap);
loggingContext.set(saveLoggingContext);
cancelled = true;
}
}

/**
* AutoCloseable implementation to ensure context restoration.
* Delegates to the cancel() method.
*/
@Override // From AutoCloseable
public void close() throws Exception {
cancel(); // Calls the cancel logic
}
};
}

Expand Down

0 comments on commit ef671b0

Please sign in to comment.