Skip to content

Commit

Permalink
[#9, #10] Support declarative partition configuration and long runnin…
Browse files Browse the repository at this point in the history
…g jobs
  • Loading branch information
beikov committed Jun 2, 2020
1 parent 7a05965 commit 588a2d2
Show file tree
Hide file tree
Showing 28 changed files with 1,975 additions and 310 deletions.
280 changes: 204 additions & 76 deletions core/api/src/main/java/com/blazebit/job/JobContext.java

Large diffs are not rendered by default.

19 changes: 19 additions & 0 deletions core/api/src/main/java/com/blazebit/job/JobInstance.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,32 @@ default Object getLastProcessed() {
*/
void onChunkSuccess(JobInstanceProcessingContext<?> processingContext);

/**
* Returns if this job instance is long running.
* A long running job instance will transition to the {@link JobInstanceState#RUNNING} state via {@link #markRunning(JobInstanceProcessingContext)}
* and will be executed on a cluster node. If the cluster topology changes, the state of jobs will be queried through cluster events.
*
* @return Whether the job instance is long running
*/
default boolean isLongRunning() {
return false;
}

/**
* Returns the job configuration.
*
* @return the job configuration
*/
JobConfiguration getJobConfiguration();

/**
* Marks the given job instance as running which happens when {@link JobInstance#isLongRunning()} is <code>true</code>.
* After this method, {@link #getState()} should return {@link JobInstanceState#RUNNING}.
*
* @param processingContext The processing context
*/
void markRunning(JobInstanceProcessingContext<?> processingContext);

/**
* Marks the given job instance as done and passes the last job instance processor execution result.
* After this method, {@link #getState()} should return {@link JobInstanceState#DONE}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ public enum JobInstanceState {
/**
* A job instance with that state is going to be remove during an update.
*/
REMOVED;
REMOVED,
/**
* A job instance with that state is a long running job that is currently running.
*/
RUNNING;

}
10 changes: 10 additions & 0 deletions core/api/src/main/java/com/blazebit/job/JobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@ public interface JobManager {
*/
List<JobInstance<?>> getJobInstancesToProcess(int partition, int partitionCount, int limit, PartitionKey partitionKey, Set<JobInstance<?>> jobInstancesToInclude);

/**
* Returns a list of job instances that have the status {@link JobInstanceState#RUNNING} for the given partition.
*
* @param partition The partition number
* @param partitionCount The partition count
* @param partitionKey The partition key
* @return The list of job instances
*/
List<JobInstance<?>> getRunningJobInstances(int partition, int partitionCount, PartitionKey partitionKey);

/**
* Returns the next schedule at which the given partition must process job instances.
*
Expand Down
91 changes: 91 additions & 0 deletions core/api/src/main/java/com/blazebit/job/Partition.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright 2018 - 2020 Blazebit.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.blazebit.job;

import java.lang.annotation.ElementType;
import java.lang.annotation.Repeatable;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* {@link PartitionKey} configuration for a {@link JobInstance} class.
*
* @author Christian Beikov
* @since 1.0.0
*/
@Repeatable(Partitions.class)
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface Partition {

/**
* The partition name. If empty, uses the class name of the annotated {@link JobInstance}.
*
* @return The partition name
*/
String name() default "";

/**
* Defines the partition predicate to use for fetching jobs for the partition.
* The string <code>{alias}</code> is replaced with the job instance alias.
* The string <code>{partition}</code> is replaced with the zero-based partition number.
*
* @return The partition predicate
*/
String predicate() default "";

/**
* The number of jobs to schedule in parallel within one scheduler transaction.
*
* @return The number of job to schedule
*/
int processCount() default 1;

/**
* Defines how many partitions should be created. If the value is greater than 1,
* it will create partitions with a name that is suffixed by the partition number, starting at 0.
*
* @return The number of partitions ot create
*/
int partitionCount() default 1;

/**
* The transaction timeout for job processing of the partition.
* When -1, the default transaction timeout is used.
*
* @return The transaction timeout
*/
int transactionTimeoutMillis() default -1;

/**
* The amount of seconds to backoff when a job processor throws a {@link JobTemporaryException}.
* When -1, the default temporary error backoff is used.
*
* @return The temporary error backoff
*/
int temporaryErrorBackoffSeconds() default -1;

/**
* The amount of seconds to backoff when a job processor throws a {@link JobRateLimitException}.
* When -1, the default rate limit backoff is used.
*
* @return The rate limit backoff
*/
int rateLimitBackoffSeconds() default -1;

}
53 changes: 51 additions & 2 deletions core/api/src/main/java/com/blazebit/job/PartitionKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package com.blazebit.job;

import java.util.Collections;
import java.util.Set;

/**
* A testable description of a subset of job instances.
*
Expand All @@ -24,13 +27,29 @@
*/
public interface PartitionKey {

/**
* Returns the name of the partition key that is unique and used for configuration of the scheduler.
*
* @return the name of the partition key
*/
String getName();

/**
* Returns the number of jobs to schedule in parallel within one scheduler transaction.
*
* @return The number of job to schedule
*/
default int getProcessCount() {
return 1;
}

/**
* Returns the concrete job instance type that is described by this partition key.
*
* @return the concrete job instance type
*/
default Class<? extends JobInstance<?>> getJobInstanceType() {
return (Class<? extends JobInstance<?>>) (Class<?>) JobInstance.class;
default Set<Class<? extends JobInstance<?>>> getJobInstanceTypes() {
return (Set<Class<? extends JobInstance<?>>>) (Set<?>) Collections.singleton(JobInstance.class);
}

/**
Expand All @@ -41,4 +60,34 @@ default Class<? extends JobInstance<?>> getJobInstanceType() {
*/
boolean matches(JobInstance<?> jobInstance);

/**
* The transaction timeout for job processing of the partition.
* When -1, the default transaction timeout is used.
*
* @return The transaction timeout
*/
default int getTransactionTimeoutMillis() {
return -1;
}

/**
* The amount of seconds to backoff when a job processor throws a {@link JobTemporaryException}.
* When -1, the default temporary error backoff is used.
*
* @return The temporary error backoff
*/
default int getTemporaryErrorBackoffSeconds() {
return -1;
}

/**
* The amount of seconds to backoff when a job processor throws a {@link JobRateLimitException}.
* When -1, the default rate limit backoff is used.
*
* @return The rate limit backoff
*/
default int getRateLimitBackoffSeconds() {
return -1;
}

}
41 changes: 41 additions & 0 deletions core/api/src/main/java/com/blazebit/job/Partitions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2018 - 2020 Blazebit.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.blazebit.job;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Container for defining multiple partitions for a {@link JobInstance}.
*
* @author Christian Beikov
* @since 1.0.0
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface Partitions {

/**
* The partitions for this job instance.
*
* @return The partitions
*/
Partition[] value() default {};

}
23 changes: 23 additions & 0 deletions core/api/src/main/java/com/blazebit/job/spi/JobScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,29 @@ default void refreshSchedules() {
*/
void reschedule(JobInstance<?> jobInstance);

/**
* Returns the cluster position where the given long running job instance is running, or <code>-1</code>.
*
* @param jobInstance The long running job instance
* @return The cluster position
*/
int getClusterPosition(JobInstance<?> jobInstance);

/**
* Returns the thread stack trace of the given long running job instance if it is still running, or <code>null</code>.
*
* @param jobInstance The long running job instance
* @return The trace of the job instance processor
*/
String getTrace(JobInstance<?> jobInstance);

/**
* Cancels the given long running job instance if it is still running.
*
* @param jobInstance The long running job instance to cancel
*/
void cancel(JobInstance<?> jobInstance);

/**
* Stops the job scheduler.
* After this method finished no further jobs are scheduled but there may still be running jobs.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2018 - 2020 Blazebit.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.blazebit.job.impl;

import java.io.Serializable;

/**
* @author Christian Beikov
* @since 1.0.0
*/
public class JobSchedulerCancelEvent implements Serializable {

private final Serializable jobInstanceId;

public JobSchedulerCancelEvent(Serializable jobInstanceId) {
this.jobInstanceId = jobInstanceId;
}

public Serializable getJobInstanceId() {
return jobInstanceId;
}

}
Loading

0 comments on commit 588a2d2

Please sign in to comment.