From 588a2d26cbab767e10e6d3fdc126c76083e99985 Mon Sep 17 00:00:00 2001 From: Christian Beikov Date: Thu, 28 May 2020 17:18:42 +0200 Subject: [PATCH] [#9, #10] Support declarative partition configuration and long running jobs --- .../java/com/blazebit/job/JobContext.java | 280 +++++++++---- .../java/com/blazebit/job/JobInstance.java | 19 + .../com/blazebit/job/JobInstanceState.java | 6 +- .../java/com/blazebit/job/JobManager.java | 10 + .../main/java/com/blazebit/job/Partition.java | 91 +++++ .../java/com/blazebit/job/PartitionKey.java | 53 ++- .../java/com/blazebit/job/Partitions.java | 41 ++ .../com/blazebit/job/spi/JobScheduler.java | 23 ++ .../job/impl/JobSchedulerCancelEvent.java | 37 ++ .../blazebit/job/impl/JobSchedulerImpl.java | 377 +++++++++++++++-- .../job/impl/JobSchedulerStatusEvent.java | 48 +++ .../job/impl/JobSchedulerTraceEvent.java | 48 +++ .../blazebit/job/view/model/AbstractJob.java | 7 - .../job/view/model/AbstractJobInstance.java | 5 + .../view/model/EntityViewPartitionKey.java | 151 ++++++- .../job/view/model/JobConfigurationView.java | 2 +- .../view/storage/EntityViewJobManager.java | 28 +- .../EntityViewPartitionKeyProvider.java | 382 ++++++++++++------ .../job/jpa/model/AbstractJobInstance.java | 5 + .../job/jpa/model/JpaPartitionKey.java | 137 ++++++- .../job/jpa/storage/JpaJobManager.java | 40 +- .../jpa/storage/JpaPartitionKeyProvider.java | 284 +++++++++++-- .../job/memory/model/AbstractJobInstance.java | 5 + .../job/memory/storage/MemoryJobManager.java | 11 + .../storage/MemoryPartitionKeyProvider.java | 20 +- parent/pom.xml | 4 +- .../job/testsuite/SimpleJobInstance.java | 11 + .../job/testsuite/JobInstanceTest.java | 160 +++++++- 28 files changed, 1975 insertions(+), 310 deletions(-) create mode 100644 core/api/src/main/java/com/blazebit/job/Partition.java create mode 100644 core/api/src/main/java/com/blazebit/job/Partitions.java create mode 100644 core/impl/src/main/java/com/blazebit/job/impl/JobSchedulerCancelEvent.java create mode 100644 core/impl/src/main/java/com/blazebit/job/impl/JobSchedulerStatusEvent.java create mode 100644 core/impl/src/main/java/com/blazebit/job/impl/JobSchedulerTraceEvent.java diff --git a/core/api/src/main/java/com/blazebit/job/JobContext.java b/core/api/src/main/java/com/blazebit/job/JobContext.java index 19e7a58..6d262e1 100644 --- a/core/api/src/main/java/com/blazebit/job/JobContext.java +++ b/core/api/src/main/java/com/blazebit/job/JobContext.java @@ -36,7 +36,6 @@ import java.util.List; import java.util.Map; import java.util.ServiceLoader; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -93,9 +92,9 @@ public interface JobContext extends ServiceProvider, ConfigurationSource { /** * Returns all partition keys. * - * @return The list of all partition keys + * @return The partition keys by name */ - Collection getPartitionKeys(); + Map getPartitionKeys(); /** * Returns the matching partition keys for the given job instance. @@ -105,6 +104,29 @@ public interface JobContext extends ServiceProvider, ConfigurationSource { */ Collection getPartitionKeys(JobInstance jobInstance); + /** + * Returns the cluster position where the given long running job instance is running, or -1. + * + * @param jobInstance The long running job instance + * @return The cluster position + */ + int getClusterPosition(JobInstance jobInstance); + + /** + * Returns the thread stack trace of the given long running job instance if it is still running, or null. + * + * @param jobInstance The long running job instance + * @return The trace of the job instance processor + */ + String getTrace(JobInstance jobInstance); + + /** + * Cancels the given long running job instance if it is still running. + * + * @param jobInstance The long running job instance to cancel + */ + void cancel(JobInstance jobInstance); + /** * Refreshes the job instance schedules for the given job instance. * @@ -157,6 +179,27 @@ public interface JobContext extends ServiceProvider, ConfigurationSource { */ void stop(long timeout, TimeUnit unit) throws InterruptedException; + /** + * The default transaction timeout for job processing of the partition. + * + * @return The default transaction timeout + */ + int getTransactionTimeoutMillis(); + + /** + * The default amount of seconds to backoff when a job processor throws a {@link JobTemporaryException}. + * + * @return The default temporary error backoff + */ + int getTemporaryErrorBackoffSeconds(); + + /** + * The default amount of seconds to backoff when a job processor throws a {@link JobRateLimitException}. + * + * @return The default rate limit backoff + */ + int getRateLimitBackoffSeconds(); + /** * Returns a builder for a job context. * @@ -196,7 +239,10 @@ class BuilderBase> { private PartitionKeyProviderFactory partitionKeyProviderFactory; private PartitionKeyProvider partitionKeyProvider; private boolean scheduleRefreshedOnly; - private final Map partitionKeys = new HashMap<>(); + private int transactionTimeoutMillis = -1; + private int temporaryErrorBackoffSeconds = -1; + private int rateLimitBackoffSeconds = -1; + private final Map partitionKeys = new HashMap<>(); private final List jobTriggerListeners = new ArrayList<>(); private final List jobInstanceListeners = new ArrayList<>(); private final Map properties = new HashMap<>(); @@ -290,22 +336,7 @@ protected void checkCreateContext() { */ public JobContext createContext() { checkCreateContext(); - return new DefaultJobContext( - transactionSupport, - getJobManagerFactory(), - getOrCreateActorContext(), - getScheduleFactory(), - getJobSchedulerFactory(), - getJobProcessorFactory(), - getJobInstanceProcessorFactory(), - getPartitionKeyMap(), - getPartitionKeyProvider(), - getJobTriggerListeners(), - getJobInstanceListeners(), - properties, - serviceMap, - isScheduleRefreshedOnly() - ); + return new DefaultJobContext(this); } /** @@ -495,16 +526,7 @@ public T withJobSchedulerFactory(JobSchedulerFactory jobSchedulerFactory) { * * @return the configured partition keys */ - public Set getPartitionKeys() { - return partitionKeys.keySet(); - } - - /** - * Returns the configured partition key map. - * - * @return the configured partition key map - */ - protected Map getPartitionKeyMap() { + public Map getPartitionKeys() { return partitionKeys; } @@ -512,11 +534,10 @@ protected Map getPartitionKeyMap() { * Adds the given partition key and sets the amount of elements that should be processed at once. * * @param partitionKey The partition key - * @param processingCount The amount of elements to process at once for the partition * @return this for chaining */ - public T withPartitionKey(PartitionKey partitionKey, int processingCount) { - this.partitionKeys.put(partitionKey, processingCount); + public T withPartitionKey(PartitionKey partitionKey) { + this.partitionKeys.put(partitionKey.getName(), partitionKey); return (T) this; } @@ -643,6 +664,66 @@ public T withScheduleRefreshedOnly(boolean scheduleRefreshedOnly) { return (T) this; } + /** + * Returns the default transaction timeout. + * + * @return the default transaction timeout + */ + public int getTransactionTimeoutMillis() { + return transactionTimeoutMillis; + } + + /** + * Sets the default transaction timeout. + * + * @param transactionTimeoutMillis The job id attribute name + * @return this for chaining + */ + public T withTransactionTimeoutMillis(int transactionTimeoutMillis) { + this.transactionTimeoutMillis = transactionTimeoutMillis; + return (T) this; + } + + /** + * Returns the default temporary error backoff. + * + * @return the default temporary error backoff + */ + public int getTemporaryErrorBackoffSeconds() { + return temporaryErrorBackoffSeconds; + } + + /** + * Sets the given temporary error backoff. + * + * @param temporaryErrorBackoffSeconds The job id attribute name + * @return this for chaining + */ + public T withTemporaryErrorBackoffSeconds(int temporaryErrorBackoffSeconds) { + this.temporaryErrorBackoffSeconds = temporaryErrorBackoffSeconds; + return (T) this; + } + + /** + * Returns the default rate limit backoff. + * + * @return the default rate limit backoff + */ + public int getRateLimitBackoffSeconds() { + return rateLimitBackoffSeconds; + } + + /** + * Sets the given rate limit backoff. + * + * @param rateLimitBackoffSeconds The job id attribute name + * @return this for chaining + */ + public T withRateLimitBackoffSeconds(int rateLimitBackoffSeconds) { + this.rateLimitBackoffSeconds = rateLimitBackoffSeconds; + return (T) this; + } + /** * Returns the configured properties. * @@ -724,77 +805,75 @@ public T withService(Class serviceClass, X service) { */ protected static class DefaultJobContext implements JobContext { private static final String DEFAULT_JOB_INSTANCE_ACTOR_NAME = "jobInstanceScheduler"; - private static final int DEFAULT_JOB_INSTANCE_PROCESS_COUNT = 1; private static final String DEFAULT_JOB_TRIGGER_ACTOR_NAME = "jobTriggerScheduler"; - private static final int DEFAULT_JOB_TRIGGER_PROCESS_COUNT = 1; private final TransactionSupport transactionSupport; private final JobManager jobManager; private final ScheduleFactory scheduleFactory; private final JobProcessorFactory jobProcessorFactory; private final JobInstanceProcessorFactory jobInstanceProcessorFactory; - private final PartitionKeyProvider partitionKeyProvider; private final Map jobSchedulers; + private final Map partitionKeys; private final Map, List> jobInstanceClassToPartitionKeysMapping = new ConcurrentHashMap<>(); private final JobInstanceListener[] jobInstanceListeners; private final Map properties; private final Map, Object> serviceMap; private final boolean scheduleRefreshedOnly; + private final int transactionTimeoutMillis; + private final int temporaryErrorBackoffSeconds; + private final int rateLimitBackoffSeconds; /** * Creates a job context from the given configuration. * - * @param transactionSupport The transaction support - * @param jobManagerFactory The job manager factory - * @param actorContext The actor context - * @param scheduleFactory The schedule factory - * @param jobSchedulerFactory The job scheduler factory - * @param jobProcessorFactory The job processor factory - * @param jobInstanceProcessorFactory The job instance processor factory - * @param partitionKeyEntries The partition key entries - * @param partitionKeyProvider The partition key provider - * @param jobTriggerListeners The job trigger listeners - * @param jobInstanceListeners The job instance listeners - * @param properties The properties - * @param serviceMap The service map - * @param scheduleRefreshedOnly Whether to schedule only refreshed job instances + * @param builderBase The builder */ - protected DefaultJobContext(TransactionSupport transactionSupport, JobManagerFactory jobManagerFactory, ActorContext actorContext, ScheduleFactory scheduleFactory, - JobSchedulerFactory jobSchedulerFactory, JobProcessorFactory jobProcessorFactory, JobInstanceProcessorFactory jobInstanceProcessorFactory, - Map partitionKeyEntries, PartitionKeyProvider partitionKeyProvider, List jobTriggerListeners, List jobInstanceListeners, - Map properties, Map, Object> serviceMap, boolean scheduleRefreshedOnly) { - this.transactionSupport = transactionSupport; - this.scheduleFactory = scheduleFactory; - this.jobProcessorFactory = jobProcessorFactory; - this.jobInstanceProcessorFactory = jobInstanceProcessorFactory; - this.scheduleRefreshedOnly = scheduleRefreshedOnly; - this.properties = new HashMap<>(properties); - this.serviceMap = new HashMap<>(serviceMap); - this.jobManager = jobManagerFactory.createJobManager(this); + protected DefaultJobContext(BuilderBase builderBase) { + ActorContext actorContext = builderBase.getOrCreateActorContext(); + JobSchedulerFactory jobSchedulerFactory = builderBase.getJobSchedulerFactory(); + Map partitionKeys = builderBase.getPartitionKeys(); + PartitionKeyProvider partitionKeyProvider = builderBase.getPartitionKeyProvider(); + List jobTriggerListeners = builderBase.getJobTriggerListeners(); + List jobInstanceListeners = builderBase.getJobInstanceListeners(); + this.transactionSupport = builderBase.getTransactionSupport(); + this.scheduleFactory = builderBase.getScheduleFactory(); + this.jobProcessorFactory = builderBase.getJobProcessorFactory(); + this.jobInstanceProcessorFactory = builderBase.getJobInstanceProcessorFactory(); + this.scheduleRefreshedOnly = builderBase.isScheduleRefreshedOnly(); + this.transactionTimeoutMillis = builderBase.getTransactionTimeoutMillis() < 0 ? 60_000 : builderBase.getTransactionTimeoutMillis(); + this.temporaryErrorBackoffSeconds = builderBase.getTemporaryErrorBackoffSeconds() < 0 ? 10 : builderBase.getTemporaryErrorBackoffSeconds(); + this.rateLimitBackoffSeconds = builderBase.getRateLimitBackoffSeconds() < 0 ? 10 : builderBase.getRateLimitBackoffSeconds(); + this.properties = new HashMap<>(builderBase.getProperties()); + this.serviceMap = new HashMap<>(builderBase.getServiceMap()); + + this.jobManager = builderBase.getJobManagerFactory().createJobManager(this); if (partitionKeyProvider == null) { throw new JobException("No PartitionKeyProvider given!"); - } else { - this.partitionKeyProvider = partitionKeyProvider; } - Collection defaultTriggerPartitionKeys = this.partitionKeyProvider.getDefaultTriggerPartitionKeys(); - if (partitionKeyEntries.isEmpty()) { - Collection instancePartitionKeys = this.partitionKeyProvider.getDefaultJobInstancePartitionKeys(); + Map partitionKeyMap = new HashMap<>(); + Collection defaultTriggerPartitionKeys = partitionKeyProvider.getDefaultTriggerPartitionKeys(); + if (partitionKeys.isEmpty()) { + Collection instancePartitionKeys = partitionKeyProvider.getDefaultJobInstancePartitionKeys(); this.jobSchedulers = new HashMap<>(defaultTriggerPartitionKeys.size() + instancePartitionKeys.size()); for (PartitionKey instancePartitionKey : instancePartitionKeys) { - JobScheduler jobInstanceScheduler = jobSchedulerFactory.createJobScheduler(this, actorContext, DEFAULT_JOB_INSTANCE_ACTOR_NAME + "/" + instancePartitionKey, DEFAULT_JOB_INSTANCE_PROCESS_COUNT, instancePartitionKey); + JobScheduler jobInstanceScheduler = jobSchedulerFactory.createJobScheduler(this, actorContext, DEFAULT_JOB_INSTANCE_ACTOR_NAME + "/" + instancePartitionKey.getName(), instancePartitionKey.getProcessCount(), instancePartitionKey); jobSchedulers.put(instancePartitionKey, jobInstanceScheduler); + partitionKeyMap.put(instancePartitionKey.getName(), instancePartitionKey); } } else { - this.jobSchedulers = new HashMap<>(defaultTriggerPartitionKeys.size() + partitionKeyEntries.size()); - for (Map.Entry entry : partitionKeyEntries.entrySet()) { - jobSchedulers.put(entry.getKey(), jobSchedulerFactory.createJobScheduler(this, actorContext, DEFAULT_JOB_INSTANCE_ACTOR_NAME + "/" + entry.getKey(), entry.getValue(), entry.getKey())); + this.jobSchedulers = new HashMap<>(defaultTriggerPartitionKeys.size() + partitionKeys.size()); + for (PartitionKey partitionKey : partitionKeys.values()) { + jobSchedulers.put(partitionKey, jobSchedulerFactory.createJobScheduler(this, actorContext, DEFAULT_JOB_INSTANCE_ACTOR_NAME + "/" + partitionKey.getName(), partitionKey.getProcessCount(), partitionKey)); + partitionKeyMap.put(partitionKey.getName(), partitionKey); } } for (PartitionKey jobTriggerPartitionKey : defaultTriggerPartitionKeys) { - jobSchedulers.put(jobTriggerPartitionKey, jobSchedulerFactory.createJobScheduler(this, actorContext, DEFAULT_JOB_TRIGGER_ACTOR_NAME, DEFAULT_JOB_TRIGGER_PROCESS_COUNT, jobTriggerPartitionKey)); + jobSchedulers.put(jobTriggerPartitionKey, jobSchedulerFactory.createJobScheduler(this, actorContext, DEFAULT_JOB_TRIGGER_ACTOR_NAME, jobTriggerPartitionKey.getProcessCount(), jobTriggerPartitionKey)); + partitionKeyMap.put(jobTriggerPartitionKey.getName(), jobTriggerPartitionKey); } + this.partitionKeys = Collections.unmodifiableMap(partitionKeyMap); jobInstanceListeners.addAll(jobTriggerListeners); this.jobInstanceListeners = jobInstanceListeners.toArray(new JobInstanceListener[jobInstanceListeners.size()]); afterConstruct(); @@ -821,6 +900,21 @@ public boolean isScheduleRefreshedOnly() { return scheduleRefreshedOnly; } + @Override + public int getTransactionTimeoutMillis() { + return transactionTimeoutMillis; + } + + @Override + public int getTemporaryErrorBackoffSeconds() { + return temporaryErrorBackoffSeconds; + } + + @Override + public int getRateLimitBackoffSeconds() { + return rateLimitBackoffSeconds; + } + @Override public Object getProperty(String property) { return properties.get(property); @@ -879,8 +973,8 @@ public void refreshJobInstanceSchedules(JobInstance jobInstance) { } @Override - public Collection getPartitionKeys() { - return Collections.unmodifiableSet(jobSchedulers.keySet()); + public Map getPartitionKeys() { + return partitionKeys; } @Override @@ -888,14 +982,48 @@ public List getPartitionKeys(JobInstance jobInstance) { return jobInstanceClassToPartitionKeysMapping.computeIfAbsent(jobInstance.getClass(), (k) -> { List v = new ArrayList<>(jobSchedulers.keySet().size()); for (PartitionKey partitionKey : jobSchedulers.keySet()) { - if (partitionKey.getJobInstanceType().isAssignableFrom(k)) { - v.add(partitionKey); + for (Class> type : partitionKey.getJobInstanceTypes()) { + if (type.isAssignableFrom(k)) { + v.add(partitionKey); + break; + } } } return v; }); } + @Override + public int getClusterPosition(JobInstance jobInstance) { + for (PartitionKey partitionKey : getPartitionKeys(jobInstance)) { + int clusterPosition = jobSchedulers.get(partitionKey).getClusterPosition(jobInstance); + if (clusterPosition != -1) { + return clusterPosition; + } + } + + return -1; + } + + @Override + public String getTrace(JobInstance jobInstance) { + for (PartitionKey partitionKey : getPartitionKeys(jobInstance)) { + String trace = jobSchedulers.get(partitionKey).getTrace(jobInstance); + if (trace != null) { + return trace; + } + } + + return null; + } + + @Override + public void cancel(JobInstance jobInstance) { + for (PartitionKey partitionKey : getPartitionKeys(jobInstance)) { + jobSchedulers.get(partitionKey).cancel(jobInstance); + } + } + @Override public void refreshJobInstanceSchedules(long earliestNewSchedule) { for (JobScheduler jobScheduler : jobSchedulers.values()) { diff --git a/core/api/src/main/java/com/blazebit/job/JobInstance.java b/core/api/src/main/java/com/blazebit/job/JobInstance.java index 996829f..115f4cf 100644 --- a/core/api/src/main/java/com/blazebit/job/JobInstance.java +++ b/core/api/src/main/java/com/blazebit/job/JobInstance.java @@ -124,6 +124,17 @@ default Object getLastProcessed() { */ void onChunkSuccess(JobInstanceProcessingContext processingContext); + /** + * Returns if this job instance is long running. + * A long running job instance will transition to the {@link JobInstanceState#RUNNING} state via {@link #markRunning(JobInstanceProcessingContext)} + * and will be executed on a cluster node. If the cluster topology changes, the state of jobs will be queried through cluster events. + * + * @return Whether the job instance is long running + */ + default boolean isLongRunning() { + return false; + } + /** * Returns the job configuration. * @@ -131,6 +142,14 @@ default Object getLastProcessed() { */ JobConfiguration getJobConfiguration(); + /** + * Marks the given job instance as running which happens when {@link JobInstance#isLongRunning()} is true. + * After this method, {@link #getState()} should return {@link JobInstanceState#RUNNING}. + * + * @param processingContext The processing context + */ + void markRunning(JobInstanceProcessingContext processingContext); + /** * Marks the given job instance as done and passes the last job instance processor execution result. * After this method, {@link #getState()} should return {@link JobInstanceState#DONE}. diff --git a/core/api/src/main/java/com/blazebit/job/JobInstanceState.java b/core/api/src/main/java/com/blazebit/job/JobInstanceState.java index 7dcd5b4..e9cfd79 100644 --- a/core/api/src/main/java/com/blazebit/job/JobInstanceState.java +++ b/core/api/src/main/java/com/blazebit/job/JobInstanceState.java @@ -47,6 +47,10 @@ public enum JobInstanceState { /** * A job instance with that state is going to be remove during an update. */ - REMOVED; + REMOVED, + /** + * A job instance with that state is a long running job that is currently running. + */ + RUNNING; } diff --git a/core/api/src/main/java/com/blazebit/job/JobManager.java b/core/api/src/main/java/com/blazebit/job/JobManager.java index 1931318..336164d 100644 --- a/core/api/src/main/java/com/blazebit/job/JobManager.java +++ b/core/api/src/main/java/com/blazebit/job/JobManager.java @@ -71,6 +71,16 @@ public interface JobManager { */ List> getJobInstancesToProcess(int partition, int partitionCount, int limit, PartitionKey partitionKey, Set> jobInstancesToInclude); + /** + * Returns a list of job instances that have the status {@link JobInstanceState#RUNNING} for the given partition. + * + * @param partition The partition number + * @param partitionCount The partition count + * @param partitionKey The partition key + * @return The list of job instances + */ + List> getRunningJobInstances(int partition, int partitionCount, PartitionKey partitionKey); + /** * Returns the next schedule at which the given partition must process job instances. * diff --git a/core/api/src/main/java/com/blazebit/job/Partition.java b/core/api/src/main/java/com/blazebit/job/Partition.java new file mode 100644 index 0000000..9d36e2c --- /dev/null +++ b/core/api/src/main/java/com/blazebit/job/Partition.java @@ -0,0 +1,91 @@ +/* + * Copyright 2018 - 2020 Blazebit. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.blazebit.job; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Repeatable; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * {@link PartitionKey} configuration for a {@link JobInstance} class. + * + * @author Christian Beikov + * @since 1.0.0 + */ +@Repeatable(Partitions.class) +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +public @interface Partition { + + /** + * The partition name. If empty, uses the class name of the annotated {@link JobInstance}. + * + * @return The partition name + */ + String name() default ""; + + /** + * Defines the partition predicate to use for fetching jobs for the partition. + * The string {alias} is replaced with the job instance alias. + * The string {partition} is replaced with the zero-based partition number. + * + * @return The partition predicate + */ + String predicate() default ""; + + /** + * The number of jobs to schedule in parallel within one scheduler transaction. + * + * @return The number of job to schedule + */ + int processCount() default 1; + + /** + * Defines how many partitions should be created. If the value is greater than 1, + * it will create partitions with a name that is suffixed by the partition number, starting at 0. + * + * @return The number of partitions ot create + */ + int partitionCount() default 1; + + /** + * The transaction timeout for job processing of the partition. + * When -1, the default transaction timeout is used. + * + * @return The transaction timeout + */ + int transactionTimeoutMillis() default -1; + + /** + * The amount of seconds to backoff when a job processor throws a {@link JobTemporaryException}. + * When -1, the default temporary error backoff is used. + * + * @return The temporary error backoff + */ + int temporaryErrorBackoffSeconds() default -1; + + /** + * The amount of seconds to backoff when a job processor throws a {@link JobRateLimitException}. + * When -1, the default rate limit backoff is used. + * + * @return The rate limit backoff + */ + int rateLimitBackoffSeconds() default -1; + +} diff --git a/core/api/src/main/java/com/blazebit/job/PartitionKey.java b/core/api/src/main/java/com/blazebit/job/PartitionKey.java index 9fb6591..8d7ac24 100644 --- a/core/api/src/main/java/com/blazebit/job/PartitionKey.java +++ b/core/api/src/main/java/com/blazebit/job/PartitionKey.java @@ -16,6 +16,9 @@ package com.blazebit.job; +import java.util.Collections; +import java.util.Set; + /** * A testable description of a subset of job instances. * @@ -24,13 +27,29 @@ */ public interface PartitionKey { + /** + * Returns the name of the partition key that is unique and used for configuration of the scheduler. + * + * @return the name of the partition key + */ + String getName(); + + /** + * Returns the number of jobs to schedule in parallel within one scheduler transaction. + * + * @return The number of job to schedule + */ + default int getProcessCount() { + return 1; + } + /** * Returns the concrete job instance type that is described by this partition key. * * @return the concrete job instance type */ - default Class> getJobInstanceType() { - return (Class>) (Class) JobInstance.class; + default Set>> getJobInstanceTypes() { + return (Set>>) (Set) Collections.singleton(JobInstance.class); } /** @@ -41,4 +60,34 @@ default Class> getJobInstanceType() { */ boolean matches(JobInstance jobInstance); + /** + * The transaction timeout for job processing of the partition. + * When -1, the default transaction timeout is used. + * + * @return The transaction timeout + */ + default int getTransactionTimeoutMillis() { + return -1; + } + + /** + * The amount of seconds to backoff when a job processor throws a {@link JobTemporaryException}. + * When -1, the default temporary error backoff is used. + * + * @return The temporary error backoff + */ + default int getTemporaryErrorBackoffSeconds() { + return -1; + } + + /** + * The amount of seconds to backoff when a job processor throws a {@link JobRateLimitException}. + * When -1, the default rate limit backoff is used. + * + * @return The rate limit backoff + */ + default int getRateLimitBackoffSeconds() { + return -1; + } + } diff --git a/core/api/src/main/java/com/blazebit/job/Partitions.java b/core/api/src/main/java/com/blazebit/job/Partitions.java new file mode 100644 index 0000000..e1063fa --- /dev/null +++ b/core/api/src/main/java/com/blazebit/job/Partitions.java @@ -0,0 +1,41 @@ +/* + * Copyright 2018 - 2020 Blazebit. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.blazebit.job; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Container for defining multiple partitions for a {@link JobInstance}. + * + * @author Christian Beikov + * @since 1.0.0 + */ +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +public @interface Partitions { + + /** + * The partitions for this job instance. + * + * @return The partitions + */ + Partition[] value() default {}; + +} diff --git a/core/api/src/main/java/com/blazebit/job/spi/JobScheduler.java b/core/api/src/main/java/com/blazebit/job/spi/JobScheduler.java index d7088a0..0539e6a 100644 --- a/core/api/src/main/java/com/blazebit/job/spi/JobScheduler.java +++ b/core/api/src/main/java/com/blazebit/job/spi/JobScheduler.java @@ -56,6 +56,29 @@ default void refreshSchedules() { */ void reschedule(JobInstance jobInstance); + /** + * Returns the cluster position where the given long running job instance is running, or -1. + * + * @param jobInstance The long running job instance + * @return The cluster position + */ + int getClusterPosition(JobInstance jobInstance); + + /** + * Returns the thread stack trace of the given long running job instance if it is still running, or null. + * + * @param jobInstance The long running job instance + * @return The trace of the job instance processor + */ + String getTrace(JobInstance jobInstance); + + /** + * Cancels the given long running job instance if it is still running. + * + * @param jobInstance The long running job instance to cancel + */ + void cancel(JobInstance jobInstance); + /** * Stops the job scheduler. * After this method finished no further jobs are scheduled but there may still be running jobs. diff --git a/core/impl/src/main/java/com/blazebit/job/impl/JobSchedulerCancelEvent.java b/core/impl/src/main/java/com/blazebit/job/impl/JobSchedulerCancelEvent.java new file mode 100644 index 0000000..2a7f3fc --- /dev/null +++ b/core/impl/src/main/java/com/blazebit/job/impl/JobSchedulerCancelEvent.java @@ -0,0 +1,37 @@ +/* + * Copyright 2018 - 2020 Blazebit. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.blazebit.job.impl; + +import java.io.Serializable; + +/** + * @author Christian Beikov + * @since 1.0.0 + */ +public class JobSchedulerCancelEvent implements Serializable { + + private final Serializable jobInstanceId; + + public JobSchedulerCancelEvent(Serializable jobInstanceId) { + this.jobInstanceId = jobInstanceId; + } + + public Serializable getJobInstanceId() { + return jobInstanceId; + } + +} diff --git a/core/impl/src/main/java/com/blazebit/job/impl/JobSchedulerImpl.java b/core/impl/src/main/java/com/blazebit/job/impl/JobSchedulerImpl.java index 9d93066..3672f5b 100644 --- a/core/impl/src/main/java/com/blazebit/job/impl/JobSchedulerImpl.java +++ b/core/impl/src/main/java/com/blazebit/job/impl/JobSchedulerImpl.java @@ -21,9 +21,11 @@ import com.blazebit.actor.spi.ClusterNodeInfo; import com.blazebit.actor.spi.ClusterStateListener; import com.blazebit.actor.spi.ClusterStateManager; +import com.blazebit.actor.spi.LockService; import com.blazebit.actor.spi.Scheduler; import com.blazebit.actor.spi.SchedulerFactory; import com.blazebit.job.JobContext; +import com.blazebit.job.JobException; import com.blazebit.job.JobInstance; import com.blazebit.job.JobInstanceListener; import com.blazebit.job.JobInstanceProcessingContext; @@ -38,11 +40,14 @@ import com.blazebit.job.spi.JobScheduler; import com.blazebit.job.spi.TransactionSupport; +import java.io.Serializable; import java.time.Clock; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; @@ -52,6 +57,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; import java.util.function.Consumer; import java.util.logging.Level; import java.util.logging.Logger; @@ -63,6 +69,7 @@ public class JobSchedulerImpl implements JobScheduler, ClusterStateListener { private static final Logger LOG = Logger.getLogger(JobSchedulerImpl.class.getName()); + private static final long COMPLETION_TX_TIMEOUT = 10_000L; private final JobContext jobContext; private final ActorContext actorContext; @@ -73,8 +80,12 @@ public class JobSchedulerImpl implements JobScheduler, ClusterStateListener { private final String actorName; private final PartitionKey partitionKey; private final int processCount; + private final long transactionTimeout; + private final long temporaryErrorDeferSeconds; + private final long rateLimitDeferSeconds; private final AtomicLong earliestKnownSchedule = new AtomicLong(Long.MAX_VALUE); private final ConcurrentMap, Boolean> jobInstancesToSchedule = new ConcurrentHashMap<>(); + private final ConcurrentMap longRunningJobInstances = new ConcurrentHashMap<>(); private volatile ClusterNodeInfo clusterNodeInfo; private volatile boolean closed; @@ -88,12 +99,51 @@ public JobSchedulerImpl(JobContext jobContext, ActorContext actorContext, Schedu this.actorName = actorName; this.processCount = processCount; this.partitionKey = partitionKey; + this.transactionTimeout = partitionKey.getTransactionTimeoutMillis() < 0 ? jobContext.getTransactionTimeoutMillis() : partitionKey.getTransactionTimeoutMillis(); + this.temporaryErrorDeferSeconds = partitionKey.getTemporaryErrorBackoffSeconds() < 0 ? jobContext.getTemporaryErrorBackoffSeconds() : partitionKey.getTemporaryErrorBackoffSeconds(); + this.rateLimitDeferSeconds = partitionKey.getRateLimitBackoffSeconds() < 0 ? jobContext.getRateLimitBackoffSeconds() : partitionKey.getRateLimitBackoffSeconds(); } @Override public void start() { actorContext.getActorManager().registerSuspendedActor(actorName, runner); - actorContext.getService(ClusterStateManager.class).registerListener(this); + ClusterStateManager clusterStateManager = actorContext.getService(ClusterStateManager.class); + clusterStateManager.registerListener(this); + clusterStateManager.registerListener(JobSchedulerCancelEvent.class, e -> { + JobInstanceExecution execution = longRunningJobInstances.get(e.getJobInstanceId()); + if (execution != null) { + execution.future.cancel(true); + } + }); + clusterStateManager.registerListener(JobSchedulerStatusEvent.class, e -> { + Serializable[] jobInstanceIds = e.getJobInstanceIds(); + int[] clusterPositions = new int[jobInstanceIds.length]; + int clusterPosition = clusterNodeInfo.getClusterPosition(); + for (int i = 0; i < jobInstanceIds.length; i++) { + if (longRunningJobInstances.containsKey(jobInstanceIds[i])) { + clusterPositions[i] = clusterPosition; + } else { + clusterPositions[i] = -1; + } + } + + e.setClusterPositions(clusterPositions); + }); + clusterStateManager.registerListener(JobSchedulerTraceEvent.class, e -> { + JobInstanceExecution execution = longRunningJobInstances.get(e.getJobInstanceId()); + if (execution != null) { + Thread thread = execution.thread; + if (thread != null) { + StackTraceElement[] stackTrace = thread.getStackTrace(); + StringBuilder sb = new StringBuilder(); + for (StackTraceElement stackTraceElement : stackTrace) { + sb.append(stackTraceElement).append('\n'); + } + + e.setTrace(sb.toString()); + } + } + }); } @Override @@ -106,9 +156,68 @@ public void onClusterStateChanged(ClusterNodeInfo clusterNodeInfo) { } else { refreshSchedules(nextSchedule.toEpochMilli()); } + List> runningJobInstances = jobManager.getRunningJobInstances(clusterNodeInfo.getClusterPosition(), clusterNodeInfo.getClusterSize(), partitionKey); + ClusterStateManager clusterStateManager = actorContext.getService(ClusterStateManager.class); + List jobInstanceIds = new ArrayList<>(runningJobInstances.size()); + for (Iterator> iterator = runningJobInstances.iterator(); iterator.hasNext(); ) { + JobInstance runningJobInstance = iterator.next(); + JobInstanceExecution execution = longRunningJobInstances.get(runningJobInstance.getId()); + if (execution == null) { + jobInstanceIds.add((Serializable) runningJobInstance.getId()); + } else { + iterator.remove(); + } + } + if (!jobInstanceIds.isEmpty()) { + LockService lockService = clusterStateManager.getLockService(); + JobSchedulerStatusEvent statusEvent = new JobSchedulerStatusEvent(jobInstanceIds.toArray(new Serializable[0])); + Map> futureMap = clusterStateManager.fireEventExcludeSelf(statusEvent); + if (futureMap.isEmpty()) { + for (JobInstance runningJobInstance : runningJobInstances) { + scheduleLongRunning(lockService, runningJobInstance); + } + } else { + try { + for (Future future : futureMap.values()) { + int[] clusterPositions = future.get(); + for (int i = 0; i < clusterPositions.length; i++) { + if (clusterPositions[i] == -1) { + // To the best of our knowledge, the job instance does not run in the cluster, so we have schedule it + scheduleLongRunning(lockService, runningJobInstances.get(i)); + } + } + } + } catch (Exception ex) { + throw new JobException("Could not get the cluster position state for running job instances.", ex); + } + } + } } } + private void scheduleLongRunning(LockService lockService, JobInstance jobInstance) { + Instant now = clock.instant(); + MutableJobInstanceProcessingContext jobProcessingContext = new MutableJobInstanceProcessingContext(jobContext, partitionKey, processCount); + jobProcessingContext.setPartitionCount(clusterNodeInfo.getClusterSize()); + jobProcessingContext.setPartitionId(clusterNodeInfo.getClusterPosition()); + jobProcessingContext.setLastProcessed(jobInstance.getLastProcessed()); + MutableScheduleContext scheduleContext = new MutableScheduleContext(); + Instant lastExecutionTime = jobInstance.getLastExecutionTime(); + if (lastExecutionTime == null) { + lastExecutionTime = now; + } + scheduleContext.setLastScheduleTime(jobInstance.getScheduleTime().toEpochMilli()); + scheduleContext.setLastExecutionTime(lastExecutionTime.toEpochMilli()); + JobInstanceProcessor jobInstanceProcessor = jobContext.getJobInstanceProcessor(jobInstance); + JobInstanceExecution execution = new JobInstanceExecution(jobInstance, jobInstance.getDeferCount(), scheduleContext, jobProcessingContext, null); + jobInstance.setLastExecutionTime(Instant.now()); + jobInstance.markRunning(jobProcessingContext); + jobManager.updateJobInstance(jobInstance); + longRunningJobInstances.put(jobInstance.getId(), execution); + Lock lock = lockService.getLock("jobInstance/" + jobInstance.getId()); + execution.future = scheduler.submit(new NotifyingSpecialThrowingCallable(jobInstanceProcessor, execution, lock)); + } + @Override public void refreshSchedules(long earliestNewSchedule) { long delayMillis = rescan(earliestNewSchedule); @@ -119,7 +228,9 @@ public void refreshSchedules(long earliestNewSchedule) { @Override public void reschedule(JobInstance jobInstance) { - jobInstancesToSchedule.put(jobInstance, Boolean.TRUE); + if (jobContext.isScheduleRefreshedOnly()) { + jobInstancesToSchedule.put(jobInstance, Boolean.TRUE); + } actorContext.getActorManager().rescheduleActor(actorName, 0); } @@ -169,6 +280,20 @@ private boolean updateEarliestKnownSchedule(long oldValue, long newValue) { return false; } + private boolean updateEarliestKnownSchedule(long newValue) { + long oldValue = earliestKnownSchedule.get(); + // We use lower or equal because a re-schedule event could be cause from within a processor + while (oldValue <= newValue) { + if (earliestKnownSchedule.compareAndSet(oldValue, newValue)) { + return true; + } + + oldValue = earliestKnownSchedule.get(); + } + + return false; + } + private void resetEarliestKnownSchedule() { long earliestKnownSchedule = this.earliestKnownSchedule.get(); // Only reset the value if the currently known earliest schedule is in the past @@ -177,6 +302,80 @@ private void resetEarliestKnownSchedule() { } } + @Override + public int getClusterPosition(JobInstance jobInstance) { + if (!jobInstance.isLongRunning()) { + return -1; + } + JobInstanceExecution execution = longRunningJobInstances.get(jobInstance.getId()); + if (execution == null) { + JobSchedulerStatusEvent event = new JobSchedulerStatusEvent(new Serializable[] { (Serializable) jobInstance.getId() }); + Map> result = actorContext.getService(ClusterStateManager.class).fireEventExcludeSelf(event); + try { + for (Map.Entry> entry : result.entrySet()) { + int position = entry.getValue().get()[0]; + if (position != -1) { + return position; + } + } + } catch (Exception e) { + throw new JobException("Could not retrieve cluster position for job instance: " + jobInstance , e); + } + return -1; + } else { + return clusterNodeInfo.getClusterPosition(); + } + } + + @Override + public String getTrace(JobInstance jobInstance) { + if (!jobInstance.isLongRunning()) { + return null; + } + JobInstanceExecution execution = longRunningJobInstances.get(jobInstance.getId()); + if (execution == null) { + JobSchedulerTraceEvent event = new JobSchedulerTraceEvent((Serializable) jobInstance.getId()); + Map> result = actorContext.getService(ClusterStateManager.class).fireEventExcludeSelf(event); + try { + for (Map.Entry> entry : result.entrySet()) { + String trace = entry.getValue().get(); + if (trace != null) { + return trace; + } + } + } catch (Exception e) { + throw new JobException("Could not retrieve trace for job instance: " + jobInstance , e); + } + return null; + } else { + Thread thread = execution.thread; + if (thread != null) { + StackTraceElement[] stackTrace = thread.getStackTrace(); + StringBuilder sb = new StringBuilder(); + for (StackTraceElement stackTraceElement : stackTrace) { + sb.append(stackTraceElement).append('\n'); + } + + return sb.toString(); + } + return null; + } + } + + @Override + public void cancel(JobInstance jobInstance) { + if (!jobInstance.isLongRunning()) { + return; + } + JobInstanceExecution execution = longRunningJobInstances.get(jobInstance.getId()); + if (execution == null) { + JobSchedulerCancelEvent event = new JobSchedulerCancelEvent((Serializable) jobInstance.getId()); + actorContext.getService(ClusterStateManager.class).fireEventExcludeSelf(event, false); + } else { + execution.future.cancel(true); + } + } + @Override public void stop() { closed = true; @@ -193,13 +392,12 @@ private class JobInstanceRunner implements ScheduledActor, Callable> jobInstancesToProcess = jobManager.getJobInstancesToProcess(clusterNodeInfo.getClusterPosition(), clusterNodeInfo.getClusterSize(), processCount, partitionKey, jobContext.isScheduleRefreshedOnly() ? jobInstancesToSchedule.keySet() : null); int size = jobInstancesToProcess.size(); @@ -237,11 +435,23 @@ public ActorRunResult call() throws Exception { // By default we execute transactional job instance processors synchronously within our transaction if (jobInstanceProcessor.isTransactional()) { f = new SyncJobInstanceProcessorFuture(jobInstanceProcessor, jobInstance, jobProcessingContext); + jobInstanceExecutions.add(new JobInstanceExecution(jobInstance, deferCount, scheduleContext, jobProcessingContext, f)); } else { jobInstance.setLastExecutionTime(Instant.now()); - f = scheduler.submit(new SpecialThrowingCallable(jobInstanceProcessor, jobInstance, jobProcessingContext)); + // A long running job instance has to be marked as "running" early to avoid rescheduling + if (jobInstance.isLongRunning()) { + jobInstance.markRunning(jobProcessingContext); + jobManager.updateJobInstance(jobInstance); + JobInstanceExecution execution = new JobInstanceExecution(jobInstance, deferCount, scheduleContext, jobProcessingContext, null); + // If the long running job is not yet done, we have to register a watcher + longRunningJobInstances.put(jobInstance.getId(), execution); + Lock lock = lockService.getLock("jobInstance/" + jobInstance.getId()); + execution.future = scheduler.submit(new NotifyingSpecialThrowingCallable(jobInstanceProcessor, execution, lock)); + } else { + f = scheduler.submit(new SpecialThrowingCallable(jobInstanceProcessor, jobInstance, jobProcessingContext)); + jobInstanceExecutions.add(new JobInstanceExecution(jobInstance, deferCount, scheduleContext, jobProcessingContext, f)); + } } - jobInstanceExecutions.add(new JobInstanceExecution(jobInstance, deferCount, scheduleContext, jobProcessingContext, f)); future = true; } else { Instant nextSchedule = TimeFrame.getNearestTimeFrameSchedule(executionTimeFrames, now); @@ -274,10 +484,10 @@ public ActorRunResult call() throws Exception { jobContext.forEachJobInstanceListeners(new JobInstanceErrorListenerConsumer(jobInstance, jobProcessingContext)); } finally { if (!future) { - jobManager.updateJobInstance(jobInstance); if (jobContext.isScheduleRefreshedOnly() && jobInstance.getState() != JobInstanceState.NEW) { jobInstancesToSchedule.remove(jobInstance); } + jobManager.updateJobInstance(jobInstance); } } } @@ -285,16 +495,16 @@ public ActorRunResult call() throws Exception { Instant rescheduleRateLimitTime = null; for (int i = 0; i < jobInstanceExecutions.size(); i++) { JobInstanceExecution execution = jobInstanceExecutions.get(i); + JobInstance jobInstance = execution.jobInstance; MutableJobInstanceProcessingContext jobProcessingContext = execution.jobProcessingContext; MutableScheduleContext scheduleContext = execution.scheduleContext; - JobInstance jobInstance = execution.jobInstance; int deferCount = execution.deferCount; Future future = execution.future; boolean success = true; try { Object lastProcessed = future.get(); jobProcessingContext.setLastProcessed(lastProcessed); - scheduleContext.setLastCompletionTime(System.currentTimeMillis()); + scheduleContext.setLastCompletionTime(clock.millis()); if (jobInstance.getState() == JobInstanceState.NEW) { Instant nextSchedule = jobInstance.nextSchedule(jobContext, scheduleContext); @@ -363,9 +573,7 @@ public ActorRunResult call() throws Exception { LOG.log(Level.SEVERE, "An error occurred in the job instance processor", t); success = false; TransactionSupport transactionSupport = jobContext.getTransactionSupport(); - // TODO: make configurable - long transactionTimeout = 60_000L; - transactionSupport.transactional(jobContext, transactionTimeout, true, () -> { + transactionSupport.transactional(jobContext, COMPLETION_TX_TIMEOUT, true, () -> { jobContext.forEachJobInstanceListeners(new JobInstanceErrorListenerConsumer(jobInstance, jobProcessingContext)); jobInstance.markFailed(jobProcessingContext, t); jobManager.updateJobInstance(jobInstance); @@ -375,12 +583,12 @@ public ActorRunResult call() throws Exception { }); } } finally { - if (success) { - jobManager.updateJobInstance(jobInstance); - } if (jobContext.isScheduleRefreshedOnly() && jobInstance.getState() != JobInstanceState.NEW) { jobInstancesToSchedule.remove(jobInstance); } + if (success) { + jobManager.updateJobInstance(jobInstance); + } } } @@ -402,8 +610,6 @@ public ActorRunResult work() { return ActorRunResult.done(); } TransactionSupport transactionSupport = jobContext.getTransactionSupport(); - // TODO: make configurable - long transactionTimeout = 60_000L; long earliestKnownNotificationSchedule = JobSchedulerImpl.this.earliestKnownSchedule.get(); ActorRunResult result = transactionSupport.transactional( jobContext, @@ -418,9 +624,7 @@ public ActorRunResult work() { } else if (result == null) { // An error occurred like e.g. a TX timeout or a temporary DB issue. We do exponential back-off long delay = getWaitTime(maxBackOff, baseBackOff, retryAttempt++); - if (LOG.isLoggable(Level.FINEST)) { - LOG.log(Level.FINEST, "Rescheduling due to error in: {0}", delay); - } + LOG.log(Level.INFO, "Rescheduling due to error in: {0}", delay); return ActorRunResult.rescheduleIn(delay); } @@ -438,6 +642,8 @@ public ActorRunResult work() { } return ActorRunResult.rescheduleIn(delayMillis); } + } else { + updateEarliestKnownSchedule(earliestKnownNotificationSchedule, clock.millis() + result.getDelayMillis()); } // NOTE: we don't need to update earliestKnownNotificationSchedule when rescheduling immediately return result; @@ -445,9 +651,9 @@ public ActorRunResult work() { } private static class SpecialThrowingCallable implements Callable { - private final JobInstanceProcessor jobInstanceProcessor; - private final JobInstance jobInstance; - private final MutableJobInstanceProcessingContext jobProcessingContext; + final JobInstanceProcessor jobInstanceProcessor; + final JobInstance jobInstance; + final MutableJobInstanceProcessingContext jobProcessingContext; public SpecialThrowingCallable(JobInstanceProcessor jobInstanceProcessor, JobInstance jobInstance, MutableJobInstanceProcessingContext jobProcessingContext) { this.jobInstanceProcessor = jobInstanceProcessor; @@ -466,6 +672,128 @@ public Object call() throws Exception { } } + private class NotifyingSpecialThrowingCallable implements Callable { + + private final JobInstanceProcessor jobInstanceProcessor; + private final JobInstanceExecution execution; + private final Lock lock; + + public NotifyingSpecialThrowingCallable(JobInstanceProcessor jobInstanceProcessor, JobInstanceExecution execution, Lock lock) { + this.jobInstanceProcessor = jobInstanceProcessor; + this.execution = execution; + this.lock = lock; + } + + @Override + public Object call() throws Exception { + JobInstance jobInstance = execution.jobInstance; + if (!lock.tryLock()) { + // Apparently the job is already running on a different node, so we can skip it here + longRunningJobInstances.remove(jobInstance.getId()); + return null; + } + execution.thread = Thread.currentThread(); + MutableScheduleContext scheduleContext = execution.scheduleContext; + int deferCount = execution.deferCount; + MutableJobInstanceProcessingContext jobProcessingContext = execution.jobProcessingContext; + try { + Object lastProcessed = jobInstanceProcessor.process(jobInstance, jobProcessingContext); + TransactionSupport transactionSupport = jobContext.getTransactionSupport(); + // Flush the completion status + transactionSupport.transactional(jobContext, COMPLETION_TX_TIMEOUT, false, () -> { + try { + jobProcessingContext.setLastProcessed(lastProcessed); + scheduleContext.setLastCompletionTime(clock.millis()); + + if (jobInstance.getState() == JobInstanceState.NEW) { + Instant nextSchedule = jobInstance.nextSchedule(jobContext, scheduleContext); + // This is essential for limited time or fixed time schedules. When these are done, they always return a nextSchedule equal to getLastScheduledExecutionTime() + if (nextSchedule.toEpochMilli() != scheduleContext.getLastScheduleTime()) { + // This is a recurring job that needs rescheduling + if (jobInstance.getDeferCount() == deferCount) { + jobInstance.onChunkSuccess(jobProcessingContext); + jobContext.forEachJobInstanceListeners(new JobInstanceChunkSuccessListenerConsumer(jobInstance, jobProcessingContext)); + } + jobInstance.setScheduleTime(nextSchedule); + updateEarliestKnownSchedule(jobInstance.getScheduleTime().toEpochMilli()); + return null; + } else if (lastProcessed != null) { + // Chunk processing + if (jobInstance.getDeferCount() == deferCount) { + jobInstance.onChunkSuccess(jobProcessingContext); + jobContext.forEachJobInstanceListeners(new JobInstanceChunkSuccessListenerConsumer(jobInstance, jobProcessingContext)); + } + updateEarliestKnownSchedule(jobInstance.getScheduleTime().toEpochMilli()); + return null; + } + } + + if (jobInstance.getState() != JobInstanceState.DONE) { + jobInstance.markDone(jobProcessingContext, lastProcessed); + } + jobContext.forEachJobInstanceListeners(new JobInstanceSuccessListenerConsumer(jobInstance, jobProcessingContext)); + return null; + } catch (Throwable t) { + jobInstance.markFailed(jobProcessingContext, t); + sneakyThrow(t); + return null; + } finally { + if (jobContext.isScheduleRefreshedOnly() && jobInstance.getState() != JobInstanceState.NEW) { + jobInstancesToSchedule.remove(jobInstance); + } + jobManager.updateJobInstance(jobInstance); + } + }, t2 -> { + LOG.log(Level.SEVERE, "An error occurred in the long running job instance completion handler", t2); + }); + return lastProcessed; + } catch (Throwable t) { + TransactionSupport transactionSupport = jobContext.getTransactionSupport(); + transactionSupport.transactional(jobContext, COMPLETION_TX_TIMEOUT, false, () -> { + if (t instanceof JobRateLimitException) { + JobRateLimitException e = (JobRateLimitException) t; + LOG.log(Level.FINEST, "Deferring job instance due to rate limit", e); + Instant rescheduleRateLimitTime; + if (e.getDeferMillis() != -1) { + rescheduleRateLimitTime = clock.instant().plus(e.getDeferMillis(), ChronoUnit.MILLIS); + } else { + rescheduleRateLimitTime = clock.instant().plus(rateLimitDeferSeconds, ChronoUnit.SECONDS); + } + jobInstance.setScheduleTime(rescheduleRateLimitTime); + updateEarliestKnownSchedule(jobInstance.getScheduleTime().toEpochMilli()); + } else if (t instanceof JobTemporaryException) { + JobTemporaryException e = (JobTemporaryException) t; + LOG.log(Level.FINEST, "Deferring job instance due to temporary error", e); + if (e.getDeferMillis() != -1) { + jobInstance.setScheduleTime(clock.instant().plus(e.getDeferMillis(), ChronoUnit.MILLIS)); + } else { + jobInstance.setScheduleTime(clock.instant().plus(temporaryErrorDeferSeconds, ChronoUnit.SECONDS)); + } + updateEarliestKnownSchedule(jobInstance.getScheduleTime().toEpochMilli()); + } else { + LOG.log(Level.SEVERE, "An error occurred in the job instance processor", t); + jobContext.forEachJobInstanceListeners(new JobInstanceErrorListenerConsumer(jobInstance, jobProcessingContext)); + jobInstance.markFailed(jobProcessingContext, t); + jobManager.updateJobInstance(jobInstance); + } + return null; + }, t2 -> { + LOG.log(Level.SEVERE, "An error occurred in the long running job instance error handler", t2); + }); + CallableThrowable.doThrow(t); + return null; + } finally { + longRunningJobInstances.remove(jobInstance.getId()); + execution.thread = null; + lock.unlock(); + } + } + } + + static void sneakyThrow(Throwable e) throws T { + throw (T) e; + } + // We need this special Throwable wrapper for exceptions, // as scheduled callables that throw exceptions are handled differently in some containers(Wildfly) // Wildfly will just log out the error and throw a different exception rather than propagating the error correctly to the Future @@ -591,7 +919,8 @@ private static class JobInstanceExecution { private final int deferCount; private final MutableScheduleContext scheduleContext; private final MutableJobInstanceProcessingContext jobProcessingContext; - private final Future future; + private volatile Thread thread; + private Future future; public JobInstanceExecution(JobInstance jobInstance, int deferCount, MutableScheduleContext scheduleContext, MutableJobInstanceProcessingContext jobProcessingContext, Future future) { this.jobInstance = jobInstance; diff --git a/core/impl/src/main/java/com/blazebit/job/impl/JobSchedulerStatusEvent.java b/core/impl/src/main/java/com/blazebit/job/impl/JobSchedulerStatusEvent.java new file mode 100644 index 0000000..2f5b9fc --- /dev/null +++ b/core/impl/src/main/java/com/blazebit/job/impl/JobSchedulerStatusEvent.java @@ -0,0 +1,48 @@ +/* + * Copyright 2018 - 2020 Blazebit. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.blazebit.job.impl; + +import com.blazebit.actor.spi.StateReturningEvent; + +import java.io.Serializable; + +/** + * @author Christian Beikov + * @since 1.0.0 + */ +public class JobSchedulerStatusEvent implements StateReturningEvent { + + private final Serializable[] jobInstanceIds; + private int[] clusterPositions; + + public JobSchedulerStatusEvent(Serializable[] jobInstanceIds) { + this.jobInstanceIds = jobInstanceIds; + } + + public Serializable[] getJobInstanceIds() { + return jobInstanceIds; + } + + @Override + public int[] getResult() { + return clusterPositions; + } + + public void setClusterPositions(int[] clusterPositions) { + this.clusterPositions = clusterPositions; + } +} diff --git a/core/impl/src/main/java/com/blazebit/job/impl/JobSchedulerTraceEvent.java b/core/impl/src/main/java/com/blazebit/job/impl/JobSchedulerTraceEvent.java new file mode 100644 index 0000000..a677ad8 --- /dev/null +++ b/core/impl/src/main/java/com/blazebit/job/impl/JobSchedulerTraceEvent.java @@ -0,0 +1,48 @@ +/* + * Copyright 2018 - 2020 Blazebit. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.blazebit.job.impl; + +import com.blazebit.actor.spi.StateReturningEvent; + +import java.io.Serializable; + +/** + * @author Christian Beikov + * @since 1.0.0 + */ +public class JobSchedulerTraceEvent implements StateReturningEvent { + + private final Serializable jobInstanceId; + private String trace; + + public JobSchedulerTraceEvent(Serializable jobInstanceId) { + this.jobInstanceId = jobInstanceId; + } + + public Serializable getJobInstanceId() { + return jobInstanceId; + } + + @Override + public String getResult() { + return trace; + } + + public void setTrace(String trace) { + this.trace = trace; + } +} diff --git a/entity-view/model/src/main/java/com/blazebit/job/view/model/AbstractJob.java b/entity-view/model/src/main/java/com/blazebit/job/view/model/AbstractJob.java index 172eb0c..cf8a1dd 100644 --- a/entity-view/model/src/main/java/com/blazebit/job/view/model/AbstractJob.java +++ b/entity-view/model/src/main/java/com/blazebit/job/view/model/AbstractJob.java @@ -38,13 +38,6 @@ public abstract class AbstractJob implements Job, IdHolderView { */ public abstract void setName(String name); - /** - * Sets the given job configuration. - * - * @param jobConfiguration The job configuration - */ - public abstract void setJobConfiguration(JobConfigurationView jobConfiguration); - /** * Sets the given creation time. * diff --git a/entity-view/model/src/main/java/com/blazebit/job/view/model/AbstractJobInstance.java b/entity-view/model/src/main/java/com/blazebit/job/view/model/AbstractJobInstance.java index e9ebad4..c6c939d 100644 --- a/entity-view/model/src/main/java/com/blazebit/job/view/model/AbstractJobInstance.java +++ b/entity-view/model/src/main/java/com/blazebit/job/view/model/AbstractJobInstance.java @@ -49,6 +49,11 @@ public void markDropped(JobInstanceProcessingContext jobProcessingContext) { setState(JobInstanceState.DROPPED); } + @Override + public void markRunning(JobInstanceProcessingContext processingContext) { + setState(JobInstanceState.RUNNING); + } + @Override public void markDone(JobInstanceProcessingContext jobProcessingContext, Object result) { setState(JobInstanceState.DONE); diff --git a/entity-view/model/src/main/java/com/blazebit/job/view/model/EntityViewPartitionKey.java b/entity-view/model/src/main/java/com/blazebit/job/view/model/EntityViewPartitionKey.java index eb273b1..c8fea00 100644 --- a/entity-view/model/src/main/java/com/blazebit/job/view/model/EntityViewPartitionKey.java +++ b/entity-view/model/src/main/java/com/blazebit/job/view/model/EntityViewPartitionKey.java @@ -21,7 +21,9 @@ import com.blazebit.job.PartitionKey; import com.blazebit.persistence.WhereBuilder; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.function.Function; /** @@ -32,16 +34,23 @@ */ public interface EntityViewPartitionKey extends PartitionKey { + /** + * Returns the entity view class to use for fetching jobs with this partition key. + * + * @return the entity view class + */ + Class> getEntityView(); + /** * Returns the entity class for the job instance type. * * @return the entity class */ - Class getEntityClass(); + Class> getEntityClass(); /** * A JPQL predicate for filtering for this partition. - * The job alias refers to an entity of the type as given in {@link #getJobInstanceType()}. + * The job alias refers to an entity of the type as given in {@link #getEntityClass()}. * * @param jobAlias The FROM clause alias for the job * @return The partition JPQL predicate or an empty string @@ -49,28 +58,28 @@ public interface EntityViewPartitionKey extends PartitionKey { String getPartitionPredicate(String jobAlias); /** - * Returns the attribute name of the identifier attribute of the entity type as given in {@link #getJobInstanceType()}. + * Returns the attribute name of the identifier attribute of the entity type as given in {@link #getEntityClass()}. * * @return the attribute name of the identifier attribute */ String getIdAttributeName(); /** - * Returns the attribute name of the schedule attribute of the entity type as given in {@link #getJobInstanceType()}. + * Returns the attribute name of the schedule attribute of the entity type as given in {@link #getEntityClass()}. * * @return the attribute name of the schedule attribute */ String getScheduleAttributeName(); /** - * Returns the attribute name of the last execution attribute of the entity type as given in {@link #getJobInstanceType()}. + * Returns the attribute name of the last execution attribute of the entity type as given in {@link #getEntityClass()}. * * @return the attribute name of the last execution attribute */ String getLastExecutionAttributeName(); /** - * Returns the attribute name of the partition key attribute of the entity type as given in {@link #getJobInstanceType()}. + * Returns the attribute name of the partition key attribute of the entity type as given in {@link #getEntityClass()}. * * @return the attribute name of the partition key attribute */ @@ -120,13 +129,53 @@ interface EntityViewPartitionKeyBuilder { */ EntityViewPartitionKeyBuilder withName(String name); + /** + * Sets the number of jobs to schedule in parallel within one scheduler transaction. + * + * @param processCount The number of jobs to process + * @return this for chaining + */ + EntityViewPartitionKeyBuilder withProcessCount(int processCount); + + /** + * Sets the given transaction timeout. + * + * @param transactionTimeoutMillis The job id attribute name + * @return this for chaining + */ + EntityViewPartitionKeyBuilder withTransactionTimeoutMillis(int transactionTimeoutMillis); + + /** + * Sets the given temporary error backoff. + * + * @param temporaryErrorBackoffSeconds The job id attribute name + * @return this for chaining + */ + EntityViewPartitionKeyBuilder withTemporaryErrorBackoffSeconds(int temporaryErrorBackoffSeconds); + + /** + * Sets the given rate limit backoff. + * + * @param rateLimitBackoffSeconds The job id attribute name + * @return this for chaining + */ + EntityViewPartitionKeyBuilder withRateLimitBackoffSeconds(int rateLimitBackoffSeconds); + + /** + * Sets the given class as entity view class. + * + * @param entityViewClass The entity view class + * @return this for chaining + */ + EntityViewPartitionKeyBuilder withEntityView(Class> entityViewClass); + /** * Sets the given class as entity class. * * @param entityClass The entity class * @return this for chaining */ - EntityViewPartitionKeyBuilder withEntityClass(Class entityClass); + EntityViewPartitionKeyBuilder withEntityClass(Class> entityClass); /** * Sets the given job instance type. @@ -208,8 +257,13 @@ interface EntityViewPartitionKeyBuilder { static EntityViewPartitionKeyBuilder builder() { return new EntityViewPartitionKeyBuilder() { String name0; - Class entityClass0; - Class> jobInstanceType0; + int processCount0 = 1; + int transactionTimeoutMillis0 = -1; + int temporaryErrorBackoffSeconds0 = -1; + int rateLimitBackoffSeconds0 = -1; + Class> entityView0; + Class> entityClass0; + Set>> jobInstanceTypes0 = new HashSet<>(); Function partitionPredicateProvider0; String idAttributeName0; String scheduleAttributeName0; @@ -225,14 +279,44 @@ public EntityViewPartitionKeyBuilder withName(String name) { } @Override - public EntityViewPartitionKeyBuilder withEntityClass(Class entityClass) { + public EntityViewPartitionKeyBuilder withProcessCount(int processCount) { + this.processCount0 = processCount; + return this; + } + + @Override + public EntityViewPartitionKeyBuilder withTransactionTimeoutMillis(int transactionTimeoutMillis) { + this.transactionTimeoutMillis0 = transactionTimeoutMillis; + return this; + } + + @Override + public EntityViewPartitionKeyBuilder withTemporaryErrorBackoffSeconds(int temporaryErrorBackoffSeconds) { + this.temporaryErrorBackoffSeconds0 = temporaryErrorBackoffSeconds; + return this; + } + + @Override + public EntityViewPartitionKeyBuilder withRateLimitBackoffSeconds(int rateLimitBackoffSeconds) { + this.rateLimitBackoffSeconds0 = rateLimitBackoffSeconds; + return this; + } + + @Override + public EntityViewPartitionKeyBuilder withEntityView(Class> entityViewClass) { + this.entityView0 = entityViewClass; + return this; + } + + @Override + public EntityViewPartitionKeyBuilder withEntityClass(Class> entityClass) { this.entityClass0 = entityClass; return this; } @Override public EntityViewPartitionKeyBuilder withJobInstanceType(Class> jobInstanceType) { - this.jobInstanceType0 = jobInstanceType; + this.jobInstanceTypes0.add(jobInstanceType); return this; } @@ -282,8 +366,13 @@ public EntityViewPartitionKeyBuilder withStateValueMappingFunction(Function entityClass = entityClass0; - private final Class> jobInstanceType = jobInstanceType0; + private final int processCount = processCount0; + private final int transactionTimeoutMillis = transactionTimeoutMillis0; + private final int temporaryErrorBackoffSeconds = temporaryErrorBackoffSeconds0; + private final int rateLimitBackoffSeconds = rateLimitBackoffSeconds0; + private final Class> entityView = entityView0; + private final Class> entityClass = entityClass0; + private final Set>> jobInstanceTypes = new HashSet<>(jobInstanceTypes0); private final Function partitionPredicateProvider = partitionPredicateProvider0; private final String idAttributeName = idAttributeName0; private final String scheduleAttributeName = scheduleAttributeName0; @@ -293,12 +382,42 @@ public EntityViewPartitionKey build() { private final Function stateValueMappingFunction = stateValueMappingFunction0; @Override - public Class> getJobInstanceType() { - return jobInstanceType; + public String getName() { + return name; + } + + @Override + public int getProcessCount() { + return processCount; + } + + @Override + public Class> getEntityView() { + return entityView; + } + + @Override + public Set>> getJobInstanceTypes() { + return jobInstanceTypes; + } + + @Override + public int getTransactionTimeoutMillis() { + return transactionTimeoutMillis; + } + + @Override + public int getTemporaryErrorBackoffSeconds() { + return temporaryErrorBackoffSeconds; + } + + @Override + public int getRateLimitBackoffSeconds() { + return rateLimitBackoffSeconds; } @Override - public Class getEntityClass() { + public Class> getEntityClass() { return entityClass; } diff --git a/entity-view/model/src/main/java/com/blazebit/job/view/model/JobConfigurationView.java b/entity-view/model/src/main/java/com/blazebit/job/view/model/JobConfigurationView.java index 3fc8b9e..914b52a 100644 --- a/entity-view/model/src/main/java/com/blazebit/job/view/model/JobConfigurationView.java +++ b/entity-view/model/src/main/java/com/blazebit/job/view/model/JobConfigurationView.java @@ -40,7 +40,7 @@ @EntityView(JobConfiguration.class) public abstract class JobConfigurationView implements com.blazebit.job.JobConfiguration, Serializable { - private static final Serializable EMPTY = new Serializable() {}; + private static final Serializable EMPTY = new Serializable() { }; private final DirtyMarkingSet executionTimeFrames; private final DirtyMarkingMap parameters; diff --git a/entity-view/storage/src/main/java/com/blazebit/job/view/storage/EntityViewJobManager.java b/entity-view/storage/src/main/java/com/blazebit/job/view/storage/EntityViewJobManager.java index 094e410..78c348e 100644 --- a/entity-view/storage/src/main/java/com/blazebit/job/view/storage/EntityViewJobManager.java +++ b/entity-view/storage/src/main/java/com/blazebit/job/view/storage/EntityViewJobManager.java @@ -150,11 +150,10 @@ public List> getJobInstancesToProcess(int partition, int partitio if (!(partitionKey instanceof EntityViewPartitionKey)) { throw new IllegalArgumentException("The given partition key does not implement EntityViewPartitionKey: " + partitionKey); } - Class> jobInstanceType = partitionKey.getJobInstanceType(); EntityViewPartitionKey entityViewPartitionKey = (EntityViewPartitionKey) partitionKey; Instant now = clock.instant(); CriteriaBuilder criteriaBuilder = createCriteriaBuilder(now, partition, partitionCount, entityViewPartitionKey, ids); - CriteriaBuilder> cb = entityViewManager.applySetting(EntityViewSetting.create((Class>) jobInstanceType), criteriaBuilder); + CriteriaBuilder> cb = entityViewManager.applySetting(EntityViewSetting.create((Class>) entityViewPartitionKey.getEntityView()), criteriaBuilder); List> jobInstances = cb.getQuery() .setHint("org.hibernate.lockMode.e", "UPGRADE_SKIPLOCKED") .setMaxResults(limit) @@ -163,6 +162,31 @@ public List> getJobInstancesToProcess(int partition, int partitio return jobInstances; } + @Override + public List> getRunningJobInstances(int partition, int partitionCount, PartitionKey partitionKey) { + if (!(partitionKey instanceof EntityViewPartitionKey)) { + throw new IllegalArgumentException("The given partition key does not implement EntityViewPartitionKey: " + partitionKey); + } + EntityViewPartitionKey entityViewPartitionKey = (EntityViewPartitionKey) partitionKey; + String partitionPredicate = entityViewPartitionKey.getPartitionPredicate("e"); + String partitionKeyAttributeName = entityViewPartitionKey.getPartitionKeyAttributeName(); + + CriteriaBuilder criteriaBuilder = criteriaBuilderFactory.create(entityManager, Object.class) + .from(entityViewPartitionKey.getEntityClass(), "e"); + if (!partitionPredicate.isEmpty()) { + criteriaBuilder.whereExpression(partitionPredicate); + } + if (partitionCount > 1) { + criteriaBuilder.where("MOD(e." + partitionKeyAttributeName + ", " + partitionCount + ")").eqLiteral(partition); + } + criteriaBuilder.where(entityViewPartitionKey.getStateExpression("e")).eqLiteral(entityViewPartitionKey.getStateValueMappingFunction().apply(JobInstanceState.RUNNING)); + CriteriaBuilder> cb = entityViewManager.applySetting(EntityViewSetting.create((Class>) entityViewPartitionKey.getEntityView()), criteriaBuilder); + List> jobInstances = cb.getQuery() + .getResultList(); + + return jobInstances; + } + @Override public Instant getNextSchedule(int partition, int partitionCount, PartitionKey partitionKey, Set> jobInstancesToInclude) { List ids = new ArrayList<>(); diff --git a/entity-view/storage/src/main/java/com/blazebit/job/view/storage/EntityViewPartitionKeyProvider.java b/entity-view/storage/src/main/java/com/blazebit/job/view/storage/EntityViewPartitionKeyProvider.java index f32a491..34be359 100644 --- a/entity-view/storage/src/main/java/com/blazebit/job/view/storage/EntityViewPartitionKeyProvider.java +++ b/entity-view/storage/src/main/java/com/blazebit/job/view/storage/EntityViewPartitionKeyProvider.java @@ -20,23 +20,25 @@ import com.blazebit.job.JobInstance; import com.blazebit.job.JobInstanceState; import com.blazebit.job.JobTrigger; +import com.blazebit.job.Partition; import com.blazebit.job.PartitionKey; +import com.blazebit.job.Partitions; import com.blazebit.job.ServiceProvider; import com.blazebit.job.spi.PartitionKeyProvider; -import com.blazebit.job.view.model.AbstractJobInstance; import com.blazebit.job.view.model.EntityViewPartitionKey; import com.blazebit.persistence.CriteriaBuilderFactory; import com.blazebit.persistence.view.EntityViewManager; +import com.blazebit.persistence.view.metamodel.ManagedViewType; import com.blazebit.persistence.view.metamodel.ViewType; import javax.persistence.metamodel.EntityType; +import javax.persistence.metamodel.IdentifiableType; import javax.persistence.metamodel.Metamodel; -import java.lang.reflect.Modifier; -import java.util.ArrayList; +import java.lang.annotation.Annotation; import java.util.Collection; -import java.util.HashMap; -import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.TreeMap; import java.util.function.Function; /** @@ -103,8 +105,8 @@ public class EntityViewPartitionKeyProvider implements PartitionKeyProvider { */ public static final String JOB_INSTANCE_STATE_VALUE_MAPPING_FUNCTION_PROPERTY = "job.view.storage.job_instance_state_value_mapping_function"; - private final Collection jobTriggerPartitionKeys; - private final Collection jobInstancePartitionKeys; + private final Map jobTriggerPartitionKeys; + private final Map jobInstancePartitionKeys; /** * Creates a new partition key provider that makes use of the service provider and configuration source to determine the {@link EntityViewManager} and attribute names. @@ -152,144 +154,280 @@ public EntityViewPartitionKeyProvider(CriteriaBuilderFactory criteriaBuilderFact if (entityViewManager == null) { throw new JobException("No entity view manager given!"); } - Collection jobTriggerPartitionKeys = new ArrayList<>(); - Collection jobInstancePartitionKeys = new ArrayList<>(); - Map, List>> entitySubtypeMap = new HashMap<>(); + Map jobTriggerPartitionKeys = new TreeMap<>(); + Map jobInstancePartitionKeys = new TreeMap<>(); Metamodel metamodel = criteriaBuilderFactory.getService(Metamodel.class); - for (EntityType entity : metamodel.getEntities()) { - Class javaType = entity.getJavaType(); - // We only query non-abstract entity types - if (javaType != null && !Modifier.isAbstract(javaType.getModifiers())) { - if (JobTrigger.class.isAssignableFrom(javaType) || JobInstance.class.isAssignableFrom(javaType)) { - List> subtypes = new ArrayList<>(); - entitySubtypeMap.put(entity, subtypes); - while (entity.getSupertype() instanceof EntityType) { - EntityType supertype = (EntityType) entity.getSupertype(); - Class supertypeJavaType = supertype.getJavaType(); - if (supertypeJavaType != null && !Modifier.isAbstract(supertypeJavaType.getModifiers())) { - List> superSubtypes = entitySubtypeMap.compute(supertype, (e, list) -> list == null ? new ArrayList<>() : list); - superSubtypes.add(entity); - if (subtypes != null) { - superSubtypes.addAll(subtypes); - } - entity = supertype; - subtypes = entitySubtypeMap.get(entity); - } else { - entity = supertype; - if (subtypes.isEmpty()) { - subtypes = entitySubtypeMap.get(entity); - } else { - // We propagate all subtypes up to non-abstract supertypes - subtypes = new ArrayList<>(subtypes); - subtypes.addAll(entitySubtypeMap.get(entity)); - } - } + StringBuilder errors = new StringBuilder(); + for (ViewType viewType : entityViewManager.getMetamodel().getViews()) { + Class> viewJavaType = (Class>) viewType.getJavaType(); + if (JobInstance.class.isAssignableFrom(viewJavaType)) { + Class> entityClass = (Class>) viewType.getEntityClass(); + String inheritanceMapping = viewType.getInheritanceMapping(); + Function partitionKeyPredicateProvider; + if (inheritanceMapping == null) { + Set> inheritanceSubtypes = (Set>) viewType.getInheritanceSubtypes(); + // An inheritance enabled base view is not considered as partition key, only the subtypes + if (inheritanceSubtypes.size() > 1 || !inheritanceSubtypes.contains(viewType)) { + continue; } - } - } - } - - Map, List>> eligibleViewTypes = new HashMap<>(); - List> possibleDefaultViewTypes = new ArrayList<>(); - - for (ViewType view : entityViewManager.getMetamodel().getViews()) { - if (view.isUpdatable() && AbstractJobInstance.class.isAssignableFrom(view.getJavaType())) { - if (entitySubtypeMap.containsKey(metamodel.entity(view.getEntityClass()))) { - eligibleViewTypes.computeIfAbsent(view.getEntityClass(), k -> new ArrayList<>()).add(view); + partitionKeyPredicateProvider = null; } else { - possibleDefaultViewTypes.add(view); + String[] parts = inheritanceMapping.split("this"); + partitionKeyPredicateProvider = new PartsRenderingFunction(parts); } - } - } - List errors = new ArrayList<>(); - for (Map.Entry, List>> entry : entitySubtypeMap.entrySet()) { - EntityType entity = entry.getKey(); - Class entityJavaType = entity.getJavaType(); - Function partitionKeyPredicateProvider; - ViewType viewType; - List> viewTypes = eligibleViewTypes.get(entityJavaType); - if (viewTypes == null || viewTypes.size() > 1) { - if (possibleDefaultViewTypes.size() == 0) { - if (viewTypes != null && viewTypes.size() > 1) { - errors.add("Could not determine the entity view for the job instance entity type '" + entityJavaType.getName() + "' because there are multiple no possible default view types but multiple possible view types: " + viewTypes); - } else { - errors.add("Could not determine the entity view for the job instance entity type '" + entityJavaType.getName() + "' because there are multiple no possible default view types!"); - } - continue; - } - if (possibleDefaultViewTypes.size() > 1) { - if (viewTypes != null && viewTypes.size() > 1) { - errors.add("Could not determine the entity view for the job instance entity type '" + entityJavaType.getName() + "' because there are multiple possible default view types: " + possibleDefaultViewTypes + " and multiple possible view types: " + viewTypes); + Partition[] partitions; + Partitions annotation = viewJavaType.getAnnotation(Partitions.class); + if (annotation == null) { + Partition partition = viewJavaType.getAnnotation(Partition.class); + if (partition == null) { + partitions = PartitionLiteral.INSTANCE; } else { - errors.add("Could not determine the entity view for the job instance entity type '" + entityJavaType.getName() + "' because there are multiple possible default view types: " + possibleDefaultViewTypes); + partitions = new Partition[]{ partition }; } - continue; - } - viewType = possibleDefaultViewTypes.get(0); - partitionKeyPredicateProvider = alias -> "TYPE(" + alias + ") = " + entity.getName(); - } else { - viewType = viewTypes.get(0); - if (entry.getValue().isEmpty()) { - partitionKeyPredicateProvider = null; } else { - partitionKeyPredicateProvider = alias -> "TYPE(" + alias + ") = " + entity.getName(); + partitions = annotation.value(); } - } - if (JobTrigger.class.isAssignableFrom(entityJavaType)) { - jobTriggerPartitionKeys.add( - EntityViewPartitionKey.builder() - .withName(entity.getName()) - .withEntityClass(entityJavaType) - .withJobInstanceType((Class>) viewType.getJavaType()) - .withPartitionPredicateProvider(partitionKeyPredicateProvider) - .withIdAttributeName(jobTriggerIdAttributeName) - .withScheduleAttributeName(jobTriggerScheduleAttributeName) - .withLastExecutionAttributeName(jobTriggerLastExecutionAttributeName) - .withPartitionKeyAttributeName(jobTriggerIdAttributeName) - .withStateAttributeName(jobTriggerStateAttributeName) - .withStateValueMappingFunction(jobTriggerStateValueMapper) - .build() - ); - } else if (JobInstance.class.isAssignableFrom(entityJavaType)) { - jobInstancePartitionKeys.add( - EntityViewPartitionKey.builder() - .withName(entity.getName()) - .withEntityClass(entityJavaType) - .withJobInstanceType((Class>) viewType.getJavaType()) - .withPartitionPredicateProvider(partitionKeyPredicateProvider) - .withIdAttributeName(jobInstanceIdAttributeName) - .withScheduleAttributeName(jobInstanceScheduleAttributeName) - .withLastExecutionAttributeName(jobInstanceLastExecutionAttributeName) - .withPartitionKeyAttributeName(jobInstancePartitionKeyAttributeName) - .withStateAttributeName(jobInstanceStateAttributeName) - .withStateValueMappingFunction(jobInstanceStateValueMapper) - .build() - ); + for (Partition partition : partitions) { + String partitionNameBase = viewJavaType.getName(); + if (!partition.name().isEmpty()) { + partitionNameBase = partition.name(); + } + int processCount = partition.processCount(); + int transactionTimeoutMillis = partition.transactionTimeoutMillis(); + int temporaryErrorBackoffSeconds = partition.temporaryErrorBackoffSeconds(); + int rateLimitBackoffSeconds = partition.rateLimitBackoffSeconds(); + + for (int i = 0; i < partition.partitionCount(); i++) { + String partitionName; + if (partition.partitionCount() > 1) { + partitionName = partitionNameBase + "-" + i; + } else { + partitionName = partitionNameBase; + } + EntityViewPartitionKey existingPartitionKey; + if (JobTrigger.class.isAssignableFrom(entityClass)) { + existingPartitionKey = (EntityViewPartitionKey) jobTriggerPartitionKeys.get(partitionName); + } else { + existingPartitionKey = (EntityViewPartitionKey) jobInstancePartitionKeys.get(partitionName); + } + if (existingPartitionKey != null) { + processCount = Math.max(processCount, existingPartitionKey.getProcessCount()); + transactionTimeoutMillis = Math.max(transactionTimeoutMillis, existingPartitionKey.getTransactionTimeoutMillis()); + temporaryErrorBackoffSeconds = Math.max(temporaryErrorBackoffSeconds, existingPartitionKey.getTemporaryErrorBackoffSeconds()); + rateLimitBackoffSeconds = Math.max(rateLimitBackoffSeconds, existingPartitionKey.getRateLimitBackoffSeconds()); + EntityType entity = metamodel.entity(entityClass); + EntityType existingEntity = metamodel.entity(existingPartitionKey.getEntityClass()); + entityClass = (Class>) getCommonSuperclass(entity, existingEntity); + if (entityClass == null) { + errors.append("\n * The entity view type ").append(existingPartitionKey.getEntityView().getName()).append(" and ").append(viewJavaType.getName()).append(" use the same partition name '").append(partitionName).append("' but have no common entity super type which is necessary for querying!"); + continue; + } + if (existingPartitionKey.getEntityView() != viewJavaType) { + errors.append("\n * The entity view type ").append(existingPartitionKey.getEntityView().getName()).append(" and ").append(viewJavaType.getName()).append(" use the same partition name '").append(partitionName).append("' which is disallowed!"); + continue; + } + if (partitionKeyPredicateProvider == null) { + partitionKeyPredicateProvider = existingPartitionKey::getPartitionPredicate; + } else { + String existingPredicate = existingPartitionKey.getPartitionPredicate("e"); + if (existingPredicate != null) { + if (!existingPredicate.contains(partitionKeyPredicateProvider.apply("e"))) { + Function oldPartitionKeyPredicateProvider = partitionKeyPredicateProvider; + partitionKeyPredicateProvider = alias -> oldPartitionKeyPredicateProvider.apply(alias) + " OR " + existingPartitionKey.getPartitionPredicate(alias); + } + } + } + if (!partition.predicate().isEmpty()) { + String[] parts = partition.predicate().replace("{partition}", "" + i).split("\\{alias}"); + PartsRenderingFunction additionalPartitionKeyPredicateProvider = new PartsRenderingFunction(parts); + + String existingPredicate = partitionKeyPredicateProvider.apply("e"); + if (existingPredicate != null && !existingPredicate.isEmpty()) { + if (!existingPredicate.contains(partitionKeyPredicateProvider.apply("e"))) { + Function oldPartitionKeyPredicateProvider = partitionKeyPredicateProvider; + partitionKeyPredicateProvider = alias -> "(" + oldPartitionKeyPredicateProvider.apply(alias) + ") AND " + additionalPartitionKeyPredicateProvider.apply(alias); + } + } else { + partitionKeyPredicateProvider = additionalPartitionKeyPredicateProvider; + } + } + } + + if (JobTrigger.class.isAssignableFrom(viewJavaType)) { + jobTriggerPartitionKeys.put(partitionName, + EntityViewPartitionKey.builder() + .withName(viewJavaType.getName()) + .withProcessCount(processCount) + .withTransactionTimeoutMillis(transactionTimeoutMillis) + .withTemporaryErrorBackoffSeconds(temporaryErrorBackoffSeconds) + .withRateLimitBackoffSeconds(rateLimitBackoffSeconds) + .withEntityClass(entityClass) + .withEntityView(viewJavaType) + .withJobInstanceType(viewJavaType) + .withPartitionPredicateProvider(partitionKeyPredicateProvider) + .withIdAttributeName(jobTriggerIdAttributeName) + .withScheduleAttributeName(jobTriggerScheduleAttributeName) + .withLastExecutionAttributeName(jobTriggerLastExecutionAttributeName) + .withPartitionKeyAttributeName(jobTriggerIdAttributeName) + .withStateAttributeName(jobTriggerStateAttributeName) + .withStateValueMappingFunction(jobTriggerStateValueMapper) + .build() + ); + } else { + jobInstancePartitionKeys.put(partitionName, + EntityViewPartitionKey.builder() + .withName(viewJavaType.getName()) + .withProcessCount(processCount) + .withTransactionTimeoutMillis(transactionTimeoutMillis) + .withTemporaryErrorBackoffSeconds(temporaryErrorBackoffSeconds) + .withRateLimitBackoffSeconds(rateLimitBackoffSeconds) + .withEntityClass(entityClass) + .withEntityView(viewJavaType) + .withJobInstanceType(viewJavaType) + .withPartitionPredicateProvider(partitionKeyPredicateProvider) + .withIdAttributeName(jobInstanceIdAttributeName) + .withScheduleAttributeName(jobInstanceScheduleAttributeName) + .withLastExecutionAttributeName(jobInstanceLastExecutionAttributeName) + .withPartitionKeyAttributeName(jobInstancePartitionKeyAttributeName) + .withStateAttributeName(jobInstanceStateAttributeName) + .withStateValueMappingFunction(jobInstanceStateValueMapper) + .build() + ); + } + } + } } } - if (!errors.isEmpty()) { - StringBuilder sb = new StringBuilder(); - sb.append("There were errors while determining the partition keys for the entity view model:"); - for (String error : errors) { - sb.append("\n\t- ").append(error); - } - - throw new JobException(sb.toString()); + if (errors.length() != 0) { + errors.insert(0, "There are errors in the job instance partition configuration:"); + throw new JobException(errors.toString()); } + this.jobTriggerPartitionKeys = jobTriggerPartitionKeys; this.jobInstancePartitionKeys = jobInstancePartitionKeys; } + private static Class getCommonSuperclass(IdentifiableType t1, IdentifiableType t2) { + Class class1 = t1.getJavaType(); + if (t1 == t2) { + return class1; + } + Class class2 = t2.getJavaType(); + if (class2.isAssignableFrom(class1)) { + return class2; + } else if (class1.isAssignableFrom(class2)) { + return class1; + } else { + Class c = null; + if (t1.getSupertype() != null) { + c = getCommonSuperclass(t1.getSupertype(), t2); + } + if (c == null && t2.getSupertype() != null) { + c = getCommonSuperclass(t1, t2.getSupertype()); + } + return c; + } + } + @Override public Collection getDefaultTriggerPartitionKeys() { - return jobTriggerPartitionKeys; + return jobTriggerPartitionKeys.values(); } @Override public Collection getDefaultJobInstancePartitionKeys() { - return jobInstancePartitionKeys; + return jobInstancePartitionKeys.values(); + } + + /** + * A partition literal. + * + * @author Christian Beikov + * @since 1.0.0 + */ + private static class PartitionLiteral implements Partition { + + public static final Partition[] INSTANCE = new Partition[]{ new PartitionLiteral() }; + + private PartitionLiteral() { + } + + @Override + public String name() { + return ""; + } + + @Override + public int processCount() { + return 1; + } + + @Override + public String predicate() { + return ""; + } + + @Override + public int partitionCount() { + return 1; + } + + @Override + public int transactionTimeoutMillis() { + return -1; + } + + @Override + public int temporaryErrorBackoffSeconds() { + return -1; + } + + @Override + public int rateLimitBackoffSeconds() { + return -1; + } + + @Override + public Class annotationType() { + return Partition.class; + } + } + + /** + * A part rendering function. + * + * @author Christian Beikov + * @since 1.0.0 + */ + private static class PartsRenderingFunction implements Function { + + private final String[] parts; + + /** + * Creates the rendering function. + * + * @param parts The parts + */ + public PartsRenderingFunction(String[] parts) { + this.parts = parts; + } + + @Override + public String apply(String alias) { + int length = parts.length; + if (length == 1) { + return parts[0]; + } else { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < length - 1; i++) { + sb.append(parts[i]).append(alias); + } + sb.append(parts[length - 1]); + return sb.toString(); + } + } } } diff --git a/jpa/model/src/main/java/com/blazebit/job/jpa/model/AbstractJobInstance.java b/jpa/model/src/main/java/com/blazebit/job/jpa/model/AbstractJobInstance.java index 04a13ff..cc1c9c7 100644 --- a/jpa/model/src/main/java/com/blazebit/job/jpa/model/AbstractJobInstance.java +++ b/jpa/model/src/main/java/com/blazebit/job/jpa/model/AbstractJobInstance.java @@ -78,6 +78,11 @@ public void markDropped(JobInstanceProcessingContext jobProcessingContext) { setState(JobInstanceState.DROPPED); } + @Override + public void markRunning(JobInstanceProcessingContext processingContext) { + setState(JobInstanceState.RUNNING); + } + @Override public void markDone(JobInstanceProcessingContext jobProcessingContext, Object result) { setState(JobInstanceState.DONE); diff --git a/jpa/model/src/main/java/com/blazebit/job/jpa/model/JpaPartitionKey.java b/jpa/model/src/main/java/com/blazebit/job/jpa/model/JpaPartitionKey.java index b94f11e..f4663e9 100644 --- a/jpa/model/src/main/java/com/blazebit/job/jpa/model/JpaPartitionKey.java +++ b/jpa/model/src/main/java/com/blazebit/job/jpa/model/JpaPartitionKey.java @@ -22,7 +22,9 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.function.Function; /** @@ -33,9 +35,16 @@ */ public interface JpaPartitionKey extends PartitionKey { + /** + * Returns the entity class to use for fetching jobs with this partition key. + * + * @return the entity class + */ + Class> getEntityClass(); + /** * A JPQL predicate for filtering for this partition. - * The job alias refers to an entity of the type as given in {@link #getJobInstanceType()}. + * The job alias refers to an entity of the type as given in {@link #getEntityClass()}. * * @param jobAlias The FROM clause alias for the job * @return The partition JPQL predicate or an empty string @@ -43,28 +52,28 @@ public interface JpaPartitionKey extends PartitionKey { String getPartitionPredicate(String jobAlias); /** - * Returns the attribute name of the identifier attribute of the entity type as given in {@link #getJobInstanceType()}. + * Returns the attribute name of the identifier attribute of the entity type as given in {@link #getEntityClass()}. * * @return the attribute name of the identifier attribute */ String getIdAttributeName(); /** - * Returns the attribute name of the schedule attribute of the entity type as given in {@link #getJobInstanceType()}. + * Returns the attribute name of the schedule attribute of the entity type as given in {@link #getEntityClass()}. * * @return the attribute name of the schedule attribute */ String getScheduleAttributeName(); /** - * Returns the attribute name of the last execution attribute of the entity type as given in {@link #getJobInstanceType()}. + * Returns the attribute name of the last execution attribute of the entity type as given in {@link #getEntityClass()}. * * @return the attribute name of the last execution attribute */ String getLastExecutionAttributeName(); /** - * Returns the attribute name of the partition key attribute of the entity type as given in {@link #getJobInstanceType()}. + * Returns the attribute name of the partition key attribute of the entity type as given in {@link #getEntityClass()}. * * @return the attribute name of the partition key attribute */ @@ -122,6 +131,22 @@ interface JpaPartitionKeyBuilder { */ JpaPartitionKeyBuilder withName(String name); + /** + * Sets the number of jobs to schedule in parallel within one scheduler transaction. + * + * @param processCount The number of jobs to process + * @return this for chaining + */ + JpaPartitionKeyBuilder withProcessCount(int processCount); + + /** + * Sets the given class as entity class. + * + * @param entityClass The entity class + * @return this for chaining + */ + JpaPartitionKeyBuilder withEntityClass(Class> entityClass); + /** * Sets the given job instance type. * @@ -130,6 +155,30 @@ interface JpaPartitionKeyBuilder { */ JpaPartitionKeyBuilder withJobInstanceType(Class> jobInstanceType); + /** + * Sets the given transaction timeout. + * + * @param transactionTimeoutMillis The job id attribute name + * @return this for chaining + */ + JpaPartitionKeyBuilder withTransactionTimeoutMillis(int transactionTimeoutMillis); + + /** + * Sets the given temporary error backoff. + * + * @param temporaryErrorBackoffSeconds The job id attribute name + * @return this for chaining + */ + JpaPartitionKeyBuilder withTemporaryErrorBackoffSeconds(int temporaryErrorBackoffSeconds); + + /** + * Sets the given rate limit backoff. + * + * @param rateLimitBackoffSeconds The job id attribute name + * @return this for chaining + */ + JpaPartitionKeyBuilder withRateLimitBackoffSeconds(int rateLimitBackoffSeconds); + /** * Sets the given partition predicate provider. * @@ -210,7 +259,12 @@ interface JpaPartitionKeyBuilder { static JpaPartitionKeyBuilder builder() { return new JpaPartitionKeyBuilder() { String name0; - Class> jobInstanceType0; + int processCount0 = 1; + int transactionTimeoutMillis0 = -1; + int temporaryErrorBackoffSeconds0 = -1; + int rateLimitBackoffSeconds0 = -1; + Class> entityClass0; + Set>> jobInstanceType0 = new HashSet<>(); Function partitionPredicateProvider0; String idAttributeName0; String scheduleAttributeName0; @@ -226,9 +280,39 @@ public JpaPartitionKeyBuilder withName(String name) { return this; } + @Override + public JpaPartitionKeyBuilder withProcessCount(int processCount) { + this.processCount0 = processCount; + return this; + } + + @Override + public JpaPartitionKeyBuilder withTransactionTimeoutMillis(int transactionTimeoutMillis) { + this.transactionTimeoutMillis0 = transactionTimeoutMillis; + return this; + } + + @Override + public JpaPartitionKeyBuilder withTemporaryErrorBackoffSeconds(int temporaryErrorBackoffSeconds) { + this.temporaryErrorBackoffSeconds0 = temporaryErrorBackoffSeconds; + return this; + } + + @Override + public JpaPartitionKeyBuilder withRateLimitBackoffSeconds(int rateLimitBackoffSeconds) { + this.rateLimitBackoffSeconds0 = rateLimitBackoffSeconds; + return this; + } + + @Override + public JpaPartitionKeyBuilder withEntityClass(Class> entityClass) { + this.entityClass0 = entityClass; + return this; + } + @Override public JpaPartitionKeyBuilder withJobInstanceType(Class> jobInstanceType) { - this.jobInstanceType0 = jobInstanceType; + this.jobInstanceType0.add(jobInstanceType); return this; } @@ -284,7 +368,12 @@ public JpaPartitionKeyBuilder withJoinFetches(String... fetches) { public JpaPartitionKey build() { return new JpaPartitionKey() { private final String name = name0; - private final Class> jobInstanceType = jobInstanceType0; + private final int processCount = processCount0; + private final Class> entityClass = entityClass0; + private final int transactionTimeoutMillis = transactionTimeoutMillis0; + private final int temporaryErrorBackoffSeconds = temporaryErrorBackoffSeconds0; + private final int rateLimitBackoffSeconds = rateLimitBackoffSeconds0; + private final Set>> jobInstanceType = new HashSet<>(jobInstanceType0); private final Function partitionPredicateProvider = partitionPredicateProvider0; private final String idAttributeName = idAttributeName0; private final String scheduleAttributeName = scheduleAttributeName0; @@ -295,10 +384,40 @@ public JpaPartitionKey build() { private final String[] fetches = fetches0.toArray(new String[fetches0.size()]); @Override - public Class> getJobInstanceType() { + public String getName() { + return name; + } + + @Override + public int getProcessCount() { + return processCount; + } + + @Override + public Class> getEntityClass() { + return entityClass; + } + + @Override + public Set>> getJobInstanceTypes() { return jobInstanceType; } + @Override + public int getTransactionTimeoutMillis() { + return transactionTimeoutMillis; + } + + @Override + public int getTemporaryErrorBackoffSeconds() { + return temporaryErrorBackoffSeconds; + } + + @Override + public int getRateLimitBackoffSeconds() { + return rateLimitBackoffSeconds; + } + @Override public String getPartitionPredicate(String jobAlias) { return partitionPredicateProvider == null ? "" : partitionPredicateProvider.apply(jobAlias); diff --git a/jpa/storage/src/main/java/com/blazebit/job/jpa/storage/JpaJobManager.java b/jpa/storage/src/main/java/com/blazebit/job/jpa/storage/JpaJobManager.java index d3dd5e6..c1c8229 100644 --- a/jpa/storage/src/main/java/com/blazebit/job/jpa/storage/JpaJobManager.java +++ b/jpa/storage/src/main/java/com/blazebit/job/jpa/storage/JpaJobManager.java @@ -269,7 +269,6 @@ public List> getJobInstancesToProcess(int partition, int partitio if (!(partitionKey instanceof JpaPartitionKey)) { throw new IllegalArgumentException("The given partition key does not implement JpaPartitionKey: " + partitionKey); } - Class> jobInstanceType = partitionKey.getJobInstanceType(); JpaPartitionKey jpaPartitionKey = (JpaPartitionKey) partitionKey; String partitionPredicate = jpaPartitionKey.getPartitionPredicate("e"); String idAttributeName = jpaPartitionKey.getIdAttributeName(); @@ -280,7 +279,7 @@ public List> getJobInstancesToProcess(int partition, int partitio String joinFetches = jpaPartitionKey.getJoinFetches("e"); Instant now = clock.instant(); TypedQuery> typedQuery = entityManager.createQuery( - "SELECT e FROM " + jobInstanceType.getName() + " e " + + "SELECT e FROM " + jpaPartitionKey.getEntityClass().getName() + " e " + joinFetches + " " + "WHERE e." + scheduleAttributeName + " <= :now " + (partitionPredicate.isEmpty() ? "" : "AND " + partitionPredicate + " ") + @@ -288,7 +287,7 @@ public List> getJobInstancesToProcess(int partition, int partitio (statePredicate == null || statePredicate.isEmpty() ? "" : "AND " + statePredicate + " ") + (ids.isEmpty() ? "" : "AND e." + idAttributeName + " IN :ids ") + "ORDER BY e." + scheduleAttributeName + " ASC, e." + idAttributeName + " ASC", - jobInstanceType + jpaPartitionKey.getEntityClass() ); typedQuery.setParameter("now", now); if (stateValueMappingFunction != null) { @@ -313,6 +312,36 @@ public List> getJobInstancesToProcess(int partition, int partitio return jobInstances; } + @Override + public List> getRunningJobInstances(int partition, int partitionCount, PartitionKey partitionKey) { + if (!(partitionKey instanceof JpaPartitionKey)) { + throw new IllegalArgumentException("The given partition key does not implement JpaPartitionKey: " + partitionKey); + } + JpaPartitionKey jpaPartitionKey = (JpaPartitionKey) partitionKey; + String partitionPredicate = jpaPartitionKey.getPartitionPredicate("e"); + String partitionKeyAttributeName = jpaPartitionKey.getPartitionKeyAttributeName(); + String statePredicate = jpaPartitionKey.getStatePredicate("e"); + Function stateValueMappingFunction = jpaPartitionKey.getStateValueMappingFunction(); + String joinFetches = jpaPartitionKey.getJoinFetches("e"); + TypedQuery> typedQuery = entityManager.createQuery( + "SELECT e FROM " + jpaPartitionKey.getEntityClass().getName() + " e " + + joinFetches + " " + + "WHERE " + statePredicate + + (partitionPredicate.isEmpty() ? "" : "AND " + partitionPredicate + " ") + + (partitionCount > 1 ? "AND MOD(e." + partitionKeyAttributeName + ", " + partitionCount + ") = " + partition + " " : ""), + jpaPartitionKey.getEntityClass() + ); + typedQuery.setParameter("readyState", stateValueMappingFunction.apply(JobInstanceState.RUNNING)); + List> jobInstances = (List>) (List) typedQuery.getResultList(); + + // Detach the job instances to avoid accidental flushes due to changes +// for (int i = 0; i < jobInstances.size(); i++) { +// entityManager.detach(jobInstances.get(i)); +// } + + return jobInstances; + } + @Override public Instant getNextSchedule(int partition, int partitionCount, PartitionKey partitionKey, Set> jobInstancesToInclude) { List ids = new ArrayList<>(); @@ -327,7 +356,6 @@ public Instant getNextSchedule(int partition, int partitionCount, PartitionKey p if (!(partitionKey instanceof JpaPartitionKey)) { throw new IllegalArgumentException("The given partition key does not implement JpaPartitionKey: " + partitionKey); } - Class> jobInstanceType = partitionKey.getJobInstanceType(); JpaPartitionKey jpaPartitionKey = (JpaPartitionKey) partitionKey; String partitionPredicate = jpaPartitionKey.getPartitionPredicate("e"); String idAttributeName = jpaPartitionKey.getIdAttributeName(); @@ -337,7 +365,7 @@ public Instant getNextSchedule(int partition, int partitionCount, PartitionKey p Function stateValueMappingFunction = jpaPartitionKey.getStateValueMappingFunction(); TypedQuery typedQuery = entityManager.createQuery( - "SELECT e." + scheduleAttributeName + " FROM " + jobInstanceType.getName() + " e " + + "SELECT e." + scheduleAttributeName + " FROM " + jpaPartitionKey.getEntityClass().getName() + " e " + "WHERE 1=1 " + (statePredicate == null || statePredicate.isEmpty() ? "" : "AND " + statePredicate + " ") + (partitionPredicate.isEmpty() ? "" : "AND " + partitionPredicate + " ") + @@ -394,7 +422,7 @@ public int removeJobInstances(Set states, Instant executionTim String stateExpression = jpaPartitionKey.getStateExpression("i"); if (stateExpression != null && !stateExpression.isEmpty() && !states.isEmpty()) { StringBuilder sb = new StringBuilder(); - sb.append("DELETE FROM ").append(partitionKey.getJobInstanceType().getName()).append(" i ") + sb.append("DELETE FROM ").append(jpaPartitionKey.getEntityClass().getName()).append(" i ") .append("WHERE ").append(stateExpression).append(" IN ("); int i = 0; int size = states.size(); diff --git a/jpa/storage/src/main/java/com/blazebit/job/jpa/storage/JpaPartitionKeyProvider.java b/jpa/storage/src/main/java/com/blazebit/job/jpa/storage/JpaPartitionKeyProvider.java index f8ee73d..d1fb6de 100644 --- a/jpa/storage/src/main/java/com/blazebit/job/jpa/storage/JpaPartitionKeyProvider.java +++ b/jpa/storage/src/main/java/com/blazebit/job/jpa/storage/JpaPartitionKeyProvider.java @@ -20,19 +20,25 @@ import com.blazebit.job.JobInstance; import com.blazebit.job.JobInstanceState; import com.blazebit.job.JobTrigger; +import com.blazebit.job.Partition; import com.blazebit.job.PartitionKey; +import com.blazebit.job.Partitions; import com.blazebit.job.ServiceProvider; import com.blazebit.job.jpa.model.JpaPartitionKey; import com.blazebit.job.spi.PartitionKeyProvider; import javax.persistence.EntityManager; import javax.persistence.metamodel.EntityType; +import javax.persistence.metamodel.IdentifiableType; +import javax.persistence.metamodel.Metamodel; +import java.lang.annotation.Annotation; import java.lang.reflect.Modifier; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.TreeMap; import java.util.function.Function; /** @@ -99,8 +105,8 @@ public class JpaPartitionKeyProvider implements PartitionKeyProvider { */ public static final String JOB_INSTANCE_STATE_VALUE_MAPPING_FUNCTION_PROPERTY = "job.jpa.storage.job_instance_state_value_mapping_function"; - private final Collection jobTriggerPartitionKeys; - private final Collection jobInstancePartitionKeys; + private final Map jobTriggerPartitionKeys; + private final Map jobInstancePartitionKeys; /** * Creates a new partition key provider that makes use of the service provider and configuration source to determine the {@link EntityManager} and attribute names. @@ -147,10 +153,11 @@ public JpaPartitionKeyProvider(EntityManager entityManager, String jobTriggerIdA throw new JobException("No entity manager given!"); } - Collection jobTriggerPartitionKeys = new ArrayList<>(); - Collection jobInstancePartitionKeys = new ArrayList<>(); + Map jobTriggerPartitionKeys = new TreeMap<>(); + Map jobInstancePartitionKeys = new TreeMap<>(); Map, List>> entitySubtypeMap = new HashMap<>(); - for (EntityType entity : entityManager.getMetamodel().getEntities()) { + Metamodel metamodel = entityManager.getMetamodel(); + for (EntityType entity : metamodel.getEntities()) { Class javaType = entity.getJavaType(); // We only query non-abstract entity types if (javaType != null && !Modifier.isAbstract(javaType.getModifiers())) { @@ -184,9 +191,10 @@ public JpaPartitionKeyProvider(EntityManager entityManager, String jobTriggerIdA } } + StringBuilder errors = new StringBuilder(); for (Map.Entry, List>> entry : entitySubtypeMap.entrySet()) { EntityType entity = entry.getKey(); - Class javaType = entity.getJavaType(); + Class> javaType = (Class>) entity.getJavaType(); Function partitionKeyPredicateProvider; if (entry.getValue().isEmpty()) { partitionKeyPredicateProvider = null; @@ -194,47 +202,251 @@ public JpaPartitionKeyProvider(EntityManager entityManager, String jobTriggerIdA partitionKeyPredicateProvider = alias -> "TYPE(" + alias + ") = " + entity.getName(); } - if (JobTrigger.class.isAssignableFrom(javaType)) { - jobTriggerPartitionKeys.add( - JpaPartitionKey.builder() - .withName(entity.getName()) - .withJobInstanceType((Class>) javaType) - .withPartitionPredicateProvider(partitionKeyPredicateProvider) - .withIdAttributeName(jobTriggerIdAttributeName) - .withScheduleAttributeName(jobTriggerScheduleAttributeName) - .withLastExecutionAttributeName(jobTriggerLastExecutionAttributeName) - .withPartitionKeyAttributeName(jobTriggerIdAttributeName) - .withStateAttributeName(jobTriggerStateAttributeName) - .withStateValueMappingFunction(jobTriggerStateValueMapper) - .build() - ); - } else if (JobInstance.class.isAssignableFrom(javaType)) { - jobInstancePartitionKeys.add( - JpaPartitionKey.builder() - .withName(entity.getName()) - .withJobInstanceType((Class>) javaType) - .withPartitionPredicateProvider(partitionKeyPredicateProvider) - .withIdAttributeName(jobInstanceIdAttributeName) - .withScheduleAttributeName(jobInstanceScheduleAttributeName) - .withLastExecutionAttributeName(jobInstanceLastExecutionAttributeName) - .withPartitionKeyAttributeName(jobInstancePartitionKeyAttributeName) - .withStateAttributeName(jobInstanceStateAttributeName) - .withStateValueMappingFunction(jobInstanceStateValueMapper) - .build() - ); + + Partition[] partitions; + Partitions annotation = javaType.getAnnotation(Partitions.class); + if (annotation == null) { + Partition partition = javaType.getAnnotation(Partition.class); + if (partition == null) { + partitions = PartitionLiteral.INSTANCE; + } else { + partitions = new Partition[]{ partition }; + } + } else { + partitions = annotation.value(); + } + + for (Partition partition : partitions) { + String partitionNameBase = javaType.getName(); + if (!partition.name().isEmpty()) { + partitionNameBase = partition.name(); + } + int processCount = partition.processCount(); + int transactionTimeoutMillis = partition.transactionTimeoutMillis(); + int temporaryErrorBackoffSeconds = partition.temporaryErrorBackoffSeconds(); + int rateLimitBackoffSeconds = partition.rateLimitBackoffSeconds(); + + for (int i = 0; i < partition.partitionCount(); i++) { + String partitionName; + if (partition.partitionCount() > 1) { + partitionName = partitionNameBase + "-" + i; + } else { + partitionName = partitionNameBase; + } + + JpaPartitionKey existingPartitionKey; + if (JobTrigger.class.isAssignableFrom(javaType)) { + existingPartitionKey = (JpaPartitionKey) jobTriggerPartitionKeys.get(partitionName); + } else { + existingPartitionKey = (JpaPartitionKey) jobInstancePartitionKeys.get(partitionName); + } + if (existingPartitionKey != null) { + processCount = Math.max(processCount, existingPartitionKey.getProcessCount()); + transactionTimeoutMillis = Math.max(transactionTimeoutMillis, existingPartitionKey.getTransactionTimeoutMillis()); + temporaryErrorBackoffSeconds = Math.max(temporaryErrorBackoffSeconds, existingPartitionKey.getTemporaryErrorBackoffSeconds()); + rateLimitBackoffSeconds = Math.max(rateLimitBackoffSeconds, existingPartitionKey.getRateLimitBackoffSeconds()); + EntityType existingEntity = metamodel.entity(existingPartitionKey.getEntityClass()); + javaType = (Class>) getCommonSuperclass(entity, existingEntity); + if (javaType == null) { + errors.append("\n * The entity type " + existingEntity.getName() + " and " + entity.getName() + " use the same partition name '" + partitionName + "' but have no common super type which is necessary for querying!"); + continue; + } + if (partitionKeyPredicateProvider == null) { + partitionKeyPredicateProvider = existingPartitionKey::getPartitionPredicate; + } else { + String existingPredicate = existingPartitionKey.getPartitionPredicate("e"); + if (existingPredicate != null) { + if (!existingPredicate.contains(partitionKeyPredicateProvider.apply("e"))) { + Function oldPartitionKeyPredicateProvider = partitionKeyPredicateProvider; + partitionKeyPredicateProvider = alias -> oldPartitionKeyPredicateProvider.apply(alias) + " OR " + existingPartitionKey.getPartitionPredicate(alias); + } + } + } + if (!partition.predicate().isEmpty()) { + String[] parts = partition.predicate().replace("{partition}", "" + i).split("\\{alias}"); + PartsRenderingFunction additionalPartitionKeyPredicateProvider = new PartsRenderingFunction(parts); + + String existingPredicate = partitionKeyPredicateProvider.apply("e"); + if (existingPredicate != null && !existingPredicate.isEmpty()) { + if (!existingPredicate.contains(partitionKeyPredicateProvider.apply("e"))) { + Function oldPartitionKeyPredicateProvider = partitionKeyPredicateProvider; + partitionKeyPredicateProvider = alias -> "(" + oldPartitionKeyPredicateProvider.apply(alias) + ") AND " + additionalPartitionKeyPredicateProvider.apply(alias); + } + } else { + partitionKeyPredicateProvider = additionalPartitionKeyPredicateProvider; + } + } + } + + if (JobTrigger.class.isAssignableFrom(javaType)) { + jobTriggerPartitionKeys.put(partitionName, + JpaPartitionKey.builder() + .withName(partitionName) + .withEntityClass(javaType) + .withProcessCount(processCount) + .withTransactionTimeoutMillis(transactionTimeoutMillis) + .withTemporaryErrorBackoffSeconds(temporaryErrorBackoffSeconds) + .withRateLimitBackoffSeconds(rateLimitBackoffSeconds) + .withJobInstanceType(javaType) + .withPartitionPredicateProvider(partitionKeyPredicateProvider) + .withIdAttributeName(jobTriggerIdAttributeName) + .withScheduleAttributeName(jobTriggerScheduleAttributeName) + .withLastExecutionAttributeName(jobTriggerLastExecutionAttributeName) + .withPartitionKeyAttributeName(jobTriggerIdAttributeName) + .withStateAttributeName(jobTriggerStateAttributeName) + .withStateValueMappingFunction(jobTriggerStateValueMapper) + .build() + ); + } else if (JobInstance.class.isAssignableFrom(javaType)) { + jobInstancePartitionKeys.put(partitionName, + JpaPartitionKey.builder() + .withName(partitionName) + .withEntityClass(javaType) + .withProcessCount(processCount) + .withTransactionTimeoutMillis(transactionTimeoutMillis) + .withTemporaryErrorBackoffSeconds(temporaryErrorBackoffSeconds) + .withRateLimitBackoffSeconds(rateLimitBackoffSeconds) + .withJobInstanceType(javaType) + .withPartitionPredicateProvider(partitionKeyPredicateProvider) + .withIdAttributeName(jobInstanceIdAttributeName) + .withScheduleAttributeName(jobInstanceScheduleAttributeName) + .withLastExecutionAttributeName(jobInstanceLastExecutionAttributeName) + .withPartitionKeyAttributeName(jobInstancePartitionKeyAttributeName) + .withStateAttributeName(jobInstanceStateAttributeName) + .withStateValueMappingFunction(jobInstanceStateValueMapper) + .build() + ); + } + } } } + if (errors.length() != 0) { + errors.insert(0, "There are errors in the job instance partition configuration:"); + throw new JobException(errors.toString()); + } + this.jobTriggerPartitionKeys = jobTriggerPartitionKeys; this.jobInstancePartitionKeys = jobInstancePartitionKeys; } + private static Class getCommonSuperclass(IdentifiableType t1, IdentifiableType t2) { + Class class1 = t1.getJavaType(); + if (t1 == t2) { + return class1; + } + Class class2 = t2.getJavaType(); + if (class2.isAssignableFrom(class1)) { + return class2; + } else if (class1.isAssignableFrom(class2)) { + return class1; + } else { + Class c = null; + if (t1.getSupertype() != null) { + c = getCommonSuperclass(t1.getSupertype(), t2); + } + if (c == null && t2.getSupertype() != null) { + c = getCommonSuperclass(t1, t2.getSupertype()); + } + return c; + } + } + @Override public Collection getDefaultTriggerPartitionKeys() { - return jobTriggerPartitionKeys; + return jobTriggerPartitionKeys.values(); } @Override public Collection getDefaultJobInstancePartitionKeys() { - return jobInstancePartitionKeys; + return jobInstancePartitionKeys.values(); + } + + /** + * A partition literal. + * + * @author Christian Beikov + * @since 1.0.0 + */ + private static class PartitionLiteral implements Partition { + + public static final Partition[] INSTANCE = new Partition[]{ new PartitionLiteral() }; + + private PartitionLiteral() { + } + + @Override + public String name() { + return ""; + } + + @Override + public int processCount() { + return 1; + } + + @Override + public String predicate() { + return ""; + } + + @Override + public int partitionCount() { + return 1; + } + + @Override + public int transactionTimeoutMillis() { + return -1; + } + + @Override + public int temporaryErrorBackoffSeconds() { + return -1; + } + + @Override + public int rateLimitBackoffSeconds() { + return -1; + } + + @Override + public Class annotationType() { + return Partition.class; + } + } + + /** + * A part rendering function. + * + * @author Christian Beikov + * @since 1.0.0 + */ + private static class PartsRenderingFunction implements Function { + + private final String[] parts; + + /** + * Creates the rendering function. + * + * @param parts The parts + */ + public PartsRenderingFunction(String[] parts) { + this.parts = parts; + } + + @Override + public String apply(String alias) { + int length = parts.length; + if (length == 1) { + return parts[0]; + } else { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < length - 1; i++) { + sb.append(parts[i]).append(alias); + } + sb.append(parts[length - 1]); + return sb.toString(); + } + } } } diff --git a/memory/model/src/main/java/com/blazebit/job/memory/model/AbstractJobInstance.java b/memory/model/src/main/java/com/blazebit/job/memory/model/AbstractJobInstance.java index 6c1d77d..b505a72 100644 --- a/memory/model/src/main/java/com/blazebit/job/memory/model/AbstractJobInstance.java +++ b/memory/model/src/main/java/com/blazebit/job/memory/model/AbstractJobInstance.java @@ -71,6 +71,11 @@ public void markDropped(JobInstanceProcessingContext jobProcessingContext) { setState(JobInstanceState.DROPPED); } + @Override + public void markRunning(JobInstanceProcessingContext processingContext) { + setState(JobInstanceState.RUNNING); + } + @Override public void markDone(JobInstanceProcessingContext jobProcessingContext, Object result) { setState(JobInstanceState.DONE); diff --git a/memory/storage/src/main/java/com/blazebit/job/memory/storage/MemoryJobManager.java b/memory/storage/src/main/java/com/blazebit/job/memory/storage/MemoryJobManager.java index 2d103a2..046625a 100644 --- a/memory/storage/src/main/java/com/blazebit/job/memory/storage/MemoryJobManager.java +++ b/memory/storage/src/main/java/com/blazebit/job/memory/storage/MemoryJobManager.java @@ -359,6 +359,17 @@ public List> getJobInstancesToProcess(int partition, int partitio } } + @Override + public List> getRunningJobInstances(int partition, int partitionCount, PartitionKey partitionKey) { + return jobInstances.values().stream() + .filter(i -> i.getState() == JobInstanceState.RUNNING + && i.getScheduleTime().toEpochMilli() <= clock.millis() + && (partitionCount == 1 || (i.getPartitionKey() % partitionCount) == partition) + && partitionKey.matches(i) + ) + .collect(Collectors.toList()); + } + private Stream> streamJobInstances(int partition, int partitionCount, PartitionKey partitionKey) { return jobInstances.values().stream() .filter(i -> i.getState() == JobInstanceState.NEW diff --git a/memory/storage/src/main/java/com/blazebit/job/memory/storage/MemoryPartitionKeyProvider.java b/memory/storage/src/main/java/com/blazebit/job/memory/storage/MemoryPartitionKeyProvider.java index 04d6b62..eae50ec 100644 --- a/memory/storage/src/main/java/com/blazebit/job/memory/storage/MemoryPartitionKeyProvider.java +++ b/memory/storage/src/main/java/com/blazebit/job/memory/storage/MemoryPartitionKeyProvider.java @@ -26,6 +26,7 @@ import java.util.Collection; import java.util.Collections; +import java.util.Set; /** * A factory as well as {@link PartitionKeyProvider} implementation providing static partition keys. @@ -37,6 +38,11 @@ public class MemoryPartitionKeyProvider implements PartitionKeyProvider, PartitionKeyProviderFactory { private static final Collection NON_TRIGGER = Collections.singletonList(new PartitionKey() { + @Override + public String getName() { + return "jobInstance"; + } + @Override public boolean matches(JobInstance jobInstance) { return !(jobInstance instanceof JobTrigger); @@ -44,13 +50,19 @@ public boolean matches(JobInstance jobInstance) { @Override public String toString() { - return "jobInstance"; + return getName(); } }); private static final Collection TRIGGER_ONLY = Collections.singletonList(new PartitionKey() { + @Override - public Class> getJobInstanceType() { - return JobTrigger.class; + public String getName() { + return "jobTrigger"; + } + + @Override + public Set>> getJobInstanceTypes() { + return Collections.singleton(JobTrigger.class); } @Override @@ -60,7 +72,7 @@ public boolean matches(JobInstance jobInstance) { @Override public String toString() { - return "jobTrigger"; + return getName(); } }); diff --git a/parent/pom.xml b/parent/pom.xml index bd87963..4f1d6a9 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -18,8 +18,8 @@ Bohrium 0.1.21 - 1.4.1 - 1.0.0-Alpha3 + 1.5.0-Alpha3 + 1.0.0-Alpha4 2.4.5.Final 5.0.7.RELEASE 4.12 diff --git a/testsuite/src/main/java/com/blazebit/job/testsuite/SimpleJobInstance.java b/testsuite/src/main/java/com/blazebit/job/testsuite/SimpleJobInstance.java index 094ea1e..e8ba451 100644 --- a/testsuite/src/main/java/com/blazebit/job/testsuite/SimpleJobInstance.java +++ b/testsuite/src/main/java/com/blazebit/job/testsuite/SimpleJobInstance.java @@ -25,10 +25,21 @@ public class SimpleJobInstance extends AbstractJobInstance { private JobConfiguration jobConfiguration = new JobConfiguration(); + private boolean longRunning; public SimpleJobInstance() { + this(false); + } + + public SimpleJobInstance(boolean longRunning) { setCreationTime(Instant.now()); setScheduleTime(getCreationTime()); + this.longRunning = longRunning; + } + + @Override + public boolean isLongRunning() { + return longRunning; } @Override diff --git a/testsuite/src/test/java/com/blazebit/job/testsuite/JobInstanceTest.java b/testsuite/src/test/java/com/blazebit/job/testsuite/JobInstanceTest.java index b2d89ac..8ae51d7 100644 --- a/testsuite/src/test/java/com/blazebit/job/testsuite/JobInstanceTest.java +++ b/testsuite/src/test/java/com/blazebit/job/testsuite/JobInstanceTest.java @@ -19,6 +19,7 @@ import com.blazebit.actor.spi.ClusterNodeInfo; import com.blazebit.actor.spi.ClusterStateListener; import com.blazebit.actor.spi.ClusterStateManager; +import com.blazebit.actor.spi.LockService; import com.blazebit.actor.spi.StateReturningEvent; import com.blazebit.job.JobContext; import com.blazebit.job.JobInstanceProcessingContext; @@ -46,15 +47,20 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; +import java.util.function.Function; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class JobInstanceTest extends AbstractJobTest { @@ -447,10 +453,108 @@ public void testChangeClusterWhileIdleWorkStealing() throws Exception { assertEquals(JobInstanceState.DONE, jobInstance.getState()); } - private static class MutableClusterStateManager implements ClusterStateManager, ClusterNodeInfo { + @Test + public void testLongRunning() throws Exception { + // GIVEN + MutableClusterStateManager clusterStateManager = new MutableClusterStateManager(); + CountDownLatch processorEnterLatch = new CountDownLatch(1); + CountDownLatch processorLatch = new CountDownLatch(1); + this.jobContext = builder() + .withJobInstanceProcessorFactory(JobInstanceProcessorFactory.of(((jobInstance, context) -> { + processorEnterLatch.countDown(); + try { + processorLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return null; + }))) + .withActorContextBuilder(ActorContext.builder().withClusterStateManager(clusterStateManager)).createContext(); + SimpleJobInstance jobInstance = new SimpleJobInstance(true); + jobInstance.setState(JobInstanceState.NEW); + jobContext.getJobManager().addJobInstance(jobInstance); + + // WHEN + processorEnterLatch.await(); + int clusterPosition = jobContext.getClusterPosition(jobInstance); + String trace = jobContext.getTrace(jobInstance); + jobContext.cancel(jobInstance); + + // THEN + await(); + jobContext.stop(1, TimeUnit.MINUTES); + assertEquals(JobInstanceState.FAILED, jobInstance.getState()); + assertEquals(0, clusterPosition); + assertTrue(trace.contains("java.util.concurrent.CountDownLatch.await")); + } + + @Test + public void testLongRunningTakeOver() throws Exception { + // GIVEN + MutableClusterStateManager clusterStateManager = new MutableClusterStateManager(); + CountDownLatch processorEnterLatch = new CountDownLatch(1); + CountDownLatch processorLatch = new CountDownLatch(1); + this.jobContext = builder() + .withJobInstanceProcessorFactory(JobInstanceProcessorFactory.of(((jobInstance, context) -> { + processorEnterLatch.countDown(); + try { + processorLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return null; + }))) + .withActorContextBuilder(ActorContext.builder().withClusterStateManager(clusterStateManager)).createContext(); + SimpleJobInstance jobInstance = new SimpleJobInstance(true); + jobInstance.setState(JobInstanceState.RUNNING); + jobContext.getJobManager().addJobInstance(jobInstance); + + // WHEN + clusterStateManager.fireClusterStateChanged(); + processorEnterLatch.await(); + int clusterPosition = jobContext.getClusterPosition(jobInstance); + String trace = jobContext.getTrace(jobInstance); + jobContext.cancel(jobInstance); + + // THEN + await(); + jobContext.stop(1, TimeUnit.MINUTES); + assertEquals(JobInstanceState.FAILED, jobInstance.getState()); + assertEquals(0, clusterPosition); + assertTrue(trace.contains("java.util.concurrent.CountDownLatch.await")); + } + + @Test + public void testLongRunningSkipAlreadyRunning() throws Exception { + // GIVEN + MutableClusterStateManager clusterStateManager = new MutableClusterStateManager(); + CountDownLatch processorEnterLatch = new CountDownLatch(1); + this.jobContext = builder() + .withJobInstanceProcessorFactory(JobInstanceProcessorFactory.of(((jobInstance, context) -> { + processorEnterLatch.countDown(); + return null; + }))) + .withActorContextBuilder(ActorContext.builder().withClusterStateManager(clusterStateManager)).createContext(); + SimpleJobInstance jobInstance = new SimpleJobInstance(true); + jobInstance.setState(JobInstanceState.RUNNING); + jobContext.getJobManager().addJobInstance(jobInstance); + + // WHEN + clusterStateManager.eventFunction = event -> Collections.singletonMap(null, new SimpleFuture<>(new int[]{ 1 })); + clusterStateManager.fireClusterStateChanged(); + + // THEN + jobContext.stop(1, TimeUnit.MINUTES); + assertEquals(JobInstanceState.RUNNING, jobInstance.getState()); + assertEquals(1, processorEnterLatch.getCount()); + } + + private static class MutableClusterStateManager implements ClusterStateManager, ClusterNodeInfo, LockService { private final List clusterStateListeners = new CopyOnWriteArrayList<>(); + private final ConcurrentMap locks = new ConcurrentHashMap<>(); private final Map, List>> listeners = new ConcurrentHashMap<>(); + private Function, Map>> eventFunction = e -> Collections.emptyMap(); private boolean isCoordinator = true; private long clusterVersion = 0L; private int clusterPosition = 0; @@ -476,6 +580,16 @@ public void registerListener(Class eventClass, java. listeners.computeIfAbsent(eventClass, k -> new CopyOnWriteArrayList<>()).add((java.util.function.Consumer) listener); } + @Override + public Lock getLock(String name) { + return locks.computeIfAbsent(name, k -> new ReentrantLock()); + } + + @Override + public LockService getLockService() { + return this; + } + @Override public boolean isStandalone() { return true; @@ -537,7 +651,15 @@ public T get(long timeout, TimeUnit unit) throws InterruptedException, Execution @Override public Map> fireEventExcludeSelf(StateReturningEvent event) { - return null; + return (Map) eventFunction.apply(event); + } + + public Function, Map>> getEventFunction() { + return eventFunction; + } + + public void setEventFunction(Function, Map>> eventFunction) { + this.eventFunction = eventFunction; } private void visitInterfaces(java.util.function.Consumer> consumer, Class clazz, Set> visitedClasses) { @@ -587,4 +709,38 @@ public void setClusterSize(int clusterSize) { this.clusterSize = clusterSize; } } + + private static class SimpleFuture implements Future { + + private final T result; + + public SimpleFuture(T result) { + this.result = result; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return true; + } + + @Override + public T get() throws InterruptedException, ExecutionException { + return result; + } + + @Override + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return result; + } + } }