Skip to content

Commit

Permalink
[core] merge and getCardinality api in DeletionVector (apache#3319)
Browse files Browse the repository at this point in the history
  • Loading branch information
YannByron authored May 11, 2024
1 parent 2af82a4 commit 49b11e5
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public void add(int x) {
roaringBitmap.add(x);
}

public void or(RoaringBitmap32 other) {
roaringBitmap.or(other.roaringBitmap);
}

public boolean checkedAdd(int x) {
return roaringBitmap.checkedAdd(x);
}
Expand All @@ -51,6 +55,10 @@ public boolean isEmpty() {
return roaringBitmap.isEmpty();
}

public long getCardinality() {
return roaringBitmap.getLongCardinality();
}

public void serialize(DataOutput out) throws IOException {
roaringBitmap.runOptimize();
roaringBitmap.serialize(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ public class BitmapDeletionVector implements DeletionVector {

private final RoaringBitmap32 roaringBitmap;

BitmapDeletionVector() {
roaringBitmap = new RoaringBitmap32();
public BitmapDeletionVector() {
this.roaringBitmap = new RoaringBitmap32();
}

private BitmapDeletionVector(RoaringBitmap32 roaringBitmap) {
Expand All @@ -49,6 +49,15 @@ public void delete(long position) {
roaringBitmap.add((int) position);
}

@Override
public void merge(DeletionVector deletionVector) {
if (deletionVector instanceof BitmapDeletionVector) {
roaringBitmap.or(((BitmapDeletionVector) deletionVector).roaringBitmap);
} else {
throw new RuntimeException("Only instance with the same class type can be merged.");
}
}

@Override
public boolean checkedDelete(long position) {
checkPosition(position);
Expand All @@ -66,6 +75,11 @@ public boolean isEmpty() {
return roaringBitmap.isEmpty();
}

@Override
public long getCardinality() {
return roaringBitmap.getCardinality();
}

@Override
public byte[] serializeToBytes() {
try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ public interface DeletionVector {
*/
void delete(long position);

/**
* merge another {@link DeletionVector} to this current one.
*
* @param deletionVector the other {@link DeletionVector}
*/
void merge(DeletionVector deletionVector);

/**
* Marks the row at the specified position as deleted.
*
Expand Down Expand Up @@ -77,6 +84,9 @@ default boolean checkedDelete(long position) {
*/
boolean isEmpty();

/** @return the number of distinct integers added to the DeletionVector. */
long getCardinality();

/**
* Serializes the deletion vector to a byte array for storage or transmission.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,9 @@ public class DeletionVectorsMaintainer {
private boolean modified;

private DeletionVectorsMaintainer(
IndexFileHandler fileHandler,
@Nullable Long snapshotId,
BinaryRow partition,
int bucket) {
IndexFileHandler fileHandler, Map<String, DeletionVector> deletionVectors) {
this.indexFileHandler = fileHandler;
IndexFileMeta indexFile =
snapshotId == null
? null
: fileHandler
.scan(snapshotId, DELETION_VECTORS_INDEX, partition, bucket)
.orElse(null);
this.deletionVectors =
indexFile == null
? new HashMap<>()
: new HashMap<>(indexFileHandler.readAllDeletionVectors(indexFile));
this.deletionVectors = deletionVectors;
this.modified = false;
}

Expand All @@ -74,6 +62,17 @@ public void notifyNewDeletion(String fileName, long position) {
}
}

/**
* Notifies a new deletion which marks the specified deletion vector with the given file name.
*
* @param fileName The name of the file where the deletion occurred.
* @param deletionVector The deletion vector
*/
public void notifyNewDeletion(String fileName, DeletionVector deletionVector) {
deletionVectors.put(fileName, deletionVector);
modified = true;
}

/**
* Removes the specified file's deletion vector, this method is typically used for remove before
* files' deletion vector in compaction.
Expand Down Expand Up @@ -130,7 +129,25 @@ public Factory(IndexFileHandler handler) {

public DeletionVectorsMaintainer createOrRestore(
@Nullable Long snapshotId, BinaryRow partition, int bucket) {
return new DeletionVectorsMaintainer(handler, snapshotId, partition, bucket);
IndexFileMeta indexFile =
snapshotId == null
? null
: handler.scan(snapshotId, DELETION_VECTORS_INDEX, partition, bucket)
.orElse(null);
Map<String, DeletionVector> deletionVectors =
indexFile == null
? new HashMap<>()
: new HashMap<>(handler.readAllDeletionVectors(indexFile));
return createOrRestore(deletionVectors);
}

public DeletionVectorsMaintainer create() {
return createOrRestore(new HashMap<>());
}

public DeletionVectorsMaintainer createOrRestore(
Map<String, DeletionVector> deletionVectors) {
return new DeletionVectorsMaintainer(handler, deletionVectors);
}
}
}

0 comments on commit 49b11e5

Please sign in to comment.