diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/remote/RemoteExecutionTwillRunnerService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/remote/RemoteExecutionTwillRunnerService.java index 3c77c8bc216e..97b8b9ff0829 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/remote/RemoteExecutionTwillRunnerService.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/remote/RemoteExecutionTwillRunnerService.java @@ -32,6 +32,7 @@ import io.cdap.cdap.common.lang.ClassLoaders; import io.cdap.cdap.common.logging.LoggingContext; import io.cdap.cdap.common.logging.LoggingContextAccessor; +import io.cdap.cdap.common.logging.LoggingContextAccessor.LoggingContextRestorer; import io.cdap.cdap.common.service.Retries; import io.cdap.cdap.common.service.RetryStrategies; import io.cdap.cdap.common.service.RetryStrategy; @@ -632,21 +633,29 @@ public TwillController create(@Nullable Callable startupTask, long timeout // If the startup task failed, publish failure state and delete the program running state startupTaskCompletion.whenComplete((res, throwable) -> { - if (throwable == null) { - LOG.debug("Startup task completed for program run {}", programRunId); - } else { - LOG.error("Fail to start program run {}", programRunId, throwable); - // The startup task completion can be failed in multiple scenarios. - // It can be caused by the startup task failure. - // It can also be due to cancellation from the controller, or a start up timeout. - // In either case, always cancel the startup task. If the task is already completed, there is no. - startupTaskFuture.cancel(true); - try { - // Attempt to force kill the remote process. If there is no such process found, it won't throw. - processController.kill(RuntimeJobStatus.RUNNING); - } catch (Exception e) { - LOG.warn("Force termination of remote process for {} failed", programRunId, e); + Map systemArgs = programOpts.getArguments().asMap(); + LoggingContext loggingContext = LoggingContextHelper.getLoggingContextWithRunId( + programRunId, systemArgs); + try (LoggingContextRestorer restoreContext = LoggingContextAccessor.setLoggingContext(loggingContext)) { + if (throwable == null) { + LOG.debug("Startup task completed for program run {}", programRunId); + } else { + LOG.error("Fail to start program run {}", programRunId, throwable); + // The startup task completion can be failed in multiple scenarios. + // It can be caused by the startup task failure. + // It can also be due to cancellation from the controller, or a start up timeout. + // In either case, always cancel the startup task. If the task is already completed, there is no. + startupTaskFuture.cancel(true); + try { + // Attempt to force kill the remote process. If there is no such process found, it won't throw. + processController.kill(RuntimeJobStatus.RUNNING); + } catch (Exception e) { + LOG.warn("Force termination of remote process for {} failed", programRunId, e); + } + programStateWriter.error(programRunId, throwable); } + } catch (Exception e) { + LOG.error("Exception caught while setting logging context for program run {}", programRunId, e); programStateWriter.error(programRunId, throwable); } }); diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/logging/LoggingContextAccessor.java b/cdap-common/src/main/java/io/cdap/cdap/common/logging/LoggingContextAccessor.java index 97ba7e35b458..adb98b09a10d 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/logging/LoggingContextAccessor.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/logging/LoggingContextAccessor.java @@ -1,5 +1,5 @@ /* - * Copyright © 2014 Cask Data, Inc. + * Copyright © 2024 Cask Data, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -14,6 +14,7 @@ * the License. */ + package io.cdap.cdap.common.logging; import java.util.Collections; @@ -34,6 +35,9 @@ public class LoggingContextAccessor { private static final InheritableThreadLocal loggingContext = new InheritableThreadLocal<>(); + public interface LoggingContextRestorer extends Cancellable, AutoCloseable { + // The AutoCloseable interface implicitly provides the close() method + } /** * Sets the logging context. *

@@ -46,7 +50,7 @@ public class LoggingContextAccessor { * @return Cancellable that can be used to revert the logging context and MDC Map to its original * value */ - public static Cancellable setLoggingContext(LoggingContext context) { + public static LoggingContextRestorer setLoggingContext(LoggingContext context) { final LoggingContext saveLoggingContext = loggingContext.get(); final Map saveContextMap = MDC.getCopyOfContextMap(); final Thread saveCurrentThread = Thread.currentThread(); @@ -59,10 +63,13 @@ public static Cancellable setLoggingContext(LoggingContext context) { // MDC will throw this if there is no valid binding. Normally this shouldn't happen, but in case it does, // we'll just ignore it as it doesn't affect platform logic at all as we always use loggingContext. } - return new Cancellable() { + return new LoggingContextRestorer() { private boolean cancelled; - @Override + /** + * Cancels the current logging context change and restores the previous context. + */ + @Override // From Cancellable public void cancel() { if (Thread.currentThread() == saveCurrentThread && !cancelled) { MDC.setContextMap(saveContextMap == null ? Collections.emptyMap() : saveContextMap); @@ -70,6 +77,15 @@ public void cancel() { cancelled = true; } } + + /** + * AutoCloseable implementation to ensure restoration of context. + * Delegates to the cancel() method. + */ + @Override // From AutoCloseable + public void close() throws Exception { + cancel(); // Calls the cancel logic + } }; }