Skip to content

Commit

Permalink
Add copy command to CLI.
Browse files Browse the repository at this point in the history
  • Loading branch information
kenwenzel committed Dec 4, 2023
1 parent 4cbe751 commit 2513826
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 17 deletions.
3 changes: 2 additions & 1 deletion bundles/io.github.linkedfactory.core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@
<outputDirectory>${project.build.outputDirectory}</outputDirectory>
<excludeTransitive>false</excludeTransitive>
<includeGroupIds>
io.github.pcmind,org.iq80.snappy,org.apache.parquet,org.apache.avro,org.apache.hadoop
io.github.pcmind,org.iq80.snappy,org.apache.parquet,org.apache.avro,org.apache.hadoop,
com.fasterxml.woodstox,com.codehaus.woodstox
</includeGroupIds>
<excludes>META-INF/versions/**</excludes>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,38 @@
import io.github.linkedfactory.core.kvin.parquet.KvinParquet;
import io.github.linkedfactory.core.kvin.util.JsonFormatWriter;
import net.enilink.commons.iterator.IExtendedIterator;
import net.enilink.commons.iterator.NiceIterator;
import net.enilink.commons.iterator.WrappedIterator;
import net.enilink.komma.core.URI;
import net.enilink.komma.core.URIs;
import picocli.CommandLine;
import picocli.CommandLine.*;
import picocli.CommandLine.Model.CommandSpec;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.NoSuchElementException;

@Command(name = "CLI",
subcommands = {CLI.Copy.class, CLI.Fetch.class, CommandLine.HelpCommand.class},
description = "Interact with KVIN stores")
public class CLI {
@Option(names = {"-s", "--store"}, paramLabel = "<location>", required = true, description = "location URI of a KVIN store")
@Option(names = {"-s", "--store"}, paramLabel = "<location>", required = true, description = "location of the KVIN store")
protected String storeLocation;
@Spec
CommandSpec spec;

public static void main(String[] args) {
int exitCode = new CommandLine(new CLI()).execute(args);
System.exit(exitCode);
}

private static Kvin createStore(String storeLocation) {
private static Kvin createStore(String storeLocation) throws IOException {
return createStore(storeLocation, null);
}

private static Kvin createStore(String storeLocation, String type) throws IOException {
URI locationUri = URIs.createURI(storeLocation);
Path path;
if (locationUri.scheme() != null && locationUri.scheme().startsWith("http")) {
Expand All @@ -41,9 +48,15 @@ private static Kvin createStore(String storeLocation) {
} else {
path = Paths.get(storeLocation);
}
if (Files.isDirectory(path.resolve("ids"))) {
if ("leveldb".equals(type) || Files.isDirectory(path.resolve("ids"))) {
if (!Files.isDirectory(path)) {
Files.createDirectories(path);
}
return new KvinLevelDb(path.toFile());
} else if (Files.isDirectory(path.resolve("metadata"))) {
} else if ("parquet".equals(type) || Files.isDirectory(path.resolve("metadata"))) {
if (!Files.isDirectory(path)) {
Files.createDirectories(path);
}
return new KvinParquet(path.toString());
} else {
throw new IllegalArgumentException("Invalid store location: " + storeLocation);
Expand Down Expand Up @@ -80,8 +93,94 @@ int properties(@Parameters(paramLabel = "<item>", description = "the target item

@Command(name = "copy", description = "Copies values from one store to another")
static class Copy extends FetchBase implements Runnable {
@Parameters(paramLabel = "<sink>", description = "location of the sink KVIN store")
protected String sinkLocation;
@Option(names = {"--sink-type"}, description = "type of the sink KVIN store (if newly created): leveldb, parquet")
String sinkType;

@Override
public void run() {
URI propertyUri = property == null ? null : URIs.createURI(property);
URI contextUri = context == null ? Kvin.DEFAULT_CONTEXT : URIs.createURI(context);

try (Kvin store = createStore(cli.storeLocation)) {
Kvin sink;
try {
sink = createStore(sinkLocation, sinkType);
} catch (IllegalArgumentException e) {
System.err.println("Please specify --sink-type");
throw e;
}

try (sink) {
IExtendedIterator<URI> items;
if ("*".equals(item)) {
items = store.descendants(URIs.createURI(""));
} else {
items = WrappedIterator.create(Arrays.asList(URIs.createURI(item)).iterator());
}

IExtendedIterator<KvinTuple> allTuples = new NiceIterator<>() {
KvinTuple next;
IExtendedIterator<KvinTuple> tuples;

@Override
public boolean hasNext() {
if (next != null) {
return true;
}
while ((tuples == null || !tuples.hasNext()) && items.hasNext()) {
if (tuples != null) {
tuples.close();
}
tuples = store.fetch(items.next(), propertyUri, contextUri,
to != null ? to : KvinTuple.TIME_MAX_VALUE,
from != null ? from : 0, limit != null ? limit : 0, 0, null);
}
if (tuples != null) {
if (tuples.hasNext()) {
next = tuples.next();
} else {
tuples.close();
}
}
if (next == null) {
items.close();
return false;
}
return true;
}

@Override
public KvinTuple next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
KvinTuple result = next;
next = null;
return result;
}

@Override
public void close() {
items.close();
if (tuples != null) {
tuples.close();
}
super.close();
}
};

try {
sink.put(allTuples);
} finally {
allTuples.close();
}
}
} catch (Exception e) {
System.err.println(e.getMessage());
throw (e instanceof RuntimeException ? (RuntimeException) e : new RuntimeException(e));
}
}
}

Expand All @@ -94,23 +193,38 @@ static class Fetch extends FetchBase implements Runnable {

@Override
public void run() {
URI itemUri = URIs.createURI(item);
URI propertyUri = property == null ? null : URIs.createURI(property);
URI contextUri = context == null ? Kvin.DEFAULT_CONTEXT : URIs.createURI(context);

try (Kvin store = createStore(cli.storeLocation)) {
IExtendedIterator<KvinTuple> tuples;
if (op != null) {
tuples = store.fetch(itemUri, propertyUri, contextUri, to != null ? to : KvinTuple.TIME_MAX_VALUE,
from != null ? from : 0, limit != null ? limit : 0, interval != 0 ? interval : 0, op);
IExtendedIterator<URI> items;
if ("*".equals(item)) {
items = store.descendants(URIs.createURI(""));
} else {
tuples = store.fetch(itemUri, propertyUri, contextUri, limit != null ? limit : 0);
items = WrappedIterator.create(Arrays.asList(URIs.createURI(item)).iterator());
}
JsonFormatWriter writer = new JsonFormatWriter(System.out, this.prettyPrint);
while (tuples.hasNext()) {
writer.writeTuple(tuples.next());
try {
for (URI itemUri : items) {
IExtendedIterator<KvinTuple> tuples;
if (op != null) {
tuples = store.fetch(itemUri, propertyUri, contextUri, to != null ? to : KvinTuple.TIME_MAX_VALUE,
from != null ? from : 0, limit != null ? limit : 0, interval != 0 ? interval : 0, op);
} else {
tuples = store.fetch(itemUri, propertyUri, contextUri, limit != null ? limit : 0);
}
try {
JsonFormatWriter writer = new JsonFormatWriter(System.out, this.prettyPrint);
while (tuples.hasNext()) {
writer.writeTuple(tuples.next());
}
writer.close();
} finally {
tuples.close();
}
}
} finally {
items.close();
}
writer.close();
} catch (Exception e) {
System.err.println(e.getMessage());
throw (e instanceof RuntimeException ? (RuntimeException) e : new RuntimeException(e));
Expand Down

0 comments on commit 2513826

Please sign in to comment.