diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java index 31eb710ae8c8..6458c825aa14 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java @@ -39,6 +39,10 @@ public void add(int x) { roaringBitmap.add(x); } + public void or(RoaringBitmap32 other) { + roaringBitmap.or(other.roaringBitmap); + } + public boolean checkedAdd(int x) { return roaringBitmap.checkedAdd(x); } @@ -51,6 +55,10 @@ public boolean isEmpty() { return roaringBitmap.isEmpty(); } + public long getCardinality() { + return roaringBitmap.getLongCardinality(); + } + public void serialize(DataOutput out) throws IOException { roaringBitmap.runOptimize(); roaringBitmap.serialize(out); diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java index d15c0d210575..2948c10d6495 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java @@ -35,8 +35,8 @@ public class BitmapDeletionVector implements DeletionVector { private final RoaringBitmap32 roaringBitmap; - BitmapDeletionVector() { - roaringBitmap = new RoaringBitmap32(); + public BitmapDeletionVector() { + this.roaringBitmap = new RoaringBitmap32(); } private BitmapDeletionVector(RoaringBitmap32 roaringBitmap) { @@ -49,6 +49,15 @@ public void delete(long position) { roaringBitmap.add((int) position); } + @Override + public void merge(DeletionVector deletionVector) { + if (deletionVector instanceof BitmapDeletionVector) { + roaringBitmap.or(((BitmapDeletionVector) deletionVector).roaringBitmap); + } else { + throw new RuntimeException("Only instance with the same class type can be merged."); + } + } + @Override public boolean checkedDelete(long position) { checkPosition(position); @@ -66,6 +75,11 @@ public boolean isEmpty() { return roaringBitmap.isEmpty(); } + @Override + public long getCardinality() { + return roaringBitmap.getCardinality(); + } + @Override public byte[] serializeToBytes() { try (ByteArrayOutputStream bos = new ByteArrayOutputStream(); diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java index 60d69a436409..aa85c3df8b05 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java @@ -47,6 +47,13 @@ public interface DeletionVector { */ void delete(long position); + /** + * merge another {@link DeletionVector} to this current one. + * + * @param deletionVector the other {@link DeletionVector} + */ + void merge(DeletionVector deletionVector); + /** * Marks the row at the specified position as deleted. * @@ -77,6 +84,9 @@ default boolean checkedDelete(long position) { */ boolean isEmpty(); + /** @return the number of distinct integers added to the DeletionVector. */ + long getCardinality(); + /** * Serializes the deletion vector to a byte array for storage or transmission. * diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java index c014037d195a..3beb996238de 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java @@ -41,21 +41,9 @@ public class DeletionVectorsMaintainer { private boolean modified; private DeletionVectorsMaintainer( - IndexFileHandler fileHandler, - @Nullable Long snapshotId, - BinaryRow partition, - int bucket) { + IndexFileHandler fileHandler, Map deletionVectors) { this.indexFileHandler = fileHandler; - IndexFileMeta indexFile = - snapshotId == null - ? null - : fileHandler - .scan(snapshotId, DELETION_VECTORS_INDEX, partition, bucket) - .orElse(null); - this.deletionVectors = - indexFile == null - ? new HashMap<>() - : new HashMap<>(indexFileHandler.readAllDeletionVectors(indexFile)); + this.deletionVectors = deletionVectors; this.modified = false; } @@ -74,6 +62,17 @@ public void notifyNewDeletion(String fileName, long position) { } } + /** + * Notifies a new deletion which marks the specified deletion vector with the given file name. + * + * @param fileName The name of the file where the deletion occurred. + * @param deletionVector The deletion vector + */ + public void notifyNewDeletion(String fileName, DeletionVector deletionVector) { + deletionVectors.put(fileName, deletionVector); + modified = true; + } + /** * Removes the specified file's deletion vector, this method is typically used for remove before * files' deletion vector in compaction. @@ -130,7 +129,25 @@ public Factory(IndexFileHandler handler) { public DeletionVectorsMaintainer createOrRestore( @Nullable Long snapshotId, BinaryRow partition, int bucket) { - return new DeletionVectorsMaintainer(handler, snapshotId, partition, bucket); + IndexFileMeta indexFile = + snapshotId == null + ? null + : handler.scan(snapshotId, DELETION_VECTORS_INDEX, partition, bucket) + .orElse(null); + Map deletionVectors = + indexFile == null + ? new HashMap<>() + : new HashMap<>(handler.readAllDeletionVectors(indexFile)); + return createOrRestore(deletionVectors); + } + + public DeletionVectorsMaintainer create() { + return createOrRestore(new HashMap<>()); + } + + public DeletionVectorsMaintainer createOrRestore( + Map deletionVectors) { + return new DeletionVectorsMaintainer(handler, deletionVectors); } } }