diff --git a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/Compactor.java b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/Compactor.java index 597f2a5b..625a71a5 100644 --- a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/Compactor.java +++ b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/Compactor.java @@ -204,8 +204,15 @@ private void compactDataFiles(File weekFolder) throws IOException { while (!nextTuples.isEmpty()) { var pair = nextTuples.poll(); if (prevTuple == null || prevTuple.compareTo(pair.getFirst()) != 0) { - compactionFileWriter.write(pair.getFirst()); - prevTuple = pair.getFirst(); + var tuple = pair.getFirst(); + // update the first flag + if (prevTuple == null || !Arrays.equals(prevTuple.id, tuple.id)) { + tuple.setFirst(true); + } else { + tuple.setFirst(null); + } + compactionFileWriter.write(tuple); + prevTuple = tuple; } else if (prevTuple != null) { // omit tuple as it is duplicate in terms of id, time, and seqNr } diff --git a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/KvinParquet.java b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/KvinParquet.java index 630d503c..29eb0597 100644 --- a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/KvinParquet.java +++ b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/KvinParquet.java @@ -9,6 +9,7 @@ import io.github.linkedfactory.core.kvin.util.AggregatingIterator; import net.enilink.commons.iterator.IExtendedIterator; import net.enilink.commons.iterator.NiceIterator; +import net.enilink.commons.iterator.WrappedIterator; import net.enilink.commons.util.Pair; import net.enilink.komma.core.URI; import net.enilink.komma.core.URIs; @@ -231,6 +232,7 @@ private synchronized void putInternal(Iterable tuples) throws IOExcep WriterState writerState = null; String prevKey = null; + KvinTupleInternal prevTuple = null; for (KvinTuple tuple : tuples) { KvinTupleInternal internalTuple = new KvinTupleInternal(); @@ -268,9 +270,14 @@ private synchronized void putInternal(Iterable tuples) throws IOExcep } else { internalTuple.setValueObject(null); } + // set first flag + if (prevTuple == null || !Arrays.equals(prevTuple.id, internalTuple.id)) { + internalTuple.setFirst(true); + } writerState.writer.write(internalTuple); writerState.minMax[0] = Math.min(writerState.minMax[0], writeContext.itemIdCounter); writerState.minMax[1] = Math.max(writerState.minMax[1], writeContext.itemIdCounter); + prevTuple = internalTuple; } for (WriterState state : writers.values()) { @@ -745,7 +752,7 @@ KvinTuple selectNextTuple() throws IOException { propertyValueCount = 0; } } - if (! skipAfterLimit) { + if (!skipAfterLimit) { prevRecord = min.getFirst(); tuple = convert(min.getFirst()); propertyValueCount++; @@ -926,98 +933,63 @@ public IExtendedIterator descendants(URI item, long limit) { return null; } - private long getNextPropertyId(long itemId, long lastPropertyId, long contextId) { + private List getProperties(long itemId, long contextId) { try { - ByteBuffer keyBuffer = ByteBuffer.allocate(Long.BYTES * (lastPropertyId > 0 ? 2 : 1)); - keyBuffer.putLong(itemId); - if (lastPropertyId > 0) { - keyBuffer.putLong(lastPropertyId + 1); - } - FilterPredicate filter = gt(FilterApi.binaryColumn("id"), Binary.fromConstantByteArray(keyBuffer.array())); + ByteBuffer lowKey = ByteBuffer.allocate(Long.BYTES); + lowKey.putLong(itemId); + ByteBuffer highKey = ByteBuffer.allocate(Long.BYTES * 2); + highKey.putLong(itemId); + highKey.putLong(Long.MAX_VALUE); + FilterPredicate filter = and(eq(FilterApi.booleanColumn("first"), true), and( + gt(FilterApi.binaryColumn("id"), Binary.fromConstantByteArray(lowKey.array())), + lt(FilterApi.binaryColumn("id"), Binary.fromConstantByteArray(highKey.array())))); IdMappings idMappings = new IdMappings(); idMappings.itemId = itemId; List dataFolders = getDataFolders(idMappings); + Set propertyIds = new LinkedHashSet<>(); - long minPropertyId = 0; for (java.nio.file.Path dataFolder : dataFolders) { for (Path dataFile : getDataFiles(dataFolder.toString())) { ParquetReader reader = createGenericReader(getFile(dataFile), FilterCompat.get(filter)); GenericRecord record; while ((record = reader.read()) != null) { ByteBuffer idBb = (ByteBuffer) record.get(0); - if (itemId != idBb.getLong()) { - break; - } - // we are reading property ids which are lower than last given property id + idBb.getLong(); long currentPropertyId = idBb.getLong(); - if (minPropertyId == 0 || currentPropertyId < minPropertyId) { - minPropertyId = currentPropertyId; - } - break; + propertyIds.add(currentPropertyId); } reader.close(); } } - return minPropertyId; - } catch (Exception e) { - throw new RuntimeException(e); + + return propertyIds.stream().map(id -> { + try { + return getProperty(id); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }).collect(Collectors.toList()); + } catch (IOException e) { + throw new UncheckedIOException(e); } } @Override public synchronized IExtendedIterator properties(URI item) { + Lock readLock = null; try { - Lock readLock = readLock(); + readLock = readLock(); IdMappings idMappings = getIdMappings(item, null, null); if (idMappings.itemId == 0L) { return NiceIterator.emptyIterator(); } - return new NiceIterator<>() { - long lastPropertyId = 0L; - long nextPropertyId = 0L; - URI nextProperty; - boolean closed = false; - - @Override - public boolean hasNext() { - if (!closed && nextPropertyId == 0) { - nextPropertyId = getNextPropertyId(idMappings.itemId, lastPropertyId, 0); - } - if (nextPropertyId == 0) { - close(); - return false; - } - try { - nextProperty = getProperty(nextPropertyId); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - return true; - } - - @Override - public URI next() { - if (nextProperty == null) { - throw new NoSuchElementException(); - } - URI property = nextProperty; - lastPropertyId = nextPropertyId; - nextPropertyId = 0; - return property; - } - - @Override - public void close() { - super.close(); - closed = true; - if (readLock.isActive()) { - readLock.release(); - } - } - }; + List properties = getProperties(idMappings.itemId, 0L); + return WrappedIterator.create(properties.iterator()); } catch (IOException e) { throw new UncheckedIOException(e); + } finally { + readLock.release(); } } diff --git a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/KvinTupleInternal.java b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/KvinTupleInternal.java index eed36ba0..fc7731cb 100644 --- a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/KvinTupleInternal.java +++ b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/KvinTupleInternal.java @@ -6,6 +6,7 @@ public class KvinTupleInternal implements Comparable { protected byte[] id; protected Long time; protected Integer seqNr; + protected Boolean first; protected Integer valueInt; protected Long valueLong; protected Float valueFloat; @@ -38,6 +39,15 @@ public void setSeqNr(int seqNr) { this.seqNr = seqNr; } + public Boolean getFirst() { + return first; + } + + public KvinTupleInternal setFirst(Boolean first) { + this.first = first; + return this; + } + public Integer getValueInt() { return valueInt; } diff --git a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/ParquetHelpers.java b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/ParquetHelpers.java index 3cf3128c..8ea7b13a 100644 --- a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/ParquetHelpers.java +++ b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/ParquetHelpers.java @@ -46,6 +46,7 @@ public class ParquetHelpers { .name("id").type().bytesType().noDefault() .name("time").type().longType().noDefault() .name("seqNr").type().intType().intDefault(0) + .name("first").type().nullable().booleanType().noDefault() .name("valueInt").type().nullable().intType().noDefault() .name("valueLong").type().nullable().longType().noDefault() .name("valueFloat").type().nullable().floatType().noDefault() @@ -54,6 +55,8 @@ public class ParquetHelpers { .name("valueBool").type().nullable().intType().noDefault() .name("valueObject").type().nullable().bytesType().noDefault().endRecord(); + static int kvinTupleFirstField = kvinTupleSchema.getField("valueInt").pos(); + static Pattern fileWithSeqNr = Pattern.compile("^([^.].*)__([0-9]+)\\..*$"); static Pattern fileOrDotFileWithSeqNr = Pattern.compile("^\\.?([^.].*)__([0-9]+)\\..*$"); static Configuration configuration = new Configuration(); @@ -140,7 +143,7 @@ public static KvinTuple recordToTuple(URI item, URI property, URI context, Gener int seqNr = (Integer) record.get(2); int fields = record.getSchema().getFields().size(); Object value = null; - for (int i = 3; i < fields; i++) { + for (int i = kvinTupleFirstField; i < fields; i++) { value = record.get(i); if (value != null) { if (i == fields - 1) {