Skip to content

Commit

Permalink
Support contexts for all Kvin operations.
Browse files Browse the repository at this point in the history
  • Loading branch information
kenwenzel committed Jul 23, 2024
1 parent 74e3b89 commit 4b768a2
Show file tree
Hide file tree
Showing 33 changed files with 345 additions and 213 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,12 @@ private static Kvin createStore(String storeLocation, String type) throws IOExce
}

@Command(description = "Retrieves all descendants of a given item")
int descendants(@Parameters(paramLabel = "<item>", defaultValue = "", description = "the target item") String item) {
int descendants(@Parameters(paramLabel = "<item>", defaultValue = "", description = "the target item") String item,
@Parameters(paramLabel = "<context>", defaultValue = Kvin.DEFAULT_CONTEXT_VALUE, description = "the context") String context) {
URI itemUri = URIs.createURI(item);
URI contextUri = URIs.createURI(context);
try (Kvin store = createStore(storeLocation)) {
store.descendants(itemUri).forEach(d -> {
store.descendants(itemUri, contextUri).forEach(d -> {
System.out.println(d);
});
} catch (Exception e) {
Expand All @@ -80,10 +82,12 @@ int descendants(@Parameters(paramLabel = "<item>", defaultValue = "", descriptio
}

@Command(description = "Retrieves all properties of a given item")
int properties(@Parameters(paramLabel = "<item>", description = "the target item") String item) {
int properties(@Parameters(paramLabel = "<item>", description = "the target item") String item,
@Parameters(paramLabel = "<context>", defaultValue = Kvin.DEFAULT_CONTEXT_VALUE, description = "the context") String context) {
URI itemUri = URIs.createURI(item);
URI contextUri = URIs.createURI(context);
try (Kvin store = createStore(storeLocation)) {
store.properties(itemUri).forEach(p -> {
store.properties(itemUri, contextUri).forEach(p -> {
System.out.println(p);
});
} catch (Exception e) {
Expand Down Expand Up @@ -117,7 +121,7 @@ public void run() {
try (sink) {
IExtendedIterator<URI> items;
if ("*".equals(item)) {
items = store.descendants(URIs.createURI(""));
items = store.descendants(URIs.createURI(""), contextUri);
} else {
items = WrappedIterator.create(Arrays.asList(URIs.createURI(item)).iterator());
}
Expand Down Expand Up @@ -201,7 +205,7 @@ public void run() {
try (Kvin store = createStore(cli.storeLocation)) {
IExtendedIterator<URI> items;
if ("*".equals(item)) {
items = store.descendants(URIs.createURI(""));
items = store.descendants(URIs.createURI(""), contextUri);
} else {
items = WrappedIterator.create(Arrays.asList(URIs.createURI(item)).iterator());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,23 @@ public long delete(URI item, URI property, URI context, long end, long begin) {
}

@Override
public boolean delete(URI item) {
return getDelegate().delete(item);
public boolean delete(URI item, URI context) {
return getDelegate().delete(item, context);
}

@Override
public IExtendedIterator<URI> descendants(URI item) {
return getDelegate().descendants(item);
public IExtendedIterator<URI> descendants(URI item, URI context) {
return getDelegate().descendants(item, context);
}

@Override
public IExtendedIterator<URI> descendants(URI item, long limit) {
return getDelegate().descendants(item, limit);
public IExtendedIterator<URI> descendants(URI item, URI context, long limit) {
return getDelegate().descendants(item, context, limit);
}

@Override
public IExtendedIterator<URI> properties(URI item) {
return getDelegate().properties(item);
public IExtendedIterator<URI> properties(URI item, URI context) {
return getDelegate().properties(item, context);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@ public interface Kvin extends Closeable {
/**
* The default context that is used for <code>null</code> values.
*/
URI DEFAULT_CONTEXT = URIs.createURI("kvin:nil");
String DEFAULT_CONTEXT_VALUE = "kvin:nil";

/**
* The default context that is used for <code>null</code> values.
*/
URI DEFAULT_CONTEXT = URIs.createURI(DEFAULT_CONTEXT_VALUE);

/**
* Add a listener to be notified of changes.
Expand Down Expand Up @@ -102,34 +107,38 @@ IExtendedIterator<KvinTuple> fetch(URI item, URI property, URI context, long end
* Deletes the given item and all of its associated values from the store.
*
* @param item The item URI.
* @param context The context URI.
* @return <code>true</code> if item exists in the store else
* <code>false</code>.
*/
boolean delete(URI item);
boolean delete(URI item, URI context);

/**
* Returns all known sub-items of a given item.
*
* @param item The item URI.
* @param context The context URI.
* @return A list with descendants of the given item.
*/
IExtendedIterator<URI> descendants(URI item);
IExtendedIterator<URI> descendants(URI item, URI context);

/**
* Returns all known sub-items of a given item.
*
* @param item The item URI.
* @param context The context URI.
* @return A list with descendants of the given item.
*/
IExtendedIterator<URI> descendants(URI item, long limit);
IExtendedIterator<URI> descendants(URI item, URI context, long limit);

/**
* Returns all known properties of a given item.
*
* @param item The item URI.
* @param context The context URI.
* @return A list with properties of the given item.
*/
IExtendedIterator<URI> properties(URI item);
IExtendedIterator<URI> properties(URI item, URI context);

/**
* Closes the store and frees resources.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ private IExtendedIterator<KvinTuple> fetchInternal(URI item, URI property, URI c
uriBuilder.setParameter("item", item.toString());
if (item != null) uriBuilder.setParameter("item", item.toString());
if (property != null) uriBuilder.setParameter("property", property.toString());
if (context != null) uriBuilder.setParameter("model", context.toString());
if (limit != null) uriBuilder.setParameter("limit", Long.toString(limit));
if (end != null) uriBuilder.setParameter("to", Long.toString(end));
if (begin != null) uriBuilder.setParameter("from", Long.toString(begin));
Expand Down Expand Up @@ -221,25 +222,26 @@ public long delete(URI item, URI property, URI context, long end, long begin) {
}

@Override
public boolean delete(URI item) {
public boolean delete(URI item, URI context) {
return false;
}

@Override
public IExtendedIterator<URI> descendants(URI item) {
return descendantsInternal(item, null);
public IExtendedIterator<URI> descendants(URI item, URI context) {
return descendantsInternal(item, context, null);
}

@Override
public IExtendedIterator<URI> descendants(URI item, long limit) {
return descendantsInternal(item, limit);
public IExtendedIterator<URI> descendants(URI item, URI context, long limit) {
return descendantsInternal(item, context, limit);
}

private IExtendedIterator<URI> descendantsInternal(URI item, Long limit) {
private IExtendedIterator<URI> descendantsInternal(URI item, URI context, Long limit) {
try {
// building url
URIBuilder uriBuilder = new URIBuilder(this.hostEndpoint + "/**");
uriBuilder.setParameter("item", item.toString());
if (context != null) uriBuilder.setParameter("model", context.toString());
if (limit != null) uriBuilder.setParameter("limit", Long.toString(limit));
java.net.URI getRequestUri = uriBuilder.build();

Expand Down Expand Up @@ -304,11 +306,12 @@ public void close() {
}

@Override
public IExtendedIterator<URI> properties(URI item) {
public IExtendedIterator<URI> properties(URI item, URI context) {
try {
// building url
URIBuilder uriBuilder = new URIBuilder(this.hostEndpoint + "/properties");
uriBuilder.setParameter("item", item.toString());
if (context != null) uriBuilder.setParameter("model", context.toString());
java.net.URI getRequestUri = uriBuilder.build();

// sending get request to the endpoint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,8 +515,8 @@ private byte[] generateId(KvinTuple tuple,

ByteBuffer idBuffer = ByteBuffer.allocate(Long.BYTES * 3);
idBuffer.putLong(itemId);
idBuffer.putLong(propertyId);
idBuffer.putLong(contextId);
idBuffer.putLong(propertyId);
return idBuffer.array();
}

Expand Down Expand Up @@ -584,20 +584,20 @@ private IdMappings getIdMappings(URI item, URI property, URI context) throws IOE
}

private FilterPredicate generateFetchFilter(IdMappings idMappings) {
if (idMappings.propertyId != 0L && idMappings.contextId != 0L) {
if (idMappings.itemId != 0L && idMappings.propertyId != 0L && idMappings.contextId != 0L) {
ByteBuffer keyBuffer = ByteBuffer.allocate(Long.BYTES * 3);
keyBuffer.putLong(idMappings.itemId);
keyBuffer.putLong(idMappings.propertyId);
keyBuffer.putLong(idMappings.contextId);
keyBuffer.putLong(idMappings.propertyId);
return eq(FilterApi.binaryColumn("id"), Binary.fromConstantByteArray(keyBuffer.array()));
} else if (idMappings.propertyId != 0L) {
} else if (idMappings.contextId != 0L) {
ByteBuffer keyBuffer = ByteBuffer.allocate(Long.BYTES * 2);
keyBuffer.putLong(idMappings.itemId);
keyBuffer.putLong(idMappings.propertyId);
keyBuffer.putLong(idMappings.contextId);
return and(gt(FilterApi.binaryColumn("id"), Binary.fromConstantByteArray(keyBuffer.array())),
lt(FilterApi.binaryColumn("id"),
Binary.fromConstantByteArray(ByteBuffer.allocate(Long.BYTES * 2)
.putLong(idMappings.itemId).putLong(idMappings.propertyId + 1).array())));
.putLong(idMappings.itemId).putLong(idMappings.contextId + 1).array())));
} else {
ByteBuffer keyBuffer = ByteBuffer.allocate(Long.BYTES);
keyBuffer.putLong(idMappings.itemId);
Expand Down Expand Up @@ -638,6 +638,8 @@ protected KvinTuple createElement(URI item, URI property, URI context, long time
public URI getProperty(ByteBuffer idBuffer) throws IOException {
// skip item id
idBuffer.getLong();
// skip context id
idBuffer.getLong();
return getProperty(idBuffer.getLong());
}

Expand Down Expand Up @@ -683,11 +685,11 @@ private ParquetReader<GenericRecord> createGenericReader(InputFile file, FilterC
private synchronized IExtendedIterator<KvinTuple> fetchInternal(URI item, URI property, URI context, Long end, Long begin, Long limit) throws IOException {
Lock readLock = readLock();
try {
URI contextFinal = context != null ? context : Kvin.DEFAULT_CONTEXT;
// filters
IdMappings idMappings = getIdMappings(item, property, context);
if (idMappings.itemId == 0L
|| property != null && idMappings.propertyId == 0L
|| context != null && idMappings.contextId == 0L) {
IdMappings idMappings = getIdMappings(item, property, contextFinal);
if (idMappings.itemId == 0L || idMappings.contextId == 0L
|| property != null && idMappings.propertyId == 0L) {
// ensure read lock is freed
readLock.release();
return NiceIterator.emptyIterator();
Expand Down Expand Up @@ -822,7 +824,7 @@ public void close() {

KvinTuple convert(GenericRecord record) throws IOException {
URI p = property != null ? property : getProperty((ByteBuffer) record.get(0));
return recordToTuple(item, p, context, record);
return recordToTuple(item, p, contextFinal, record);
}

void nextReaders() throws IOException {
Expand Down Expand Up @@ -855,7 +857,7 @@ public long delete(URI item, URI property, URI context, long end, long begin) {
}

@Override
public boolean delete(URI item) {
public boolean delete(URI item, URI context) {
return false;
}

Expand Down Expand Up @@ -924,28 +926,31 @@ private List<java.nio.file.Path> getDataFolders(IdMappings idMappings) throws IO
}

@Override
public IExtendedIterator<URI> descendants(URI item) {
public IExtendedIterator<URI> descendants(URI item, URI context) {
return null;
}

@Override
public IExtendedIterator<URI> descendants(URI item, long limit) {
public IExtendedIterator<URI> descendants(URI item, URI context, long limit) {
return null;
}

private List<URI> getProperties(long itemId, long contextId) {
try {
ByteBuffer lowKey = ByteBuffer.allocate(Long.BYTES);
ByteBuffer lowKey = ByteBuffer.allocate(Long.BYTES * 2);
lowKey.putLong(itemId);
ByteBuffer highKey = ByteBuffer.allocate(Long.BYTES * 2);
lowKey.putLong(contextId);
ByteBuffer highKey = ByteBuffer.allocate(Long.BYTES * 3);
highKey.putLong(itemId);
highKey.putLong(contextId);
highKey.putLong(Long.MAX_VALUE);
FilterPredicate filter = and(eq(FilterApi.booleanColumn("first"), true), and(
gt(FilterApi.binaryColumn("id"), Binary.fromConstantByteArray(lowKey.array())),
lt(FilterApi.binaryColumn("id"), Binary.fromConstantByteArray(highKey.array()))));

IdMappings idMappings = new IdMappings();
idMappings.itemId = itemId;
idMappings.contextId = contextId;
List<java.nio.file.Path> dataFolders = getDataFolders(idMappings);
Set<Long> propertyIds = new LinkedHashSet<>();

Expand All @@ -955,6 +960,9 @@ private List<URI> getProperties(long itemId, long contextId) {
GenericRecord record;
while ((record = reader.read()) != null) {
ByteBuffer idBb = (ByteBuffer) record.get(0);
// skip item id
idBb.getLong();
// skip context id
idBb.getLong();
long currentPropertyId = idBb.getLong();
propertyIds.add(currentPropertyId);
Expand All @@ -976,15 +984,18 @@ private List<URI> getProperties(long itemId, long contextId) {
}

@Override
public synchronized IExtendedIterator<URI> properties(URI item) {
public synchronized IExtendedIterator<URI> properties(URI item, URI context) {
if (context == null) {
context = Kvin.DEFAULT_CONTEXT;
}
Lock readLock = null;
try {
readLock = readLock();
IdMappings idMappings = getIdMappings(item, null, null);
if (idMappings.itemId == 0L) {
IdMappings idMappings = getIdMappings(item, null, context);
if (idMappings.itemId == 0L || idMappings.contextId == 0L) {
return NiceIterator.emptyIterator();
}
List<URI> properties = getProperties(idMappings.itemId, 0L);
List<URI> properties = getProperties(idMappings.itemId, idMappings.contextId);
return WrappedIterator.create(properties.iterator());
} catch (IOException e) {
throw new UncheckedIOException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,19 +322,19 @@ public long delete(URI item, URI property, URI context, long end, long begin) {
}

@Override
public boolean delete(URI item) {
public boolean delete(URI item, URI context) {
Lock readLock = readLock();
try {
return hotStore.delete(item);
return hotStore.delete(item, context);
} finally {
readLock.release();
}
}

@Override
public IExtendedIterator<URI> descendants(URI item) {
public IExtendedIterator<URI> descendants(URI item, URI context) {
Lock readLock = readLock();
IExtendedIterator<URI> results = hotStore.descendants(item);
IExtendedIterator<URI> results = hotStore.descendants(item, context);
return new NiceIterator<>() {
boolean closed;

Expand Down Expand Up @@ -371,9 +371,9 @@ public void close() {
}

@Override
public IExtendedIterator<URI> descendants(URI item, long limit) {
public IExtendedIterator<URI> descendants(URI item, URI context, long limit) {
Lock readLock = readLock();
IExtendedIterator<URI> results = hotStore.descendants(item, limit);
IExtendedIterator<URI> results = hotStore.descendants(item, context, limit);
return new NiceIterator<>() {
boolean closed;

Expand Down Expand Up @@ -410,15 +410,15 @@ public void close() {
}

@Override
public IExtendedIterator<URI> properties(URI item) {
public IExtendedIterator<URI> properties(URI item, URI context) {
Set<URI> properties = new HashSet<>();
Lock readLock = readLock();
try {
properties.addAll(hotStore.properties(item).toList());
properties.addAll(hotStore.properties(item, context).toList());
if (hotStoreArchive != null) {
properties.addAll(hotStoreArchive.properties(item).toList());
properties.addAll(hotStoreArchive.properties(item, context).toList());
}
properties.addAll(archiveStore.properties(item).toList());
properties.addAll(archiveStore.properties(item, context).toList());
} finally {
readLock.release();
}
Expand Down
Loading

0 comments on commit 4b768a2

Please sign in to comment.