diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java index a5574d6a16..400df6b0d2 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java @@ -477,8 +477,8 @@ public void testDuplicateCommitAfterRestore(String metastore) .collect(Collectors.toList()); committer.commit(commitRequests); - // We add a loop for restore 3 times - for (int i = 0; i < 3; i++) { + // We add a loop for restore 6 times + for (int i = 2; i < 9; i++) { // We've two steps in checkpoint: 1. snapshotState(ckp); 2. // notifyCheckpointComplete(ckp). // It's possible that flink job will restore from a checkpoint with only step#1 finished @@ -493,8 +493,8 @@ public void testDuplicateCommitAfterRestore(String metastore) generator.generate( new Object[] { BinaryStringData.fromString( - Integer.toString(i + 2)), - BinaryStringData.fromString(Integer.toString(i + 2)) + Integer.toString(i)), + BinaryStringData.fromString(Integer.toString(i)) }))); Assertions.assertDoesNotThrow( () -> { @@ -504,7 +504,7 @@ public void testDuplicateCommitAfterRestore(String metastore) }); writer.flush(false); //Checkpoint id start from 1 - long checkpointId = i + 2; + long checkpointId = i; committer.commit( writer.prepareCommit().stream() .map( @@ -526,7 +526,8 @@ public void testDuplicateCommitAfterRestore(String metastore) .execute() .collect() .forEachRemaining(result::add); - Assertions.assertEquals(result.size(), 4); + // 8 APPEND and 1 COMPACT + Assertions.assertEquals(result.size(), 9); result.clear(); tEnv.sqlQuery("select * from paimon_catalog.test.`table1`") @@ -538,7 +539,11 @@ public void testDuplicateCommitAfterRestore(String metastore) Row.ofKind(RowKind.INSERT, "1", "1"), Row.ofKind(RowKind.INSERT, "2", "2"), Row.ofKind(RowKind.INSERT, "3", "3"), - Row.ofKind(RowKind.INSERT, "4", "4")), + Row.ofKind(RowKind.INSERT, "4", "4"), + Row.ofKind(RowKind.INSERT, "5", "5"), + Row.ofKind(RowKind.INSERT, "6", "6"), + Row.ofKind(RowKind.INSERT, "7", "7"), + Row.ofKind(RowKind.INSERT, "8", "8")), result); }