diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/remote/RemoteExecutionTwillRunnerService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/remote/RemoteExecutionTwillRunnerService.java index 3c77c8bc216e..97b8b9ff0829 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/remote/RemoteExecutionTwillRunnerService.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/remote/RemoteExecutionTwillRunnerService.java @@ -32,6 +32,7 @@ import io.cdap.cdap.common.lang.ClassLoaders; import io.cdap.cdap.common.logging.LoggingContext; import io.cdap.cdap.common.logging.LoggingContextAccessor; +import io.cdap.cdap.common.logging.LoggingContextAccessor.LoggingContextRestorer; import io.cdap.cdap.common.service.Retries; import io.cdap.cdap.common.service.RetryStrategies; import io.cdap.cdap.common.service.RetryStrategy; @@ -632,21 +633,29 @@ public TwillController create(@Nullable Callable startupTask, long timeout // If the startup task failed, publish failure state and delete the program running state startupTaskCompletion.whenComplete((res, throwable) -> { - if (throwable == null) { - LOG.debug("Startup task completed for program run {}", programRunId); - } else { - LOG.error("Fail to start program run {}", programRunId, throwable); - // The startup task completion can be failed in multiple scenarios. - // It can be caused by the startup task failure. - // It can also be due to cancellation from the controller, or a start up timeout. - // In either case, always cancel the startup task. If the task is already completed, there is no. - startupTaskFuture.cancel(true); - try { - // Attempt to force kill the remote process. If there is no such process found, it won't throw. - processController.kill(RuntimeJobStatus.RUNNING); - } catch (Exception e) { - LOG.warn("Force termination of remote process for {} failed", programRunId, e); + Map systemArgs = programOpts.getArguments().asMap(); + LoggingContext loggingContext = LoggingContextHelper.getLoggingContextWithRunId( + programRunId, systemArgs); + try (LoggingContextRestorer restoreContext = LoggingContextAccessor.setLoggingContext(loggingContext)) { + if (throwable == null) { + LOG.debug("Startup task completed for program run {}", programRunId); + } else { + LOG.error("Fail to start program run {}", programRunId, throwable); + // The startup task completion can be failed in multiple scenarios. + // It can be caused by the startup task failure. + // It can also be due to cancellation from the controller, or a start up timeout. + // In either case, always cancel the startup task. If the task is already completed, there is no. + startupTaskFuture.cancel(true); + try { + // Attempt to force kill the remote process. If there is no such process found, it won't throw. + processController.kill(RuntimeJobStatus.RUNNING); + } catch (Exception e) { + LOG.warn("Force termination of remote process for {} failed", programRunId, e); + } + programStateWriter.error(programRunId, throwable); } + } catch (Exception e) { + LOG.error("Exception caught while setting logging context for program run {}", programRunId, e); programStateWriter.error(programRunId, throwable); } }); diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/logging/LoggingContextAccessor.java b/cdap-common/src/main/java/io/cdap/cdap/common/logging/LoggingContextAccessor.java index 97ba7e35b458..10eb995a1af9 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/logging/LoggingContextAccessor.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/logging/LoggingContextAccessor.java @@ -1,17 +1,11 @@ /* - * Copyright © 2014 Cask Data, Inc. + * Copyright © 2024 Cask Data, Inc. * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package io.cdap.cdap.common.logging; @@ -34,6 +28,9 @@ public class LoggingContextAccessor { private static final InheritableThreadLocal loggingContext = new InheritableThreadLocal<>(); + public interface LoggingContextRestorer extends Cancellable, AutoCloseable { + // The AutoCloseable interface implicitly provides the close() method + } /** * Sets the logging context. *

@@ -46,7 +43,7 @@ public class LoggingContextAccessor { * @return Cancellable that can be used to revert the logging context and MDC Map to its original * value */ - public static Cancellable setLoggingContext(LoggingContext context) { + public static LoggingContextRestorer setLoggingContext(LoggingContext context) { final LoggingContext saveLoggingContext = loggingContext.get(); final Map saveContextMap = MDC.getCopyOfContextMap(); final Thread saveCurrentThread = Thread.currentThread(); @@ -59,10 +56,13 @@ public static Cancellable setLoggingContext(LoggingContext context) { // MDC will throw this if there is no valid binding. Normally this shouldn't happen, but in case it does, // we'll just ignore it as it doesn't affect platform logic at all as we always use loggingContext. } - return new Cancellable() { + return new LoggingContextRestorer() { private boolean cancelled; - @Override + /** + * Cancels the current logging context change and restores the previous context. + */ + @Override // From Cancellable public void cancel() { if (Thread.currentThread() == saveCurrentThread && !cancelled) { MDC.setContextMap(saveContextMap == null ? Collections.emptyMap() : saveContextMap); @@ -70,6 +70,15 @@ public void cancel() { cancelled = true; } } + + /** + * AutoCloseable implementation to ensure context restoration. + * Delegates to the cancel() method. + */ + @Override // From AutoCloseable + public void close() throws Exception { + cancel(); // Calls the cancel logic + } }; }