Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
support task domains in the worker spring configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
v1r3n committed Mar 27, 2023
1 parent 8c65ec1 commit 9465a46
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,10 @@ public int getThreadCount(String taskName) {
String key = "conductor.worker." + taskName + ".threadCount";
return environment.getProperty(key, Integer.class, 0);
}

@Override
public String getDomain(String taskName) {
String key = "conductor.worker." + taskName + ".domain";
return environment.getProperty(key, String.class, null);
}
}
3 changes: 2 additions & 1 deletion client-spring/src/test/resources/application.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
conductor.client.rootUri=http://localhost:8080/api/
conductor.worker.hello.threadCount=100
conductor.worker.hello.threadCount=100
conductor.worker.hello_again.domain=test
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import java.lang.reflect.Method;
import java.util.*;

import com.google.common.base.Strings;
import com.netflix.conductor.common.utils.EnvUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -42,6 +44,8 @@ public class AnnotatedWorkerExecutor {

private Map<String, Integer> workerToPollingInterval = new HashMap<>();

private Map<String, String> workerDomains = new HashMap<>();

private Map<String, Object> workerClassObjs = new HashMap<>();

private static Set<String> scannedPackages = new HashSet<>();
Expand Down Expand Up @@ -163,6 +167,14 @@ private void addMethod(WorkerTask annotation, Method method, Object bean) {
}
workerToPollingInterval.put(name, pollingInterval);

String domain = workerConfiguration.getDomain(name);
if(Strings.isNullOrEmpty(domain)) {
domain = annotation.domain();
}
if(!Strings.isNullOrEmpty(domain)) {
workerDomains.put(name, domain);
}

workerClassObjs.put(name, bean);
workerExecutors.put(name, method);
LOGGER.info(
Expand All @@ -187,10 +199,12 @@ public void startPolling() {
}

LOGGER.info("Starting workers with threadCount {}", workerToThreadCount);
LOGGER.info("Worker domains {}", workerDomains);

taskRunner =
new TaskRunnerConfigurer.Builder(taskClient, executors)
.withTaskThreadCount(workerToThreadCount)
.withTaskToDomain(workerDomains)
.build();

taskRunner.init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,8 @@ public int getPollingInterval(String taskName) {
public int getThreadCount(String taskName) {
return 0;
}

public String getDomain(String taskName) {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,6 @@
int threadCount() default 1;

int pollingInterval() default 100;

String domain() default "";
}

0 comments on commit 9465a46

Please sign in to comment.