Skip to content

Commit

Permalink
Merge pull request #15615 from cdapio/CDAP-20980-cherrypick
Browse files Browse the repository at this point in the history
[🍒][CDAP-20980] Do not retry service exception in task workers
  • Loading branch information
itsankit-google authored Apr 25, 2024
2 parents 3d75c90 + 7c63d21 commit d1f8cd6
Showing 1 changed file with 8 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@
import java.util.zip.DeflaterInputStream;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Helper class for executing a {@link RunnableTaskRequest} on a remote worker.
Expand All @@ -65,13 +63,14 @@ public class RemoteTaskExecutor {
private static final Gson GSON = new Gson();
private static final String TASK_WORKER_URL = "/worker/run";
private static final String SYSTEM_WORKER_URL = "/system/run";
private static final Predicate<Throwable> RETRYABLE_PREDICATE = throwable -> {
return (throwable instanceof RetryableException) || (throwable instanceof ServiceException);
};
private static final Logger LOG = LoggerFactory.getLogger(RemoteTaskExecutor.class);
private static final Predicate<Throwable> RETRYABLE_PREDICATE_SYSTEM_WORKER = throwable ->
(throwable instanceof RetryableException) || (throwable instanceof ServiceException);
private static final Predicate<Throwable> RETRYABLE_PREDICATE_TASK_WORKER = throwable ->
(throwable instanceof RetryableException);
private final boolean compression;
private final RemoteClient remoteClient;
private final RetryStrategy retryStrategy;
private final Predicate<Throwable> retryablePredicate;
private final MetricsCollectionService metricsCollectionService;
private final String workerUrl;

Expand All @@ -95,10 +94,12 @@ public RemoteTaskExecutor(CConfiguration cConf, MetricsCollectionService metrics
this.workerUrl = TASK_WORKER_URL;
this.retryStrategy = RetryStrategies.fromConfiguration(cConf,
Constants.Service.TASK_WORKER + ".");
this.retryablePredicate = RETRYABLE_PREDICATE_TASK_WORKER;
} else {
this.workerUrl = SYSTEM_WORKER_URL;
this.retryStrategy = RetryStrategies.fromConfiguration(cConf,
Constants.Service.SYSTEM_WORKER + ".");
this.retryablePredicate = RETRYABLE_PREDICATE_SYSTEM_WORKER;
}
}

Expand Down Expand Up @@ -147,7 +148,7 @@ public byte[] runTask(RunnableTaskRequest runnableTaskRequest) throws Exception
String.format("Received exception %s for %s", e.getMessage(),
runnableTaskRequest.getClassName()));
}
}, retryStrategy, RETRYABLE_PREDICATE);
}, retryStrategy, retryablePredicate);
} catch (ServiceException se) {
Exception ex = getTaskException(se);
//emit metrics with failed result
Expand Down

0 comments on commit d1f8cd6

Please sign in to comment.