diff --git a/kafka-plugins-client/src/test/java/io/cdap/plugin/kafka/AbstractKafkaBatchSourceTest.java b/kafka-plugins-client/src/test/java/io/cdap/plugin/kafka/AbstractKafkaBatchSourceTest.java index 2c34c63..2d87ce5 100644 --- a/kafka-plugins-client/src/test/java/io/cdap/plugin/kafka/AbstractKafkaBatchSourceTest.java +++ b/kafka-plugins-client/src/test/java/io/cdap/plugin/kafka/AbstractKafkaBatchSourceTest.java @@ -95,6 +95,7 @@ public abstract class AbstractKafkaBatchSourceTest extends HydratorTestBase { @Before public void setup() throws Exception { + clear(); ArtifactId parentArtifact = NamespaceId.DEFAULT.artifact(APP_ARTIFACT.getName(), APP_ARTIFACT.getVersion()); // add the data-pipeline artifact and mock plugins @@ -227,6 +228,64 @@ public void testKafkaSource() throws Exception { )); } + @Test + public void testKafkaSourceWithInitialPartitionOffsets() throws Exception { + + Schema schema = Schema.recordOf( + "user", + Schema.Field.of("id", Schema.of(Schema.Type.LONG)), + Schema.Field.of("first", Schema.of(Schema.Type.STRING)), + Schema.Field.of("last", Schema.of(Schema.Type.STRING))); + + // create the pipeline config + String outputName = "sourceTestOutput"; + + Map sourceProperties = new HashMap<>(); + sourceProperties.put("kafkaBrokers", "localhost:" + kafkaPort); + sourceProperties.put("referenceName", "kafkaTest"); + sourceProperties.put("topic", "users"); + sourceProperties.put("schema", schema.toString()); + sourceProperties.put("format", "csv"); + sourceProperties.put("initialPartitionOffsets", "${initialPartitionOffsets}"); + ETLStage source = new ETLStage("source", new ETLPlugin(getKafkaBatchSourceName(), + BatchSource.PLUGIN_TYPE, sourceProperties, null)); + ETLStage sink = new ETLStage("sink", MockSink.getPlugin(outputName)); + + ETLBatchConfig pipelineConfig = ETLBatchConfig.builder() + .addStage(source) + .addStage(sink) + .addConnection(source.getName(), sink.getName()) + .build(); + + // create the pipeline + ApplicationId pipelineId = NamespaceId.DEFAULT.app("testKafkaSource"); + ApplicationManager appManager = deployApplication(pipelineId, new AppRequest<>(APP_ARTIFACT, pipelineConfig)); + + Map messages = new HashMap<>(); + messages.put("a", "0,samuel,jackson"); + messages.put("b", "1,dwayne,johnson"); + messages.put("c", "2,christopher,walken"); + messages.put("d", "3,michael,jackson"); + sendKafkaMessage("users", messages); + + WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); + workflowManager.startAndWaitForRun(Collections.singletonMap("initialPartitionOffsets", "0:2"), + ProgramRunStatus.COMPLETED, 2, TimeUnit.MINUTES); + + DataSetManager outputManager = getDataset(outputName); + Set outputRecords = new HashSet<>(MockSink.readOutput(outputManager)); + Map actual = new HashMap<>(); + for (StructuredRecord outputRecord : outputRecords) { + actual.put(outputRecord.get("id"), outputRecord.get("first") + " " + outputRecord.get("last")); + } + + Map expected = ImmutableMap.of( + 2L, "christopher walken", + 3L, "michael jackson" + ); + Assert.assertEquals(expected, actual); + } + /** * Validates the test result and offsets after the testing pipeline was executed. * diff --git a/kafka-plugins-common/src/main/java/io/cdap/plugin/kafka/batch/source/KafkaPartitionOffsets.java b/kafka-plugins-common/src/main/java/io/cdap/plugin/kafka/batch/source/KafkaPartitionOffsets.java index 6008ac9..b416d20 100644 --- a/kafka-plugins-common/src/main/java/io/cdap/plugin/kafka/batch/source/KafkaPartitionOffsets.java +++ b/kafka-plugins-common/src/main/java/io/cdap/plugin/kafka/batch/source/KafkaPartitionOffsets.java @@ -47,7 +47,7 @@ public KafkaPartitionOffsets(Map partitionOffsets) { } public void setPartitionOffset(int partition, long offset) { - partitionOffsets.computeIfPresent(partition, (k, v) -> offset); + partitionOffsets.put(partition, offset); } public long getPartitionOffset(int partition, long defaultValue) {