Skip to content

Commit

Permalink
Speed up reading of properties in parquet files.
Browse files Browse the repository at this point in the history
  • Loading branch information
kenwenzel committed Jul 23, 2024
1 parent 405d9aa commit 4d037b8
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,15 @@ private void compactDataFiles(File weekFolder) throws IOException {
while (!nextTuples.isEmpty()) {
var pair = nextTuples.poll();
if (prevTuple == null || prevTuple.compareTo(pair.getFirst()) != 0) {
compactionFileWriter.write(pair.getFirst());
prevTuple = pair.getFirst();
var tuple = pair.getFirst();
// update the first flag
if (prevTuple == null || !Arrays.equals(prevTuple.id, tuple.id)) {
tuple.setFirst(true);
} else {
tuple.setFirst(null);
}
compactionFileWriter.write(tuple);
prevTuple = tuple;
} else if (prevTuple != null) {
// omit tuple as it is duplicate in terms of id, time, and seqNr
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.github.linkedfactory.core.kvin.util.AggregatingIterator;
import net.enilink.commons.iterator.IExtendedIterator;
import net.enilink.commons.iterator.NiceIterator;
import net.enilink.commons.iterator.WrappedIterator;
import net.enilink.commons.util.Pair;
import net.enilink.komma.core.URI;
import net.enilink.komma.core.URIs;
Expand Down Expand Up @@ -231,6 +232,7 @@ private synchronized void putInternal(Iterable<KvinTuple> tuples) throws IOExcep

WriterState writerState = null;
String prevKey = null;
KvinTupleInternal prevTuple = null;
for (KvinTuple tuple : tuples) {
KvinTupleInternal internalTuple = new KvinTupleInternal();

Expand Down Expand Up @@ -268,9 +270,14 @@ private synchronized void putInternal(Iterable<KvinTuple> tuples) throws IOExcep
} else {
internalTuple.setValueObject(null);
}
// set first flag
if (prevTuple == null || !Arrays.equals(prevTuple.id, internalTuple.id)) {
internalTuple.setFirst(true);
}
writerState.writer.write(internalTuple);
writerState.minMax[0] = Math.min(writerState.minMax[0], writeContext.itemIdCounter);
writerState.minMax[1] = Math.max(writerState.minMax[1], writeContext.itemIdCounter);
prevTuple = internalTuple;
}

for (WriterState state : writers.values()) {
Expand Down Expand Up @@ -745,7 +752,7 @@ KvinTuple selectNextTuple() throws IOException {
propertyValueCount = 0;
}
}
if (! skipAfterLimit) {
if (!skipAfterLimit) {
prevRecord = min.getFirst();
tuple = convert(min.getFirst());
propertyValueCount++;
Expand Down Expand Up @@ -926,98 +933,63 @@ public IExtendedIterator<URI> descendants(URI item, long limit) {
return null;
}

private long getNextPropertyId(long itemId, long lastPropertyId, long contextId) {
private List<URI> getProperties(long itemId, long contextId) {
try {
ByteBuffer keyBuffer = ByteBuffer.allocate(Long.BYTES * (lastPropertyId > 0 ? 2 : 1));
keyBuffer.putLong(itemId);
if (lastPropertyId > 0) {
keyBuffer.putLong(lastPropertyId + 1);
}
FilterPredicate filter = gt(FilterApi.binaryColumn("id"), Binary.fromConstantByteArray(keyBuffer.array()));
ByteBuffer lowKey = ByteBuffer.allocate(Long.BYTES);
lowKey.putLong(itemId);
ByteBuffer highKey = ByteBuffer.allocate(Long.BYTES * 2);
highKey.putLong(itemId);
highKey.putLong(Long.MAX_VALUE);
FilterPredicate filter = and(eq(FilterApi.booleanColumn("first"), true), and(
gt(FilterApi.binaryColumn("id"), Binary.fromConstantByteArray(lowKey.array())),
lt(FilterApi.binaryColumn("id"), Binary.fromConstantByteArray(highKey.array()))));

IdMappings idMappings = new IdMappings();
idMappings.itemId = itemId;
List<java.nio.file.Path> dataFolders = getDataFolders(idMappings);
Set<Long> propertyIds = new LinkedHashSet<>();

long minPropertyId = 0;
for (java.nio.file.Path dataFolder : dataFolders) {
for (Path dataFile : getDataFiles(dataFolder.toString())) {
ParquetReader<GenericRecord> reader = createGenericReader(getFile(dataFile), FilterCompat.get(filter));
GenericRecord record;
while ((record = reader.read()) != null) {
ByteBuffer idBb = (ByteBuffer) record.get(0);
if (itemId != idBb.getLong()) {
break;
}
// we are reading property ids which are lower than last given property id
idBb.getLong();
long currentPropertyId = idBb.getLong();
if (minPropertyId == 0 || currentPropertyId < minPropertyId) {
minPropertyId = currentPropertyId;
}
break;
propertyIds.add(currentPropertyId);
}
reader.close();
}
}
return minPropertyId;
} catch (Exception e) {
throw new RuntimeException(e);

return propertyIds.stream().map(id -> {
try {
return getProperty(id);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}).collect(Collectors.toList());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@Override
public synchronized IExtendedIterator<URI> properties(URI item) {
Lock readLock = null;
try {
Lock readLock = readLock();
readLock = readLock();
IdMappings idMappings = getIdMappings(item, null, null);
if (idMappings.itemId == 0L) {
return NiceIterator.emptyIterator();
}
return new NiceIterator<>() {
long lastPropertyId = 0L;
long nextPropertyId = 0L;
URI nextProperty;
boolean closed = false;

@Override
public boolean hasNext() {
if (!closed && nextPropertyId == 0) {
nextPropertyId = getNextPropertyId(idMappings.itemId, lastPropertyId, 0);
}
if (nextPropertyId == 0) {
close();
return false;
}
try {
nextProperty = getProperty(nextPropertyId);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
return true;
}

@Override
public URI next() {
if (nextProperty == null) {
throw new NoSuchElementException();
}
URI property = nextProperty;
lastPropertyId = nextPropertyId;
nextPropertyId = 0;
return property;
}

@Override
public void close() {
super.close();
closed = true;
if (readLock.isActive()) {
readLock.release();
}
}
};
List<URI> properties = getProperties(idMappings.itemId, 0L);
return WrappedIterator.create(properties.iterator());
} catch (IOException e) {
throw new UncheckedIOException(e);
} finally {
readLock.release();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ public class KvinTupleInternal implements Comparable<KvinTupleInternal> {
protected byte[] id;
protected Long time;
protected Integer seqNr;
protected Boolean first;
protected Integer valueInt;
protected Long valueLong;
protected Float valueFloat;
Expand Down Expand Up @@ -38,6 +39,15 @@ public void setSeqNr(int seqNr) {
this.seqNr = seqNr;
}

public Boolean getFirst() {
return first;
}

public KvinTupleInternal setFirst(Boolean first) {
this.first = first;
return this;
}

public Integer getValueInt() {
return valueInt;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class ParquetHelpers {
.name("id").type().bytesType().noDefault()
.name("time").type().longType().noDefault()
.name("seqNr").type().intType().intDefault(0)
.name("first").type().nullable().booleanType().noDefault()
.name("valueInt").type().nullable().intType().noDefault()
.name("valueLong").type().nullable().longType().noDefault()
.name("valueFloat").type().nullable().floatType().noDefault()
Expand All @@ -54,6 +55,8 @@ public class ParquetHelpers {
.name("valueBool").type().nullable().intType().noDefault()
.name("valueObject").type().nullable().bytesType().noDefault().endRecord();

static int kvinTupleFirstField = kvinTupleSchema.getField("valueInt").pos();

static Pattern fileWithSeqNr = Pattern.compile("^([^.].*)__([0-9]+)\\..*$");
static Pattern fileOrDotFileWithSeqNr = Pattern.compile("^\\.?([^.].*)__([0-9]+)\\..*$");
static Configuration configuration = new Configuration();
Expand Down Expand Up @@ -140,7 +143,7 @@ public static KvinTuple recordToTuple(URI item, URI property, URI context, Gener
int seqNr = (Integer) record.get(2);
int fields = record.getSchema().getFields().size();
Object value = null;
for (int i = 3; i < fields; i++) {
for (int i = kvinTupleFirstField; i < fields; i++) {
value = record.get(i);
if (value != null) {
if (i == fields - 1) {
Expand Down

0 comments on commit 4d037b8

Please sign in to comment.