Skip to content

Commit

Permalink
Merge pull request #202 from cloudsufi/bugfix/PLUGIN-1594
Browse files Browse the repository at this point in the history
PLUGIN-1594: Add initial offsets to the PartitionOffsets map
  • Loading branch information
sgarg-CS authored May 4, 2023
2 parents ce3ddc0 + 07ea548 commit 931866e
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public abstract class AbstractKafkaBatchSourceTest extends HydratorTestBase {

@Before
public void setup() throws Exception {
clear();
ArtifactId parentArtifact = NamespaceId.DEFAULT.artifact(APP_ARTIFACT.getName(), APP_ARTIFACT.getVersion());

// add the data-pipeline artifact and mock plugins
Expand Down Expand Up @@ -227,6 +228,64 @@ public void testKafkaSource() throws Exception {
));
}

@Test
public void testKafkaSourceWithInitialPartitionOffsets() throws Exception {

Schema schema = Schema.recordOf(
"user",
Schema.Field.of("id", Schema.of(Schema.Type.LONG)),
Schema.Field.of("first", Schema.of(Schema.Type.STRING)),
Schema.Field.of("last", Schema.of(Schema.Type.STRING)));

// create the pipeline config
String outputName = "sourceTestOutput";

Map<String, String> sourceProperties = new HashMap<>();
sourceProperties.put("kafkaBrokers", "localhost:" + kafkaPort);
sourceProperties.put("referenceName", "kafkaTest");
sourceProperties.put("topic", "users");
sourceProperties.put("schema", schema.toString());
sourceProperties.put("format", "csv");
sourceProperties.put("initialPartitionOffsets", "${initialPartitionOffsets}");
ETLStage source = new ETLStage("source", new ETLPlugin(getKafkaBatchSourceName(),
BatchSource.PLUGIN_TYPE, sourceProperties, null));
ETLStage sink = new ETLStage("sink", MockSink.getPlugin(outputName));

ETLBatchConfig pipelineConfig = ETLBatchConfig.builder()
.addStage(source)
.addStage(sink)
.addConnection(source.getName(), sink.getName())
.build();

// create the pipeline
ApplicationId pipelineId = NamespaceId.DEFAULT.app("testKafkaSource");
ApplicationManager appManager = deployApplication(pipelineId, new AppRequest<>(APP_ARTIFACT, pipelineConfig));

Map<String, String> messages = new HashMap<>();
messages.put("a", "0,samuel,jackson");
messages.put("b", "1,dwayne,johnson");
messages.put("c", "2,christopher,walken");
messages.put("d", "3,michael,jackson");
sendKafkaMessage("users", messages);

WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME);
workflowManager.startAndWaitForRun(Collections.singletonMap("initialPartitionOffsets", "0:2"),
ProgramRunStatus.COMPLETED, 2, TimeUnit.MINUTES);

DataSetManager<Table> outputManager = getDataset(outputName);
Set<StructuredRecord> outputRecords = new HashSet<>(MockSink.readOutput(outputManager));
Map<Long, String> actual = new HashMap<>();
for (StructuredRecord outputRecord : outputRecords) {
actual.put(outputRecord.get("id"), outputRecord.get("first") + " " + outputRecord.get("last"));
}

Map<Long, String> expected = ImmutableMap.of(
2L, "christopher walken",
3L, "michael jackson"
);
Assert.assertEquals(expected, actual);
}

/**
* Validates the test result and offsets after the testing pipeline was executed.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public KafkaPartitionOffsets(Map<Integer, Long> partitionOffsets) {
}

public void setPartitionOffset(int partition, long offset) {
partitionOffsets.computeIfPresent(partition, (k, v) -> offset);
partitionOffsets.put(partition, offset);
}

public long getPartitionOffset(int partition, long defaultValue) {
Expand Down

0 comments on commit 931866e

Please sign in to comment.