Skip to content

Commit

Permalink
Merge pull request #2054 from keeps/jgomes-dev
Browse files Browse the repository at this point in the history
Roda Central Actions
  • Loading branch information
hmiguim authored May 18, 2022
2 parents 198922d + dffdce1 commit 4e43390
Show file tree
Hide file tree
Showing 18 changed files with 232 additions and 204 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;


/**
* @author Hélder Silva <[email protected]>
*/
Expand Down Expand Up @@ -60,7 +59,7 @@ public enum JOB_STATE {
// job instance id
private String instanceId = null;

private List<JobUserDetails> jobUsersDetails= new ArrayList<>();
private List<JobUserDetails> jobUsersDetails = new ArrayList<>();
private String instanceName = null;

// job statistics (total source objects, etc.)
Expand Down Expand Up @@ -154,7 +153,6 @@ public void setJobUsersDetails(List<JobUserDetails> jobUserDetails) {
this.jobUsersDetails = jobUserDetails;
}


public Date getStartDate() {
return startDate;
}
Expand Down Expand Up @@ -312,7 +310,7 @@ public void setAttachmentsList(List<String> attachmentsList) {

@Override
public String toString() {
return "Job [id=" + id + ", name=" + name + ", username=" + username + ", startDate=" + startDate + ", endDate="
return "Job [id=" + id + ", name=" + name + ", username=" + username + ", startDate=" + startDate + ", endDate="
+ endDate + ", state=" + state + ", stateDetails=" + stateDetails + ", priority=" + priority + ", type="
+ parallelism + ", jobStats=" + jobStats + ", plugin=" + plugin + ", pluginType=" + pluginType
+ ", pluginParameters=" + pluginParameters + ", sourceObjects=" + sourceObjects + ", outcomeObjectsClass="
Expand Down Expand Up @@ -358,4 +356,33 @@ public void setFields(Map<String, Object> fields) {
this.fields = fields;
}

public Job clone() {
final Job newJob = new Job();
newJob.setName(getName());
newJob.setUsername(getUsername());
newJob.setInstanceName(getInstanceName());
newJob.setStartDate(getStartDate());
newJob.setEndDate(getEndDate());
newJob.setJobUsersDetails(getJobUsersDetails());
newJob.setPluginType(getPluginType());
newJob.setPlugin(getPlugin());
newJob.setPluginParameters(getPluginParameters());
newJob.setPriority(getPriority());
newJob.setParallelism(getParallelism());
newJob.setState(getState());
newJob.setJobUsersDetails(getJobUsersDetails());
newJob.setStateDetails(getStateDetails());
newJob.setSourceObjects(getSourceObjects());
newJob.setOutcomeObjectsClass(getOutcomeObjectsClass());
newJob.getJobStats().setCompletionPercentage(getJobStats().getCompletionPercentage());
newJob.getJobStats().setSourceObjectsBeingProcessed(getJobStats().getSourceObjectsBeingProcessed());
newJob.getJobStats().setOutcomeObjectsWithManualIntervention(getJobStats().getOutcomeObjectsWithManualIntervention());
newJob.getJobStats().setSourceObjectsCount(getJobStats().getSourceObjectsCount());
newJob.getJobStats().setSourceObjectsProcessedWithPartialSuccess(getJobStats().getSourceObjectsProcessedWithPartialSuccess());
newJob.getJobStats().setSourceObjectsProcessedWithSkipped(getJobStats().getSourceObjectsProcessedWithSkipped());
newJob.getJobStats().setSourceObjectsProcessedWithSuccess(getJobStats().getSourceObjectsProcessedWithSuccess());
newJob.getJobStats().setSourceObjectsProcessedWithFailure(getJobStats().getSourceObjectsProcessedWithFailure());
newJob.getJobStats().setSourceObjectsWaitingToBeProcessed(getJobStats().getSourceObjectsWaitingToBeProcessed());
return newJob;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ public static <T extends IsRODAObject, T1 extends Plugin<T>> Job executeJob(Clas
job.setSourceObjects(selectedItems);
job.setUsername(user);
try {
RodaCoreFactory.getModelService().createJob(job);
RodaCoreFactory.getPluginOrchestrator().executeJob(job, false);
RodaCoreFactory.getPluginOrchestrator().createAndExecuteJobs(job, false);
} catch (Exception e) {
AssertJUnit.fail("Unable to execute job in test mode: [" + e.getClass().getName() + "] " + e.getMessage());
}
Expand Down Expand Up @@ -145,8 +144,7 @@ public static <T extends IsIndexed, T1 extends Plugin<? extends IsRODAObject>> J
job.setSourceObjects(selectedItems);
job.setUsername(RodaConstants.ADMIN);
try {
RodaCoreFactory.getModelService().createJob(job);
RodaCoreFactory.getPluginOrchestrator().executeJob(job, false);
RodaCoreFactory.getPluginOrchestrator().createAndExecuteJobs(job, false);
} catch (Exception e) {
AssertJUnit.fail("Unable to execute job in test mode: [" + e.getClass().getName() + "] " + e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2423,12 +2423,10 @@ public void createOrUpdateJob(Job job)
if (job.getInstanceId() == null) {
job.setInstanceId(RODAInstanceUtils.getLocalInstanceIdentifier());
}

// create or update job in storage
String jobAsJson = JsonUtils.getJsonFromObject(job);
StoragePath jobPath = ModelUtils.getJobStoragePath(job.getId());
storage.updateBinaryContent(jobPath, new StringContentPayload(jobAsJson), false, true);

// index it
notifyJobCreatedOrUpdated(job, false).failOnError();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1492,8 +1492,7 @@ private static void moveChildrenAIPsAndDelete(IndexService index, ModelService m

public static void createAndExecuteJob(Job job) throws GenericException, JobAlreadyStartedException,
RequestNotValidException, NotFoundException, AuthorizationDeniedException {
RodaCoreFactory.getModelService().createJob(job);
RodaCoreFactory.getPluginOrchestrator().executeJob(job, true);
RodaCoreFactory.getPluginOrchestrator().createAndExecuteJobs(job,true);
RodaCoreFactory.getIndexService().commit(Job.class);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@
import java.util.List;

import org.roda.core.common.akka.Messages.JobPartialUpdate;
import org.roda.core.data.exceptions.AuthorizationDeniedException;
import org.roda.core.data.exceptions.GenericException;
import org.roda.core.data.exceptions.JobAlreadyStartedException;
import org.roda.core.data.exceptions.JobException;
import org.roda.core.data.exceptions.LockingException;
import org.roda.core.data.exceptions.NotFoundException;
import org.roda.core.data.exceptions.RequestNotValidException;
import org.roda.core.data.v2.IsRODAObject;
import org.roda.core.data.v2.index.IsIndexed;
import org.roda.core.data.v2.index.filter.Filter;
Expand All @@ -33,13 +37,14 @@ public interface PluginOrchestrator {

public void shutdown();

public <T extends IsRODAObject, T1 extends IsIndexed> void runPluginFromIndex(Object context, Job job, Class<T1> classToActOn,
Filter filter, Boolean justActive ,Plugin<T> plugin);
public <T extends IsRODAObject, T1 extends IsIndexed> void runPluginFromIndex(Object context, Job job,
Class<T1> classToActOn, Filter filter, Boolean justActive, Plugin<T> plugin);

public <T extends IsRODAObject> void runPluginOnObjects(Object context, Job job, Plugin<T> plugin, Class<T> objectClass,
List<String> uuids);
public <T extends IsRODAObject> void runPluginOnObjects(Object context, Job job, Plugin<T> plugin,
Class<T> objectClass, List<String> uuids);

public <T extends IsRODAObject> void runPluginOnAllObjects(Object context, Plugin<T> plugin, Job job, Class<T> objectClass);
public <T extends IsRODAObject> void runPluginOnAllObjects(Object context, Plugin<T> plugin, Job job,
Class<T> objectClass);

public <T extends IsRODAObject> void runPlugin(Object context, Plugin<T> plugin, Job job);

Expand All @@ -50,6 +55,8 @@ public <T extends IsRODAObject> void runPluginOnObjects(Object context, Job job,
/** 201603 hsilva: only tests should invoke this method synchronously */
public void executeJob(Job job, boolean async) throws JobAlreadyStartedException;

public void createAndExecuteJobs(Job job, boolean async) throws JobAlreadyStartedException, AuthorizationDeniedException, RequestNotValidException, NotFoundException, GenericException;

/** 201712 hsilva: this method was known as stopJob */
public void stopJobAsync(Job job);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public void process(IndexService index, ModelService model, StorageService stora
}, index, model, storage);
}

private void sendSyncBundle(ModelService model, Report report, JobPluginInfo jobPluginInfo, Job cachedJob) {
private void sendSyncBundle(ModelService model, Report report, JobPluginInfo jobPluginInfo, Job cachedJob) {
Report reportItem = PluginHelper.initPluginReportItem(this, cachedJob.getId(), Job.class);
PluginHelper.updatePartialJobReport(this, model, reportItem, false, cachedJob);
PluginState pluginState = PluginState.SKIPPED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ public void shutdown() {
}

@Override
public <T extends IsRODAObject, T1 extends IsIndexed> void runPluginFromIndex(Object context, Job job, Class<T1> classToActOn,
Filter filter, Boolean justActive, Plugin<T> plugin) {
public <T extends IsRODAObject, T1 extends IsIndexed> void runPluginFromIndex(Object context, Job job,
Class<T1> classToActOn, Filter filter, Boolean justActive, Plugin<T> plugin) {
// do nothing
}

Expand All @@ -154,6 +154,11 @@ public void executeJob(Job job, boolean async) throws JobAlreadyStartedException
// do nothing
}

@Override
public void createAndExecuteJobs(Job job, boolean async) throws JobAlreadyStartedException {
// do nothing
}

@Override
public void stopJobAsync(Job job) {
// do nothing
Expand Down Expand Up @@ -181,13 +186,14 @@ public <T extends IsRODAObject> void updateJobAsync(Plugin<T> plugin, JobPartial
}

@Override
public <T extends IsRODAObject> void runPluginOnAllObjects(Object context, Plugin<T> plugin, Job job, Class<T> objectClass) {
public <T extends IsRODAObject> void runPluginOnAllObjects(Object context, Plugin<T> plugin, Job job,
Class<T> objectClass) {
// do nothing
}

@Override
public <T extends IsRODAObject> void runPluginOnObjects(Object context, Job job, Plugin<T> plugin, Class<T> objectClass,
List<String> uuids) {
public <T extends IsRODAObject> void runPluginOnObjects(Object context, Job job, Plugin<T> plugin,
Class<T> objectClass, List<String> uuids) {
// do nothing
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.roda.core.data.v2.common.OptionalWithCause;
import org.roda.core.data.v2.index.IsIndexed;
import org.roda.core.data.v2.index.filter.Filter;
import org.roda.core.data.v2.index.select.SelectedItems;
import org.roda.core.data.v2.index.select.SelectedItemsList;
import org.roda.core.data.v2.jobs.Job;
import org.roda.core.data.v2.jobs.Job.JOB_STATE;
Expand Down Expand Up @@ -150,8 +151,8 @@ public void onComplete(Throwable failure, Terminated result) {
}

@Override
public <T extends IsRODAObject, T1 extends IsIndexed> void runPluginFromIndex(Object context, Job job, Class<T1> classToActOn,
Filter filter, Boolean justActive ,Plugin<T> plugin) {
public <T extends IsRODAObject, T1 extends IsIndexed> void runPluginFromIndex(Object context, Job job,
Class<T1> classToActOn, Filter filter, Boolean justActive, Plugin<T> plugin) {
try {
LOGGER.info("Starting {} (which will be done asynchronously)", plugin.getName());
boolean noObjectsOrchestrated = true;
Expand All @@ -161,12 +162,11 @@ public <T extends IsRODAObject, T1 extends IsIndexed> void runPluginFromIndex(Ob
Plugin<T> innerPlugin;
Class<T> modelClassToActOn = (Class<T>) ModelUtils.giveRespectiveModelClass(classToActOn);

jobStateInfoActor.tell(
Messages.newPluginBeforeAllExecuteIsReady(plugin).withParallelism(job.getParallelism()).withJobPriority(job.getPriority()),
jobActor);
jobStateInfoActor.tell(Messages.newPluginBeforeAllExecuteIsReady(plugin).withParallelism(job.getParallelism())
.withJobPriority(job.getPriority()), jobActor);

List<String> liteFields = SolrUtils.getClassLiteFields(classToActOn);
try (IterableIndexResult<T1> findAll = index.findAll(classToActOn, filter,justActive, liteFields)) {
try (IterableIndexResult<T1> findAll = index.findAll(classToActOn, filter, justActive, liteFields)) {
Iterator<T1> findAllIterator = findAll.iterator();
List<T1> indexObjects = new ArrayList<>();

Expand Down Expand Up @@ -221,7 +221,8 @@ public <T extends IsRODAObject> void runPluginOnObjects(Object context, Job job,
JobPriority priority = job.getPriority();

jobStateInfoActor.tell(
Messages.newPluginBeforeAllExecuteIsReady(plugin).withParallelism(parallelism).withJobPriority(priority), jobActor);
Messages.newPluginBeforeAllExecuteIsReady(plugin).withParallelism(parallelism).withJobPriority(priority),
jobActor);

List<T> block = new ArrayList<>();
while (iter.hasNext()) {
Expand Down Expand Up @@ -272,7 +273,8 @@ public <T extends IsRODAObject> void runPluginOnAllObjects(Object context, Plugi
JobParallelism parallelism = job.getParallelism();
JobPriority priority = job.getPriority();
jobStateInfoActor.tell(
Messages.newPluginBeforeAllExecuteIsReady(plugin).withParallelism(parallelism).withJobPriority(priority), jobActor);
Messages.newPluginBeforeAllExecuteIsReady(plugin).withParallelism(parallelism).withJobPriority(priority),
jobActor);

List<LiteOptionalWithCause> block = new ArrayList<>();
while (iter.hasNext()) {
Expand Down Expand Up @@ -323,12 +325,12 @@ public <T extends IsRODAObject> void runPlugin(Object context, Plugin<T> plugin,

initJobPluginInfo(plugin, job, 0, jobActor);
jobStateInfoActor.tell(
Messages.newPluginBeforeAllExecuteIsReady(plugin).withParallelism(parallelism).withJobPriority(priority), jobActor);
Messages.newPluginBeforeAllExecuteIsReady(plugin).withParallelism(parallelism).withJobPriority(priority),
jobActor);
jobStateInfoActor.tell(Messages.newPluginExecuteIsReady(plugin, Collections.emptyList()).withJobPriority(priority)
.withParallelism(parallelism), jobActor);
jobStateInfoActor.tell(
Messages.newJobInitEnded(getJobPluginInfo(plugin), false).withJobPriority(priority).withParallelism(parallelism),
jobActor);
jobStateInfoActor.tell(Messages.newJobInitEnded(getJobPluginInfo(plugin), false).withJobPriority(priority)
.withParallelism(parallelism), jobActor);

} catch (JobIsStoppingException | JobInErrorException e) {
// do nothing
Expand Down Expand Up @@ -411,25 +413,71 @@ private <T extends IsRODAObject> void initJobPluginInfo(Plugin<T> innerPlugin, J

@Override
public void executeJob(Job job, boolean async) throws JobAlreadyStartedException {
LOGGER.info("Adding job '{}' ({}) to be executed", job.getName(), job.getId());
if (!RodaConstants.DistributedModeType.CENTRAL.equals(RodaCoreFactory.getDistributedModeType())
|| job.getInstanceId() == null) {
LOGGER.info("Adding job '{}' ({}) to be executed", job.getName(), job.getId());
if (runningJobs.containsKey(job.getId())) {
LOGGER.info("Job '{}' ({}) is already queued to be executed", job.getName(), job.getId());
throw new JobAlreadyStartedException();
} else {
if (async) {
jobsManager.tell(job, ActorRef.noSender());
} else {
int timeoutInSeconds = JobsHelper.getSyncTimeout();
Timeout timeout = new Timeout(Duration.create(timeoutInSeconds, "seconds"));
Future<Object> future = Patterns.ask(jobsManager, job, timeout);
try {
Await.result(future, timeout.duration());
} catch (Exception e) {
LOGGER.error("Error executing job synchronously", e);
}
}
LOGGER.info("Success adding job '{}' ({}) to be executed", job.getName(), job.getId());
}
}
}

if (runningJobs.containsKey(job.getId())) {
LOGGER.info("Job '{}' ({}) is already queued to be executed", job.getName(), job.getId());
throw new JobAlreadyStartedException();
@Override
public void createAndExecuteJobs(Job job, boolean async) throws JobAlreadyStartedException,
AuthorizationDeniedException, RequestNotValidException, NotFoundException, GenericException {
if (!RodaCoreFactory.getDistributedModeType().equals(RodaConstants.DistributedModeType.CENTRAL)) {
RodaCoreFactory.getModelService().createJob(job);
RodaCoreFactory.getPluginOrchestrator().executeJob(job, async);
} else {
if (async) {
jobsManager.tell(job, ActorRef.noSender());
List<String> jobIds = new ArrayList<>();
final HashMap<String, SelectedItems<?>> instancesItems = JobsHelper.splitInstancesItems(job.getSourceObjects());
if (instancesItems.keySet().size() == 1 && instancesItems.containsKey(null)) {
RodaCoreFactory.getModelService().createJob(job);
RodaCoreFactory.getPluginOrchestrator().executeJob(job, async);
} else {
int timeoutInSeconds = JobsHelper.getSyncTimeout();
Timeout timeout = new Timeout(Duration.create(timeoutInSeconds, "seconds"));
Future<Object> future = Patterns.ask(jobsManager, job, timeout);
try {
Await.result(future, timeout.duration());
} catch (Exception e) {
LOGGER.error("Error executing job synchronously", e);
for (String instance : instancesItems.keySet()) {
Job newJob = job.clone();
if (instance != null) {
newJob.setId(IdUtils.createUUID());
} else {
newJob.setId(job.getId());
}
newJob.setSourceObjects(instancesItems.get(instance));
newJob.setInstanceId(instance);
if (newJob.getSourceObjects() instanceof SelectedItemsList) {
newJob.getJobStats()
.setSourceObjectsCount(((SelectedItemsList<?>) newJob.getSourceObjects()).getIds().size());
}
RodaCoreFactory.getModelService().createJob(newJob);
RodaCoreFactory.getPluginOrchestrator().executeJob(newJob, async);
jobIds.add(newJob.getId());
}
StringBuilder details = new StringBuilder();
details.append("Created the following jobs");
for (String jobId : jobIds) {
details.append(", ").append(jobId);
}
job.setName("Distributed Jobs");
job.setStateDetails(details.toString());
job.setState(JOB_STATE.COMPLETED);
job.getJobStats().setCompletionPercentage(100);
RodaCoreFactory.getModelService().createJob(job);
}
LOGGER.info("Success adding job '{}' ({}) to be executed", job.getName(), job.getId());
}
}

Expand All @@ -439,7 +487,8 @@ public void stopJobAsync(Job job) {
ActorRef jobStateInfoActor = getJobContextInformation(jobId);
if (jobStateInfoActor != null) {
stoppingJobs.add(jobId);
jobStateInfoActor.tell(Messages.newJobStop().withJobPriority(job.getPriority()).withParallelism(job.getParallelism()),
jobStateInfoActor.tell(
Messages.newJobStop().withJobPriority(job.getPriority()).withParallelism(job.getParallelism()),
ActorRef.noSender());
}
}
Expand All @@ -466,8 +515,7 @@ public void cleanUnfinishedJobsAsync() {
job.setPluginType(PluginType.INTERNAL);
job.setUsername(RodaConstants.ADMIN);

RodaCoreFactory.getModelService().createJob(job);
RodaCoreFactory.getPluginOrchestrator().executeJob(job, true);
RodaCoreFactory.getPluginOrchestrator().createAndExecuteJobs(job, true);
}
} catch (JobAlreadyStartedException | GenericException | RequestNotValidException | NotFoundException
| AuthorizationDeniedException | IOException e) {
Expand Down
Loading

0 comments on commit 4e43390

Please sign in to comment.