Skip to content

Commit

Permalink
https://github.com/gbif/pipelines/issues/1078
Browse files Browse the repository at this point in the history
adding balancer handler
  • Loading branch information
fmendezh committed Sep 23, 2024
1 parent 1f9acb0 commit a6ca112
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package org.gbif.pipelines.tasks.occurrences.warehouse;
package org.gbif.pipelines.common.messaging;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Set;
import java.util.UUID;
import org.gbif.common.messaging.api.messages.PipelinesHdfsViewMessage;

public class DataWarehouseFinishMessage extends PipelinesHdfsViewMessage {
public class DataWarehouseMessage extends PipelinesHdfsViewMessage {

public DataWarehouseFinishMessage() {}
public DataWarehouseMessage() {}

@JsonCreator
public DataWarehouseFinishMessage(
public DataWarehouseMessage(
@JsonProperty("datasetUuid") UUID datasetUuid,
@JsonProperty("attempt") int attempt,
@JsonProperty("pipelineSteps") Set<String> pipelineSteps,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,8 @@
import org.gbif.common.messaging.api.messages.PipelinesInterpretedMessage;
import org.gbif.common.messaging.api.messages.PipelinesVerbatimMessage;
import org.gbif.common.messaging.api.messages.PipelinesXmlMessage;
import org.gbif.pipelines.tasks.balancer.handler.EventsIndexedMessageHandler;
import org.gbif.pipelines.tasks.balancer.handler.EventsInterpretedMessageHandler;
import org.gbif.pipelines.tasks.balancer.handler.InterpretedMessageHandler;
import org.gbif.pipelines.tasks.balancer.handler.PipelinesAbcdMessageHandler;
import org.gbif.pipelines.tasks.balancer.handler.PipelinesDwcaMessageHandler;
import org.gbif.pipelines.tasks.balancer.handler.PipelinesEventsHdfsViewMessageHandler;
import org.gbif.pipelines.tasks.balancer.handler.PipelinesFragmenterMessageHandler;
import org.gbif.pipelines.tasks.balancer.handler.PipelinesHdfsViewMessageHandler;
import org.gbif.pipelines.tasks.balancer.handler.PipelinesIndexedMessageHandler;
import org.gbif.pipelines.tasks.balancer.handler.PipelinesXmlMessageHandler;
import org.gbif.pipelines.tasks.balancer.handler.VerbatimMessageHandler;
import org.gbif.pipelines.common.messaging.DataWarehouseMessage;
import org.gbif.pipelines.tasks.balancer.handler.*;

/**
* Callback which is called when the {@link PipelinesBalancerMessage} is received.
Expand Down Expand Up @@ -76,6 +67,8 @@ public void handleMessage(PipelinesBalancerMessage message) {
EventsIndexedMessageHandler.handle(publisher, message);
} else if (PipelinesEventsHdfsViewMessage.class.getSimpleName().equals(className)) {
PipelinesEventsHdfsViewMessageHandler.handle(publisher, message);
} else if (DataWarehouseMessage.class.getSimpleName().equals(className)) {
PipelinesWarehouseMessageHandler.handle(publisher, message);
} else {
log.error("Handler for {} wasn't found!", className);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package org.gbif.pipelines.tasks.balancer.handler;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.gbif.common.messaging.api.MessagePublisher;
import org.gbif.common.messaging.api.messages.PipelinesBalancerMessage;
import org.gbif.pipelines.common.messaging.DataWarehouseMessage;

/**
* Populates and sends the {@link Pipelines} message, the main method is {@link
* DataWarehouseMessage#handle}
*/
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class PipelinesWarehouseMessageHandler {

private static final ObjectMapper MAPPER = new ObjectMapper();

/** Main handler, basically computes the runner type and sends to the same consumer */
public static void handle(MessagePublisher publisher, PipelinesBalancerMessage message)
throws IOException {

log.info("Process DataWarehouseMessage - {}", message);

DataWarehouseMessage m = MAPPER.readValue(message.getPayload(), DataWarehouseMessage.class);

DataWarehouseMessage outputMessage =
new DataWarehouseMessage(
m.getDatasetUuid(),
m.getAttempt(),
m.getPipelineSteps(),
m.getRunner(),
m.getExecutionId());

publisher.send(outputMessage);

log.info("The message has been sent - {}", outputMessage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.gbif.common.messaging.api.messages.PipelinesInterpretationMessage;
import org.gbif.pipelines.common.PipelinesVariables;
import org.gbif.pipelines.common.airflow.AppName;
import org.gbif.pipelines.common.messaging.DataWarehouseMessage;
import org.gbif.pipelines.common.process.AirflowSparkLauncher;
import org.gbif.pipelines.common.process.BeamParametersBuilder;
import org.gbif.pipelines.common.process.RecordCountReader;
Expand All @@ -26,7 +27,7 @@
@Slf4j
@Builder
public class DataWarehouseCallback extends AbstractMessageCallback<PipelinesHdfsViewMessage>
implements StepHandler<PipelinesHdfsViewMessage, DataWarehouseFinishMessage> {
implements StepHandler<PipelinesHdfsViewMessage, DataWarehouseMessage> {

protected final HdfsViewConfiguration config;
private final MessagePublisher publisher;
Expand All @@ -36,7 +37,7 @@ public class DataWarehouseCallback extends AbstractMessageCallback<PipelinesHdfs

@Override
public void handleMessage(PipelinesHdfsViewMessage message) {
PipelinesCallback.<PipelinesHdfsViewMessage, DataWarehouseFinishMessage>builder()
PipelinesCallback.<PipelinesHdfsViewMessage, DataWarehouseMessage>builder()
.historyClient(historyClient)
.datasetClient(datasetClient)
.config(config)
Expand Down Expand Up @@ -99,8 +100,8 @@ private void runDistributed(
}

@Override
public DataWarehouseFinishMessage createOutgoingMessage(PipelinesHdfsViewMessage message) {
return new DataWarehouseFinishMessage(
public DataWarehouseMessage createOutgoingMessage(PipelinesHdfsViewMessage message) {
return new DataWarehouseMessage(
message.getDatasetUuid(), message.getAttempt(), message.getPipelineSteps(), null, null);
}

Expand Down

0 comments on commit a6ca112

Please sign in to comment.