diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/internal/remote/RemoteTaskExecutor.java b/cdap-common/src/main/java/io/cdap/cdap/common/internal/remote/RemoteTaskExecutor.java index adfa423ee070..7ae94e2e965c 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/internal/remote/RemoteTaskExecutor.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/internal/remote/RemoteTaskExecutor.java @@ -54,8 +54,6 @@ import java.util.zip.DeflaterInputStream; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Helper class for executing a {@link RunnableTaskRequest} on a remote worker. @@ -65,13 +63,14 @@ public class RemoteTaskExecutor { private static final Gson GSON = new Gson(); private static final String TASK_WORKER_URL = "/worker/run"; private static final String SYSTEM_WORKER_URL = "/system/run"; - private static final Predicate RETRYABLE_PREDICATE = throwable -> { - return (throwable instanceof RetryableException) || (throwable instanceof ServiceException); - }; - private static final Logger LOG = LoggerFactory.getLogger(RemoteTaskExecutor.class); + private static final Predicate RETRYABLE_PREDICATE_SYSTEM_WORKER = throwable -> + (throwable instanceof RetryableException) || (throwable instanceof ServiceException); + private static final Predicate RETRYABLE_PREDICATE_TASK_WORKER = throwable -> + (throwable instanceof RetryableException); private final boolean compression; private final RemoteClient remoteClient; private final RetryStrategy retryStrategy; + private final Predicate retryablePredicate; private final MetricsCollectionService metricsCollectionService; private final String workerUrl; @@ -95,10 +94,12 @@ public RemoteTaskExecutor(CConfiguration cConf, MetricsCollectionService metrics this.workerUrl = TASK_WORKER_URL; this.retryStrategy = RetryStrategies.fromConfiguration(cConf, Constants.Service.TASK_WORKER + "."); + this.retryablePredicate = RETRYABLE_PREDICATE_TASK_WORKER; } else { this.workerUrl = SYSTEM_WORKER_URL; this.retryStrategy = RetryStrategies.fromConfiguration(cConf, Constants.Service.SYSTEM_WORKER + "."); + this.retryablePredicate = RETRYABLE_PREDICATE_SYSTEM_WORKER; } } @@ -147,7 +148,7 @@ public byte[] runTask(RunnableTaskRequest runnableTaskRequest) throws Exception String.format("Received exception %s for %s", e.getMessage(), runnableTaskRequest.getClassName())); } - }, retryStrategy, RETRYABLE_PREDICATE); + }, retryStrategy, retryablePredicate); } catch (ServiceException se) { Exception ex = getTaskException(se); //emit metrics with failed result