Skip to content

Commit

Permalink
Add StartUpTask timeout errors to pipeline logs
Browse files Browse the repository at this point in the history
  • Loading branch information
dj-smart committed Apr 24, 2024
1 parent f9bd603 commit c8aae28
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.cdap.cdap.common.lang.ClassLoaders;
import io.cdap.cdap.common.logging.LoggingContext;
import io.cdap.cdap.common.logging.LoggingContextAccessor;
import io.cdap.cdap.common.logging.LoggingContextAccessor.LoggingContextRestorer;
import io.cdap.cdap.common.service.Retries;
import io.cdap.cdap.common.service.RetryStrategies;
import io.cdap.cdap.common.service.RetryStrategy;
Expand Down Expand Up @@ -632,21 +633,29 @@ public TwillController create(@Nullable Callable<Void> startupTask, long timeout

// If the startup task failed, publish failure state and delete the program running state
startupTaskCompletion.whenComplete((res, throwable) -> {
if (throwable == null) {
LOG.debug("Startup task completed for program run {}", programRunId);
} else {
LOG.error("Fail to start program run {}", programRunId, throwable);
// The startup task completion can be failed in multiple scenarios.
// It can be caused by the startup task failure.
// It can also be due to cancellation from the controller, or a start up timeout.
// In either case, always cancel the startup task. If the task is already completed, there is no.
startupTaskFuture.cancel(true);
try {
// Attempt to force kill the remote process. If there is no such process found, it won't throw.
processController.kill(RuntimeJobStatus.RUNNING);
} catch (Exception e) {
LOG.warn("Force termination of remote process for {} failed", programRunId, e);
Map<String, String> systemArgs = programOpts.getArguments().asMap();
LoggingContext loggingContext = LoggingContextHelper.getLoggingContextWithRunId(
programRunId, systemArgs);
try (LoggingContextRestorer restoreContext = LoggingContextAccessor.setLoggingContext(loggingContext)) {
if (throwable == null) {
LOG.debug("Startup task completed for program run {}", programRunId);
} else {
LOG.error("Fail to start program run {}", programRunId, throwable);
// The startup task completion can be failed in multiple scenarios.
// It can be caused by the startup task failure.
// It can also be due to cancellation from the controller, or a start up timeout.
// In either case, always cancel the startup task. If the task is already completed, there is no.
startupTaskFuture.cancel(true);
try {
// Attempt to force kill the remote process. If there is no such process found, it won't throw.
processController.kill(RuntimeJobStatus.RUNNING);
} catch (Exception e) {
LOG.warn("Force termination of remote process for {} failed", programRunId, e);
}
programStateWriter.error(programRunId, throwable);
}
} catch (Exception e) {
LOG.error("Exception caught while setting logging context for program run {}", programRunId, e);
programStateWriter.error(programRunId, throwable);
}
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2014 Cask Data, Inc.
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
Expand All @@ -14,6 +14,7 @@
* the License.
*/


package io.cdap.cdap.common.logging;

import java.util.Collections;
Expand All @@ -34,6 +35,9 @@ public class LoggingContextAccessor {
private static final InheritableThreadLocal<LoggingContext> loggingContext =
new InheritableThreadLocal<>();

public interface LoggingContextRestorer extends Cancellable, AutoCloseable {

Check warning on line 38 in cdap-common/src/main/java/io/cdap/cdap/common/logging/LoggingContextAccessor.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.javadoc.MissingJavadocTypeCheck

Missing a Javadoc comment.
// The AutoCloseable interface implicitly provides the close() method
}
/**
* Sets the logging context.
* <p>
Expand All @@ -46,7 +50,7 @@ public class LoggingContextAccessor {
* @return Cancellable that can be used to revert the logging context and MDC Map to its original
* value
*/
public static Cancellable setLoggingContext(LoggingContext context) {
public static LoggingContextRestorer setLoggingContext(LoggingContext context) {

Check warning on line 53 in cdap-common/src/main/java/io/cdap/cdap/common/logging/LoggingContextAccessor.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.whitespace.EmptyLineSeparatorCheck

'METHOD_DEF' should be separated from previous line.
final LoggingContext saveLoggingContext = loggingContext.get();
final Map saveContextMap = MDC.getCopyOfContextMap();
final Thread saveCurrentThread = Thread.currentThread();
Expand All @@ -59,17 +63,29 @@ public static Cancellable setLoggingContext(LoggingContext context) {
// MDC will throw this if there is no valid binding. Normally this shouldn't happen, but in case it does,
// we'll just ignore it as it doesn't affect platform logic at all as we always use loggingContext.
}
return new Cancellable() {
return new LoggingContextRestorer() {
private boolean cancelled;

@Override
/**
* Cancels the current logging context change and restores the previous context.
*/
@Override // From Cancellable
public void cancel() {
if (Thread.currentThread() == saveCurrentThread && !cancelled) {
MDC.setContextMap(saveContextMap == null ? Collections.emptyMap() : saveContextMap);
loggingContext.set(saveLoggingContext);
cancelled = true;
}
}

/**
* AutoCloseable implementation to ensure restoration of context.
* Delegates to the cancel() method.
*/
@Override // From AutoCloseable
public void close() throws Exception {
cancel(); // Calls the cancel logic
}
};
}

Expand Down

0 comments on commit c8aae28

Please sign in to comment.