diff --git a/roda-common/roda-common-data/src/main/java/org/roda/core/data/common/RodaConstants.java b/roda-common/roda-common-data/src/main/java/org/roda/core/data/common/RodaConstants.java index 31530b3a62..771de9025b 100644 --- a/roda-common/roda-common-data/src/main/java/org/roda/core/data/common/RodaConstants.java +++ b/roda-common/roda-common-data/src/main/java/org/roda/core/data/common/RodaConstants.java @@ -765,10 +765,10 @@ public enum DistributedModeType { public static final String DEFAULT_ENVIRONMENT_COLLECT_VERSION = "true"; public enum OrchestratorType { - AKKA, AKKA_DISTRIBUTED + PEKKO, PEKKO_DISTRIBUTED } - public static final OrchestratorType DEFAULT_ORCHESTRATOR_TYPE = OrchestratorType.AKKA; + public static final OrchestratorType DEFAULT_ORCHESTRATOR_TYPE = OrchestratorType.PEKKO; public static final String ORCHESTRATOR_TYPE_PROPERTY = "core.orchestrator.type"; public static final String CORE_ORCHESTRATOR_PREFIX = "core.orchestrator"; public static final String CORE_ORCHESTRATOR_PROP_INTERNAL_JOBS_PRIORITY = "internal_jobs_priority"; @@ -830,7 +830,7 @@ public enum OrchestratorType { "org.roda.core.data.v2.ip.TransferredResource", "org.roda.core.data.v2.user.User", "org.roda.core.data.v2.user.Group", "org.roda.core.data.v2.user.RODAMember", "org.roda.core.data.v2.ip.disposal.DisposalConfirmation", "org.roda.core.data.v2.user.RodaPrincipal", - "org.roda.core.data.v2.ip.AIP", "org.roda.core.data.v2.risks.Risk", "org.roda.core.events.akka.CRDTWrapper", + "org.roda.core.data.v2.ip.AIP", "org.roda.core.data.v2.risks.Risk", "org.roda.core.events.pekko.CRDTWrapper", "org.roda.core.data.v2.ip.DIP", "org.roda.core.data.v2.ip.metadata.DescriptiveMetadata", "org.roda.core.data.v2.ip.disposal.DisposalConfirmationAIPEntry", "org.roda.core.data.v2.ip.disposal.aipMetadata.DisposalConfirmationAIPMetadata", diff --git a/roda-core/roda-core-tests/src/main/java/org/roda/core/plugins/base/PluginThatTestsLocking.java b/roda-core/roda-core-tests/src/main/java/org/roda/core/plugins/base/PluginThatTestsLocking.java index 2712eeea2b..c946533884 100644 --- a/roda-core/roda-core-tests/src/main/java/org/roda/core/plugins/base/PluginThatTestsLocking.java +++ b/roda-core/roda-core-tests/src/main/java/org/roda/core/plugins/base/PluginThatTestsLocking.java @@ -31,7 +31,7 @@ import org.roda.core.plugins.PluginHelper; import org.roda.core.plugins.RODAObjectsProcessingLogic; import org.roda.core.plugins.orchestrate.JobPluginInfo; -import org.roda.core.plugins.orchestrate.akka.AkkaJobsManager; +import org.roda.core.plugins.orchestrate.pekko.PekkoJobsManager; import org.roda.core.storage.StorageService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -145,7 +145,7 @@ public void process(IndexService index, ModelService model, StorageService stora Report reportItem = PluginHelper.initPluginReportItem(plugin, aip.getId(), AIP.class); addDetails(reportItem, new Date().toString()); for (Entry entry : RodaCoreFactory.getMetrics().getCounters().entrySet()) { - if (entry.getKey().endsWith(AkkaJobsManager.LOCK_REQUESTS_WAITING_TO_ACQUIRE_LOCK) + if (entry.getKey().endsWith(PekkoJobsManager.LOCK_REQUESTS_WAITING_TO_ACQUIRE_LOCK) && entry.getValue().getCount() > 0) { addDetails(reportItem, PLUGIN_DETAILS_AT_LEAST_ONE_LOCK_REQUEST_WAITING); } diff --git a/roda-core/roda-core-tests/src/main/resources/logback-test.xml b/roda-core/roda-core-tests/src/main/resources/logback-test.xml index f1ea41f11e..c8b454a790 100644 --- a/roda-core/roda-core-tests/src/main/resources/logback-test.xml +++ b/roda-core/roda-core-tests/src/main/resources/logback-test.xml @@ -9,14 +9,14 @@ - + - + - + diff --git a/roda-core/roda-core/src/main/java/org/roda/core/RodaCoreFactory.java b/roda-core/roda-core/src/main/java/org/roda/core/RodaCoreFactory.java index e35ffb9138..42054a829c 100644 --- a/roda-core/roda-core/src/main/java/org/roda/core/RodaCoreFactory.java +++ b/roda-core/roda-core/src/main/java/org/roda/core/RodaCoreFactory.java @@ -145,7 +145,7 @@ import org.roda.core.plugins.PluginManager; import org.roda.core.plugins.PluginManagerException; import org.roda.core.plugins.PluginOrchestrator; -import org.roda.core.plugins.orchestrate.AkkaEmbeddedPluginOrchestrator; +import org.roda.core.plugins.orchestrate.PekkoEmbeddedPluginOrchestrator; import org.roda.core.protocols.Protocol; import org.roda.core.protocols.ProtocolManager; import org.roda.core.protocols.ProtocolManagerException; @@ -1568,8 +1568,8 @@ private static void instantiateDistributedMode() { private static void instantiateOrchestrator() { OrchestratorType orchestratorType = getOrchestratorType(); - if (orchestratorType == OrchestratorType.AKKA) { - pluginOrchestrator = new AkkaEmbeddedPluginOrchestrator(); + if (orchestratorType == OrchestratorType.PEKKO) { + pluginOrchestrator = new PekkoEmbeddedPluginOrchestrator(); } else { LOGGER.error("Orchestrator type '{}' is invalid or not supported. No plugin orchestrator will be started!", orchestratorType); diff --git a/roda-core/roda-core/src/main/java/org/roda/core/common/akka/Messages.java b/roda-core/roda-core/src/main/java/org/roda/core/common/akka/Messages.java deleted file mode 100644 index 9919df623c..0000000000 --- a/roda-core/roda-core/src/main/java/org/roda/core/common/akka/Messages.java +++ /dev/null @@ -1,925 +0,0 @@ -/** - * The contents of this file are subject to the license and copyright - * detailed in the LICENSE file at the root of the source - * tree and available online at - * - * https://github.com/keeps/roda - */ -package org.roda.core.common.akka; - -import java.io.Serializable; -import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.Optional; - -import org.apache.pekko.actor.ActorRef; -import org.roda.core.data.v2.IsRODAObject; -import org.roda.core.data.v2.LiteOptionalWithCause; -import org.roda.core.data.v2.SerializableOptional; -import org.roda.core.data.v2.jobs.Job.JOB_STATE; -import org.roda.core.data.v2.jobs.JobParallelism; -import org.roda.core.data.v2.jobs.JobPriority; -import org.roda.core.data.v2.jobs.JobStats; -import org.roda.core.data.v2.jobs.PluginType; -import org.roda.core.data.v2.user.Group; -import org.roda.core.data.v2.user.User; -import org.roda.core.plugins.Plugin; -import org.roda.core.plugins.orchestrate.JobPluginInfo; -import org.roda.core.util.IdUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class Messages { - private static final Logger LOGGER = LoggerFactory.getLogger(Messages.class); - - private static final Messages INSTANCE = new Messages(); - - private Messages() { - // do nothing - } - - public abstract class AbstractMessage implements Serializable { - private static final long serialVersionUID = 1898368418865765060L; - private String uuid; - private long creationTime; - private JobPriority jobPriority; - private JobParallelism jobParallelism; - - private AbstractMessage() { - if (LOGGER.isTraceEnabled()) { - uuid = IdUtils.createUUID(); - LOGGER.trace("{} Created message {}", uuid, getClass().getSimpleName()); - } - - creationTime = System.currentTimeMillis(); - } - - public void logProcessingStarted() { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("{} Started processing message {} [{}]", uuid, getClass().getSimpleName(), toString()); - } - } - - public void logProcessingEnded() { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("{} Ended processing message {}", uuid, getClass().getSimpleName()); - } - } - - public long getTimeSinceCreation() { - return System.currentTimeMillis() - creationTime; - } - - public AbstractMessage withJobPriority(JobPriority priority) { - this.jobPriority = priority; - return this; - } - - public AbstractMessage withParallelism(JobParallelism parallelism) { - this.jobParallelism = parallelism; - return this; - } - - public JobPriority getJobPriority() { - return jobPriority; - } - - public JobParallelism getParallelism() { - return jobParallelism; - } - } - - /*-------------------- JOB MANAGER RELATED STATIC CLASSES --------------------*/ - public static JobsManagerTick newJobsManagerTick() { - return INSTANCE.new JobsManagerTick(); - } - - public final class JobsManagerTick extends AbstractMessage { - private static final long serialVersionUID = -2514581679498648676L; - - public JobsManagerTick() { - super(); - } - - @Override - public String toString() { - return "JobManagerTick []"; - } - } - - public static JobsManagerJobEnded newJobsManagerJobEnded(String jobId, String plugin, PluginType type, long duration, - JobStats jobStats, JobParallelism jobParallelism) { - return INSTANCE.new JobsManagerJobEnded(jobId, plugin, type, duration, jobStats, jobParallelism); - } - - public final class JobsManagerJobEnded extends AbstractMessage { - private static final long serialVersionUID = -2514581679498648676L; - - private final String jobId; - private final String plugin; - private final PluginType pluginType; - private final long duration; - private final JobStats jobStats; - private final JobParallelism parallelism; - - public JobsManagerJobEnded(String jobId, String plugin, PluginType pluginType, long duration, JobStats jobStats, - JobParallelism parallelism) { - super(); - this.jobId = jobId; - this.plugin = plugin; - this.pluginType = pluginType; - this.duration = duration; - this.jobStats = jobStats; - this.parallelism = parallelism; - } - - public String getJobId() { - return jobId; - } - - public String getPlugin() { - return plugin; - } - - public PluginType getPluginType() { - return pluginType; - } - - public long getDuration() { - return duration; - } - - public JobStats getJobStats() { - return jobStats; - } - - public JobParallelism getJobParallelism() { - return parallelism; - } - - @Override - public String toString() { - return "JobsManagerJobEnded [jobId=" + jobId + ", plugin=" + plugin + ", pluginType=" + pluginType - + ", jobParallelism=" + parallelism + "]"; - - } - } - - public static JobsManagerAcquireLock newJobsManagerAcquireLock(List lites, boolean waitForLockIfLocked, - int secondsToExpire, String requestUuid) { - return INSTANCE.new JobsManagerAcquireLock(lites, waitForLockIfLocked, secondsToExpire, requestUuid); - } - - public final class JobsManagerAcquireLock extends AbstractMessage { - private static final long serialVersionUID = 4924002662559968741L; - - private List lites; - private boolean waitForLockIfLocked; - private Date expireDate; - private ActorRef sender; - private String requestUuid; - - public JobsManagerAcquireLock(List lites, boolean waitForLockIfLocked, int secondsToExpire, - String requestUuid) { - super(); - this.lites = lites; - this.waitForLockIfLocked = waitForLockIfLocked; - this.expireDate = new Date(new Date().getTime() + (secondsToExpire * 1000L)); - this.requestUuid = requestUuid; - } - - public List getLites() { - return lites; - } - - public ActorRef getSender() { - return sender; - } - - public JobsManagerAcquireLock setSender(ActorRef sender) { - this.sender = sender; - return this; - } - - public boolean isWaitForLockIfLocked() { - return waitForLockIfLocked; - } - - public Date getExpireDate() { - return expireDate; - } - - public String getRequestUuid() { - return requestUuid; - } - - @Override - public String toString() { - return "JobsManagerAcquireLock [lites=" + lites + ", waitForLockIfLocked=" + waitForLockIfLocked + ", expireDate=" - + expireDate + ", requestUuid=" + requestUuid + "]"; - } - - } - - public static JobsManagerReleaseLock newJobsManagerReleaseLock(List lites, String requestUuid) { - return INSTANCE.new JobsManagerReleaseLock(lites, requestUuid); - } - - public final class JobsManagerReleaseLock extends AbstractMessage { - private static final long serialVersionUID = 4924002662559968741L; - - private List lites; - private String requestUuid; - - public JobsManagerReleaseLock(List lites, String requestUuid) { - super(); - this.lites = lites; - this.requestUuid = requestUuid; - } - - public List getLites() { - return lites; - } - - public String getRequestUuid() { - return requestUuid; - } - - @Override - public String toString() { - return "JobsManagerReleaseLock [lites=" + lites + ", requestUuid=" + requestUuid + "]"; - } - - } - - public static JobsManagerReleaseAllLocks newJobsManagerReleaseAllLocks() { - return INSTANCE.new JobsManagerReleaseAllLocks(); - } - - public final class JobsManagerReleaseAllLocks extends AbstractMessage { - private static final long serialVersionUID = -2842420964416530808L; - - public JobsManagerReleaseAllLocks() { - super(); - } - - @Override - public String toString() { - return "JobsManagerReleaseAllLocks []"; - } - } - - public static JobsManagerReplyToAcquireLock newJobsManagerReplyToAcquireLock(List lites) { - return INSTANCE.new JobsManagerReplyToAcquireLock(lites); - } - - public final class JobsManagerReplyToAcquireLock extends AbstractMessage { - private static final long serialVersionUID = 4924002662559968741L; - - private List lites; - - public JobsManagerReplyToAcquireLock(List lites) { - super(); - this.lites = lites; - } - - public List getLites() { - return lites; - } - - @Override - public String toString() { - return "JobsManagerReplyToAcquireLock [lites=" + lites + "]"; - } - } - - public static JobsManagerReplyToReleaseLock newJobsManagerReplyToReleaseLock(List lites) { - return INSTANCE.new JobsManagerReplyToReleaseLock(lites); - } - - public final class JobsManagerReplyToReleaseLock extends AbstractMessage { - private static final long serialVersionUID = 4924002662559968741L; - - private List lites; - - public JobsManagerReplyToReleaseLock(List lites) { - super(); - this.lites = lites; - } - - public List getLites() { - return lites; - } - - @Override - public String toString() { - return "JobsManagerReplyToReleaseLock [lites=" + lites + "]"; - } - } - - public static JobsManagerNotLockableAtTheTime newJobsManagerNotLockableAtTheTime() { - return INSTANCE.new JobsManagerNotLockableAtTheTime(); - } - - public static JobsManagerNotLockableAtTheTime newJobsManagerNotLockableAtTheTime(String msg) { - return INSTANCE.new JobsManagerNotLockableAtTheTime(msg); - } - - public final class JobsManagerNotLockableAtTheTime extends AbstractMessage { - private static final long serialVersionUID = -2313831907910175641L; - - private String msg; - - public JobsManagerNotLockableAtTheTime() { - super(); - this.msg = ""; - } - - public JobsManagerNotLockableAtTheTime(String msg) { - super(); - this.msg = msg; - } - - @Override - public String toString() { - return "JobsManagerUnlockableAtTheTime [msg=" + msg + "]"; - } - - } - - /*-------------------- JOB STATE RELATED STATIC CLASSES --------------------*/ - public static JobInfoUpdated newJobInfoUpdated(Plugin plugin, JobPluginInfo jobPluginInfo) { - return INSTANCE.new JobInfoUpdated(plugin, jobPluginInfo); - } - - public final class JobInfoUpdated extends AbstractMessage { - private static final long serialVersionUID = -6918015956027259760L; - - private Plugin plugin; - private JobPluginInfo jobPluginInfo; - - public JobInfoUpdated(Plugin plugin, JobPluginInfo jobPluginInfo) { - super(); - this.plugin = plugin; - this.jobPluginInfo = jobPluginInfo; - } - - public Plugin getPlugin() { - return plugin; - } - - public JobPluginInfo getJobPluginInfo() { - return jobPluginInfo; - } - - @Override - public String toString() { - return "JobInfoUpdated [plugin=" + plugin + ", jobPluginInfo=" + jobPluginInfo + "]"; - } - } - - public abstract class JobPartialUpdate extends AbstractMessage { - private static final long serialVersionUID = 4722216970884172260L; - - @Override - public String toString() { - return "JobPartialUpdate []"; - } - } - - public static JobSourceObjectsUpdated newJobSourceObjectsUpdated(Map oldToNewIds) { - return INSTANCE.new JobSourceObjectsUpdated(oldToNewIds); - } - - public class JobSourceObjectsUpdated extends JobPartialUpdate { - private static final long serialVersionUID = -8395563279621159731L; - - private Map oldToNewIds; - - public JobSourceObjectsUpdated(Map oldToNewIds) { - super(); - this.oldToNewIds = oldToNewIds; - } - - public Map getOldToNewIds() { - return oldToNewIds; - } - - @Override - public String toString() { - return "JobSourceObjectsUpdated [oldToNewIds=" + oldToNewIds + "]"; - } - } - - public static JobStateDetailsUpdated newJobStateDetailsUpdated(Plugin plugin, Optional stateDatails) { - return INSTANCE.new JobStateDetailsUpdated(plugin, stateDatails); - } - - public class JobStateDetailsUpdated extends JobPartialUpdate { - private static final long serialVersionUID = 1946036502369851214L; - - private Plugin plugin; - private SerializableOptional stateDetails; - - public JobStateDetailsUpdated(Plugin plugin, Optional stateDetails) { - super(); - this.plugin = plugin; - this.stateDetails = SerializableOptional.setOptional(stateDetails); - } - - public JobStateDetailsUpdated(Plugin plugin, Throwable throwable) { - this(plugin, Optional.of(throwable.getClass().getName() + ": " + throwable.getMessage())); - } - - public Plugin getPlugin() { - return plugin; - } - - public Optional getStateDetails() { - return stateDetails.getOptional(); - } - - @Override - public String toString() { - return "JobStateDetailsUpdated [plugin=" + plugin + ", stateDatails=" + stateDetails + "]"; - } - } - - public static JobStateUpdated newJobStateUpdated(Plugin plugin, JOB_STATE state) { - return INSTANCE.new JobStateUpdated(plugin, state); - } - - public static JobStateUpdated newJobStateUpdated(Plugin plugin, JOB_STATE state, Optional stateDetails) { - return INSTANCE.new JobStateUpdated(plugin, state, stateDetails); - } - - public static JobStateUpdated newJobStateUpdated(Plugin plugin, JOB_STATE state, Throwable throwable) { - return INSTANCE.new JobStateUpdated(plugin, state, throwable); - } - - public class JobStateUpdated extends JobStateDetailsUpdated { - private static final long serialVersionUID = 1946036502369851214L; - - private JOB_STATE state; - - public JobStateUpdated(Plugin plugin, JOB_STATE state) { - this(plugin, state, Optional.empty()); - } - - public JobStateUpdated(Plugin plugin, JOB_STATE state, Optional stateDatails) { - super(plugin, stateDatails); - this.state = state; - } - - public JobStateUpdated(Plugin plugin, JOB_STATE state, Throwable throwable) { - this(plugin, state, Optional.of(throwable.getClass().getName() + ": " + throwable.getMessage())); - } - - public JOB_STATE getState() { - return state; - } - - @Override - public String toString() { - return "JobStateUpdated [plugin=" + getPlugin() + ", state=" + state + ", stateDatails=" + getStateDetails() - + "]"; - } - } - - public static JobInitEnded newJobInitEnded(JobPluginInfo jobPluginInfo, boolean noObjectsOrchestrated) { - return INSTANCE.new JobInitEnded(jobPluginInfo, noObjectsOrchestrated); - } - - public final class JobInitEnded extends AbstractMessage { - private static final long serialVersionUID = 5040958276243865900L; - - private boolean noObjectsOrchestrated; - private JobPluginInfo jobPluginInfo; - - public JobInitEnded(JobPluginInfo jobPluginInfo, boolean noObjectsOrchestrated) { - super(); - this.jobPluginInfo = jobPluginInfo; - this.noObjectsOrchestrated = noObjectsOrchestrated; - } - - public JobPluginInfo getJobPluginInfo() { - return jobPluginInfo; - } - - public boolean isNoObjectsOrchestrated() { - return noObjectsOrchestrated; - } - - @Override - public String toString() { - return "JobInitEnded [noObjectsOrchestrated=" + noObjectsOrchestrated + ", jobPluginInfo=" + jobPluginInfo + "]"; - } - } - - public static JobCleanup newJobCleanup() { - return INSTANCE.new JobCleanup(); - } - - public class JobCleanup extends AbstractMessage { - private static final long serialVersionUID = -5175825019027462407L; - - public JobCleanup() { - super(); - } - - @Override - public String toString() { - return "JobCleanup []"; - } - } - - public static JobStop newJobStop() { - return INSTANCE.new JobStop(); - } - - public class JobStop extends AbstractMessage { - private static final long serialVersionUID = -8806029242967727412L; - - public JobStop() { - super(); - } - - @Override - public String toString() { - return "JobStop []"; - } - } - - /*-------------------- PLUGIN STATE TRANSITIONS RELATED STATIC CLASSES --------------------*/ - - private class PluginMethodIsReady extends AbstractMessage { - private static final long serialVersionUID = -5214600055070295410L; - - private Plugin plugin; - - public PluginMethodIsReady(Plugin plugin) { - super(); - this.plugin = plugin; - } - - public Plugin getPlugin() { - return plugin; - } - - @Override - public String toString() { - return "PluginMethodIsReady [plugin=" + plugin + "]"; - } - } - - private class PluginMethodIsDone extends AbstractMessage { - private static final long serialVersionUID = -8701179264086005994L; - - private Plugin plugin; - private boolean withError; - private String errorMessage = ""; - - public PluginMethodIsDone(Plugin plugin, boolean withError) { - super(); - this.plugin = plugin; - this.withError = withError; - } - - public PluginMethodIsDone(Plugin plugin, boolean withError, String errorMessage) { - super(); - this.plugin = plugin; - this.withError = withError; - this.errorMessage = errorMessage; - } - - public Plugin getPlugin() { - return plugin; - } - - public boolean isWithError() { - return withError; - } - - public String getErrorMessage() { - return errorMessage; - } - - @Override - public String toString() { - return "PluginMethodIsDone [plugin=" + plugin + ", withError=" + withError + ", errorMessage=" + errorMessage - + "]"; - } - } - - public static PluginBeforeAllExecuteIsReady newPluginBeforeAllExecuteIsReady( - Plugin plugin) { - return INSTANCE.new PluginBeforeAllExecuteIsReady(plugin); - } - - public class PluginBeforeAllExecuteIsReady extends PluginMethodIsReady { - private static final long serialVersionUID = -7730727049162062388L; - - public PluginBeforeAllExecuteIsReady(Plugin plugin) { - super(plugin); - } - - @Override - public String toString() { - return "PluginBeforeAllExecuteIsReady [getPlugin()=" + getPlugin() + "]"; - } - } - - public static PluginBeforeAllExecuteIsDone newPluginBeforeAllExecuteIsDone(Plugin plugin, boolean withError) { - return INSTANCE.new PluginBeforeAllExecuteIsDone(plugin, withError); - } - - public class PluginBeforeAllExecuteIsDone extends PluginMethodIsDone { - private static final long serialVersionUID = 7449486178368177015L; - - public PluginBeforeAllExecuteIsDone(Plugin plugin, boolean withError) { - super(plugin, withError); - } - - @Override - public String toString() { - return "PluginBeforeAllExecuteIsDone [getPlugin()=" + getPlugin() + ", isWithError()=" + isWithError() + "]"; - } - } - - public static PluginExecuteIsReady newPluginExecuteIsReady(Plugin plugin, - List list) { - return INSTANCE.new PluginExecuteIsReady(plugin, list); - } - - public class PluginExecuteIsReady extends PluginMethodIsReady { - private static final long serialVersionUID = 1821489252490235130L; - - private List list; - private boolean hasBeenForwarded = false; - - public PluginExecuteIsReady(Plugin plugin, List list) { - super(plugin); - this.list = list; - } - - public List getList() { - return list; - } - - public void setHasBeenForwarded() { - this.hasBeenForwarded = true; - } - - @Override - public String toString() { - return "PluginExecuteIsReady [list=" + list + ", hasBeenForwarded=" + hasBeenForwarded + ", getPlugin()=" - + getPlugin() + "]"; - } - } - - public static PluginExecuteIsDone newPluginExecuteIsDone(Plugin plugin, boolean withError) { - return INSTANCE.new PluginExecuteIsDone(plugin, withError); - } - - public static PluginExecuteIsDone newPluginExecuteIsDone(Plugin plugin, boolean withError, String errorMessage) { - return INSTANCE.new PluginExecuteIsDone(plugin, withError, errorMessage); - } - - public class PluginExecuteIsDone extends PluginMethodIsDone { - private static final long serialVersionUID = -5136014936634139026L; - - public PluginExecuteIsDone(Plugin plugin, boolean withError) { - super(plugin, withError); - } - - public PluginExecuteIsDone(Plugin plugin, boolean withError, String errorMessage) { - super(plugin, withError, errorMessage); - } - - @Override - public String toString() { - return "PluginExecuteIsDone [getPlugin()=" + getPlugin() + ", isWithError()=" + isWithError() - + ", getErrorMessage()=" + getErrorMessage() + "]"; - } - } - - public static PluginAfterAllExecuteIsReady newPluginAfterAllExecuteIsReady( - Plugin plugin) { - return INSTANCE.new PluginAfterAllExecuteIsReady(plugin); - } - - public class PluginAfterAllExecuteIsReady extends PluginMethodIsReady { - private static final long serialVersionUID = 8852688692792086166L; - - public PluginAfterAllExecuteIsReady(Plugin plugin) { - super(plugin); - } - - @Override - public String toString() { - return "PluginAfterAllExecuteIsReady [plugin=" + getPlugin() + "]"; - } - } - - public static PluginAfterAllExecuteIsDone newPluginAfterAllExecuteIsDone(Plugin plugin, boolean withError) { - return INSTANCE.new PluginAfterAllExecuteIsDone(plugin, withError); - } - - public class PluginAfterAllExecuteIsDone extends PluginMethodIsDone { - private static final long serialVersionUID = -5136014936634139026L; - - public PluginAfterAllExecuteIsDone(Plugin plugin, boolean withError) { - super(plugin, withError); - } - - @Override - public String toString() { - return "PluginAfterAllExecuteIsDone [getPlugin()=" + getPlugin() + ", isWithError()=" + isWithError() + "]"; - } - } - - /*-------------------- EVENTS RELATED STATIC CLASSES --------------------*/ - public abstract class AbstractEventMessage extends AbstractMessage { - private static final long serialVersionUID = -2517455273875624115L; - - private String senderId; - - public AbstractEventMessage(String senderId) { - super(); - this.senderId = senderId; - } - - public String getSenderId() { - return senderId; - } - - @Override - public String toString() { - return "AbstractEventMessage [senderId=" + senderId + "]"; - } - } - - public static EventUserCreated newEventUserCreated(User user, String senderId) { - return INSTANCE.new EventUserCreated(user, senderId); - } - - public final class EventUserCreated extends AbstractEventMessage { - private static final long serialVersionUID = -2517455273875624115L; - - private User user; - private String password; - - public EventUserCreated(User user, String senderId) { - super(senderId); - this.user = user; - } - - public User getUser() { - return user; - } - - public String getPassword() { - return password; - } - - @Override - public String toString() { - return "EventUserCreated [user=" + user + ", getSenderId()=" + getSenderId() + "]"; - } - } - - public final static EventUserUpdated newEventUserUpdated(User user, boolean myUser, - String senderId) { - return INSTANCE.new EventUserUpdated(user, myUser, senderId); - } - - public final class EventUserUpdated extends AbstractEventMessage { - private static final long serialVersionUID = -2517455273875624115L; - - private User user; - private String password; - private boolean myUser; - - public EventUserUpdated(User user, boolean myUser, String senderId) { - super(senderId); - this.user = user; - this.myUser = myUser; - } - - public User getUser() { - return user; - } - - public String getPassword() { - return password; - } - - public boolean isMyUser() { - return myUser; - } - - @Override - public String toString() { - return "EventUserUpdated [user=" + user + ", password=" + password + ", myUser=" + myUser + ", getSenderId()=" - + getSenderId() + "]"; - } - } - - public static EventUserDeleted newEventUserDeleted(String id, String senderId) { - return INSTANCE.new EventUserDeleted(id, senderId); - } - - public final class EventUserDeleted extends AbstractEventMessage { - private static final long serialVersionUID = -7862917122791858311L; - - private String id; - - public EventUserDeleted(String id, String senderId) { - super(senderId); - this.id = id; - } - - public String getId() { - return id; - } - - @Override - public String toString() { - return "EventUserDeleted [id=" + id + ", getSenderId()=" + getSenderId() + "]"; - } - } - - public static EventGroupCreated newEventGroupCreated(Group group, String senderId) { - return INSTANCE.new EventGroupCreated(group, senderId); - } - - public final class EventGroupCreated extends AbstractEventMessage { - private static final long serialVersionUID = -51380983717488740L; - - private Group group; - - public EventGroupCreated(Group group, String senderId) { - super(senderId); - this.group = group; - } - - public Group getGroup() { - return group; - } - - @Override - public String toString() { - return "EventGroupCreated [group=" + group + ", getSenderId()=" + getSenderId() + "]"; - } - } - - public static EventGroupUpdated newEventGroupUpdated(Group group, String senderId) { - return INSTANCE.new EventGroupUpdated(group, senderId); - } - - public final class EventGroupUpdated extends AbstractEventMessage { - private static final long serialVersionUID = -51380983717488740L; - - private Group group; - - public EventGroupUpdated(Group group, String senderId) { - super(senderId); - this.group = group; - } - - public Group getGroup() { - return group; - } - - @Override - public String toString() { - return "EventGroupUpdated [group=" + group + ", getSenderId()=" + getSenderId() + "]"; - } - } - - public static EventGroupDeleted newEventGroupDeleted(String id, String senderId) { - return INSTANCE.new EventGroupDeleted(id, senderId); - } - - public final class EventGroupDeleted extends AbstractEventMessage { - private static final long serialVersionUID = -7862917122791858311L; - - private String id; - - public EventGroupDeleted(String id, String senderId) { - super(senderId); - this.id = id; - } - - public String getId() { - return id; - } - - @Override - public String toString() { - return "EventGroupDeleted [id=" + id + ", getSenderId()=" + getSenderId() + "]"; - } - } - -} diff --git a/roda-core/roda-core/src/main/java/org/roda/core/common/akka/DeadLetterActor.java b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/DeadLetterActor.java similarity index 95% rename from roda-core/roda-core/src/main/java/org/roda/core/common/akka/DeadLetterActor.java rename to roda-core/roda-core/src/main/java/org/roda/core/common/pekko/DeadLetterActor.java index 4cb4e52da2..4d67416b44 100644 --- a/roda-core/roda-core/src/main/java/org/roda/core/common/akka/DeadLetterActor.java +++ b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/DeadLetterActor.java @@ -5,7 +5,7 @@ * * https://github.com/keeps/roda */ -package org.roda.core.common.akka; +package org.roda.core.common.pekko; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/Messages.java b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/Messages.java new file mode 100644 index 0000000000..c8c177d485 --- /dev/null +++ b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/Messages.java @@ -0,0 +1,186 @@ +/** + * The contents of this file are subject to the license and copyright + * detailed in the LICENSE file at the root of the source + * tree and available online at + * + * https://github.com/keeps/roda + */ +package org.roda.core.common.pekko; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import org.roda.core.common.pekko.messages.events.EventGroupCreated; +import org.roda.core.common.pekko.messages.events.EventGroupDeleted; +import org.roda.core.common.pekko.messages.events.EventGroupUpdated; +import org.roda.core.common.pekko.messages.events.EventUserCreated; +import org.roda.core.common.pekko.messages.events.EventUserDeleted; +import org.roda.core.common.pekko.messages.events.EventUserUpdated; +import org.roda.core.common.pekko.messages.jobs.JobCleanup; +import org.roda.core.common.pekko.messages.jobs.JobInfoUpdated; +import org.roda.core.common.pekko.messages.jobs.JobInitEnded; +import org.roda.core.common.pekko.messages.jobs.JobSourceObjectsUpdated; +import org.roda.core.common.pekko.messages.jobs.JobStateDetailsUpdated; +import org.roda.core.common.pekko.messages.jobs.JobStateUpdated; +import org.roda.core.common.pekko.messages.jobs.JobStop; +import org.roda.core.common.pekko.messages.jobs.JobsManagerAcquireLock; +import org.roda.core.common.pekko.messages.jobs.JobsManagerJobEnded; +import org.roda.core.common.pekko.messages.jobs.JobsManagerNotLockableAtTheTime; +import org.roda.core.common.pekko.messages.jobs.JobsManagerReleaseAllLocks; +import org.roda.core.common.pekko.messages.jobs.JobsManagerReleaseLock; +import org.roda.core.common.pekko.messages.jobs.JobsManagerReplyToAcquireLock; +import org.roda.core.common.pekko.messages.jobs.JobsManagerReplyToReleaseLock; +import org.roda.core.common.pekko.messages.jobs.JobsManagerTick; +import org.roda.core.common.pekko.messages.plugins.PluginAfterAllExecuteIsDone; +import org.roda.core.common.pekko.messages.plugins.PluginAfterAllExecuteIsReady; +import org.roda.core.common.pekko.messages.plugins.PluginBeforeAllExecuteIsDone; +import org.roda.core.common.pekko.messages.plugins.PluginBeforeAllExecuteIsReady; +import org.roda.core.common.pekko.messages.plugins.PluginExecuteIsDone; +import org.roda.core.common.pekko.messages.plugins.PluginExecuteIsReady; +import org.roda.core.data.v2.IsRODAObject; +import org.roda.core.data.v2.LiteOptionalWithCause; +import org.roda.core.data.v2.jobs.Job.JOB_STATE; +import org.roda.core.data.v2.jobs.JobParallelism; +import org.roda.core.data.v2.jobs.JobStats; +import org.roda.core.data.v2.jobs.PluginType; +import org.roda.core.data.v2.user.Group; +import org.roda.core.data.v2.user.User; +import org.roda.core.plugins.Plugin; +import org.roda.core.plugins.orchestrate.JobPluginInfo; + +public class Messages { + private Messages() { + // do nothing + } + + public static PluginBeforeAllExecuteIsReady newPluginBeforeAllExecuteIsReady( + Plugin plugin) { + return new PluginBeforeAllExecuteIsReady<>(plugin); + } + + /*-------------------- JOB MANAGER RELATED STATIC CLASSES --------------------*/ + public static JobsManagerTick newJobsManagerTick() { + return new JobsManagerTick(); + } + + public static PluginExecuteIsReady newPluginExecuteIsReady(Plugin plugin, + List list) { + return new PluginExecuteIsReady<>(plugin, list); + } + + public static JobsManagerJobEnded newJobsManagerJobEnded(String jobId, String plugin, PluginType type, long duration, + JobStats jobStats, JobParallelism jobParallelism) { + return new JobsManagerJobEnded(jobId, plugin, type, duration, jobStats, jobParallelism); + } + + public static JobsManagerAcquireLock newJobsManagerAcquireLock(List lites, boolean waitForLockIfLocked, + int secondsToExpire, String requestUuid) { + return new JobsManagerAcquireLock(lites, waitForLockIfLocked, secondsToExpire, requestUuid); + } + + public static JobsManagerReleaseLock newJobsManagerReleaseLock(List lites, String requestUuid) { + return new JobsManagerReleaseLock(lites, requestUuid); + } + + public static JobsManagerReleaseAllLocks newJobsManagerReleaseAllLocks() { + return new JobsManagerReleaseAllLocks(); + } + + public static JobsManagerReplyToAcquireLock newJobsManagerReplyToAcquireLock(List lites) { + return new JobsManagerReplyToAcquireLock(lites); + } + + public static JobsManagerReplyToReleaseLock newJobsManagerReplyToReleaseLock(List lites) { + return new JobsManagerReplyToReleaseLock(lites); + } + + public static JobsManagerNotLockableAtTheTime newJobsManagerNotLockableAtTheTime() { + return new JobsManagerNotLockableAtTheTime(); + } + + public static JobsManagerNotLockableAtTheTime newJobsManagerNotLockableAtTheTime(String msg) { + return new JobsManagerNotLockableAtTheTime(msg); + } + + public static JobInfoUpdated newJobInfoUpdated(Plugin plugin, JobPluginInfo jobPluginInfo) { + return new JobInfoUpdated(plugin, jobPluginInfo); + } + + public static JobSourceObjectsUpdated newJobSourceObjectsUpdated(Map oldToNewIds) { + return new JobSourceObjectsUpdated(oldToNewIds); + } + + public static JobStateDetailsUpdated newJobStateDetailsUpdated(Plugin plugin, Optional stateDatails) { + return new JobStateDetailsUpdated(plugin, stateDatails); + } + + public static JobStateUpdated newJobStateUpdated(Plugin plugin, JOB_STATE state) { + return new JobStateUpdated(plugin, state); + } + + public static JobStateUpdated newJobStateUpdated(Plugin plugin, JOB_STATE state, Optional stateDetails) { + return new JobStateUpdated(plugin, state, stateDetails); + } + + public static JobStateUpdated newJobStateUpdated(Plugin plugin, JOB_STATE state, Throwable throwable) { + return new JobStateUpdated(plugin, state, throwable); + } + + public static JobInitEnded newJobInitEnded(JobPluginInfo jobPluginInfo, boolean noObjectsOrchestrated) { + return new JobInitEnded(jobPluginInfo, noObjectsOrchestrated); + } + + public static JobCleanup newJobCleanup() { + return new JobCleanup(); + } + + public static JobStop newJobStop() { + return new JobStop(); + } + + public static PluginBeforeAllExecuteIsDone newPluginBeforeAllExecuteIsDone(Plugin plugin, boolean withError) { + return new PluginBeforeAllExecuteIsDone(plugin, withError); + } + + public static PluginAfterAllExecuteIsReady newPluginAfterAllExecuteIsReady( + Plugin plugin) { + return new PluginAfterAllExecuteIsReady<>(plugin); + } + + public static EventUserUpdated newEventUserUpdated(User user, boolean myUser, String senderId) { + return new EventUserUpdated(user, myUser, senderId); + } + + public static PluginExecuteIsDone newPluginExecuteIsDone(Plugin plugin, boolean withError) { + return new PluginExecuteIsDone(plugin, withError); + } + + public static PluginExecuteIsDone newPluginExecuteIsDone(Plugin plugin, boolean withError, String errorMessage) { + return new PluginExecuteIsDone(plugin, withError, errorMessage); + } + + public static PluginAfterAllExecuteIsDone newPluginAfterAllExecuteIsDone(Plugin plugin, boolean withError) { + return new PluginAfterAllExecuteIsDone(plugin, withError); + } + + public static EventUserCreated newEventUserCreated(User user, String senderId) { + return new EventUserCreated(user, senderId); + } + + public static EventUserDeleted newEventUserDeleted(String id, String senderId) { + return new EventUserDeleted(id, senderId); + } + + public static EventGroupCreated newEventGroupCreated(Group group, String senderId) { + return new EventGroupCreated(group, senderId); + } + + public static EventGroupUpdated newEventGroupUpdated(Group group, String senderId) { + return new EventGroupUpdated(group, senderId); + } + + public static EventGroupDeleted newEventGroupDeleted(String id, String senderId) { + return new EventGroupDeleted(id, senderId); + } +} \ No newline at end of file diff --git a/roda-core/roda-core/src/main/java/org/roda/core/common/akka/AkkaBaseActor.java b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/PekkoBaseActor.java similarity index 82% rename from roda-core/roda-core/src/main/java/org/roda/core/common/akka/AkkaBaseActor.java rename to roda-core/roda-core/src/main/java/org/roda/core/common/pekko/PekkoBaseActor.java index da8e38d82f..019f6cb6fd 100644 --- a/roda-core/roda-core/src/main/java/org/roda/core/common/akka/AkkaBaseActor.java +++ b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/PekkoBaseActor.java @@ -5,7 +5,7 @@ * * https://github.com/keeps/roda */ -package org.roda.core.common.akka; +package org.roda.core.common.pekko; import org.roda.core.RodaCoreFactory; import org.roda.core.index.IndexService; @@ -18,9 +18,9 @@ import org.apache.pekko.actor.UntypedAbstractActor; -public abstract class AkkaBaseActor extends UntypedAbstractActor { +public abstract class PekkoBaseActor extends UntypedAbstractActor { - public AkkaBaseActor() { + public PekkoBaseActor() { setup(); } @@ -33,8 +33,8 @@ private void setup() { } public void setup(Object msg) throws Exception { - org.slf4j.MDC.put("akkaSourceActor", self().path().toString()); - org.slf4j.MDC.put("akkaSourceThread", Thread.currentThread().getName()); + org.slf4j.MDC.put("pekkoSourceActor", self().path().toString()); + org.slf4j.MDC.put("pekkoSourceThread", Thread.currentThread().getName()); } public StorageService getStorage() { diff --git a/roda-core/roda-core/src/main/java/org/roda/core/common/akka/AkkaUtils.java b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/PekkoUtils.java similarity index 66% rename from roda-core/roda-core/src/main/java/org/roda/core/common/akka/AkkaUtils.java rename to roda-core/roda-core/src/main/java/org/roda/core/common/pekko/PekkoUtils.java index 8e244c684e..2d356a8bf8 100644 --- a/roda-core/roda-core/src/main/java/org/roda/core/common/akka/AkkaUtils.java +++ b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/PekkoUtils.java @@ -5,7 +5,7 @@ * * https://github.com/keeps/roda */ -package org.roda.core.common.akka; +package org.roda.core.common.pekko; import java.io.IOException; import java.io.InputStream; @@ -20,25 +20,25 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; -public final class AkkaUtils { - private static final Logger LOGGER = LoggerFactory.getLogger(AkkaUtils.class); +public final class PekkoUtils { + private static final Logger LOGGER = LoggerFactory.getLogger(PekkoUtils.class); - private AkkaUtils() { + private PekkoUtils() { // do nothing } - public static Config getAkkaConfiguration(String configFilename) { - Config akkaConfig = null; + public static Config getPekkoConfiguration(String configFilename) { + Config pekkoConfig = null; try (InputStream originStream = RodaCoreFactory .getConfigurationFileAsStream(RodaConstants.CORE_ORCHESTRATOR_FOLDER + "/" + configFilename)) { String configAsString = IOUtils.toString(originStream, StandardCharsets.UTF_8); - akkaConfig = ConfigFactory.parseString(configAsString); + pekkoConfig = ConfigFactory.parseString(configAsString); } catch (IOException e) { - LOGGER.error("Could not load Akka configuration '{}'", configFilename, e); + LOGGER.error("Could not load Pekko configuration '{}'", configFilename, e); } - return akkaConfig; + return pekkoConfig; } } diff --git a/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/AbstractMessage.java b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/AbstractMessage.java new file mode 100644 index 0000000000..c083d8ec23 --- /dev/null +++ b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/AbstractMessage.java @@ -0,0 +1,67 @@ +package org.roda.core.common.pekko.messages; + +import java.io.Serial; +import java.io.Serializable; + +import org.roda.core.data.v2.jobs.JobParallelism; +import org.roda.core.data.v2.jobs.JobPriority; +import org.roda.core.util.IdUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author Miguel Guimarães + */ +public abstract class AbstractMessage implements Serializable { + @Serial + private static final long serialVersionUID = 1898368418865765060L; + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractMessage.class); + + private String uuid; + private final long creationTime; + private JobPriority jobPriority; + private JobParallelism jobParallelism; + + protected AbstractMessage() { + if (LOGGER.isTraceEnabled()) { + uuid = IdUtils.createUUID(); + LOGGER.trace("{} Created message {}", uuid, getClass().getSimpleName()); + } + + creationTime = System.currentTimeMillis(); + } + + public void logProcessingStarted() { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("{} Started processing message {} [{}]", uuid, getClass().getSimpleName(), toString()); + } + } + + public void logProcessingEnded() { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("{} Ended processing message {}", uuid, getClass().getSimpleName()); + } + } + + public long getTimeSinceCreation() { + return System.currentTimeMillis() - creationTime; + } + + public AbstractMessage withJobPriority(JobPriority priority) { + this.jobPriority = priority; + return this; + } + + public AbstractMessage withParallelism(JobParallelism parallelism) { + this.jobParallelism = parallelism; + return this; + } + + public JobPriority getJobPriority() { + return jobPriority; + } + + public JobParallelism getParallelism() { + return jobParallelism; + } +} diff --git a/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/events/AbstractEventMessage.java b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/events/AbstractEventMessage.java new file mode 100644 index 0000000000..dafe075c9b --- /dev/null +++ b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/events/AbstractEventMessage.java @@ -0,0 +1,29 @@ +package org.roda.core.common.pekko.messages.events; + +import org.roda.core.common.pekko.messages.AbstractMessage; + +import java.io.Serial; + +/** + * @author Miguel Guimarães + */ +public abstract class AbstractEventMessage extends AbstractMessage { + @Serial + private static final long serialVersionUID = -2517455273875624115L; + + private final String senderId; + + protected AbstractEventMessage(String senderId) { + super(); + this.senderId = senderId; + } + + public String getSenderId() { + return senderId; + } + + @Override + public String toString() { + return "AbstractEventMessage [senderId=" + senderId + "]"; + } +} diff --git a/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/events/EventGroupCreated.java b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/events/EventGroupCreated.java new file mode 100644 index 0000000000..394531cf64 --- /dev/null +++ b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/events/EventGroupCreated.java @@ -0,0 +1,29 @@ +package org.roda.core.common.pekko.messages.events; + +import org.roda.core.data.v2.user.Group; + +import java.io.Serial; + +/** + * @author Miguel Guimarães + */ +public final class EventGroupCreated extends AbstractEventMessage { + @Serial + private static final long serialVersionUID = -51380983717488740L; + + private final Group group; + + public EventGroupCreated(Group group, String senderId) { + super(senderId); + this.group = group; + } + + public Group getGroup() { + return group; + } + + @Override + public String toString() { + return "EventGroupCreated [group=" + group + ", getSenderId()=" + getSenderId() + "]"; + } +} diff --git a/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/events/EventGroupDeleted.java b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/events/EventGroupDeleted.java new file mode 100644 index 0000000000..ea2cfff79a --- /dev/null +++ b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/events/EventGroupDeleted.java @@ -0,0 +1,27 @@ +package org.roda.core.common.pekko.messages.events; + +import java.io.Serial; + +/** + * @author Miguel Guimarães + */ +public final class EventGroupDeleted extends AbstractEventMessage { + @Serial + private static final long serialVersionUID = -7862917122791858311L; + + private final String id; + + public EventGroupDeleted(String id, String senderId) { + super(senderId); + this.id = id; + } + + public String getId() { + return id; + } + + @Override + public String toString() { + return "EventGroupDeleted [id=" + id + ", getSenderId()=" + getSenderId() + "]"; + } +} diff --git a/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/events/EventGroupUpdated.java b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/events/EventGroupUpdated.java new file mode 100644 index 0000000000..8610ab4f5f --- /dev/null +++ b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/events/EventGroupUpdated.java @@ -0,0 +1,29 @@ +package org.roda.core.common.pekko.messages.events; + +import org.roda.core.data.v2.user.Group; + +import java.io.Serial; + +/** + * @author Miguel Guimarães + */ +public final class EventGroupUpdated extends AbstractEventMessage { + @Serial + private static final long serialVersionUID = -51380983717488740L; + + private final Group group; + + public EventGroupUpdated(Group group, String senderId) { + super(senderId); + this.group = group; + } + + public Group getGroup() { + return group; + } + + @Override + public String toString() { + return "EventGroupUpdated [group=" + group + ", getSenderId()=" + getSenderId() + "]"; + } +} diff --git a/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/events/EventUserCreated.java b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/events/EventUserCreated.java new file mode 100644 index 0000000000..9c74066d63 --- /dev/null +++ b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/events/EventUserCreated.java @@ -0,0 +1,34 @@ +package org.roda.core.common.pekko.messages.events; + +import org.roda.core.data.v2.user.User; + +import java.io.Serial; + +/** + * @author Miguel Guimarães + */ +public final class EventUserCreated extends AbstractEventMessage { + @Serial + private static final long serialVersionUID = -2517455273875624115L; + + private User user; + private String password; + + public EventUserCreated(User user, String senderId) { + super(senderId); + this.user = user; + } + + public User getUser() { + return user; + } + + public String getPassword() { + return password; + } + + @Override + public String toString() { + return "EventUserCreated [user=" + user + ", getSenderId()=" + getSenderId() + "]"; + } +} diff --git a/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/events/EventUserDeleted.java b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/events/EventUserDeleted.java new file mode 100644 index 0000000000..289dc050cb --- /dev/null +++ b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/events/EventUserDeleted.java @@ -0,0 +1,27 @@ +package org.roda.core.common.pekko.messages.events; + +import java.io.Serial; + +/** + * @author Miguel Guimarães + */ +public final class EventUserDeleted extends AbstractEventMessage { + @Serial + private static final long serialVersionUID = -7862917122791858311L; + + private final String id; + + public EventUserDeleted(String id, String senderId) { + super(senderId); + this.id = id; + } + + public String getId() { + return id; + } + + @Override + public String toString() { + return "EventUserDeleted [id=" + id + ", getSenderId()=" + getSenderId() + "]"; + } +} diff --git a/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/events/EventUserUpdated.java b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/events/EventUserUpdated.java new file mode 100644 index 0000000000..8f42f3f730 --- /dev/null +++ b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/events/EventUserUpdated.java @@ -0,0 +1,41 @@ +package org.roda.core.common.pekko.messages.events; + +import org.roda.core.data.v2.user.User; + +import java.io.Serial; + +/** + * @author Miguel Guimarães + */ +public final class EventUserUpdated extends AbstractEventMessage { + @Serial + private static final long serialVersionUID = -2517455273875624115L; + + private final User user; + private String password; + private final boolean myUser; + + public EventUserUpdated(User user, boolean myUser, String senderId) { + super(senderId); + this.user = user; + this.myUser = myUser; + } + + public User getUser() { + return user; + } + + public String getPassword() { + return password; + } + + public boolean isMyUser() { + return myUser; + } + + @Override + public String toString() { + return "EventUserUpdated [user=" + user + ", password=" + password + ", myUser=" + myUser + ", getSenderId()=" + + getSenderId() + "]"; + } +} diff --git a/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobCleanup.java b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobCleanup.java new file mode 100644 index 0000000000..f3c9a0587d --- /dev/null +++ b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobCleanup.java @@ -0,0 +1,22 @@ +package org.roda.core.common.pekko.messages.jobs; + +import org.roda.core.common.pekko.messages.AbstractMessage; + +import java.io.Serial; + +/** + * @author Miguel Guimarães + */ +public class JobCleanup extends AbstractMessage { + @Serial + private static final long serialVersionUID = -5175825019027462407L; + + public JobCleanup() { + super(); + } + + @Override + public String toString() { + return "JobCleanup []"; + } +} diff --git a/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobInfoUpdated.java b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobInfoUpdated.java new file mode 100644 index 0000000000..c48f5a8334 --- /dev/null +++ b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobInfoUpdated.java @@ -0,0 +1,37 @@ +package org.roda.core.common.pekko.messages.jobs; + +import java.io.Serial; + +import org.roda.core.common.pekko.messages.AbstractMessage; +import org.roda.core.plugins.Plugin; +import org.roda.core.plugins.orchestrate.JobPluginInfo; + +/** + * @author Miguel Guimarães + */ +public class JobInfoUpdated extends AbstractMessage { + @Serial + private static final long serialVersionUID = -6918015956027259760L; + + private final Plugin plugin; + private final JobPluginInfo jobPluginInfo; + + public JobInfoUpdated(Plugin plugin, JobPluginInfo jobPluginInfo) { + super(); + this.plugin = plugin; + this.jobPluginInfo = jobPluginInfo; + } + + public Plugin getPlugin() { + return plugin; + } + + public JobPluginInfo getJobPluginInfo() { + return jobPluginInfo; + } + + @Override + public String toString() { + return "JobInfoUpdated [plugin=" + plugin + ", jobPluginInfo=" + jobPluginInfo + "]"; + } +} diff --git a/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobInitEnded.java b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobInitEnded.java new file mode 100644 index 0000000000..909539acaf --- /dev/null +++ b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobInitEnded.java @@ -0,0 +1,36 @@ +package org.roda.core.common.pekko.messages.jobs; + +import org.roda.core.common.pekko.messages.AbstractMessage; +import org.roda.core.plugins.orchestrate.JobPluginInfo; + +import java.io.Serial; + +/** + * @author Miguel Guimarães + */ +public class JobInitEnded extends AbstractMessage { + @Serial + private static final long serialVersionUID = 5040958276243865900L; + + private final boolean noObjectsOrchestrated; + private final JobPluginInfo jobPluginInfo; + + public JobInitEnded(JobPluginInfo jobPluginInfo, boolean noObjectsOrchestrated) { + super(); + this.jobPluginInfo = jobPluginInfo; + this.noObjectsOrchestrated = noObjectsOrchestrated; + } + + public JobPluginInfo getJobPluginInfo() { + return jobPluginInfo; + } + + public boolean isNoObjectsOrchestrated() { + return noObjectsOrchestrated; + } + + @Override + public String toString() { + return "JobInitEnded [noObjectsOrchestrated=" + noObjectsOrchestrated + ", jobPluginInfo=" + jobPluginInfo + "]"; + } +} diff --git a/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobPartialUpdate.java b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobPartialUpdate.java new file mode 100644 index 0000000000..306d72b703 --- /dev/null +++ b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobPartialUpdate.java @@ -0,0 +1,18 @@ +package org.roda.core.common.pekko.messages.jobs; + +import org.roda.core.common.pekko.messages.AbstractMessage; + +import java.io.Serial; + +/** + * @author Miguel Guimarães + */ +public abstract class JobPartialUpdate extends AbstractMessage { + @Serial + private static final long serialVersionUID = 4722216970884172260L; + + @Override + public String toString() { + return "JobPartialUpdate []"; + } +} diff --git a/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobSourceObjectsUpdated.java b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobSourceObjectsUpdated.java new file mode 100644 index 0000000000..583ea3a3f2 --- /dev/null +++ b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobSourceObjectsUpdated.java @@ -0,0 +1,28 @@ +package org.roda.core.common.pekko.messages.jobs; + +import java.io.Serial; +import java.util.Map; + +/** + * @author Miguel Guimarães + */ +public class JobSourceObjectsUpdated extends JobPartialUpdate { + @Serial + private static final long serialVersionUID = -8395563279621159731L; + + private final Map oldToNewIds; + + public JobSourceObjectsUpdated(Map oldToNewIds) { + super(); + this.oldToNewIds = oldToNewIds; + } + + public Map getOldToNewIds() { + return oldToNewIds; + } + + @Override + public String toString() { + return "JobSourceObjectsUpdated [oldToNewIds=" + oldToNewIds + "]"; + } +} diff --git a/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobStateDetailsUpdated.java b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobStateDetailsUpdated.java new file mode 100644 index 0000000000..fdf50cf6c3 --- /dev/null +++ b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobStateDetailsUpdated.java @@ -0,0 +1,41 @@ +package org.roda.core.common.pekko.messages.jobs; + +import org.roda.core.data.v2.SerializableOptional; +import org.roda.core.plugins.Plugin; + +import java.io.Serial; +import java.util.Optional; + +/** + * @author Miguel Guimarães + */ +public class JobStateDetailsUpdated extends JobPartialUpdate { + @Serial + private static final long serialVersionUID = 1946036502369851214L; + + private final Plugin plugin; + private final SerializableOptional stateDetails; + + public JobStateDetailsUpdated(Plugin plugin, Optional stateDetails) { + super(); + this.plugin = plugin; + this.stateDetails = SerializableOptional.setOptional(stateDetails); + } + + public JobStateDetailsUpdated(Plugin plugin, Throwable throwable) { + this(plugin, Optional.of(throwable.getClass().getName() + ": " + throwable.getMessage())); + } + + public Plugin getPlugin() { + return plugin; + } + + public Optional getStateDetails() { + return stateDetails.getOptional(); + } + + @Override + public String toString() { + return "JobStateDetailsUpdated [plugin=" + plugin + ", stateDetails=" + stateDetails + "]"; + } +} diff --git a/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobStateUpdated.java b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobStateUpdated.java new file mode 100644 index 0000000000..b664ec7492 --- /dev/null +++ b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobStateUpdated.java @@ -0,0 +1,40 @@ +package org.roda.core.common.pekko.messages.jobs; + +import org.roda.core.data.v2.jobs.Job; +import org.roda.core.plugins.Plugin; + +import java.io.Serial; +import java.util.Optional; + +/** + * @author Miguel Guimarães + */ +public class JobStateUpdated extends JobStateDetailsUpdated { + @Serial + private static final long serialVersionUID = 1946036502369851214L; + + private final Job.JOB_STATE state; + + public JobStateUpdated(Plugin plugin, Job.JOB_STATE state) { + this(plugin, state, Optional.empty()); + } + + public JobStateUpdated(Plugin plugin, Job.JOB_STATE state, Optional stateDetails) { + super(plugin, stateDetails); + this.state = state; + } + + public JobStateUpdated(Plugin plugin, Job.JOB_STATE state, Throwable throwable) { + this(plugin, state, Optional.of(throwable.getClass().getName() + ": " + throwable.getMessage())); + } + + public Job.JOB_STATE getState() { + return state; + } + + @Override + public String toString() { + return "JobStateUpdated [plugin=" + getPlugin() + ", state=" + state + ", stateDetails=" + getStateDetails() + + "]"; + } +} diff --git a/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobStop.java b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobStop.java new file mode 100644 index 0000000000..2f2a36165c --- /dev/null +++ b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobStop.java @@ -0,0 +1,22 @@ +package org.roda.core.common.pekko.messages.jobs; + +import org.roda.core.common.pekko.messages.AbstractMessage; + +import java.io.Serial; + +/** + * @author Miguel Guimarães + */ +public class JobStop extends AbstractMessage { + @Serial + private static final long serialVersionUID = -8806029242967727412L; + + public JobStop() { + super(); + } + + @Override + public String toString() { + return "JobStop []"; + } +} diff --git a/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobsManagerAcquireLock.java b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobsManagerAcquireLock.java new file mode 100644 index 0000000000..ebea9f04f8 --- /dev/null +++ b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobsManagerAcquireLock.java @@ -0,0 +1,62 @@ +package org.roda.core.common.pekko.messages.jobs; + +import java.io.Serial; +import java.util.Date; +import java.util.List; + +import org.apache.pekko.actor.ActorRef; +import org.roda.core.common.pekko.messages.AbstractMessage; + +/** + * @author Miguel Guimarães + */ +public class JobsManagerAcquireLock extends AbstractMessage { + @Serial + private static final long serialVersionUID = 4924002662559968741L; + + private final List lites; + private final boolean waitForLockIfLocked; + private final Date expireDate; + private final String requestUuid; + private ActorRef sender; + + public JobsManagerAcquireLock(List lites, boolean waitForLockIfLocked, int secondsToExpire, + String requestUuid) { + super(); + this.lites = lites; + this.waitForLockIfLocked = waitForLockIfLocked; + this.expireDate = new Date(new Date().getTime() + (secondsToExpire * 1000L)); + this.requestUuid = requestUuid; + } + + public List getLites() { + return lites; + } + + public ActorRef getSender() { + return sender; + } + + public JobsManagerAcquireLock setSender(ActorRef sender) { + this.sender = sender; + return this; + } + + public boolean isWaitForLockIfLocked() { + return waitForLockIfLocked; + } + + public Date getExpireDate() { + return expireDate; + } + + public String getRequestUuid() { + return requestUuid; + } + + @Override + public String toString() { + return "JobsManagerAcquireLock [lites=" + lites + ", waitForLockIfLocked=" + waitForLockIfLocked + ", expireDate=" + + expireDate + ", requestUuid=" + requestUuid + "]"; + } +} \ No newline at end of file diff --git a/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobsManagerJobEnded.java b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobsManagerJobEnded.java new file mode 100644 index 0000000000..0f00531099 --- /dev/null +++ b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobsManagerJobEnded.java @@ -0,0 +1,65 @@ +package org.roda.core.common.pekko.messages.jobs; + +import java.io.Serial; + +import org.roda.core.common.pekko.messages.AbstractMessage; +import org.roda.core.data.v2.jobs.JobParallelism; +import org.roda.core.data.v2.jobs.JobStats; +import org.roda.core.data.v2.jobs.PluginType; + +/** + * @author Miguel Guimarães + */ +public class JobsManagerJobEnded extends AbstractMessage { + @Serial + private static final long serialVersionUID = -2514581679498648676L; + + private final String jobId; + private final String plugin; + private final PluginType pluginType; + private final long duration; + private final JobStats jobStats; + private final JobParallelism parallelism; + + public JobsManagerJobEnded(String jobId, String plugin, PluginType pluginType, long duration, JobStats jobStats, + JobParallelism parallelism) { + super(); + this.jobId = jobId; + this.plugin = plugin; + this.pluginType = pluginType; + this.duration = duration; + this.jobStats = jobStats; + this.parallelism = parallelism; + } + + public String getJobId() { + return jobId; + } + + public String getPlugin() { + return plugin; + } + + public PluginType getPluginType() { + return pluginType; + } + + public long getDuration() { + return duration; + } + + public JobStats getJobStats() { + return jobStats; + } + + public JobParallelism getJobParallelism() { + return parallelism; + } + + @Override + public String toString() { + return "JobsManagerJobEnded [jobId=" + jobId + ", plugin=" + plugin + ", pluginType=" + pluginType + + ", jobParallelism=" + parallelism + "]"; + + } +} diff --git a/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobsManagerNotLockableAtTheTime.java b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobsManagerNotLockableAtTheTime.java new file mode 100644 index 0000000000..adfcbfb2ad --- /dev/null +++ b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobsManagerNotLockableAtTheTime.java @@ -0,0 +1,30 @@ +package org.roda.core.common.pekko.messages.jobs; + +import org.roda.core.common.pekko.messages.AbstractMessage; + +import java.io.Serial; + +/** + * @author Miguel Guimarães + */ +public class JobsManagerNotLockableAtTheTime extends AbstractMessage { + @Serial + private static final long serialVersionUID = -2313831907910175641L; + + private final String msg; + + public JobsManagerNotLockableAtTheTime() { + super(); + this.msg = ""; + } + + public JobsManagerNotLockableAtTheTime(String msg) { + super(); + this.msg = msg; + } + + @Override + public String toString() { + return "JobsManagerUnlockableAtTheTime [msg=" + msg + "]"; + } +} diff --git a/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobsManagerReleaseAllLocks.java b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobsManagerReleaseAllLocks.java new file mode 100644 index 0000000000..9b4dfc675c --- /dev/null +++ b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobsManagerReleaseAllLocks.java @@ -0,0 +1,22 @@ +package org.roda.core.common.pekko.messages.jobs; + +import org.roda.core.common.pekko.messages.AbstractMessage; + +import java.io.Serial; + +/** + * @author Miguel Guimarães + */ +public class JobsManagerReleaseAllLocks extends AbstractMessage { + @Serial + private static final long serialVersionUID = -2842420964416530808L; + + public JobsManagerReleaseAllLocks() { + super(); + } + + @Override + public String toString() { + return "JobsManagerReleaseAllLocks []"; + } +} diff --git a/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobsManagerReleaseLock.java b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobsManagerReleaseLock.java new file mode 100644 index 0000000000..640be08bca --- /dev/null +++ b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobsManagerReleaseLock.java @@ -0,0 +1,37 @@ +package org.roda.core.common.pekko.messages.jobs; + +import org.roda.core.common.pekko.messages.AbstractMessage; + +import java.io.Serial; +import java.util.List; + +/** + * @author Miguel Guimarães + */ +public class JobsManagerReleaseLock extends AbstractMessage { + @Serial + private static final long serialVersionUID = 4924002662559968741L; + + private final List lites; + private final String requestUuid; + + public JobsManagerReleaseLock(List lites, String requestUuid) { + super(); + this.lites = lites; + this.requestUuid = requestUuid; + } + + public List getLites() { + return lites; + } + + public String getRequestUuid() { + return requestUuid; + } + + @Override + public String toString() { + return "JobsManagerReleaseLock [lites=" + lites + ", requestUuid=" + requestUuid + "]"; + } + +} diff --git a/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobsManagerReplyToAcquireLock.java b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobsManagerReplyToAcquireLock.java new file mode 100644 index 0000000000..bdcd8e9c45 --- /dev/null +++ b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobsManagerReplyToAcquireLock.java @@ -0,0 +1,30 @@ +package org.roda.core.common.pekko.messages.jobs; + +import org.roda.core.common.pekko.messages.AbstractMessage; + +import java.io.Serial; +import java.util.List; + +/** + * @author Miguel Guimarães + */ +public class JobsManagerReplyToAcquireLock extends AbstractMessage { + @Serial + private static final long serialVersionUID = 4924002662559968741L; + + private final List lites; + + public JobsManagerReplyToAcquireLock(List lites) { + super(); + this.lites = lites; + } + + public List getLites() { + return lites; + } + + @Override + public String toString() { + return "JobsManagerReplyToAcquireLock [lites=" + lites + "]"; + } +} diff --git a/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobsManagerReplyToReleaseLock.java b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobsManagerReplyToReleaseLock.java new file mode 100644 index 0000000000..db0ed1e24c --- /dev/null +++ b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobsManagerReplyToReleaseLock.java @@ -0,0 +1,30 @@ +package org.roda.core.common.pekko.messages.jobs; + +import org.roda.core.common.pekko.messages.AbstractMessage; + +import java.io.Serial; +import java.util.List; + +/** + * @author Miguel Guimarães + */ +public class JobsManagerReplyToReleaseLock extends AbstractMessage { + @Serial + private static final long serialVersionUID = 4924002662559968741L; + + private final List lites; + + public JobsManagerReplyToReleaseLock(List lites) { + super(); + this.lites = lites; + } + + public List getLites() { + return lites; + } + + @Override + public String toString() { + return "JobsManagerReplyToReleaseLock [lites=" + lites + "]"; + } +} diff --git a/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobsManagerTick.java b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobsManagerTick.java new file mode 100644 index 0000000000..6f2d38e74c --- /dev/null +++ b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/jobs/JobsManagerTick.java @@ -0,0 +1,22 @@ +package org.roda.core.common.pekko.messages.jobs; + +import org.roda.core.common.pekko.messages.AbstractMessage; + +import java.io.Serial; + +/** + * @author Miguel Guimarães + */ +public class JobsManagerTick extends AbstractMessage { + @Serial + private static final long serialVersionUID = -2514581679498648676L; + + public JobsManagerTick() { + super(); + } + + @Override + public String toString() { + return "JobManagerTick []"; + } +} diff --git a/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/plugins/PluginAfterAllExecuteIsDone.java b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/plugins/PluginAfterAllExecuteIsDone.java new file mode 100644 index 0000000000..6486ea6fbb --- /dev/null +++ b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/plugins/PluginAfterAllExecuteIsDone.java @@ -0,0 +1,22 @@ +package org.roda.core.common.pekko.messages.plugins; + +import org.roda.core.plugins.Plugin; + +import java.io.Serial; + +/** + * @author Miguel Guimarães + */ +public class PluginAfterAllExecuteIsDone extends PluginMethodIsDone { + @Serial + private static final long serialVersionUID = -5136014936634139026L; + + public PluginAfterAllExecuteIsDone(Plugin plugin, boolean withError) { + super(plugin, withError); + } + + @Override + public String toString() { + return "PluginAfterAllExecuteIsDone [getPlugin()=" + getPlugin() + ", isWithError()=" + isWithError() + "]"; + } +} diff --git a/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/plugins/PluginAfterAllExecuteIsReady.java b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/plugins/PluginAfterAllExecuteIsReady.java new file mode 100644 index 0000000000..89e1c78303 --- /dev/null +++ b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/plugins/PluginAfterAllExecuteIsReady.java @@ -0,0 +1,23 @@ +package org.roda.core.common.pekko.messages.plugins; + +import org.roda.core.data.v2.IsRODAObject; +import org.roda.core.plugins.Plugin; + +import java.io.Serial; + +/** + * @author Miguel Guimarães + */ +public class PluginAfterAllExecuteIsReady extends PluginMethodIsReady { + @Serial + private static final long serialVersionUID = 8852688692792086166L; + + public PluginAfterAllExecuteIsReady(Plugin plugin) { + super(plugin); + } + + @Override + public String toString() { + return "PluginAfterAllExecuteIsReady [plugin=" + getPlugin() + "]"; + } +} diff --git a/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/plugins/PluginBeforeAllExecuteIsDone.java b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/plugins/PluginBeforeAllExecuteIsDone.java new file mode 100644 index 0000000000..ef2aaac1fd --- /dev/null +++ b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/plugins/PluginBeforeAllExecuteIsDone.java @@ -0,0 +1,22 @@ +package org.roda.core.common.pekko.messages.plugins; + +import org.roda.core.plugins.Plugin; + +import java.io.Serial; + +/** + * @author Miguel Guimarães + */ +public class PluginBeforeAllExecuteIsDone extends PluginMethodIsDone { + @Serial + private static final long serialVersionUID = 7449486178368177015L; + + public PluginBeforeAllExecuteIsDone(Plugin plugin, boolean withError) { + super(plugin, withError); + } + + @Override + public String toString() { + return "PluginBeforeAllExecuteIsDone [getPlugin()=" + getPlugin() + ", isWithError()=" + isWithError() + "]"; + } +} diff --git a/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/plugins/PluginBeforeAllExecuteIsReady.java b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/plugins/PluginBeforeAllExecuteIsReady.java new file mode 100644 index 0000000000..644cf68f81 --- /dev/null +++ b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/plugins/PluginBeforeAllExecuteIsReady.java @@ -0,0 +1,23 @@ +package org.roda.core.common.pekko.messages.plugins; + +import org.roda.core.data.v2.IsRODAObject; +import org.roda.core.plugins.Plugin; + +import java.io.Serial; + +/** + * @author Miguel Guimarães + */ +public class PluginBeforeAllExecuteIsReady extends PluginMethodIsReady { + @Serial + private static final long serialVersionUID = -7730727049162062388L; + + public PluginBeforeAllExecuteIsReady(Plugin plugin) { + super(plugin); + } + + @Override + public String toString() { + return "PluginBeforeAllExecuteIsReady [getPlugin()=" + getPlugin() + "]"; + } +} diff --git a/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/plugins/PluginExecuteIsDone.java b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/plugins/PluginExecuteIsDone.java new file mode 100644 index 0000000000..f173fa79f1 --- /dev/null +++ b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/plugins/PluginExecuteIsDone.java @@ -0,0 +1,27 @@ +package org.roda.core.common.pekko.messages.plugins; + +import org.roda.core.plugins.Plugin; + +import java.io.Serial; + +/** + * @author Miguel Guimarães + */ +public class PluginExecuteIsDone extends PluginMethodIsDone { + @Serial + private static final long serialVersionUID = -5136014936634139026L; + + public PluginExecuteIsDone(Plugin plugin, boolean withError) { + super(plugin, withError); + } + + public PluginExecuteIsDone(Plugin plugin, boolean withError, String errorMessage) { + super(plugin, withError, errorMessage); + } + + @Override + public String toString() { + return "PluginExecuteIsDone [getPlugin()=" + getPlugin() + ", isWithError()=" + isWithError() + + ", getErrorMessage()=" + getErrorMessage() + "]"; + } +} diff --git a/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/plugins/PluginExecuteIsReady.java b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/plugins/PluginExecuteIsReady.java new file mode 100644 index 0000000000..5b5469c154 --- /dev/null +++ b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/plugins/PluginExecuteIsReady.java @@ -0,0 +1,38 @@ +package org.roda.core.common.pekko.messages.plugins; + +import org.roda.core.data.v2.IsRODAObject; +import org.roda.core.data.v2.LiteOptionalWithCause; +import org.roda.core.plugins.Plugin; + +import java.io.Serial; +import java.util.List; + +/** + * @author Miguel Guimarães + */ +public class PluginExecuteIsReady extends PluginMethodIsReady { + @Serial + private static final long serialVersionUID = 1821489252490235130L; + + private final List list; + private boolean hasBeenForwarded = false; + + public PluginExecuteIsReady(Plugin plugin, List list) { + super(plugin); + this.list = list; + } + + public List getList() { + return list; + } + + public void setHasBeenForwarded() { + this.hasBeenForwarded = true; + } + + @Override + public String toString() { + return "PluginExecuteIsReady [list=" + list + ", hasBeenForwarded=" + hasBeenForwarded + ", getPlugin()=" + + getPlugin() + "]"; + } +} diff --git a/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/plugins/PluginMethodIsDone.java b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/plugins/PluginMethodIsDone.java new file mode 100644 index 0000000000..a3e271226c --- /dev/null +++ b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/plugins/PluginMethodIsDone.java @@ -0,0 +1,49 @@ +package org.roda.core.common.pekko.messages.plugins; + +import org.roda.core.common.pekko.messages.AbstractMessage; +import org.roda.core.plugins.Plugin; + +import java.io.Serial; + +/** + * @author Miguel Guimarães + */ +public class PluginMethodIsDone extends AbstractMessage { + @Serial + private static final long serialVersionUID = -8701179264086005994L; + + private final Plugin plugin; + private final boolean withError; + private String errorMessage = ""; + + public PluginMethodIsDone(Plugin plugin, boolean withError) { + super(); + this.plugin = plugin; + this.withError = withError; + } + + public PluginMethodIsDone(Plugin plugin, boolean withError, String errorMessage) { + super(); + this.plugin = plugin; + this.withError = withError; + this.errorMessage = errorMessage; + } + + public Plugin getPlugin() { + return plugin; + } + + public boolean isWithError() { + return withError; + } + + public String getErrorMessage() { + return errorMessage; + } + + @Override + public String toString() { + return "PluginMethodIsDone [plugin=" + plugin + ", withError=" + withError + ", errorMessage=" + errorMessage + + "]"; + } +} diff --git a/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/plugins/PluginMethodIsReady.java b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/plugins/PluginMethodIsReady.java new file mode 100644 index 0000000000..cef9ba7595 --- /dev/null +++ b/roda-core/roda-core/src/main/java/org/roda/core/common/pekko/messages/plugins/PluginMethodIsReady.java @@ -0,0 +1,31 @@ +package org.roda.core.common.pekko.messages.plugins; + +import org.roda.core.common.pekko.messages.AbstractMessage; +import org.roda.core.data.v2.IsRODAObject; +import org.roda.core.plugins.Plugin; + +import java.io.Serial; + +/** + * @author Miguel Guimarães + */ +public class PluginMethodIsReady extends AbstractMessage { + @Serial + private static final long serialVersionUID = -5214600055070295410L; + + private Plugin plugin; + + public PluginMethodIsReady(Plugin plugin) { + super(); + this.plugin = plugin; + } + + public Plugin getPlugin() { + return plugin; + } + + @Override + public String toString() { + return "PluginMethodIsReady [plugin=" + plugin + "]"; + } +} diff --git a/roda-core/roda-core/src/main/java/org/roda/core/events/AbstractEventsHandler.java b/roda-core/roda-core/src/main/java/org/roda/core/events/AbstractEventsHandler.java index d934aff806..a965f21dcb 100644 --- a/roda-core/roda-core/src/main/java/org/roda/core/events/AbstractEventsHandler.java +++ b/roda-core/roda-core/src/main/java/org/roda/core/events/AbstractEventsHandler.java @@ -21,7 +21,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Serial; + public abstract class AbstractEventsHandler implements EventsHandler { + @Serial private static final long serialVersionUID = -1284727831525932207L; private static final Logger LOGGER = LoggerFactory.getLogger(AbstractEventsHandler.class); diff --git a/roda-core/roda-core/src/main/java/org/roda/core/events/EventsManager.java b/roda-core/roda-core/src/main/java/org/roda/core/events/EventsManager.java index 0fa99060ff..532c7b8b2c 100644 --- a/roda-core/roda-core/src/main/java/org/roda/core/events/EventsManager.java +++ b/roda-core/roda-core/src/main/java/org/roda/core/events/EventsManager.java @@ -12,7 +12,10 @@ import org.roda.core.data.v2.user.User; import org.roda.core.model.ModelService; +import java.io.Serial; + public class EventsManager implements EventsNotifier { + @Serial private static final long serialVersionUID = 3733394744862836327L; private EventsNotifier eventsNotifier; diff --git a/roda-core/roda-core/src/main/java/org/roda/core/events/akka/CRDTSerializer.java b/roda-core/roda-core/src/main/java/org/roda/core/events/pekko/CRDTSerializer.java similarity index 98% rename from roda-core/roda-core/src/main/java/org/roda/core/events/akka/CRDTSerializer.java rename to roda-core/roda-core/src/main/java/org/roda/core/events/pekko/CRDTSerializer.java index 1cec459cc3..8206e71e68 100644 --- a/roda-core/roda-core/src/main/java/org/roda/core/events/akka/CRDTSerializer.java +++ b/roda-core/roda-core/src/main/java/org/roda/core/events/pekko/CRDTSerializer.java @@ -5,7 +5,7 @@ * * https://github.com/keeps/roda */ -package org.roda.core.events.akka; +package org.roda.core.events.pekko; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; diff --git a/roda-core/roda-core/src/main/java/org/roda/core/events/akka/CRDTWrapper.java b/roda-core/roda-core/src/main/java/org/roda/core/events/pekko/CRDTWrapper.java similarity index 98% rename from roda-core/roda-core/src/main/java/org/roda/core/events/akka/CRDTWrapper.java rename to roda-core/roda-core/src/main/java/org/roda/core/events/pekko/CRDTWrapper.java index d9901ed308..4cb873f4db 100644 --- a/roda-core/roda-core/src/main/java/org/roda/core/events/akka/CRDTWrapper.java +++ b/roda-core/roda-core/src/main/java/org/roda/core/events/pekko/CRDTWrapper.java @@ -5,7 +5,7 @@ * * https://github.com/keeps/roda */ -package org.roda.core.events.akka; +package org.roda.core.events.pekko; import java.io.Serial; import java.util.HashMap; diff --git a/roda-core/roda-core/src/main/java/org/roda/core/events/akka/AkkaEventsHandlerAndNotifier.java b/roda-core/roda-core/src/main/java/org/roda/core/events/pekko/PekkoEventsHandlerAndNotifier.java similarity index 86% rename from roda-core/roda-core/src/main/java/org/roda/core/events/akka/AkkaEventsHandlerAndNotifier.java rename to roda-core/roda-core/src/main/java/org/roda/core/events/pekko/PekkoEventsHandlerAndNotifier.java index 0fe8eb8840..a5e2a92dc6 100644 --- a/roda-core/roda-core/src/main/java/org/roda/core/events/akka/AkkaEventsHandlerAndNotifier.java +++ b/roda-core/roda-core/src/main/java/org/roda/core/events/pekko/PekkoEventsHandlerAndNotifier.java @@ -5,7 +5,7 @@ * * https://github.com/keeps/roda */ -package org.roda.core.events.akka; +package org.roda.core.events.pekko; import java.io.IOException; import java.io.Serial; @@ -27,8 +27,8 @@ import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.roda.core.RodaCoreFactory; -import org.roda.core.common.akka.AkkaUtils; -import org.roda.core.common.akka.Messages; +import org.roda.core.common.pekko.PekkoUtils; +import org.roda.core.common.pekko.Messages; import org.roda.core.data.common.RodaConstants; import org.roda.core.data.v2.user.Group; import org.roda.core.data.v2.user.User; @@ -46,10 +46,10 @@ import scala.concurrent.Future; import scala.concurrent.duration.Duration; -public class AkkaEventsHandlerAndNotifier extends AbstractEventsHandler implements EventsNotifier { +public class PekkoEventsHandlerAndNotifier extends AbstractEventsHandler implements EventsNotifier { @Serial private static final long serialVersionUID = 919188071375009042L; - private static final Logger LOGGER = LoggerFactory.getLogger(AkkaEventsHandlerAndNotifier.class); + private static final Logger LOGGER = LoggerFactory.getLogger(PekkoEventsHandlerAndNotifier.class); private static final String EVENTS_SYSTEM = "EventsSystem"; @@ -58,9 +58,9 @@ public class AkkaEventsHandlerAndNotifier extends AbstractEventsHandler implemen private String instanceSenderId; private boolean shuttingDown = false; - public AkkaEventsHandlerAndNotifier() { - Config akkaConfig = AkkaUtils.getAkkaConfiguration("events.conf"); - eventsSystem = ActorSystem.create(EVENTS_SYSTEM, akkaConfig); + public PekkoEventsHandlerAndNotifier() { + Config pekkoConfig = PekkoUtils.getPekkoConfiguration("events.conf"); + eventsSystem = ActorSystem.create(EVENTS_SYSTEM, pekkoConfig); List
seedNodesAddresses = getSeedNodesAddresses(); if (seedNodesAddresses.isEmpty()) { @@ -74,10 +74,10 @@ public AkkaEventsHandlerAndNotifier() { } private ActorRef instantiateEventsNotifierAndHandlerActor() { - String writeConsistency = RodaCoreFactory.getProperty("core.events.akka.writeConsistency", ""); + String writeConsistency = RodaCoreFactory.getProperty("core.events.pekko.writeConsistency", ""); int writeConsistencyTimeoutInSeconds = RodaCoreFactory - .getProperty("core.events.akka.writeConsistencyTimeoutInSeconds", 3); - return eventsSystem.actorOf(Props.create(AkkaEventsHandlerAndNotifierActor.class, (EventsHandler) this, + .getProperty("core.events.pekko.writeConsistencyTimeoutInSeconds", 3); + return eventsSystem.actorOf(Props.create(PekkoEventsHandlerAndNotifierActor.class, (EventsHandler) this, writeConsistency, writeConsistencyTimeoutInSeconds), "eventsNotifierAndHandlerActor"); } @@ -161,10 +161,10 @@ public void onComplete(Throwable failure, Terminated result) { private List
getSeedNodesAddresses() { List
seedNodes = new ArrayList<>(); - if (RodaCoreFactory.getProperty("core.events.akka.seeds_via_list", false)) { + if (RodaCoreFactory.getProperty("core.events.pekko.seeds_via_list", false)) { int i = 1; while (i != -1) { - String seed = RodaCoreFactory.getProperty("core.events.akka.seeds." + i, null); + String seed = RodaCoreFactory.getProperty("core.events.pekko.seeds." + i, null); if (seed != null) { processAndAddSeedNode(seedNodes, seed); i++; @@ -176,7 +176,7 @@ private List
getSeedNodesAddresses() { ZooKeeper zkClient; try { String connectString = RodaCoreFactory.getProperty(RodaConstants.CORE_SOLR_CLOUD_URLS, "localhost:2181"); - String zkSeedsNode = RodaCoreFactory.getProperty("core.events.akka.zk.seeds_path", "/akka/nodes"); + String zkSeedsNode = RodaCoreFactory.getProperty("core.events.pekko.zk.seeds_path", "/akka/nodes"); String chRootPath = connectString + zkSeedsNode; ZkController.checkChrootPath(chRootPath, true); @@ -188,7 +188,7 @@ private List
getSeedNodesAddresses() { processAndAddSeedNode(seedNodes, seed); }); - String separator = RodaCoreFactory.getProperty("core.events.akka.address.separator", ":"); + String separator = RodaCoreFactory.getProperty("core.events.pekko.address.separator", ":"); String hostName = InetAddress.getLocalHost().getHostAddress() + separator + "2552"; zkClient.create(zkSeedsNode + "/" + hostName, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); @@ -212,9 +212,9 @@ private void processAndAddSeedNode(List
seedNodes, String node) { return; } try { - String separator = RodaCoreFactory.getProperty("core.events.akka.address.separator", ":"); + String separator = RodaCoreFactory.getProperty("core.events.pekko.address.separator", ":"); String[] nodeParts = node.split(separator, 2); - seedNodes.add(new Address("akka.tcp", EVENTS_SYSTEM, nodeParts[0], Integer.parseInt(nodeParts[1]))); + seedNodes.add(new Address("pekko.tcp", EVENTS_SYSTEM, nodeParts[0], Integer.parseInt(nodeParts[1]))); } catch (NumberFormatException | IndexOutOfBoundsException e) { // do nothing and carry on } diff --git a/roda-core/roda-core/src/main/java/org/roda/core/events/akka/AkkaEventsHandlerAndNotifierActor.java b/roda-core/roda-core/src/main/java/org/roda/core/events/pekko/PekkoEventsHandlerAndNotifierActor.java similarity index 87% rename from roda-core/roda-core/src/main/java/org/roda/core/events/akka/AkkaEventsHandlerAndNotifierActor.java rename to roda-core/roda-core/src/main/java/org/roda/core/events/pekko/PekkoEventsHandlerAndNotifierActor.java index d9d62fc5f3..9d5d1d1e1d 100644 --- a/roda-core/roda-core/src/main/java/org/roda/core/events/akka/AkkaEventsHandlerAndNotifierActor.java +++ b/roda-core/roda-core/src/main/java/org/roda/core/events/pekko/PekkoEventsHandlerAndNotifierActor.java @@ -5,7 +5,7 @@ * * https://github.com/keeps/roda */ -package org.roda.core.events.akka; +package org.roda.core.events.pekko; import java.io.Serial; import java.util.Collections; @@ -28,12 +28,12 @@ import org.apache.pekko.cluster.ddata.Replicator.Changed; import org.apache.pekko.cluster.ddata.Replicator.Update; import org.roda.core.RodaCoreFactory; -import org.roda.core.common.akka.Messages.EventGroupCreated; -import org.roda.core.common.akka.Messages.EventGroupDeleted; -import org.roda.core.common.akka.Messages.EventGroupUpdated; -import org.roda.core.common.akka.Messages.EventUserCreated; -import org.roda.core.common.akka.Messages.EventUserDeleted; -import org.roda.core.common.akka.Messages.EventUserUpdated; +import org.roda.core.common.pekko.messages.events.EventGroupCreated; +import org.roda.core.common.pekko.messages.events.EventGroupDeleted; +import org.roda.core.common.pekko.messages.events.EventGroupUpdated; +import org.roda.core.common.pekko.messages.events.EventUserCreated; +import org.roda.core.common.pekko.messages.events.EventUserDeleted; +import org.roda.core.common.pekko.messages.events.EventUserUpdated; import org.roda.core.data.common.SecureString; import org.roda.core.data.v2.user.Group; import org.roda.core.data.v2.user.User; @@ -44,8 +44,8 @@ import scala.Option; import scala.concurrent.duration.Duration; -public class AkkaEventsHandlerAndNotifierActor extends AbstractActor { - private static final Logger LOGGER = LoggerFactory.getLogger(AkkaEventsHandlerAndNotifierActor.class); +public class PekkoEventsHandlerAndNotifierActor extends AbstractActor { + private static final Logger LOGGER = LoggerFactory.getLogger(PekkoEventsHandlerAndNotifierActor.class); private static final String CACHE_PREFIX = "cache-"; private static final String USER_KEY_PREFIX = "user-"; @@ -54,17 +54,14 @@ public class AkkaEventsHandlerAndNotifierActor extends AbstractActor { private final ActorRef replicator = DistributedData.get(context().system()).replicator(); private final Cluster cluster = Cluster.get(context().system()); - + private final Key> objectKeysKey = GSetKey.create("objectKeys"); + private final Replicator.WriteConsistency writeConsistency; private EventsHandler eventsHandler; private String instanceSenderId; - - private final Key> objectKeysKey = GSetKey.create("objectKeys"); private Set objectKeys = new HashSet<>(); - private final Replicator.WriteConsistency writeConsistency; - - public AkkaEventsHandlerAndNotifierActor(final EventsHandler eventsHandler, final String writeConsistency, - final int writeConsistencyTimeoutInSeconds) { + public PekkoEventsHandlerAndNotifierActor(final EventsHandler eventsHandler, final String writeConsistency, + final int writeConsistencyTimeoutInSeconds) { this.eventsHandler = eventsHandler; this.instanceSenderId = self().toString(); this.writeConsistency = instantiateWriteConsistency(writeConsistency, writeConsistencyTimeoutInSeconds); @@ -94,8 +91,7 @@ public Receive createReceive() { .match(EventGroupUpdated.class, e -> handleGroupUpdated(e)) .match(EventGroupDeleted.class, e -> handleGroupDeleted(e)).match(Replicator.Changed.class, c -> handleChanged(c)) .match(Replicator.UpdateSuccess.class, e -> handleUpdateSuccess(e)) - .match(Replicator.UpdateFailure.class, e -> handleUpdateFailure(e)) - .matchAny(msg -> { + .match(Replicator.UpdateFailure.class, e -> handleUpdateFailure(e)).matchAny(msg -> { LOGGER.warn("Received unknown message '{}'", msg); }).build(); } @@ -140,13 +136,14 @@ private void handleObjectChanged(Replicator.Changed> CRDTWrapper wrapper = (CRDTWrapper) option.get(); if (!wrapper.getInstanceId().equals(instanceSenderId)) { if (objectId.startsWith(USER_KEY_PREFIX)) { - try(SecureString password = new SecureString(getUserPasswordFromRodaUserOtherInfoMap(wrapper).toCharArray())) { + try ( + SecureString password = new SecureString(getUserPasswordFromRodaUserOtherInfoMap(wrapper).toCharArray())) { if (!wrapper.isUpdate()) { eventsHandler.handleUserCreated(RodaCoreFactory.getModelService(), (User) wrapper.getRodaObject(), - password); + password); } else { eventsHandler.handleUserUpdated(RodaCoreFactory.getModelService(), (User) wrapper.getRodaObject(), - password); + password); } } } else if (objectId.startsWith(GROUP_KEY_PREFIX)) { @@ -171,8 +168,8 @@ private void handleObjectChanged(Replicator.Changed> private void handleUserCreated(EventUserCreated e) { String key = USER_KEY_PREFIX + e.getUser().getId(); Map rodaObjectOtherInfo = new HashMap<>(); - putObjectInCache(key, new CRDTWrapper(e.getUser(), rodaObjectOtherInfo, - false, instanceSenderId, new Date().getTime())); + putObjectInCache(key, + new CRDTWrapper(e.getUser(), rodaObjectOtherInfo, false, instanceSenderId, new Date().getTime())); } private String getUserPasswordFromRodaUserOtherInfoMap(CRDTWrapper wrapper) { @@ -188,8 +185,8 @@ private Map createRodaUserOtherInfoMapWithUserPassword(String pa private void handleUserUpdated(EventUserUpdated e) { String key = USER_KEY_PREFIX + e.getUser().getId(); Map rodaObjectOtherInfo = new HashMap<>(); - putObjectInCache(key, new CRDTWrapper(e.getUser(), rodaObjectOtherInfo, - true, instanceSenderId, new Date().getTime())); + putObjectInCache(key, + new CRDTWrapper(e.getUser(), rodaObjectOtherInfo, true, instanceSenderId, new Date().getTime())); } private void handleUserDeleted(EventUserDeleted e) { diff --git a/roda-core/roda-core/src/main/java/org/roda/core/plugins/PluginHelper.java b/roda-core/roda-core/src/main/java/org/roda/core/plugins/PluginHelper.java index c80a841f93..38ba136dcb 100644 --- a/roda-core/roda-core/src/main/java/org/roda/core/plugins/PluginHelper.java +++ b/roda-core/roda-core/src/main/java/org/roda/core/plugins/PluginHelper.java @@ -31,7 +31,8 @@ import org.reflections.Reflections; import org.roda.core.RodaCoreFactory; import org.roda.core.common.PremisV3Utils; -import org.roda.core.common.akka.Messages; +import org.roda.core.common.pekko.Messages; +import org.roda.core.common.pekko.messages.jobs.JobPartialUpdate; import org.roda.core.data.common.RodaConstants; import org.roda.core.data.common.RodaConstants.RODA_TYPE; import org.roda.core.data.exceptions.AlreadyExistsException; @@ -1375,7 +1376,7 @@ private static void updateJobAfterMovingSIPsAsync(Plugi Job job = getJob(plugin, index); SelectedItems sourceObjects = job.getSourceObjects(); if (sourceObjects instanceof SelectedItemsList) { - RodaCoreFactory.getPluginOrchestrator().updateJobAsync(plugin, (Messages.JobPartialUpdate) Messages + RodaCoreFactory.getPluginOrchestrator().updateJobAsync(plugin, (JobPartialUpdate) Messages .newJobSourceObjectsUpdated(oldToNewTransferredResourceIds).withJobPriority(job.getPriority())); } } catch (NotFoundException | GenericException | RequestNotValidException e) { diff --git a/roda-core/roda-core/src/main/java/org/roda/core/plugins/PluginOrchestrator.java b/roda-core/roda-core/src/main/java/org/roda/core/plugins/PluginOrchestrator.java index d8090e9530..1f866c96ce 100644 --- a/roda-core/roda-core/src/main/java/org/roda/core/plugins/PluginOrchestrator.java +++ b/roda-core/roda-core/src/main/java/org/roda/core/plugins/PluginOrchestrator.java @@ -9,7 +9,7 @@ import java.util.List; -import org.roda.core.common.akka.Messages.JobPartialUpdate; +import org.roda.core.common.pekko.messages.jobs.JobPartialUpdate; import org.roda.core.data.exceptions.AuthorizationDeniedException; import org.roda.core.data.exceptions.GenericException; import org.roda.core.data.exceptions.JobAlreadyStartedException; diff --git a/roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/JobsHelper.java b/roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/JobsHelper.java index ba64a40bb8..01938a3ded 100644 --- a/roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/JobsHelper.java +++ b/roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/JobsHelper.java @@ -21,7 +21,8 @@ import java.util.stream.Collectors; import org.roda.core.RodaCoreFactory; -import org.roda.core.common.akka.Messages; +import org.roda.core.common.pekko.Messages; +import org.roda.core.common.pekko.messages.jobs.JobPartialUpdate; import org.roda.core.data.common.RodaConstants; import org.roda.core.data.common.RodaConstants.NodeType; import org.roda.core.data.exceptions.AlreadyExistsException; @@ -254,7 +255,7 @@ private static Job setJobCounters(Job job, JobPluginInfo jobPluginInfo) { public static void updateJobStateAsync(Plugin plugin, JobPriority priority, JOB_STATE state, Optional stateDetails) { RodaCoreFactory.getPluginOrchestrator().updateJobAsync(plugin, - (Messages.JobPartialUpdate) Messages.newJobStateUpdated(plugin, state, stateDetails).withJobPriority(priority)); + (JobPartialUpdate) Messages.newJobStateUpdated(plugin, state, stateDetails).withJobPriority(priority)); } public static void updateJobStateAsync(Plugin plugin, JobPriority priority, diff --git a/roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/AkkaEmbeddedPluginOrchestrator.java b/roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/PekkoEmbeddedPluginOrchestrator.java similarity index 96% rename from roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/AkkaEmbeddedPluginOrchestrator.java rename to roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/PekkoEmbeddedPluginOrchestrator.java index a36e960dad..d3fae01ae8 100644 --- a/roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/AkkaEmbeddedPluginOrchestrator.java +++ b/roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/PekkoEmbeddedPluginOrchestrator.java @@ -26,12 +26,13 @@ import org.apache.pekko.pattern.Patterns; import org.apache.pekko.util.Timeout; import org.roda.core.RodaCoreFactory; -import org.roda.core.common.akka.AkkaUtils; -import org.roda.core.common.akka.DeadLetterActor; -import org.roda.core.common.akka.Messages; -import org.roda.core.common.akka.Messages.JobPartialUpdate; -import org.roda.core.common.akka.Messages.JobStateUpdated; +import org.roda.core.common.pekko.PekkoUtils; +import org.roda.core.common.pekko.DeadLetterActor; +import org.roda.core.common.pekko.Messages; import org.roda.core.common.iterables.CloseableIterable; +import org.roda.core.common.pekko.messages.jobs.JobPartialUpdate; +import org.roda.core.common.pekko.messages.jobs.JobStateUpdated; +import org.roda.core.common.pekko.messages.jobs.JobsManagerNotLockableAtTheTime; import org.roda.core.data.common.RodaConstants; import org.roda.core.data.exceptions.AcquireLockTimeoutException; import org.roda.core.data.exceptions.AuthorizationDeniedException; @@ -69,7 +70,7 @@ import org.roda.core.plugins.PluginHelper; import org.roda.core.plugins.PluginOrchestrator; import org.roda.core.plugins.base.maintenance.CleanUnfinishedJobsPlugin; -import org.roda.core.plugins.orchestrate.akka.AkkaJobsManager; +import org.roda.core.plugins.orchestrate.pekko.PekkoJobsManager; import org.roda.core.util.IdUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,8 +86,8 @@ * http://www.lightbend.com/activator/template/akka-monitoring-kamon-statsd) * * */ -public class AkkaEmbeddedPluginOrchestrator implements PluginOrchestrator { - private static final Logger LOGGER = LoggerFactory.getLogger(AkkaEmbeddedPluginOrchestrator.class); +public class PekkoEmbeddedPluginOrchestrator implements PluginOrchestrator { + private static final Logger LOGGER = LoggerFactory.getLogger(PekkoEmbeddedPluginOrchestrator.class); private final IndexService index; private final ModelService model; @@ -101,7 +102,7 @@ public class AkkaEmbeddedPluginOrchestrator implements PluginOrchestrator { // List private List inErrorJobs; - public AkkaEmbeddedPluginOrchestrator() { + public PekkoEmbeddedPluginOrchestrator() { int maxNumberOfJobsInParallel = JobsHelper.getMaxNumberOfJobsInParallel(); int maxNumberOfLimitedJobsInParallel = JobsHelper.getMaxNumberOfLimitedJobsInParallel(); @@ -112,12 +113,12 @@ public AkkaEmbeddedPluginOrchestrator() { stoppingJobs = new ArrayList<>(); inErrorJobs = new ArrayList<>(); - Config akkaConfig = AkkaUtils.getAkkaConfiguration("application.conf"); - jobsSystem = ActorSystem.create("JobsSystem", akkaConfig); + Config pekkoConfig = PekkoUtils.getPekkoConfiguration("application.conf"); + jobsSystem = ActorSystem.create("JobsSystem", pekkoConfig); // 20170105 hsilva: subscribe all dead letter so they are logged jobsSystem.eventStream().subscribe(jobsSystem.actorOf(Props.create(DeadLetterActor.class)), AllDeadLetters.class); - jobsManager = jobsSystem.actorOf(Props.create(AkkaJobsManager.class, maxNumberOfJobsInParallel, maxNumberOfLimitedJobsInParallel), "jobsManager"); + jobsManager = jobsSystem.actorOf(Props.create(PekkoJobsManager.class, maxNumberOfJobsInParallel, maxNumberOfLimitedJobsInParallel), "jobsManager"); } @@ -617,7 +618,7 @@ public void acquireObjectLock(List lites, int timeoutInSeconds, boolean throw new AcquireLockTimeoutException("Unable to acquire locks for the objects being processed '" + lites + "'"); } - if (result != null && result instanceof Messages.JobsManagerNotLockableAtTheTime) { + if (result != null && result instanceof JobsManagerNotLockableAtTheTime) { throw new NotLockableAtTheTimeException( "Not lockable at the time due to requester not willing to await to obtain the lock!"); } diff --git a/roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/akka/AkkaBackgroundJobActor.java b/roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/pekko/PekkoBackgroundJobActor.java similarity index 92% rename from roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/akka/AkkaBackgroundJobActor.java rename to roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/pekko/PekkoBackgroundJobActor.java index 77c900368f..fb9e63521c 100644 --- a/roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/akka/AkkaBackgroundJobActor.java +++ b/roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/pekko/PekkoBackgroundJobActor.java @@ -5,7 +5,7 @@ * * https://github.com/keeps/roda */ -package org.roda.core.plugins.orchestrate.akka; +package org.roda.core.plugins.orchestrate.pekko; import java.util.Optional; @@ -14,8 +14,8 @@ import org.apache.pekko.actor.Props; import org.apache.pekko.actor.SupervisorStrategy; import org.apache.pekko.japi.pf.DeciderBuilder; -import org.roda.core.common.akka.AkkaBaseActor; -import org.roda.core.common.akka.Messages; +import org.roda.core.common.pekko.PekkoBaseActor; +import org.roda.core.common.pekko.Messages; import org.roda.core.data.exceptions.GenericException; import org.roda.core.data.exceptions.RequestNotValidException; import org.roda.core.data.v2.IsRODAObject; @@ -37,14 +37,14 @@ /** * @author Miguel Guimarães */ -public class AkkaBackgroundJobActor extends AkkaBaseActor { +public class PekkoBackgroundJobActor extends PekkoBaseActor { - private static final Logger LOGGER = LoggerFactory.getLogger(AkkaBackgroundJobActor.class); + private static final Logger LOGGER = LoggerFactory.getLogger(PekkoBackgroundJobActor.class); private ActorRef jobsManager; private SupervisorStrategy strategy = new OneForOneStrategy(false, DeciderBuilder.matchAny(e -> { - LOGGER.error("A child actor of {} has thrown an exception", AkkaBackgroundJobActor.class.getSimpleName(), e); + LOGGER.error("A child actor of {} has thrown an exception", PekkoBackgroundJobActor.class.getSimpleName(), e); for (ActorRef actorRef : getContext().getChildren()) { actorRef.tell(Messages.newJobStateUpdated(null, JOB_STATE.FAILED_TO_COMPLETE, e), ActorRef.noSender()); } @@ -52,7 +52,7 @@ public class AkkaBackgroundJobActor extends AkkaBaseActor { }).build()); /** Public constructor */ - public AkkaBackgroundJobActor(ActorRef jobsManager) { + public PekkoBackgroundJobActor(ActorRef jobsManager) { super(); this.jobsManager = jobsManager; } @@ -75,7 +75,7 @@ public void onReceive(Object msg) throws Throwable { String jobId = job.getId(); JobPriority jobPriority = job.getPriority(); JobParallelism jobParallelism = job.getParallelism(); - ActorRef jobStateInfoActor = getContext().actorOf(Props.create(AkkaJobStateInfoActor.class, plugin, getSender(), + ActorRef jobStateInfoActor = getContext().actorOf(Props.create(PekkoJobStateInfoActor.class, plugin, getSender(), jobsManager, jobId, JobsHelper.getNumberOfJobsWorkers(), JobsHelper.getNumberOfLimitedJobsWorkers()), jobId); super.getPluginOrchestrator().setJobContextInformation(jobId, jobStateInfoActor); diff --git a/roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/akka/AkkaBackgroundWorkerActor.java b/roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/pekko/PekkoBackgroundWorkerActor.java similarity index 85% rename from roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/akka/AkkaBackgroundWorkerActor.java rename to roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/pekko/PekkoBackgroundWorkerActor.java index 2f1c5d696f..86a3730080 100644 --- a/roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/akka/AkkaBackgroundWorkerActor.java +++ b/roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/pekko/PekkoBackgroundWorkerActor.java @@ -5,12 +5,14 @@ * * https://github.com/keeps/roda */ -package org.roda.core.plugins.orchestrate.akka; +package org.roda.core.plugins.orchestrate.pekko; import java.util.List; -import org.roda.core.common.akka.AkkaBaseActor; -import org.roda.core.common.akka.Messages; +import org.roda.core.common.pekko.PekkoBaseActor; +import org.roda.core.common.pekko.Messages; +import org.roda.core.common.pekko.messages.plugins.PluginAfterAllExecuteIsReady; +import org.roda.core.common.pekko.messages.plugins.PluginExecuteIsReady; import org.roda.core.data.exceptions.AuthorizationDeniedException; import org.roda.core.data.exceptions.GenericException; import org.roda.core.data.exceptions.NotFoundException; @@ -31,14 +33,14 @@ /** * @author Miguel Guimarães */ -public class AkkaBackgroundWorkerActor extends AkkaBaseActor { - private static final Logger LOGGER = LoggerFactory.getLogger(AkkaBackgroundWorkerActor.class); +public class PekkoBackgroundWorkerActor extends PekkoBaseActor { + private static final Logger LOGGER = LoggerFactory.getLogger(PekkoBackgroundWorkerActor.class); private final IndexService index; private final ModelService model; private final StorageService storage; - public AkkaBackgroundWorkerActor() { + public PekkoBackgroundWorkerActor() { super(); this.storage = getStorage(); this.model = getModel(); @@ -48,9 +50,9 @@ public AkkaBackgroundWorkerActor() { @Override public void onReceive(Object msg) throws Throwable { super.setup(msg); - if (msg instanceof Messages.PluginExecuteIsReady) { + if (msg instanceof PluginExecuteIsReady) { handlePluginExecuteIsReady(msg); - } else if (msg instanceof Messages.PluginAfterAllExecuteIsReady) { + } else if (msg instanceof PluginAfterAllExecuteIsReady) { handlePluginAfterAllExecuteIsReady(msg); } else { LOGGER.error("Received a message that it doesn't know how to process ({})...", msg.getClass().getName()); @@ -59,7 +61,7 @@ public void onReceive(Object msg) throws Throwable { } private void handlePluginExecuteIsReady(Object msg) { - Messages.PluginExecuteIsReady message = (Messages.PluginExecuteIsReady) msg; + PluginExecuteIsReady message = (PluginExecuteIsReady) msg; List objectsToBeProcessed = message.getList(); message.logProcessingStarted(); Plugin messagePlugin = message.getPlugin(); @@ -90,7 +92,7 @@ private String getErrorMessage(Throwable e) { } private void handlePluginAfterAllExecuteIsReady(Object msg) { - Messages.PluginAfterAllExecuteIsReady message = (Messages.PluginAfterAllExecuteIsReady) msg; + PluginAfterAllExecuteIsReady message = (PluginAfterAllExecuteIsReady) msg; message.logProcessingStarted(); Plugin plugin = message.getPlugin(); try { diff --git a/roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/akka/AkkaJobActor.java b/roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/pekko/PekkoJobActor.java similarity index 93% rename from roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/akka/AkkaJobActor.java rename to roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/pekko/PekkoJobActor.java index 3051cfe070..91b05769f1 100644 --- a/roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/akka/AkkaJobActor.java +++ b/roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/pekko/PekkoJobActor.java @@ -5,7 +5,7 @@ * * https://github.com/keeps/roda */ -package org.roda.core.plugins.orchestrate.akka; +package org.roda.core.plugins.orchestrate.pekko; import java.util.Optional; @@ -14,8 +14,8 @@ import org.apache.pekko.actor.Props; import org.apache.pekko.actor.SupervisorStrategy; import org.apache.pekko.japi.pf.DeciderBuilder; -import org.roda.core.common.akka.AkkaBaseActor; -import org.roda.core.common.akka.Messages; +import org.roda.core.common.pekko.PekkoBaseActor; +import org.roda.core.common.pekko.Messages; import org.roda.core.data.exceptions.GenericException; import org.roda.core.data.exceptions.RequestNotValidException; import org.roda.core.data.v2.IsRODAObject; @@ -34,13 +34,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class AkkaJobActor extends AkkaBaseActor { - private static final Logger LOGGER = LoggerFactory.getLogger(AkkaJobActor.class); +public class PekkoJobActor extends PekkoBaseActor { + private static final Logger LOGGER = LoggerFactory.getLogger(PekkoJobActor.class); private ActorRef jobsManager; private SupervisorStrategy strategy = new OneForOneStrategy(false, DeciderBuilder.matchAny(e -> { - LOGGER.error("A child actor of {} has thrown an exception", AkkaJobActor.class.getSimpleName(), e); + LOGGER.error("A child actor of {} has thrown an exception", PekkoJobActor.class.getSimpleName(), e); for (ActorRef actorRef : getContext().getChildren()) { actorRef.tell(Messages.newJobStateUpdated(null, JOB_STATE.FAILED_TO_COMPLETE, e), ActorRef.noSender()); } @@ -48,7 +48,7 @@ public class AkkaJobActor extends AkkaBaseActor { }).build()); /** Public constructor */ - public AkkaJobActor(ActorRef jobsManager) { + public PekkoJobActor(ActorRef jobsManager) { super(); this.jobsManager = jobsManager; } @@ -71,7 +71,7 @@ public void onReceive(Object msg) throws Exception { String jobId = job.getId(); JobPriority jobPriority = job.getPriority(); JobParallelism jobParallelism = job.getParallelism(); - ActorRef jobStateInfoActor = getContext().actorOf(Props.create(AkkaJobStateInfoActor.class, plugin, getSender(), + ActorRef jobStateInfoActor = getContext().actorOf(Props.create(PekkoJobStateInfoActor.class, plugin, getSender(), jobsManager, jobId, JobsHelper.getNumberOfJobsWorkers(), JobsHelper.getNumberOfLimitedJobsWorkers()), jobId); super.getPluginOrchestrator().setJobContextInformation(jobId, jobStateInfoActor); diff --git a/roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/akka/AkkaJobStateInfoActor.java b/roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/pekko/PekkoJobStateInfoActor.java similarity index 81% rename from roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/akka/AkkaJobStateInfoActor.java rename to roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/pekko/PekkoJobStateInfoActor.java index 0515e18ad9..afe7c6457b 100644 --- a/roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/akka/AkkaJobStateInfoActor.java +++ b/roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/pekko/PekkoJobStateInfoActor.java @@ -5,7 +5,7 @@ * * https://github.com/keeps/roda */ -package org.roda.core.plugins.orchestrate.akka; +package org.roda.core.plugins.orchestrate.pekko; import java.util.ArrayList; import java.util.Optional; @@ -14,8 +14,20 @@ import org.apache.pekko.actor.Props; import org.apache.pekko.actor.Terminated; import org.apache.pekko.routing.RoundRobinPool; -import org.roda.core.common.akka.AkkaBaseActor; -import org.roda.core.common.akka.Messages; +import org.roda.core.common.pekko.PekkoBaseActor; +import org.roda.core.common.pekko.Messages; +import org.roda.core.common.pekko.messages.AbstractMessage; +import org.roda.core.common.pekko.messages.jobs.JobCleanup; +import org.roda.core.common.pekko.messages.jobs.JobInfoUpdated; +import org.roda.core.common.pekko.messages.jobs.JobInitEnded; +import org.roda.core.common.pekko.messages.jobs.JobSourceObjectsUpdated; +import org.roda.core.common.pekko.messages.jobs.JobStateDetailsUpdated; +import org.roda.core.common.pekko.messages.jobs.JobStateUpdated; +import org.roda.core.common.pekko.messages.jobs.JobStop; +import org.roda.core.common.pekko.messages.plugins.PluginAfterAllExecuteIsDone; +import org.roda.core.common.pekko.messages.plugins.PluginBeforeAllExecuteIsReady; +import org.roda.core.common.pekko.messages.plugins.PluginExecuteIsDone; +import org.roda.core.common.pekko.messages.plugins.PluginExecuteIsReady; import org.roda.core.data.exceptions.AuthorizationDeniedException; import org.roda.core.data.exceptions.GenericException; import org.roda.core.data.exceptions.JobException; @@ -43,8 +55,8 @@ import com.codahale.metrics.MetricRegistry; import com.google.common.collect.Iterables; -public class AkkaJobStateInfoActor extends AkkaBaseActor { - private static final Logger LOGGER = LoggerFactory.getLogger(AkkaJobStateInfoActor.class); +public class PekkoJobStateInfoActor extends PekkoBaseActor { + private static final Logger LOGGER = LoggerFactory.getLogger(PekkoJobStateInfoActor.class); private JobInfo jobInfo; private Plugin plugin; @@ -60,8 +72,8 @@ public class AkkaJobStateInfoActor extends AkkaBaseActor { // private Map stateMessagesMetrics; private Histogram stateMessagesMetricsHistogram; - public AkkaJobStateInfoActor(Plugin plugin, ActorRef jobCreator, ActorRef jobsManager, String jobId, - int numberOfJobsWorkers, int numberOfLimitedJobsWorkers) { + public PekkoJobStateInfoActor(Plugin plugin, ActorRef jobCreator, ActorRef jobsManager, String jobId, + int numberOfJobsWorkers, int numberOfLimitedJobsWorkers) { super(); jobInfo = new JobInfo(); this.plugin = plugin; @@ -69,12 +81,12 @@ public AkkaJobStateInfoActor(Plugin plugin, ActorRef jobCreator, ActorRef job this.jobsManager = jobsManager; this.jobId = jobId; - LOGGER.debug("Starting AkkaJobStateInfoActor router with {} actors", numberOfJobsWorkers); - Props workersProps = new RoundRobinPool(numberOfJobsWorkers).props(Props.create(AkkaWorkerActor.class)); + LOGGER.debug("Starting PekkoJobStateInfoActor router with {} actors", numberOfJobsWorkers); + Props workersProps = new RoundRobinPool(numberOfJobsWorkers).props(Props.create(PekkoWorkerActor.class)); workersRouter = getContext().actorOf(workersProps, "WorkersRouter"); LOGGER.debug("Starting background workers router with {} actors", numberOfLimitedJobsWorkers); - Props props = new RoundRobinPool(numberOfLimitedJobsWorkers).props(Props.create(AkkaBackgroundWorkerActor.class)); + Props props = new RoundRobinPool(numberOfLimitedJobsWorkers).props(Props.create(PekkoBackgroundWorkerActor.class)); backgroundWorkersRouter = getContext().actorOf(props, "BackgroundWorkersRouter"); // 20160914 hsilva: watch child events, so when they stop we can react getContext().watch(workersRouter); @@ -82,7 +94,7 @@ public AkkaJobStateInfoActor(Plugin plugin, ActorRef jobCreator, ActorRef job JobsHelper.createJobWorkingDirectory(jobId); - String className = AkkaJobStateInfoActor.class.getSimpleName(); + String className = PekkoJobStateInfoActor.class.getSimpleName(); // stateMessagesMetrics = new HashMap<>(); stateMessagesMetricsHistogram = getMetricRegistry() .histogram(MetricRegistry.name(className, "msgCreationToProcessingStartedInMilis")); @@ -91,29 +103,29 @@ public AkkaJobStateInfoActor(Plugin plugin, ActorRef jobCreator, ActorRef job @Override public void onReceive(Object msg) throws Exception { super.setup(msg); - if (msg instanceof Messages.JobStateUpdated) { - handleJobStateUpdated((Messages.JobStateUpdated) msg); - } else if (msg instanceof Messages.JobStateDetailsUpdated) { - handleJobStateDetailsUpdated((Messages.JobStateDetailsUpdated) msg); - } else if (msg instanceof Messages.JobSourceObjectsUpdated) { + if (msg instanceof JobStateUpdated) { + handleJobStateUpdated((JobStateUpdated) msg); + } else if (msg instanceof JobStateDetailsUpdated) { + handleJobStateDetailsUpdated((JobStateDetailsUpdated) msg); + } else if (msg instanceof JobSourceObjectsUpdated) { handleJobSourceObjectsUpdated(msg); - } else if (msg instanceof Messages.JobInfoUpdated) { + } else if (msg instanceof JobInfoUpdated) { handleJobInfoUpdated(msg); - } else if (msg instanceof Messages.JobStop) { + } else if (msg instanceof JobStop) { handleJobStop(msg); } else if (msg instanceof Terminated) { handleTerminated(msg); - } else if (msg instanceof Messages.PluginExecuteIsReady) { + } else if (msg instanceof PluginExecuteIsReady) { handleExecuteIsReady(msg); - } else if (msg instanceof Messages.JobInitEnded) { + } else if (msg instanceof JobInitEnded) { handleJobInitEnded(msg); - } else if (msg instanceof Messages.PluginBeforeAllExecuteIsReady) { + } else if (msg instanceof PluginBeforeAllExecuteIsReady) { handleBeforeAllExecuteIsReady(msg); - } else if (msg instanceof Messages.PluginExecuteIsDone) { + } else if (msg instanceof PluginExecuteIsDone) { handleExecuteIsDone(msg); - } else if (msg instanceof Messages.PluginAfterAllExecuteIsDone) { + } else if (msg instanceof PluginAfterAllExecuteIsDone) { handleAfterAllExecuteIsDone(msg); - } else if (msg instanceof Messages.JobCleanup) { + } else if (msg instanceof JobCleanup) { handleJobCleanup(msg); } else { LOGGER.error("Received a message that don't know how to process ({})...", msg.getClass().getName()); @@ -121,7 +133,7 @@ public void onReceive(Object msg) throws Exception { } } - private void handleJobStateUpdated(Messages.JobStateUpdated message) { + private void handleJobStateUpdated(JobStateUpdated message) { markMessageProcessingAsStarted(message); Plugin p = message.getPlugin() == null ? this.plugin : message.getPlugin(); JobParallelism parallelism = null; @@ -149,7 +161,7 @@ private void handleJobStateUpdated(Messages.JobStateUpdated message) { markMessageProcessingAsEnded(message); } - private void handleJobStateDetailsUpdated(Messages.JobStateDetailsUpdated message) { + private void handleJobStateDetailsUpdated(JobStateDetailsUpdated message) { markMessageProcessingAsStarted(message); Plugin p = message.getPlugin() == null ? this.plugin : message.getPlugin(); JobsHelper.updateJobStateDetails(p, getModel(), message.getStateDetails()); @@ -157,7 +169,7 @@ private void handleJobStateDetailsUpdated(Messages.JobStateDetailsUpdated messag } private void handleJobSourceObjectsUpdated(Object msg) { - Messages.JobSourceObjectsUpdated message = (Messages.JobSourceObjectsUpdated) msg; + JobSourceObjectsUpdated message = (JobSourceObjectsUpdated) msg; markMessageProcessingAsStarted(message); try { Job job = PluginHelper.getJob(plugin, getModel()); @@ -183,7 +195,7 @@ private void handleJobSourceObjectsUpdated(Object msg) } private void handleJobInfoUpdated(Object msg) { - Messages.JobInfoUpdated message = (Messages.JobInfoUpdated) msg; + JobInfoUpdated message = (JobInfoUpdated) msg; markMessageProcessingAsStarted(message); jobInfo.put(message.getPlugin(), message.getJobPluginInfo()); JobPluginInfo infoUpdated = message.getJobPluginInfo().processJobPluginInformation(message.getPlugin(), jobInfo); @@ -193,7 +205,7 @@ private void handleJobInfoUpdated(Object msg) { } private void handleJobStop(Object msg) { - Messages.JobStop message = (Messages.JobStop) msg; + JobStop message = (JobStop) msg; markMessageProcessingAsStarted(message); getSelf().tell(Messages.newJobStateUpdated(plugin, JOB_STATE.STOPPING).withJobPriority(JobPriority.HIGH), getSelf()); @@ -217,7 +229,7 @@ private void handleTerminated(Object msg) { private void handleExecuteIsReady(Object msg) { if (!errorDuringBeforeAll) { - Messages.PluginExecuteIsReady message = (Messages.PluginExecuteIsReady) msg; + PluginExecuteIsReady message = (PluginExecuteIsReady) msg; markMessageProcessingAsStarted(message); jobInfo.setStarted(message.getPlugin()); // 20160819 hsilva: the following it's just for debugging purposes @@ -234,7 +246,7 @@ private void handleExecuteIsReady(Object msg) { } private void handleJobInitEnded(Object msg) { - Messages.JobInitEnded message = (Messages.JobInitEnded) msg; + JobInitEnded message = (JobInitEnded) msg; markMessageProcessingAsStarted(message); jobInfo.setInitEnded(true); // INFO 20160630 hsilva: the following test is needed because messages can @@ -260,7 +272,7 @@ private void handleJobInitEnded(Object msg) { } private void handleBeforeAllExecuteIsReady(Object msg) throws PluginException { - Messages.PluginBeforeAllExecuteIsReady message = (Messages.PluginBeforeAllExecuteIsReady) msg; + PluginBeforeAllExecuteIsReady message = (PluginBeforeAllExecuteIsReady) msg; markMessageProcessingAsStarted(message); try { message.getPlugin().beforeAllExecute(getIndex(), getModel(), getStorage()); @@ -281,7 +293,7 @@ private void handleBeforeAllExecuteIsReady(Object msg) throws PluginException { } private void handleExecuteIsDone(Object msg) { - Messages.PluginExecuteIsDone message = (Messages.PluginExecuteIsDone) msg; + PluginExecuteIsDone message = (PluginExecuteIsDone) msg; markMessageProcessingAsStarted(message); jobInfo.setDone(message.getPlugin(), message.isWithError()); @@ -309,7 +321,7 @@ private void handleExecuteIsDone(Object msg) { } private void handleAfterAllExecuteIsDone(Object msg) { - Messages.PluginAfterAllExecuteIsDone message = (Messages.PluginAfterAllExecuteIsDone) msg; + PluginAfterAllExecuteIsDone message = (PluginAfterAllExecuteIsDone) msg; markMessageProcessingAsStarted(message); getSelf().tell( Messages.newJobCleanup().withParallelism(message.getParallelism()).withJobPriority(message.getJobPriority()), @@ -322,7 +334,7 @@ private void handleAfterAllExecuteIsDone(Object msg) { } private void handleJobCleanup(Object msg) { - Messages.JobCleanup message = (Messages.JobCleanup) msg; + JobCleanup message = (JobCleanup) msg; markMessageProcessingAsStarted(message); try { LOGGER.info("Doing job cleanup"); @@ -336,12 +348,12 @@ private void handleJobCleanup(Object msg) { markMessageProcessingAsEnded(message); } - private void markMessageProcessingAsStarted(Messages.AbstractMessage message) { + private void markMessageProcessingAsStarted(AbstractMessage message) { message.logProcessingStarted(); stateMessagesMetricsHistogram.update(message.getTimeSinceCreation()); } - private void markMessageProcessingAsEnded(Messages.AbstractMessage message) { + private void markMessageProcessingAsEnded(AbstractMessage message) { message.logProcessingEnded(); } diff --git a/roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/akka/AkkaJobsManager.java b/roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/pekko/PekkoJobsManager.java similarity index 94% rename from roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/akka/AkkaJobsManager.java rename to roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/pekko/PekkoJobsManager.java index 55fd43d682..1008fd2309 100644 --- a/roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/akka/AkkaJobsManager.java +++ b/roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/pekko/PekkoJobsManager.java @@ -5,7 +5,7 @@ * * https://github.com/keeps/roda */ -package org.roda.core.plugins.orchestrate.akka; +package org.roda.core.plugins.orchestrate.pekko; import java.util.ArrayList; import java.util.Collections; @@ -22,11 +22,13 @@ import org.apache.pekko.actor.Props; import org.apache.pekko.routing.RoundRobinPool; import org.roda.core.RodaCoreFactory; -import org.roda.core.common.akka.AkkaBaseActor; -import org.roda.core.common.akka.Messages; -import org.roda.core.common.akka.Messages.JobsManagerAcquireLock; -import org.roda.core.common.akka.Messages.JobsManagerReleaseAllLocks; -import org.roda.core.common.akka.Messages.JobsManagerReleaseLock; +import org.roda.core.common.pekko.PekkoBaseActor; +import org.roda.core.common.pekko.Messages; +import org.roda.core.common.pekko.messages.jobs.JobsManagerAcquireLock; +import org.roda.core.common.pekko.messages.jobs.JobsManagerJobEnded; +import org.roda.core.common.pekko.messages.jobs.JobsManagerReleaseAllLocks; +import org.roda.core.common.pekko.messages.jobs.JobsManagerReleaseLock; +import org.roda.core.common.pekko.messages.jobs.JobsManagerTick; import org.roda.core.data.v2.jobs.Job; import org.roda.core.data.v2.jobs.JobParallelism; import org.roda.core.data.v2.jobs.JobStats; @@ -40,8 +42,8 @@ import scala.concurrent.duration.Duration; -public class AkkaJobsManager extends AkkaBaseActor { - private static final Logger LOGGER = LoggerFactory.getLogger(AkkaJobsManager.class); +public class PekkoJobsManager extends PekkoBaseActor { + private static final Logger LOGGER = LoggerFactory.getLogger(PekkoJobsManager.class); public static final String LOCK_REQUESTS_WAITING_TO_ACQUIRE_LOCK = "lockRequestsWaitingToAcquireLock"; @@ -95,7 +97,7 @@ public class AkkaJobsManager extends AkkaBaseActor { private boolean nonParallelizableJobIsRunning = false; private int nonParallelizableJobsQueued = 0; - public AkkaJobsManager(int maxNumberOfJobsInParallel, int maxNumberOfLimitedJobsInParallel) { + public PekkoJobsManager(int maxNumberOfJobsInParallel, int maxNumberOfLimitedJobsInParallel) { super(); this.maxNumberOfJobsInParallel = maxNumberOfJobsInParallel; this.maxNumberOfLimitedJobsInParallel = maxNumberOfLimitedJobsInParallel; @@ -108,12 +110,12 @@ public AkkaJobsManager(int maxNumberOfJobsInParallel, int maxNumberOfLimitedJobs this.waitingToAcquireLockRequests = new ArrayList<>(); Props jobsProps = new RoundRobinPool(maxNumberOfJobsInParallel - 2) - .props(Props.create(AkkaJobActor.class, getSelf())); - Props backgroundJobsProps = new RoundRobinPool(2).props(Props.create(AkkaBackgroundJobActor.class, getSelf())); + .props(Props.create(PekkoJobActor.class, getSelf())); + Props backgroundJobsProps = new RoundRobinPool(2).props(Props.create(PekkoBackgroundJobActor.class, getSelf())); jobsRouter = getContext().actorOf(jobsProps, "JobsRouter"); Props limitedJobsProps = new RoundRobinPool(maxNumberOfLimitedJobsInParallel) - .props(Props.create(AkkaLimitedJobActor.class, getSelf())); + .props(Props.create(PekkoLimitedJobActor.class, getSelf())); limitedJobsRouter = getContext().actorOf(limitedJobsProps, "LimitedJobsRouter"); initMetrics(maxNumberOfJobsInParallel); @@ -136,17 +138,17 @@ public void onReceive(Object msg) throws Throwable { try { if (msg instanceof Job) { handleJob((Job) msg); - } else if (msg instanceof Messages.JobsManagerTick) { + } else if (msg instanceof JobsManagerTick) { doPostProcessingTasks = false; handleTick(true); - } else if (msg instanceof Messages.JobsManagerJobEnded) { - handleJobEnded((Messages.JobsManagerJobEnded) msg); - } else if (msg instanceof Messages.JobsManagerAcquireLock) { - handleAcquireLock(((Messages.JobsManagerAcquireLock) msg).setSender(getSender())); - } else if (msg instanceof Messages.JobsManagerReleaseLock) { - handleReleaseLock((Messages.JobsManagerReleaseLock) msg); - } else if (msg instanceof Messages.JobsManagerReleaseAllLocks) { - handleReleaseAllLocks((Messages.JobsManagerReleaseAllLocks) msg); + } else if (msg instanceof JobsManagerJobEnded) { + handleJobEnded((JobsManagerJobEnded) msg); + } else if (msg instanceof JobsManagerAcquireLock) { + handleAcquireLock(((JobsManagerAcquireLock) msg).setSender(getSender())); + } else if (msg instanceof JobsManagerReleaseLock) { + handleReleaseLock((JobsManagerReleaseLock) msg); + } else if (msg instanceof JobsManagerReleaseAllLocks) { + handleReleaseAllLocks((JobsManagerReleaseAllLocks) msg); } else { LOGGER.error("Received a message that don't know how to process ({})...", msg.getClass().getName()); unhandled(msg); @@ -549,7 +551,7 @@ private void processAcquiredLocksTimeout() { } } - private void handleJobEnded(Messages.JobsManagerJobEnded jobEnded) { + private void handleJobEnded(JobsManagerJobEnded jobEnded) { if (jobIsNotParallelizable(jobEnded.getPlugin())) { nonParallelizableJobIsRunning = false; } @@ -605,7 +607,7 @@ private void log(String msg, String jobId) { private void initMetrics(int maxNumberOfJobsInParallel) { MetricRegistry metrics = getMetricRegistry(); - String className = AkkaJobsManager.class.getSimpleName(); + String className = PekkoJobsManager.class.getSimpleName(); // general metrics ticksWaitingToBeProcessed = metrics.counter(MetricRegistry.name(className, "ticksWaitingToBeProcessed")); // jobs related metrics diff --git a/roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/akka/AkkaLimitedJobActor.java b/roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/pekko/PekkoLimitedJobActor.java similarity index 92% rename from roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/akka/AkkaLimitedJobActor.java rename to roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/pekko/PekkoLimitedJobActor.java index 30499a05ad..ceafc8a35c 100644 --- a/roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/akka/AkkaLimitedJobActor.java +++ b/roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/pekko/PekkoLimitedJobActor.java @@ -5,7 +5,7 @@ * * https://github.com/keeps/roda */ -package org.roda.core.plugins.orchestrate.akka; +package org.roda.core.plugins.orchestrate.pekko; import java.util.Optional; @@ -14,8 +14,8 @@ import org.apache.pekko.actor.Props; import org.apache.pekko.actor.SupervisorStrategy; import org.apache.pekko.japi.pf.DeciderBuilder; -import org.roda.core.common.akka.AkkaBaseActor; -import org.roda.core.common.akka.Messages; +import org.roda.core.common.pekko.PekkoBaseActor; +import org.roda.core.common.pekko.Messages; import org.roda.core.data.exceptions.GenericException; import org.roda.core.data.exceptions.RequestNotValidException; import org.roda.core.data.v2.IsRODAObject; @@ -36,13 +36,13 @@ /** * @author Miguel Guimarãese */ -public class AkkaLimitedJobActor extends AkkaBaseActor { - private static final Logger LOGGER = LoggerFactory.getLogger(AkkaLimitedJobActor.class); +public class PekkoLimitedJobActor extends PekkoBaseActor { + private static final Logger LOGGER = LoggerFactory.getLogger(PekkoLimitedJobActor.class); private ActorRef jobsManager; private SupervisorStrategy strategy = new OneForOneStrategy(false, DeciderBuilder.matchAny(e -> { - LOGGER.error("A child actor of {} has thrown an exception", AkkaLimitedJobActor.class.getSimpleName(), e); + LOGGER.error("A child actor of {} has thrown an exception", PekkoLimitedJobActor.class.getSimpleName(), e); for (ActorRef actorRef : getContext().getChildren()) { actorRef.tell(Messages.newJobStateUpdated(null, Job.JOB_STATE.FAILED_TO_COMPLETE, e), ActorRef.noSender()); } @@ -50,7 +50,7 @@ public class AkkaLimitedJobActor extends AkkaBaseActor { }).build()); /** Public constructor */ - public AkkaLimitedJobActor(ActorRef jobsManager) { + public PekkoLimitedJobActor(ActorRef jobsManager) { super(); this.jobsManager = jobsManager; } @@ -74,7 +74,7 @@ public void onReceive(Object msg) throws Exception { String jobId = job.getId(); JobPriority jobPriority = job.getPriority(); JobParallelism jobParallelism = job.getParallelism(); - ActorRef jobStateInfoActor = getContext().actorOf(Props.create(AkkaJobStateInfoActor.class, plugin, getSender(), + ActorRef jobStateInfoActor = getContext().actorOf(Props.create(PekkoJobStateInfoActor.class, plugin, getSender(), jobsManager, jobId, JobsHelper.getNumberOfJobsWorkers(), JobsHelper.getNumberOfLimitedJobsWorkers()), jobId); super.getPluginOrchestrator().setJobContextInformation(jobId, jobStateInfoActor); diff --git a/roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/akka/AkkaWorkerActor.java b/roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/pekko/PekkoWorkerActor.java similarity index 85% rename from roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/akka/AkkaWorkerActor.java rename to roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/pekko/PekkoWorkerActor.java index 7ecb435278..0da6560adb 100644 --- a/roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/akka/AkkaWorkerActor.java +++ b/roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/pekko/PekkoWorkerActor.java @@ -5,12 +5,14 @@ * * https://github.com/keeps/roda */ -package org.roda.core.plugins.orchestrate.akka; +package org.roda.core.plugins.orchestrate.pekko; import java.util.List; -import org.roda.core.common.akka.AkkaBaseActor; -import org.roda.core.common.akka.Messages; +import org.roda.core.common.pekko.Messages; +import org.roda.core.common.pekko.PekkoBaseActor; +import org.roda.core.common.pekko.messages.plugins.PluginAfterAllExecuteIsReady; +import org.roda.core.common.pekko.messages.plugins.PluginExecuteIsReady; import org.roda.core.data.exceptions.AuthorizationDeniedException; import org.roda.core.data.exceptions.GenericException; import org.roda.core.data.exceptions.NotFoundException; @@ -28,14 +30,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class AkkaWorkerActor extends AkkaBaseActor { - private static final Logger LOGGER = LoggerFactory.getLogger(AkkaWorkerActor.class); +public class PekkoWorkerActor extends PekkoBaseActor { + private static final Logger LOGGER = LoggerFactory.getLogger(PekkoWorkerActor.class); private final IndexService index; private final ModelService model; private final StorageService storage; - public AkkaWorkerActor() { + public PekkoWorkerActor() { super(); this.storage = getStorage(); this.model = getModel(); @@ -45,9 +47,9 @@ public AkkaWorkerActor() { @Override public void onReceive(Object msg) throws Exception { super.setup(msg); - if (msg instanceof Messages.PluginExecuteIsReady) { + if (msg instanceof PluginExecuteIsReady) { handlePluginExecuteIsReady(msg); - } else if (msg instanceof Messages.PluginAfterAllExecuteIsReady) { + } else if (msg instanceof PluginAfterAllExecuteIsReady) { handlePluginAfterAllExecuteIsReady(msg); } else { LOGGER.error("Received a message that it doesn't know how to process ({})...", msg.getClass().getName()); @@ -56,7 +58,7 @@ public void onReceive(Object msg) throws Exception { } private void handlePluginExecuteIsReady(Object msg) { - Messages.PluginExecuteIsReady message = (Messages.PluginExecuteIsReady) msg; + PluginExecuteIsReady message = (PluginExecuteIsReady) msg; List objectsToBeProcessed = message.getList(); message.logProcessingStarted(); Plugin messagePlugin = message.getPlugin(); @@ -87,7 +89,7 @@ private String getErrorMessage(Throwable e) { } private void handlePluginAfterAllExecuteIsReady(Object msg) { - Messages.PluginAfterAllExecuteIsReady message = (Messages.PluginAfterAllExecuteIsReady) msg; + PluginAfterAllExecuteIsReady message = (PluginAfterAllExecuteIsReady) msg; message.logProcessingStarted(); Plugin plugin = message.getPlugin(); try { diff --git a/roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/akka/mailbox/PrioritizedMailbox.java b/roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/pekko/mailbox/PrioritizedMailbox.java similarity index 81% rename from roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/akka/mailbox/PrioritizedMailbox.java rename to roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/pekko/mailbox/PrioritizedMailbox.java index 5cfa9b682f..6d03e7e014 100644 --- a/roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/akka/mailbox/PrioritizedMailbox.java +++ b/roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/pekko/mailbox/PrioritizedMailbox.java @@ -5,15 +5,14 @@ * * https://github.com/keeps/roda */ -package org.roda.core.plugins.orchestrate.akka.mailbox; - -import org.roda.core.common.akka.Messages; - -import com.typesafe.config.Config; +package org.roda.core.plugins.orchestrate.pekko.mailbox; import org.apache.pekko.actor.ActorSystem; import org.apache.pekko.dispatch.PriorityGenerator; import org.apache.pekko.dispatch.UnboundedStablePriorityMailbox; +import org.roda.core.common.pekko.messages.AbstractMessage; + +import com.typesafe.config.Config; /** * @author Miguel Guimarães @@ -24,8 +23,8 @@ public PrioritizedMailbox(ActorSystem.Settings settings, Config config) { super(new PriorityGenerator() { @Override public int gen(Object message) { - if (message instanceof Messages.AbstractMessage) { - switch (((Messages.AbstractMessage) message).getJobPriority()) { + if (message instanceof AbstractMessage) { + switch (((AbstractMessage) message).getJobPriority()) { case LOW: return PrioritizedMessage.LOW; case HIGH: diff --git a/roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/akka/mailbox/PrioritizedMessage.java b/roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/pekko/mailbox/PrioritizedMessage.java similarity index 93% rename from roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/akka/mailbox/PrioritizedMessage.java rename to roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/pekko/mailbox/PrioritizedMessage.java index 29b1bc61b4..295cc896e6 100644 --- a/roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/akka/mailbox/PrioritizedMessage.java +++ b/roda-core/roda-core/src/main/java/org/roda/core/plugins/orchestrate/pekko/mailbox/PrioritizedMessage.java @@ -5,7 +5,7 @@ * * https://github.com/keeps/roda */ -package org.roda.core.plugins.orchestrate.akka.mailbox; +package org.roda.core.plugins.orchestrate.pekko.mailbox; /** * @author Miguel Guimarães diff --git a/roda-core/roda-core/src/main/resources/config/logback.xml b/roda-core/roda-core/src/main/resources/config/logback.xml index a53506781b..7058de6fec 100644 --- a/roda-core/roda-core/src/main/resources/config/logback.xml +++ b/roda-core/roda-core/src/main/resources/config/logback.xml @@ -37,27 +37,27 @@ - - ${roda.home}/log/roda-core-akka.log + + ${roda.home}/log/roda-core-pekko.log true - %d [%X{akkaSourceActor} %X{akkaSourceThread}] %-5level %logger{36} - %msg%n + %d [%X{pekkoSourceActor} %X{pekkoSourceThread}] %-5level %logger{36} - %msg%n - ${roda.home}/log/roda-core-akka-%d{yyyy-MM-dd}.%i.log.gz + ${roda.home}/log/roda-core-pekko-%d{yyyy-MM-dd}.%i.log.gz 1GB - - + + - + @@ -95,12 +95,12 @@ - + - + - diff --git a/roda-core/roda-core/src/main/resources/config/orchestrator/application.conf b/roda-core/roda-core/src/main/resources/config/orchestrator/application.conf index c2edbcd32f..ea5bcdbdcb 100644 --- a/roda-core/roda-core/src/main/resources/config/orchestrator/application.conf +++ b/roda-core/roda-core/src/main/resources/config/orchestrator/application.conf @@ -1,5 +1,5 @@ priority-mailbox { - mailbox-type = "org.roda.core.plugins.orchestrate.akka.mailbox.PrioritizedMailbox" + mailbox-type = "org.roda.core.plugins.orchestrate.pekko.mailbox.PrioritizedMailbox" } st-dispatcher { @@ -20,7 +20,7 @@ io-2-dispatcher { throughput = 1 } -akka.actor.deployment { +pekko.actor.deployment { "/jobsManager" { dispatcher = st-dispatcher } @@ -66,15 +66,15 @@ akka.actor.deployment { } } -akka { +pekko { actor.provider = "local" - - loggers = ["akka.event.slf4j.Slf4jLogger"] - logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" + + loggers = ["org.apache.pekko.event.slf4j.Slf4jLogger"] + logging-filter = "org.apache.pekko.event.slf4j.Slf4jLoggingFilter" //loglevel = "DEBUG" loglevel = "ERROR" stdout-loglevel = "OFF" //log-config-on-start = on log-dead-letters = off log-dead-letters-during-shutdown = off -} +} \ No newline at end of file diff --git a/roda-core/roda-core/src/main/resources/config/orchestrator/events.conf b/roda-core/roda-core/src/main/resources/config/orchestrator/events.conf index 454a648331..c5d7a0d0c6 100644 --- a/roda-core/roda-core/src/main/resources/config/orchestrator/events.conf +++ b/roda-core/roda-core/src/main/resources/config/orchestrator/events.conf @@ -1,15 +1,15 @@ -akka { +pekko { actor { provider = "cluster" warn-about-java-serializer-usage = true serializers { - proto = "akka.remote.serialization.ProtobufSerializer" - crdt = "org.roda.core.events.akka.CRDTSerializer" + proto = "org.apache.pekko.remote.serialization.ProtobufSerializer" + crdt = "org.roda.core.events.pekko.CRDTSerializer" } serialization-bindings { - "org.roda.core.common.akka.Messages" = proto - "org.roda.core.events.akka.CRDTWrapper" = crdt + "org.roda.core.common.pekko.Messages" = proto + "org.roda.core.events.pekko.CRDTWrapper" = crdt } } remote { @@ -26,7 +26,7 @@ akka { max-delta-elements = 100 } - loggers = ["akka.event.slf4j.Slf4jLogger"] + loggers = ["org.apache.pekko.event.slf4j.Slf4jLogger"] //loglevel = "DEBUG" //stdout-loglevel = "DEBUG" loglevel = "ERROR" diff --git a/roda-core/roda-core/src/main/resources/config/orchestrator/worker.conf b/roda-core/roda-core/src/main/resources/config/orchestrator/worker.conf index f1c6b5c060..885feb4a61 100644 --- a/roda-core/roda-core/src/main/resources/config/orchestrator/worker.conf +++ b/roda-core/roda-core/src/main/resources/config/orchestrator/worker.conf @@ -1,14 +1,14 @@ -akka { +pekko { - actor.provider = "akka.remote.RemoteActorRefProvider" + actor.provider = "org.apache.pekko.remote.RemoteActorRefProvider" remote.netty.tcp.port=0 remote.netty.tcp.hostname=127.0.0.1 cluster.client { initial-contacts = [ - "akka.tcp://ClusterSystem@127.0.0.1:2551/system/receptionist", - "akka.tcp://ClusterSystem@127.0.0.1:2552/system/receptionist" + "pekko.tcp://ClusterSystem@127.0.0.1:2551/system/receptionist", + "pekko.tcp://ClusterSystem@127.0.0.1:2552/system/receptionist" ] } diff --git a/roda-core/roda-core/src/main/resources/config/roda-core.properties b/roda-core/roda-core/src/main/resources/config/roda-core.properties index b66bc1ac76..57b3ef9b55 100644 --- a/roda-core/roda-core/src/main/resources/config/roda-core.properties +++ b/roda-core/roda-core/src/main/resources/config/roda-core.properties @@ -109,7 +109,7 @@ core.solr.cloud.healthcheck.timeout_ms=10000 # # Usage (prefix core.orchestrator): # -# * type: AKKA +# * type: PEKKO # * max_jobs_in_parallel: positive number of max jobs in parallel, # defaulting to the amount of processors (cpu) plus one # @@ -148,9 +148,9 @@ core.solr.cloud.healthcheck.timeout_ms=10000 # # Status: in use (but not all) ########################################################################## -core.orchestrator.type=AKKA +core.orchestrator.type=PEKKO -#core.orchestrator.type=AKKA_DISTRIBUTED +#core.orchestrator.type=PEKKO_DISTRIBUTED core.orchestrator.max_jobs_in_parallel = 8 core.orchestrator.max_limited_jobs_in_parallel = 2 core.orchestrator.nr_of_jobs_workers = 8 @@ -178,8 +178,8 @@ core.orchestrator.non_parallelizable_plugins = org.roda.core.plugins.base.mainte # once, set this to true, set to false otherwise # * notifier_class: class that implements the notifying logic # * handler_class: class that implements the handling logic -# * akka.writeConsistency: the desired write consistency (WriteMajority | WriteAll) -# * akka.writeConsistencyTimeoutInSeconds: the amount of seconds for a write to +# * pekko.writeConsistency: the desired write consistency (WriteMajority | WriteAll) +# * pekko.writeConsistencyTimeoutInSeconds: the amount of seconds for a write to # timeout, which doesn't mean that the write will not eventually get to the # other nodes (https://doc.akka.io/docs/akka/2.5/distributed-data.html) # @@ -187,10 +187,10 @@ core.orchestrator.non_parallelizable_plugins = org.roda.core.plugins.base.mainte ########################################################################## core.events.enabled = false core.events.notifier_and_handler_are_the_same = true -core.events.notifier_class = org.roda.core.events.akka.AkkaEventsHandlerAndNotifier +core.events.notifier_class = org.roda.core.events.pekko.PekkoEventsHandlerAndNotifier #core.events.handler_class = -core.events.akka.writeConsistency = WriteMajority -core.events.akka.writeConsistencyTimeoutInSeconds = 3 +core.events.pekko.writeConsistency = WriteMajority +core.events.pekko.writeConsistencyTimeoutInSeconds = 3 ##########################################################################