From 25138264c493f030cfff756d78f35b7751e5c8b1 Mon Sep 17 00:00:00 2001 From: Ken Wenzel Date: Mon, 4 Dec 2023 16:05:53 +0100 Subject: [PATCH] Add copy command to CLI. --- bundles/io.github.linkedfactory.core/pom.xml | 3 +- .../io/github/linkedfactory/core/cli/CLI.java | 146 ++++++++++++++++-- 2 files changed, 132 insertions(+), 17 deletions(-) diff --git a/bundles/io.github.linkedfactory.core/pom.xml b/bundles/io.github.linkedfactory.core/pom.xml index a9b5b80d..a06a4b4c 100644 --- a/bundles/io.github.linkedfactory.core/pom.xml +++ b/bundles/io.github.linkedfactory.core/pom.xml @@ -245,7 +245,8 @@ ${project.build.outputDirectory} false - io.github.pcmind,org.iq80.snappy,org.apache.parquet,org.apache.avro,org.apache.hadoop + io.github.pcmind,org.iq80.snappy,org.apache.parquet,org.apache.avro,org.apache.hadoop, + com.fasterxml.woodstox,com.codehaus.woodstox META-INF/versions/** diff --git a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/cli/CLI.java b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/cli/CLI.java index 0176233c..6dcb9dc9 100644 --- a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/cli/CLI.java +++ b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/cli/CLI.java @@ -7,31 +7,38 @@ import io.github.linkedfactory.core.kvin.parquet.KvinParquet; import io.github.linkedfactory.core.kvin.util.JsonFormatWriter; import net.enilink.commons.iterator.IExtendedIterator; +import net.enilink.commons.iterator.NiceIterator; +import net.enilink.commons.iterator.WrappedIterator; import net.enilink.komma.core.URI; import net.enilink.komma.core.URIs; import picocli.CommandLine; import picocli.CommandLine.*; import picocli.CommandLine.Model.CommandSpec; +import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.Arrays; +import java.util.NoSuchElementException; @Command(name = "CLI", subcommands = {CLI.Copy.class, CLI.Fetch.class, CommandLine.HelpCommand.class}, description = "Interact with KVIN stores") public class CLI { - @Option(names = {"-s", "--store"}, paramLabel = "", required = true, description = "location URI of a KVIN store") + @Option(names = {"-s", "--store"}, paramLabel = "", required = true, description = "location of the KVIN store") protected String storeLocation; - @Spec - CommandSpec spec; public static void main(String[] args) { int exitCode = new CommandLine(new CLI()).execute(args); System.exit(exitCode); } - private static Kvin createStore(String storeLocation) { + private static Kvin createStore(String storeLocation) throws IOException { + return createStore(storeLocation, null); + } + + private static Kvin createStore(String storeLocation, String type) throws IOException { URI locationUri = URIs.createURI(storeLocation); Path path; if (locationUri.scheme() != null && locationUri.scheme().startsWith("http")) { @@ -41,9 +48,15 @@ private static Kvin createStore(String storeLocation) { } else { path = Paths.get(storeLocation); } - if (Files.isDirectory(path.resolve("ids"))) { + if ("leveldb".equals(type) || Files.isDirectory(path.resolve("ids"))) { + if (!Files.isDirectory(path)) { + Files.createDirectories(path); + } return new KvinLevelDb(path.toFile()); - } else if (Files.isDirectory(path.resolve("metadata"))) { + } else if ("parquet".equals(type) || Files.isDirectory(path.resolve("metadata"))) { + if (!Files.isDirectory(path)) { + Files.createDirectories(path); + } return new KvinParquet(path.toString()); } else { throw new IllegalArgumentException("Invalid store location: " + storeLocation); @@ -80,8 +93,94 @@ int properties(@Parameters(paramLabel = "", description = "the target item @Command(name = "copy", description = "Copies values from one store to another") static class Copy extends FetchBase implements Runnable { + @Parameters(paramLabel = "", description = "location of the sink KVIN store") + protected String sinkLocation; + @Option(names = {"--sink-type"}, description = "type of the sink KVIN store (if newly created): leveldb, parquet") + String sinkType; + @Override public void run() { + URI propertyUri = property == null ? null : URIs.createURI(property); + URI contextUri = context == null ? Kvin.DEFAULT_CONTEXT : URIs.createURI(context); + + try (Kvin store = createStore(cli.storeLocation)) { + Kvin sink; + try { + sink = createStore(sinkLocation, sinkType); + } catch (IllegalArgumentException e) { + System.err.println("Please specify --sink-type"); + throw e; + } + + try (sink) { + IExtendedIterator items; + if ("*".equals(item)) { + items = store.descendants(URIs.createURI("")); + } else { + items = WrappedIterator.create(Arrays.asList(URIs.createURI(item)).iterator()); + } + + IExtendedIterator allTuples = new NiceIterator<>() { + KvinTuple next; + IExtendedIterator tuples; + + @Override + public boolean hasNext() { + if (next != null) { + return true; + } + while ((tuples == null || !tuples.hasNext()) && items.hasNext()) { + if (tuples != null) { + tuples.close(); + } + tuples = store.fetch(items.next(), propertyUri, contextUri, + to != null ? to : KvinTuple.TIME_MAX_VALUE, + from != null ? from : 0, limit != null ? limit : 0, 0, null); + } + if (tuples != null) { + if (tuples.hasNext()) { + next = tuples.next(); + } else { + tuples.close(); + } + } + if (next == null) { + items.close(); + return false; + } + return true; + } + + @Override + public KvinTuple next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + KvinTuple result = next; + next = null; + return result; + } + + @Override + public void close() { + items.close(); + if (tuples != null) { + tuples.close(); + } + super.close(); + } + }; + + try { + sink.put(allTuples); + } finally { + allTuples.close(); + } + } + } catch (Exception e) { + System.err.println(e.getMessage()); + throw (e instanceof RuntimeException ? (RuntimeException) e : new RuntimeException(e)); + } } } @@ -94,23 +193,38 @@ static class Fetch extends FetchBase implements Runnable { @Override public void run() { - URI itemUri = URIs.createURI(item); URI propertyUri = property == null ? null : URIs.createURI(property); URI contextUri = context == null ? Kvin.DEFAULT_CONTEXT : URIs.createURI(context); try (Kvin store = createStore(cli.storeLocation)) { - IExtendedIterator tuples; - if (op != null) { - tuples = store.fetch(itemUri, propertyUri, contextUri, to != null ? to : KvinTuple.TIME_MAX_VALUE, - from != null ? from : 0, limit != null ? limit : 0, interval != 0 ? interval : 0, op); + IExtendedIterator items; + if ("*".equals(item)) { + items = store.descendants(URIs.createURI("")); } else { - tuples = store.fetch(itemUri, propertyUri, contextUri, limit != null ? limit : 0); + items = WrappedIterator.create(Arrays.asList(URIs.createURI(item)).iterator()); } - JsonFormatWriter writer = new JsonFormatWriter(System.out, this.prettyPrint); - while (tuples.hasNext()) { - writer.writeTuple(tuples.next()); + try { + for (URI itemUri : items) { + IExtendedIterator tuples; + if (op != null) { + tuples = store.fetch(itemUri, propertyUri, contextUri, to != null ? to : KvinTuple.TIME_MAX_VALUE, + from != null ? from : 0, limit != null ? limit : 0, interval != 0 ? interval : 0, op); + } else { + tuples = store.fetch(itemUri, propertyUri, contextUri, limit != null ? limit : 0); + } + try { + JsonFormatWriter writer = new JsonFormatWriter(System.out, this.prettyPrint); + while (tuples.hasNext()) { + writer.writeTuple(tuples.next()); + } + writer.close(); + } finally { + tuples.close(); + } + } + } finally { + items.close(); } - writer.close(); } catch (Exception e) { System.err.println(e.getMessage()); throw (e instanceof RuntimeException ? (RuntimeException) e : new RuntimeException(e));