From a6ca11238ba32f62572cba9552724e906648bb5b Mon Sep 17 00:00:00 2001 From: fmendezh Date: Mon, 23 Sep 2024 15:44:07 +0200 Subject: [PATCH] https://github.com/gbif/pipelines/issues/1078 adding balancer handler --- .../messaging/DataWarehouseMessage.java} | 8 ++-- .../tasks/balancer/BalancerCallback.java | 15 ++----- .../PipelinesWarehouseMessageHandler.java | 42 +++++++++++++++++++ .../warehouse/DataWarehouseCallback.java | 9 ++-- 4 files changed, 55 insertions(+), 19 deletions(-) rename gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/{tasks/occurrences/warehouse/DataWarehouseFinishMessage.java => common/messaging/DataWarehouseMessage.java} (73%) create mode 100644 gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/balancer/handler/PipelinesWarehouseMessageHandler.java diff --git a/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/occurrences/warehouse/DataWarehouseFinishMessage.java b/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/common/messaging/DataWarehouseMessage.java similarity index 73% rename from gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/occurrences/warehouse/DataWarehouseFinishMessage.java rename to gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/common/messaging/DataWarehouseMessage.java index 71a7555016..7622fd9623 100644 --- a/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/occurrences/warehouse/DataWarehouseFinishMessage.java +++ b/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/common/messaging/DataWarehouseMessage.java @@ -1,4 +1,4 @@ -package org.gbif.pipelines.tasks.occurrences.warehouse; +package org.gbif.pipelines.common.messaging; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -6,12 +6,12 @@ import java.util.UUID; import org.gbif.common.messaging.api.messages.PipelinesHdfsViewMessage; -public class DataWarehouseFinishMessage extends PipelinesHdfsViewMessage { +public class DataWarehouseMessage extends PipelinesHdfsViewMessage { - public DataWarehouseFinishMessage() {} + public DataWarehouseMessage() {} @JsonCreator - public DataWarehouseFinishMessage( + public DataWarehouseMessage( @JsonProperty("datasetUuid") UUID datasetUuid, @JsonProperty("attempt") int attempt, @JsonProperty("pipelineSteps") Set pipelineSteps, diff --git a/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/balancer/BalancerCallback.java b/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/balancer/BalancerCallback.java index 7191278214..c4f6054d11 100644 --- a/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/balancer/BalancerCallback.java +++ b/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/balancer/BalancerCallback.java @@ -18,17 +18,8 @@ import org.gbif.common.messaging.api.messages.PipelinesInterpretedMessage; import org.gbif.common.messaging.api.messages.PipelinesVerbatimMessage; import org.gbif.common.messaging.api.messages.PipelinesXmlMessage; -import org.gbif.pipelines.tasks.balancer.handler.EventsIndexedMessageHandler; -import org.gbif.pipelines.tasks.balancer.handler.EventsInterpretedMessageHandler; -import org.gbif.pipelines.tasks.balancer.handler.InterpretedMessageHandler; -import org.gbif.pipelines.tasks.balancer.handler.PipelinesAbcdMessageHandler; -import org.gbif.pipelines.tasks.balancer.handler.PipelinesDwcaMessageHandler; -import org.gbif.pipelines.tasks.balancer.handler.PipelinesEventsHdfsViewMessageHandler; -import org.gbif.pipelines.tasks.balancer.handler.PipelinesFragmenterMessageHandler; -import org.gbif.pipelines.tasks.balancer.handler.PipelinesHdfsViewMessageHandler; -import org.gbif.pipelines.tasks.balancer.handler.PipelinesIndexedMessageHandler; -import org.gbif.pipelines.tasks.balancer.handler.PipelinesXmlMessageHandler; -import org.gbif.pipelines.tasks.balancer.handler.VerbatimMessageHandler; +import org.gbif.pipelines.common.messaging.DataWarehouseMessage; +import org.gbif.pipelines.tasks.balancer.handler.*; /** * Callback which is called when the {@link PipelinesBalancerMessage} is received. @@ -76,6 +67,8 @@ public void handleMessage(PipelinesBalancerMessage message) { EventsIndexedMessageHandler.handle(publisher, message); } else if (PipelinesEventsHdfsViewMessage.class.getSimpleName().equals(className)) { PipelinesEventsHdfsViewMessageHandler.handle(publisher, message); + } else if (DataWarehouseMessage.class.getSimpleName().equals(className)) { + PipelinesWarehouseMessageHandler.handle(publisher, message); } else { log.error("Handler for {} wasn't found!", className); } diff --git a/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/balancer/handler/PipelinesWarehouseMessageHandler.java b/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/balancer/handler/PipelinesWarehouseMessageHandler.java new file mode 100644 index 0000000000..0895b619b5 --- /dev/null +++ b/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/balancer/handler/PipelinesWarehouseMessageHandler.java @@ -0,0 +1,42 @@ +package org.gbif.pipelines.tasks.balancer.handler; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.gbif.common.messaging.api.MessagePublisher; +import org.gbif.common.messaging.api.messages.PipelinesBalancerMessage; +import org.gbif.pipelines.common.messaging.DataWarehouseMessage; + +/** + * Populates and sends the {@link Pipelines} message, the main method is {@link + * DataWarehouseMessage#handle} + */ +@Slf4j +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class PipelinesWarehouseMessageHandler { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + /** Main handler, basically computes the runner type and sends to the same consumer */ + public static void handle(MessagePublisher publisher, PipelinesBalancerMessage message) + throws IOException { + + log.info("Process DataWarehouseMessage - {}", message); + + DataWarehouseMessage m = MAPPER.readValue(message.getPayload(), DataWarehouseMessage.class); + + DataWarehouseMessage outputMessage = + new DataWarehouseMessage( + m.getDatasetUuid(), + m.getAttempt(), + m.getPipelineSteps(), + m.getRunner(), + m.getExecutionId()); + + publisher.send(outputMessage); + + log.info("The message has been sent - {}", outputMessage); + } +} diff --git a/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/occurrences/warehouse/DataWarehouseCallback.java b/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/occurrences/warehouse/DataWarehouseCallback.java index ecf04fac65..6a8214a5c5 100644 --- a/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/occurrences/warehouse/DataWarehouseCallback.java +++ b/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/occurrences/warehouse/DataWarehouseCallback.java @@ -10,6 +10,7 @@ import org.gbif.common.messaging.api.messages.PipelinesInterpretationMessage; import org.gbif.pipelines.common.PipelinesVariables; import org.gbif.pipelines.common.airflow.AppName; +import org.gbif.pipelines.common.messaging.DataWarehouseMessage; import org.gbif.pipelines.common.process.AirflowSparkLauncher; import org.gbif.pipelines.common.process.BeamParametersBuilder; import org.gbif.pipelines.common.process.RecordCountReader; @@ -26,7 +27,7 @@ @Slf4j @Builder public class DataWarehouseCallback extends AbstractMessageCallback - implements StepHandler { + implements StepHandler { protected final HdfsViewConfiguration config; private final MessagePublisher publisher; @@ -36,7 +37,7 @@ public class DataWarehouseCallback extends AbstractMessageCallbackbuilder() + PipelinesCallback.builder() .historyClient(historyClient) .datasetClient(datasetClient) .config(config) @@ -99,8 +100,8 @@ private void runDistributed( } @Override - public DataWarehouseFinishMessage createOutgoingMessage(PipelinesHdfsViewMessage message) { - return new DataWarehouseFinishMessage( + public DataWarehouseMessage createOutgoingMessage(PipelinesHdfsViewMessage message) { + return new DataWarehouseMessage( message.getDatasetUuid(), message.getAttempt(), message.getPipelineSteps(), null, null); }