diff --git a/janusgraph-berkeleyje/src/test/java/org/janusgraph/blueprints/BerkeleyGraphComputerProvider.java b/janusgraph-berkeleyje/src/test/java/org/janusgraph/blueprints/BerkeleyGraphComputerProvider.java index a972b9788d..57e7575478 100644 --- a/janusgraph-berkeleyje/src/test/java/org/janusgraph/blueprints/BerkeleyGraphComputerProvider.java +++ b/janusgraph-berkeleyje/src/test/java/org/janusgraph/blueprints/BerkeleyGraphComputerProvider.java @@ -33,7 +33,8 @@ public class BerkeleyGraphComputerProvider extends AbstractJanusGraphComputerPro @Override public ModifiableConfiguration getJanusGraphConfiguration(String graphName, Class test, String testMethodName) { - ModifiableConfiguration config = BerkeleyStorageSetup.getBerkeleyJEConfiguration(StorageSetup.getHomeDir(graphName)); + ModifiableConfiguration config = super.getJanusGraphConfiguration(graphName, test, testMethodName); + config.setAll(BerkeleyStorageSetup.getBerkeleyJEConfiguration(StorageSetup.getHomeDir(graphName)).getAll()); config.set(GraphDatabaseConfiguration.IDAUTHORITY_WAIT, Duration.ofMillis(20)); config.set(GraphDatabaseConfiguration.STORAGE_TRANSACTIONAL,false); return config; diff --git a/janusgraph-cassandra/src/test/java/org/janusgraph/blueprints/thrift/ThriftGraphComputerProvider.java b/janusgraph-cassandra/src/test/java/org/janusgraph/blueprints/thrift/ThriftGraphComputerProvider.java index 8d2fe52087..ddc8848cf7 100644 --- a/janusgraph-cassandra/src/test/java/org/janusgraph/blueprints/thrift/ThriftGraphComputerProvider.java +++ b/janusgraph-cassandra/src/test/java/org/janusgraph/blueprints/thrift/ThriftGraphComputerProvider.java @@ -16,7 +16,6 @@ import org.janusgraph.CassandraStorageSetup; import org.janusgraph.blueprints.AbstractJanusGraphComputerProvider; -import org.janusgraph.blueprints.AbstractJanusGraphProvider; import org.janusgraph.diskstorage.configuration.ModifiableConfiguration; import org.janusgraph.graphdb.olap.computer.FulgoraGraphComputer; import org.apache.tinkerpop.gremlin.GraphProvider; @@ -30,7 +29,9 @@ public class ThriftGraphComputerProvider extends AbstractJanusGraphComputerProvi @Override public ModifiableConfiguration getJanusGraphConfiguration(String graphName, Class test, String testMethodName) { CassandraStorageSetup.startCleanEmbedded(); - return CassandraStorageSetup.getCassandraThriftConfiguration(graphName); + ModifiableConfiguration config = super.getJanusGraphConfiguration(graphName, test, testMethodName); + config.setAll(CassandraStorageSetup.getCassandraThriftConfiguration(graphName).getAll()); + return config; } } diff --git a/janusgraph-core/src/main/java/org/janusgraph/core/JanusGraph.java b/janusgraph-core/src/main/java/org/janusgraph/core/JanusGraph.java index a532b2067d..9d738b9348 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/core/JanusGraph.java +++ b/janusgraph-core/src/main/java/org/janusgraph/core/JanusGraph.java @@ -52,6 +52,16 @@ test = "org.apache.tinkerpop.gremlin.process.computer.GraphComputerTest", method = "shouldProcessResultGraphNewWithPersistVertexProperties", reason = "The result graph should return an empty iterator when vertex.edges() or vertex.vertices() is called.") +@Graph.OptOut( + test = "org.apache.tinkerpop.gremlin.structure.io.IoTest$GraphMLTest", + method = "shouldReadGraphMLWithNoEdgeLabels", + reason = "JanusGraph does not support default edge label (edge) used when GraphML is missing edge labels.") +@Graph.OptOut( + test = "org.apache.tinkerpop.gremlin.process.computer.GraphComputerTest", + method = "shouldSupportGraphFilter", + reason = "JanusGraph test graph computer (FulgoraGraphComputer) " + + "currently does not support graph filters but does not throw proper exception because doing so breaks numerous " + + "tests in gremlin-test ProcessComputerSuite.") public interface JanusGraph extends Transaction { /* --------------------------------------------------------------- diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/BackendTransaction.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/BackendTransaction.java index d78a25e153..82a526161e 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/BackendTransaction.java +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/BackendTransaction.java @@ -26,6 +26,7 @@ import org.janusgraph.diskstorage.keycolumnvalue.cache.KCVSCache; import org.janusgraph.diskstorage.log.kcvs.ExternalCachePersistor; import org.apache.commons.lang.StringUtils; +import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -439,8 +440,14 @@ public String toString() { private final V executeRead(Callable exe) throws JanusGraphException { - return BackendOperation.execute(exe, maxReadTime); + try { + return BackendOperation.execute(exe, maxReadTime); + } catch (JanusGraphException e) { + // support traversal interruption + // TODO: Refactor to allow direct propagation of underlying interrupt exception + if (Thread.interrupted()) throw new TraversalInterruptedException(); + throw e; + } } - } diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/scan/ScanJob.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/scan/ScanJob.java index 643d20d68c..2687c2942d 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/scan/ScanJob.java +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/scan/ScanJob.java @@ -33,7 +33,7 @@ public interface ScanJob extends Cloneable { /** * Invoked before a block of computation (i.e. multiple process() calls) is handed to this particular ScanJob. * Can be used to initialize the iteration. This method is called exactly once for each before a block of computation. - * This method is semantically aligned with {@link com.tinkerpop.gremlin.process.computer.VertexProgram#workerIterationStart()} + * This method is semantically aligned with {@link org.tinkerpop.gremlin.process.computer.VertexProgram#workerIterationStart()} * * This method may not be called if there is no data to be processed. Correspondingly, the end method won't be called either. * @@ -49,7 +49,7 @@ public default void workerIterationStart(Configuration jobConfiguration, /** * Invoked after a block of computation (i.e. multiple process() calls) is handed to this particular ScanJob. * Can be used to close any resources held by this job. This method is called exactly once for each after a block of computation. - * This method is semantically aligned with {@link com.tinkerpop.gremlin.process.computer.VertexProgram#workerIterationEnd()} + * This method is semantically aligned with {@link org.tinkerpop.gremlin.process.computer.VertexProgram#workerIterationEnd()} * * This method may not be called if there is no data to be processed. Correspondingly, the start method won't be called either. * diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/BackendOperation.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/BackendOperation.java index eb4d3a140a..849fd6e9a1 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/BackendOperation.java +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/BackendOperation.java @@ -90,6 +90,8 @@ public static final V executeDirect(Callable exe, Duration totalWaitTime) try { Thread.sleep(waitTime.toMillis()); } catch (InterruptedException r) { + // added thread interrupt signal to support traversal interruption + Thread.currentThread().interrupt(); throw new PermanentBackendException("Interrupted while waiting to retry failed backend operation", r); } } else { diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/database/serialize/StandardSerializer.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/database/serialize/StandardSerializer.java index cb52486dc0..f195ddb01f 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/graphdb/database/serialize/StandardSerializer.java +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/database/serialize/StandardSerializer.java @@ -41,6 +41,7 @@ import org.janusgraph.graphdb.types.TypeDefinitionCategory; import org.janusgraph.graphdb.types.TypeDefinitionDescription; +import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; import org.apache.tinkerpop.gremlin.structure.Direction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -133,6 +134,8 @@ public StandardSerializer() { registerClassInternal(64,Duration.class, new DurationSerializer()); registerClassInternal(65,Instant.class, new InstantSerializer()); registerClassInternal(66,StandardTransactionId.class, new StandardTransactionIdSerializer()); + registerClassInternal(67,TraverserSet.class, new SerializableSerializer()); + registerClassInternal(68,HashMap.class, new SerializableSerializer()); } diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/database/serialize/attribute/SerializableSerializer.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/database/serialize/attribute/SerializableSerializer.java new file mode 100644 index 0000000000..b4527d457f --- /dev/null +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/database/serialize/attribute/SerializableSerializer.java @@ -0,0 +1,52 @@ +// Copyright 2017 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.graphdb.database.serialize.attribute; + +import org.janusgraph.core.attribute.AttributeSerializer; +import org.janusgraph.diskstorage.ScanBuffer; +import org.janusgraph.diskstorage.WriteBuffer; +import org.janusgraph.graphdb.database.serialize.DataOutput; +import org.janusgraph.graphdb.database.serialize.Serializer; +import org.janusgraph.graphdb.database.serialize.SerializerInjected; +import org.apache.commons.lang3.SerializationUtils; + +import java.io.Serializable; + +/** + * Serializes {@link Serializable} objects. + * @param Serializable type + */ +public class SerializableSerializer implements AttributeSerializer, SerializerInjected { + + private Serializer serializer; + + @Override + public T read(ScanBuffer buffer) { + byte[] data = serializer.readObjectNotNull(buffer,byte[].class); + return (T) SerializationUtils.deserialize(data); + } + + @Override + public void write(WriteBuffer buffer, T attribute) { + DataOutput out = (DataOutput) buffer; + out.writeObjectNotNull(SerializationUtils.serialize(attribute)); + } + + @Override + public void setSerializer(Serializer serializer) { + this.serializer = serializer; + } + +} diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/VertexJobConverter.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/VertexJobConverter.java index 7b12637c64..d352a54e23 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/VertexJobConverter.java +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/VertexJobConverter.java @@ -72,9 +72,10 @@ protected VertexJobConverter(JanusGraph graph, VertexScanJob job) { } protected VertexJobConverter(VertexJobConverter copy) { - this.graph = new GraphProvider(); - if (copy.graph.isProvided()) this.graph.setGraph(copy.graph.get()); + this.graph = copy.graph; this.job = copy.job.clone(); + this.tx = copy.tx; + this.idManager = copy.idManager; } public static ScanJob convert(JanusGraph graph, VertexScanJob vertexJob) { @@ -96,10 +97,8 @@ public static StandardJanusGraphTx startTransaction(StandardJanusGraph graph) { @Override public void workerIterationStart(Configuration jobConfig, Configuration graphConfig, ScanMetrics metrics) { - graph.initializeGraph(graphConfig); - idManager = graph.get().getIDManager(); try { - tx = startTransaction(graph.get()); + open(graphConfig); job.workerIterationStart(graph.get(), jobConfig, metrics); } catch (Throwable e) { close(); @@ -107,7 +106,13 @@ public void workerIterationStart(Configuration jobConfig, Configuration graphCon } } - private void close() { + protected void open(Configuration graphConfig) { + graph.initializeGraph(graphConfig); + idManager = graph.get().getIDManager(); + tx = startTransaction(graph.get()); + } + + protected void close() { if (null != tx && tx.isOpen()) tx.rollback(); graph.close(); diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/computer/FulgoraGraphComputer.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/computer/FulgoraGraphComputer.java index d3c3c70a09..66c582a0fb 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/computer/FulgoraGraphComputer.java +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/computer/FulgoraGraphComputer.java @@ -16,7 +16,6 @@ import com.google.common.base.Function; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; import org.janusgraph.core.JanusGraphException; import org.janusgraph.core.JanusGraphComputer; @@ -30,14 +29,17 @@ import org.janusgraph.graphdb.util.WorkerPool; import org.apache.tinkerpop.gremlin.process.computer.ComputerResult; import org.apache.tinkerpop.gremlin.process.computer.GraphComputer; +import org.apache.tinkerpop.gremlin.process.computer.GraphFilter; import org.apache.tinkerpop.gremlin.process.computer.MapReduce; import org.apache.tinkerpop.gremlin.process.computer.VertexProgram; -import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram; +import org.apache.tinkerpop.gremlin.process.computer.VertexComputeKey; import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult; import org.apache.tinkerpop.gremlin.process.computer.util.GraphComputerHelper; -import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects; +import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramHelper; +import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.apache.tinkerpop.gremlin.structure.Edge; import org.apache.tinkerpop.gremlin.structure.VertexProperty; import org.apache.tinkerpop.gremlin.structure.util.StringFactory; import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph; @@ -64,9 +66,6 @@ public class FulgoraGraphComputer implements JanusGraphComputer { private static final Logger log = LoggerFactory.getLogger(FulgoraGraphComputer.class); - public static final Set NON_PERSISTING_KEYS = ImmutableSet.of(TraversalSideEffects.SIDE_EFFECTS, - TraversalVertexProgram.HALTED_TRAVERSERS); - private VertexProgram vertexProgram; private final Set mapReduces = new HashSet<>(); @@ -87,6 +86,8 @@ public class FulgoraGraphComputer implements JanusGraphComputer { private String name; private String jobId; + private final GraphFilter graphFilter = new GraphFilter(); + public FulgoraGraphComputer(final StandardJanusGraph graph, final Configuration configuration) { this.graph = graph; this.writeBatchSize = configuration.get(GraphDatabaseConfiguration.BUFFER_SIZE); @@ -94,6 +95,18 @@ public FulgoraGraphComputer(final StandardJanusGraph graph, final Configuration this.name = "compute" + computerCounter.incrementAndGet(); } + @Override + public GraphComputer vertices(final Traversal vertexFilter) { + this.graphFilter.setVertexFilter(vertexFilter); + return this; + } + + @Override + public GraphComputer edges(final Traversal edgeFilter) { + this.graphFilter.setEdgeFilter(edgeFilter); + return this; + } + @Override public GraphComputer result(ResultGraph resultGraph) { Preconditions.checkArgument(resultGraph != null, "Need to specify mode"); @@ -116,14 +129,14 @@ public JanusGraphComputer workers(int threads) { } @Override - public JanusGraphComputer program(final VertexProgram vertexProgram) { + public GraphComputer program(final VertexProgram vertexProgram) { Preconditions.checkState(this.vertexProgram == null, "A vertex program has already been set"); this.vertexProgram = vertexProgram; return this; } @Override - public JanusGraphComputer mapReduce(final MapReduce mapReduce) { + public GraphComputer mapReduce(final MapReduce mapReduce) { this.mapReduces.add(mapReduce); return this; } @@ -150,6 +163,9 @@ public Future submit() { // determine the legality persistence and result graph options if (!this.features().supportsResultGraphPersistCombination(this.resultGraphMode, this.persistMode)) throw GraphComputer.Exceptions.resultGraphPersistCombinationNotSupported(this.resultGraphMode, this.persistMode); + // ensure requested workers are not larger than supported workers + if (this.numThreads > this.features().getMaxWorkers()) + throw GraphComputer.Exceptions.computerRequiresMoreWorkersThanSupported(this.numThreads, this.features().getMaxWorkers()); memory = new FulgoraMemory(vertexProgram, mapReduces); @@ -160,45 +176,45 @@ public Future submit() { vertexMemory = new FulgoraVertexMemory(expectedNumVertices, graph.getIDManager(), vertexProgram); // execute the vertex program vertexProgram.setup(memory); - memory.completeSubRound(); - - for (int iteration = 1; ; iteration++) { - vertexMemory.nextIteration(vertexProgram.getMessageScopes(memory)); - - jobId = name + "#" + iteration; - VertexProgramScanJob.Executor job = VertexProgramScanJob.getVertexProgramScanJob(graph, memory, vertexMemory, vertexProgram); - StandardScanner.Builder scanBuilder = graph.getBackend().buildEdgeScanJob(); - scanBuilder.setJobId(jobId); - scanBuilder.setNumProcessingThreads(numThreads); - scanBuilder.setWorkBlockSize(readBatchSize); - scanBuilder.setJob(job); - PartitionedVertexProgramExecutor pvpe = new PartitionedVertexProgramExecutor(graph, memory, vertexMemory, vertexProgram); - try { - //Iterates over all vertices and computes the vertex program on all non-partitioned vertices. For partitioned ones, the data is aggregated - ScanMetrics jobResult = scanBuilder.execute().get(); - long failures = jobResult.get(ScanMetrics.Metric.FAILURE); - if (failures > 0) { - throw new JanusGraphException("Failed to process [" + failures + "] vertices in vertex program iteration [" + iteration + "]. Computer is aborting."); - } - //Runs the vertex program on all aggregated, partitioned vertices. - pvpe.run(numThreads, jobResult); - failures = jobResult.getCustom(PartitionedVertexProgramExecutor.PARTITION_VERTEX_POSTFAIL); - if (failures > 0) { - throw new JanusGraphException("Failed to process [" + failures + "] partitioned vertices in vertex program iteration [" + iteration + "]. Computer is aborting."); - } - } catch (Exception e) { - throw new JanusGraphException(e); - } - vertexMemory.completeIteration(); - memory.completeSubRound(); - try { - if (this.vertexProgram.terminate(this.memory)) { - break; + try (VertexProgramScanJob.Executor job = VertexProgramScanJob.getVertexProgramScanJob(graph, memory, vertexMemory, vertexProgram)) { + for (int iteration = 1; ; iteration++) { + memory.completeSubRound(); + vertexMemory.nextIteration(vertexProgram.getMessageScopes(memory)); + + jobId = name + "#" + iteration; + StandardScanner.Builder scanBuilder = graph.getBackend().buildEdgeScanJob(); + scanBuilder.setJobId(jobId); + scanBuilder.setNumProcessingThreads(numThreads); + scanBuilder.setWorkBlockSize(readBatchSize); + scanBuilder.setJob(job); + PartitionedVertexProgramExecutor pvpe = new PartitionedVertexProgramExecutor(graph, memory, vertexMemory, vertexProgram); + try { + //Iterates over all vertices and computes the vertex program on all non-partitioned vertices. For partitioned ones, the data is aggregated + ScanMetrics jobResult = scanBuilder.execute().get(); + long failures = jobResult.get(ScanMetrics.Metric.FAILURE); + if (failures > 0) { + throw new JanusGraphException("Failed to process [" + failures + "] vertices in vertex program iteration [" + iteration + "]. Computer is aborting."); + } + //Runs the vertex program on all aggregated, partitioned vertices. + pvpe.run(numThreads, jobResult); + failures = jobResult.getCustom(PartitionedVertexProgramExecutor.PARTITION_VERTEX_POSTFAIL); + if (failures > 0) { + throw new JanusGraphException("Failed to process [" + failures + "] partitioned vertices in vertex program iteration [" + iteration + "]. Computer is aborting."); + } + } catch (Exception e) { + throw new JanusGraphException(e); } - } finally { - memory.incrIteration(); + + vertexMemory.completeIteration(); memory.completeSubRound(); + try { + if (this.vertexProgram.terminate(this.memory)) { + break; + } + } finally { + memory.incrIteration(); + } } } } @@ -214,63 +230,66 @@ public Future submit() { } // Execute map jobs jobId = name + "#map"; - VertexMapJob.Executor job = VertexMapJob.getVertexMapJob(graph, vertexMemory, mapJobs); - StandardScanner.Builder scanBuilder = graph.getBackend().buildEdgeScanJob(); - scanBuilder.setJobId(jobId); - scanBuilder.setNumProcessingThreads(numThreads); - scanBuilder.setWorkBlockSize(readBatchSize); - scanBuilder.setJob(job); - try { - ScanMetrics jobResult = scanBuilder.execute().get(); - long failures = jobResult.get(ScanMetrics.Metric.FAILURE); - if (failures > 0) { - throw new JanusGraphException("Failed to process [" + failures + "] vertices in map phase. Computer is aborting."); - } - failures = jobResult.getCustom(VertexMapJob.MAP_JOB_FAILURE); - if (failures > 0) { - throw new JanusGraphException("Failed to process [" + failures + "] individual map jobs. Computer is aborting."); + try (VertexMapJob.Executor job = VertexMapJob.getVertexMapJob(graph, vertexMemory, mapJobs)) { + StandardScanner.Builder scanBuilder = graph.getBackend().buildEdgeScanJob(); + scanBuilder.setJobId(jobId); + scanBuilder.setNumProcessingThreads(numThreads); + scanBuilder.setWorkBlockSize(readBatchSize); + scanBuilder.setJob(job); + try { + ScanMetrics jobResult = scanBuilder.execute().get(); + long failures = jobResult.get(ScanMetrics.Metric.FAILURE); + if (failures > 0) { + throw new JanusGraphException("Failed to process [" + failures + "] vertices in map phase. Computer is aborting."); + } + failures = jobResult.getCustom(VertexMapJob.MAP_JOB_FAILURE); + if (failures > 0) { + throw new JanusGraphException("Failed to process [" + failures + "] individual map jobs. Computer is aborting."); + } + } catch (Exception e) { + throw new JanusGraphException(e); } - } catch (Exception e) { - throw new JanusGraphException(e); - } - // Execute reduce phase and add to memory - for (Map.Entry mapJob : mapJobs.entrySet()) { - FulgoraMapEmitter mapEmitter = mapJob.getValue(); - MapReduce mapReduce = mapJob.getKey(); - mapEmitter.complete(mapReduce); // sort results if a map output sort is defined - if (mapReduce.doStage(MapReduce.Stage.REDUCE)) { - final FulgoraReduceEmitter reduceEmitter = new FulgoraReduceEmitter<>(); - try (WorkerPool workers = new WorkerPool(numThreads)) { - workers.submit(() -> mapReduce.workerStart(MapReduce.Stage.REDUCE)); - for (final Map.Entry queueEntry : mapEmitter.reduceMap.entrySet()) { - workers.submit(() -> mapReduce.reduce(queueEntry.getKey(), ((Iterable) queueEntry.getValue()).iterator(), reduceEmitter)); + // Execute reduce phase and add to memory + for (Map.Entry mapJob : mapJobs.entrySet()) { + FulgoraMapEmitter mapEmitter = mapJob.getValue(); + MapReduce mapReduce = mapJob.getKey(); + mapEmitter.complete(mapReduce); // sort results if a map output sort is defined + if (mapReduce.doStage(MapReduce.Stage.REDUCE)) { + final FulgoraReduceEmitter reduceEmitter = new FulgoraReduceEmitter<>(); + try (WorkerPool workers = new WorkerPool(numThreads)) { + workers.submit(() -> mapReduce.workerStart(MapReduce.Stage.REDUCE)); + for (final Map.Entry queueEntry : mapEmitter.reduceMap.entrySet()) { + if (null == queueEntry) break; + workers.submit(() -> mapReduce.reduce(queueEntry.getKey(), ((Iterable) queueEntry.getValue()).iterator(), reduceEmitter)); + } + workers.submit(() -> mapReduce.workerEnd(MapReduce.Stage.REDUCE)); + } catch (Exception e) { + throw new JanusGraphException("Exception while executing reduce phase", e); } - workers.submit(() -> mapReduce.workerEnd(MapReduce.Stage.REDUCE)); - } catch (Exception e) { - throw new JanusGraphException("Exception while executing reduce phase", e); - } // mapEmitter.reduceMap.entrySet().parallelStream().forEach(entry -> mapReduce.reduce(entry.getKey(), entry.getValue().iterator(), reduceEmitter)); - reduceEmitter.complete(mapReduce); // sort results if a reduce output sort is defined - mapReduce.addResultToMemory(this.memory, reduceEmitter.reduceQueue.iterator()); - } else { - mapReduce.addResultToMemory(this.memory, mapEmitter.mapQueue.iterator()); + reduceEmitter.complete(mapReduce); // sort results if a reduce output sort is defined + mapReduce.addResultToMemory(this.memory, reduceEmitter.reduceQueue.iterator()); + } else { + mapReduce.addResultToMemory(this.memory, mapEmitter.mapQueue.iterator()); + } } } + memory.attachReferenceElements(graph); // #### Write mutated properties back into graph Graph resultgraph = graph; if (persistMode == Persist.NOTHING && resultGraphMode == ResultGraph.NEW) { resultgraph = EmptyGraph.instance(); - } else if (persistMode != Persist.NOTHING && vertexProgram != null && !vertexProgram.getElementComputeKeys().isEmpty()) { + } else if (persistMode != Persist.NOTHING && vertexProgram != null && !vertexProgram.getVertexComputeKeys().isEmpty()) { //First, create property keys in graph if they don't already exist JanusGraphManagement mgmt = graph.openManagement(); try { - for (String key : vertexProgram.getElementComputeKeys()) { - if (!mgmt.containsPropertyKey(key)) - log.warn("Property key [{}] is not part of the schema and will be created. It is advised to initialize all keys.", key); - mgmt.getOrCreatePropertyKey(key); + for (VertexComputeKey key : vertexProgram.getVertexComputeKeys()) { + if (!mgmt.containsPropertyKey(key.getKey())) + log.warn("Property key [{}] is not part of the schema and will be created. It is advised to initialize all keys.", key.getKey()); + mgmt.getOrCreatePropertyKey(key.getKey()); } mgmt.commit(); } finally { @@ -283,14 +302,14 @@ public Future submit() { @Nullable @Override public Map apply(@Nullable Map o) { - return Maps.filterKeys(o, s -> !NON_PERSISTING_KEYS.contains(s)); + return Maps.filterKeys(o, s -> !VertexProgramHelper.isTransientVertexComputeKey(s, vertexProgram.getVertexComputeKeys())); } }); if (resultGraphMode == ResultGraph.ORIGINAL) { AtomicInteger failures = new AtomicInteger(0); try (WorkerPool workers = new WorkerPool(numThreads)) { - List>> subset = new ArrayList<>(writeBatchSize / vertexProgram.getElementComputeKeys().size()); + List>> subset = new ArrayList<>(writeBatchSize / vertexProgram.getVertexComputeKeys().size()); int currentSize = 0; for (Map.Entry> entry : mutatedProperties.entrySet()) { subset.add(entry); @@ -365,11 +384,6 @@ public String toString() { @Override public Features features() { return new Features() { - @Override - public boolean supportsResultGraphPersistCombination(final ResultGraph resultGraph, final Persist persist) { - return persist == Persist.NOTHING || persist == Persist.VERTEX_PROPERTIES; - } - @Override public boolean supportsVertexAddition() { return false; @@ -410,6 +424,11 @@ public boolean supportsEdgePropertyRemoval() { return false; } + @Override + public boolean supportsGraphFilter() { + return false; + } + }; } } diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/computer/FulgoraMemory.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/computer/FulgoraMemory.java index f1cef2b328..68b2be3fdf 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/computer/FulgoraMemory.java +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/computer/FulgoraMemory.java @@ -17,16 +17,28 @@ import org.apache.tinkerpop.gremlin.process.computer.GraphComputer; import org.apache.tinkerpop.gremlin.process.computer.MapReduce; import org.apache.tinkerpop.gremlin.process.computer.Memory; +import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey; import org.apache.tinkerpop.gremlin.process.computer.VertexProgram; import org.apache.tinkerpop.gremlin.process.computer.util.MemoryHelper; +import org.apache.tinkerpop.gremlin.process.traversal.Operator; +import org.apache.tinkerpop.gremlin.process.traversal.Traverser; +import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; +import org.apache.tinkerpop.gremlin.structure.Edge; +import org.apache.tinkerpop.gremlin.structure.Graph; +import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.apache.tinkerpop.gremlin.structure.util.Attachable; import org.apache.tinkerpop.gremlin.structure.util.StringFactory; +import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceEdge; +import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceVertex; -import java.util.HashSet; +import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; /** * @author Marko A. Rodriguez (http://markorodriguez.com) @@ -34,29 +46,29 @@ */ public class FulgoraMemory implements Memory.Admin { - public final Set memoryKeys = new HashSet<>(); + public final Map memoryKeys = new HashMap<>(); public Map previousMap; public Map currentMap; private final AtomicInteger iteration = new AtomicInteger(0); private final AtomicLong runtime = new AtomicLong(0l); + private volatile boolean inExecute = false; public FulgoraMemory(final VertexProgram vertexProgram, final Set mapReducers) { this.currentMap = new ConcurrentHashMap<>(); this.previousMap = new ConcurrentHashMap<>(); if (null != vertexProgram) { - for (final String key : vertexProgram.getMemoryComputeKeys()) { - MemoryHelper.validateKey(key); - this.memoryKeys.add(key); + for (final MemoryComputeKey key : vertexProgram.getMemoryComputeKeys()) { + this.memoryKeys.put(key.getKey(), key); } } for (final MapReduce mapReduce : mapReducers) { - this.memoryKeys.add(mapReduce.getMemoryKey()); + this.memoryKeys.put(mapReduce.getMemoryKey(), MemoryComputeKey.of(mapReduce.getMemoryKey(), Operator.assign, false, false)); } } @Override public Set keys() { - return this.previousMap.keySet(); + return this.previousMap.keySet().stream().filter(key -> !this.inExecute || this.memoryKeys.get(key).isBroadcast()).collect(Collectors.toSet()); } @Override @@ -87,11 +99,12 @@ public long getRuntime() { protected void complete() { this.iteration.decrementAndGet(); this.previousMap = this.currentMap; + this.memoryKeys.values().stream().filter(MemoryComputeKey::isTransient).forEach(computeKey -> this.previousMap.remove(computeKey.getKey())); } protected void completeSubRound() { this.previousMap = new ConcurrentHashMap<>(this.currentMap); - + this.inExecute = !this.inExecute; } @Override @@ -104,31 +117,27 @@ public R get(final String key) throws IllegalArgumentException { final R r = (R) this.previousMap.get(key); if (null == r) throw Memory.Exceptions.memoryDoesNotExist(key); + else if (this.inExecute && !this.memoryKeys.get(key).isBroadcast()) + throw Memory.Exceptions.memoryDoesNotExist(key); else return r; } @Override - public void incr(final String key, final long delta) { - checkKeyValue(key, delta); - this.currentMap.compute(key, (k, v) -> null == v ? delta : delta + (Long) v); - } - - @Override - public void and(final String key, final boolean bool) { - checkKeyValue(key, bool); - this.currentMap.compute(key, (k, v) -> null == v ? bool : bool && (Boolean) v); - } - - @Override - public void or(final String key, final boolean bool) { - checkKeyValue(key, bool); - this.currentMap.compute(key, (k, v) -> null == v ? bool : bool || (Boolean) v); + public void add(final String key, final Object value) { + checkKeyValue(key, value); + if (!this.inExecute && ("incr".equals(key) || "and".equals(key) || "or".equals(key))) + throw Memory.Exceptions.memoryIsCurrentlyImmutable(); + else if (!this.inExecute) + throw Memory.Exceptions.memoryAddOnlyDuringVertexProgramExecute(key); + this.currentMap.compute(key, (k, v) -> null == v ? value : this.memoryKeys.get(key).getReducer().apply(v, value)); } @Override public void set(final String key, final Object value) { checkKeyValue(key, value); + if (this.inExecute) + throw Memory.Exceptions.memorySetOnlyDuringVertexProgramSetUpAndTerminate(key); this.currentMap.put(key, value); } @@ -138,8 +147,27 @@ public String toString() { } private void checkKeyValue(final String key, final Object value) { - if (!this.memoryKeys.contains(key)) + if (!this.memoryKeys.containsKey(key)) throw GraphComputer.Exceptions.providedKeyIsNotAMemoryComputeKey(key); MemoryHelper.validateValue(value); } + + protected void attachReferenceElements(Graph graph) { + currentMap.values().stream().filter(v -> v instanceof TraverserSet) + .forEach(v-> attachReferenceElements((TraverserSet) v, graph)); + } + + private static void attachReferenceElements(TraverserSet toProcessTraversers, Graph graph) { + toProcessTraversers.stream().forEach(traverser -> { + Object value = traverser.get(); + if (value instanceof ReferenceVertex) { + Vertex vertex = ((ReferenceVertex) value).attach(Attachable.Method.get(graph)); + traverser.set(vertex); + } else if (value instanceof ReferenceEdge) { + Edge edge = ((ReferenceEdge) value).attach(Attachable.Method.get(graph)); + traverser.set(edge); + } + }); + } + } diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/computer/FulgoraVertexMemory.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/computer/FulgoraVertexMemory.java index 253e95a76d..32290a7723 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/computer/FulgoraVertexMemory.java +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/computer/FulgoraVertexMemory.java @@ -26,11 +26,13 @@ import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner; import org.apache.tinkerpop.gremlin.process.computer.MessageScope; import org.apache.tinkerpop.gremlin.process.computer.Messenger; +import org.apache.tinkerpop.gremlin.process.computer.VertexComputeKey; import org.apache.tinkerpop.gremlin.process.computer.VertexProgram; import org.apache.tinkerpop.gremlin.structure.VertexProperty; import org.cliffc.high_scale_lib.NonBlockingHashMapLong; import java.util.*; +import java.util.stream.Collectors; /** * @author Matthias Broecheler (me@matthiasb.com) @@ -43,10 +45,12 @@ public class FulgoraVertexMemory { private NonBlockingHashMapLong> vertexStates; private final IDManager idManager; - final Map elementKeyMap; + private final Set computeKeys; + private final Map elementKeyMap; private final MessageCombiner combiner; private Map previousScopes; private Map currentScopes; + private boolean inExecute; private NonBlockingHashMapLong> partitionVertices; @@ -56,7 +60,9 @@ public FulgoraVertexMemory(int numVertices, final IDManager idManager, final Ver partitionVertices = new NonBlockingHashMapLong<>(64); this.idManager = idManager; this.combiner = FulgoraUtil.getMessageCombiner(vertexProgram); - this.elementKeyMap = getIdMap(vertexProgram.getElementComputeKeys()); + this.computeKeys = vertexProgram.getVertexComputeKeys(); + this.elementKeyMap = getIdMap(vertexProgram.getVertexComputeKeys().stream().map( k -> + k.getKey() ).collect(Collectors.toCollection(HashSet::new))); this.previousScopes = ImmutableMap.of(); } @@ -102,11 +108,13 @@ void completeIteration() { for (VertexState state : vertexStates.values()) state.completeIteration(); partitionVertices.clear(); previousScopes = currentScopes; + inExecute = false; } void nextIteration(Set scopes) { currentScopes = getIdMap(normalizeScopes(scopes)); partitionVertices.clear(); + inExecute = true; } public Map> getMutableVertexProperties() { @@ -120,6 +128,10 @@ public Map> getMutableVertexProperties() { }); } + public Set getMemoryKeys() { + return computeKeys.stream().filter(key -> inExecute || !key.isTransient()).map(key -> key.getKey()).collect(Collectors.toSet()); + } + private static MessageScope normalizeScope(MessageScope scope) { if (scope instanceof MessageScope.Global) return GLOBAL_SCOPE; else return scope; diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/computer/VertexMapJob.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/computer/VertexMapJob.java index 6fb961a714..bb0a7d561c 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/computer/VertexMapJob.java +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/computer/VertexMapJob.java @@ -33,6 +33,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Closeable; import java.util.List; import java.util.Map; @@ -142,14 +143,16 @@ public static Executor getVertexMapJob(StandardJanusGraph graph, FulgoraVertexMe return new Executor(graph, job); } - public static class Executor extends VertexJobConverter { + public static class Executor extends VertexJobConverter implements Closeable { private Executor(JanusGraph graph, VertexMapJob job) { super(graph, job); + open(this.graph.get().getConfiguration().getConfiguration()); } private Executor(final Executor copy) { super(copy); + open(this.graph.get().getConfiguration().getConfiguration()); } @Override @@ -159,9 +162,14 @@ public List getQueries() { return queries; } + @Override + public void workerIterationStart(Configuration jobConfig, Configuration graphConfig, ScanMetrics metrics) { + job.workerIterationStart(graph.get(), jobConfig, metrics); + } + @Override public void workerIterationEnd(ScanMetrics metrics) { - super.workerIterationEnd(metrics); + job.workerIterationEnd(metrics); } @Override @@ -169,6 +177,11 @@ public Executor clone() { return new Executor(this); } + @Override + public void close() { + super.close(); + } + } } diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/computer/VertexMemoryHandler.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/computer/VertexMemoryHandler.java index e43ce0db38..1be6f53473 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/computer/VertexMemoryHandler.java +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/computer/VertexMemoryHandler.java @@ -23,6 +23,7 @@ import org.apache.tinkerpop.gremlin.process.computer.GraphComputer; import org.apache.tinkerpop.gremlin.process.computer.MessageScope; import org.apache.tinkerpop.gremlin.process.computer.Messenger; +import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram; import org.apache.tinkerpop.gremlin.structure.Edge; import org.apache.tinkerpop.gremlin.structure.Vertex; import org.apache.tinkerpop.gremlin.structure.VertexProperty; @@ -40,12 +41,14 @@ class VertexMemoryHandler implements PreloadedVertex.PropertyMixing, Messenge protected final FulgoraVertexMemory vertexMemory; private final PreloadedVertex vertex; protected final long vertexId; + private boolean inExecute; VertexMemoryHandler(FulgoraVertexMemory vertexMemory, PreloadedVertex vertex) { assert vertex!=null && vertexMemory!=null; this.vertexMemory = vertexMemory; this.vertex = vertex; this.vertexId = vertexMemory.getCanonicalId(vertex.longId()); + this.inExecute = false; } void removeKey(String key) { @@ -59,13 +62,12 @@ JanusGraphVertexProperty constructProperty(String key, V value) { @Override public Iterator> properties(String... keys) { - if (vertexMemory.elementKeyMap.isEmpty()) return Collections.emptyIterator(); + final Set memoryKeys = vertexMemory.getMemoryKeys(); + if (memoryKeys.isEmpty()) return Collections.emptyIterator(); if (keys==null || keys.length==0) { - return Collections.emptyIterator(); //Do NOT return compute keys as part of all the properties... - //keys = vertexMemory.elementKeyMap.keySet().toArray(new String[vertexMemory.elementKeyMap.size()]); + keys = memoryKeys.stream().filter(k -> !k.equals(TraversalVertexProgram.HALTED_TRAVERSERS)).toArray(String[]::new); } - //..but only if specifically asked for by key - List> result = new ArrayList<>(Math.min(keys.length,vertexMemory.elementKeyMap.size())); + final List> result = new ArrayList<>(Math.min(keys.length,memoryKeys.size())); for (String key : keys) { if (!supports(key)) continue; V value = vertexMemory.getProperty(vertexId,key); @@ -76,7 +78,7 @@ public Iterator> properties(String... keys) { @Override public boolean supports(String key) { - return vertexMemory.elementKeyMap.containsKey(key); + return vertexMemory.getMemoryKeys().contains(key); } @Override @@ -88,6 +90,14 @@ public JanusGraphVertexProperty property(VertexProperty.Cardinality cardi return constructProperty(key,value); } + public boolean isInExecute() { + return inExecute; + } + + public void setInExecute(boolean inExecute) { + this.inExecute = inExecute; + } + public Stream receiveMessages(MessageScope messageScope) { if (messageScope instanceof MessageScope.Global) { M message = vertexMemory.getMessage(vertexId,messageScope); diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/computer/VertexProgramScanJob.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/computer/VertexProgramScanJob.java index 28f7768f3a..20bfeea350 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/computer/VertexProgramScanJob.java +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/computer/VertexProgramScanJob.java @@ -38,6 +38,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Closeable; import java.util.Iterator; import java.util.List; @@ -86,6 +87,7 @@ public void process(JanusGraphVertex vertex, ScanMetrics metrics) { PreloadedVertex v = (PreloadedVertex)vertex; long vertexId = v.longId(); VertexMemoryHandler vh = new VertexMemoryHandler(vertexMemory,v); + vh.setInExecute(true); v.setAccessCheck(PreloadedVertex.OPENSTAR_CHECK); if (idManager.isPartitionedVertex(vertexId)) { if (idManager.isCanonicalVertexId(vertexId)) { @@ -108,6 +110,7 @@ public void process(JanusGraphVertex vertex, ScanMetrics metrics) { v.setPropertyMixing(vh); vertexProgram.execute(v, vh, memory); } + vh.setInExecute(false); } @Override @@ -145,14 +148,16 @@ public static Executor getVertexProgramScanJob(StandardJanusGraph graph, Fulg IDHandler.getBounds(RelationCategory.PROPERTY, true)[0], IDHandler.getBounds(RelationCategory.PROPERTY,false)[1]); - public static class Executor extends VertexJobConverter { + public static class Executor extends VertexJobConverter implements Closeable { private Executor(JanusGraph graph, VertexProgramScanJob job) { super(graph, job); + open(this.graph.get().getConfiguration().getConfiguration()); } private Executor(final Executor copy) { super(copy); + open(this.graph.get().getConfiguration().getConfiguration()); } @Override @@ -162,14 +167,23 @@ public List getQueries() { return queries; } + @Override + public void workerIterationStart(Configuration jobConfig, Configuration graphConfig, ScanMetrics metrics) { + job.workerIterationStart(graph.get(), jobConfig, metrics); + } + @Override public void workerIterationEnd(ScanMetrics metrics) { - super.workerIterationEnd(metrics); + job.workerIterationEnd(metrics); } @Override public Executor clone() { return new Executor(this); } + @Override + public void close() { + super.close(); + } } diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/tinkerpop/JanusGraphBlueprintsGraph.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/tinkerpop/JanusGraphBlueprintsGraph.java index 975e4a2d01..c7b176b434 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/graphdb/tinkerpop/JanusGraphBlueprintsGraph.java +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/tinkerpop/JanusGraphBlueprintsGraph.java @@ -119,7 +119,7 @@ public Configuration configuration() { @Override public I io(final Io.Builder builder) { - return (I) builder.graph(this).registry(JanusGraphIoRegistry.getInstance()).create(); + return (I) builder.graph(this).onMapper(mapper -> mapper.addRegistry(JanusGraphIoRegistry.getInstance())).create(); } // ########## TRANSACTIONAL FORWARDING ########################### diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/tinkerpop/JanusGraphBlueprintsTransaction.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/tinkerpop/JanusGraphBlueprintsTransaction.java index 29765971c2..ed589cae02 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/graphdb/tinkerpop/JanusGraphBlueprintsTransaction.java +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/tinkerpop/JanusGraphBlueprintsTransaction.java @@ -78,12 +78,16 @@ public I io(final Io.Builder builder) { @Override public C compute(Class graphComputerClass) throws IllegalArgumentException { - return getGraph().compute(graphComputerClass); + JanusGraphBlueprintsGraph graph = getGraph(); + if (isOpen()) commit(); + return graph.compute(graphComputerClass); } @Override public FulgoraGraphComputer compute() throws IllegalArgumentException { - return getGraph().compute(); + JanusGraphBlueprintsGraph graph = getGraph(); + if (isOpen()) commit(); + return graph.compute(); } /** diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/tinkerpop/JanusGraphFeatures.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/tinkerpop/JanusGraphFeatures.java index e825842fb8..9300257021 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/graphdb/tinkerpop/JanusGraphFeatures.java +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/tinkerpop/JanusGraphFeatures.java @@ -78,7 +78,7 @@ private static class JanusGraphDataTypeFeatures implements DataTypeFeatures { @Override public boolean supportsMapValues() { - return false; + return true; } @Override diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/tinkerpop/io/graphson/JanusGraphSONModule.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/tinkerpop/io/graphson/JanusGraphSONModule.java index e61799ef51..4065cc5b0b 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/graphdb/tinkerpop/io/graphson/JanusGraphSONModule.java +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/tinkerpop/io/graphson/JanusGraphSONModule.java @@ -17,6 +17,7 @@ import org.janusgraph.core.attribute.Geoshape; import org.janusgraph.graphdb.relations.RelationIdentifier; import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONTokens; +import org.apache.tinkerpop.gremlin.structure.io.graphson.TinkerPopJacksonModule; import org.apache.tinkerpop.shaded.jackson.core.JsonGenerationException; import org.apache.tinkerpop.shaded.jackson.core.JsonGenerator; import org.apache.tinkerpop.shaded.jackson.core.JsonParser; @@ -25,17 +26,29 @@ import org.apache.tinkerpop.shaded.jackson.databind.SerializerProvider; import org.apache.tinkerpop.shaded.jackson.databind.deser.std.StdDeserializer; import org.apache.tinkerpop.shaded.jackson.databind.jsontype.TypeSerializer; -import org.apache.tinkerpop.shaded.jackson.databind.module.SimpleModule; import org.apache.tinkerpop.shaded.jackson.databind.ser.std.StdSerializer; import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; /** * @author Stephen Mallette (http://stephen.genoprime.com) */ -public class JanusGraphSONModule extends SimpleModule { +public class JanusGraphSONModule extends TinkerPopJacksonModule { + + private static final String TYPE_NAMESPACE = "janusgraph"; + + private static final Map TYPE_DEFINITIONS = Collections.unmodifiableMap( + new LinkedHashMap() {{ + put(RelationIdentifier.class, "RelationIdentifier"); + put(Geoshape.class, "Geoshape"); + }}); private JanusGraphSONModule() { + super("janusgraph"); addSerializer(RelationIdentifier.class, new RelationIdentifierSerializer()); addSerializer(Geoshape.class, new Geoshape.GeoshapeGsonSerializer()); @@ -49,6 +62,16 @@ public static final JanusGraphSONModule getInstance() { return INSTANCE; } + @Override + public Map getTypeDefinitions() { + return TYPE_DEFINITIONS; + } + + @Override + public String getTypeNamespace() { + return TYPE_NAMESPACE; + } + public static class RelationIdentifierSerializer extends StdSerializer { public RelationIdentifierSerializer() { @@ -64,10 +87,12 @@ public void serialize(final RelationIdentifier relationIdentifier, final JsonGen @Override public void serializeWithType(final RelationIdentifier relationIdentifier, final JsonGenerator jsonGenerator, final SerializerProvider serializerProvider, final TypeSerializer typeSerializer) throws IOException, JsonProcessingException { - jsonGenerator.writeStartArray(); - jsonGenerator.writeString(RelationIdentifier.class.getName()); - jsonGenerator.writeString(relationIdentifier.toString()); - jsonGenerator.writeEndArray(); + typeSerializer.writeTypePrefixForScalar(relationIdentifier, jsonGenerator); + jsonGenerator.writeStartObject(); + jsonGenerator.writeStringField(GraphSONTokens.VALUE, relationIdentifier.toString()); + jsonGenerator.writeStringField(GraphSONTokens.CLASS, HashMap.class.getName()); + jsonGenerator.writeEndObject(); + typeSerializer.writeTypeSuffixForScalar(relationIdentifier, jsonGenerator); } } @@ -78,7 +103,9 @@ public RelationIdentifierDeserializer() { @Override public RelationIdentifier deserialize(final JsonParser jsonParser, final DeserializationContext deserializationContext) throws IOException, JsonProcessingException { - return RelationIdentifier.parse(jsonParser.getValueAsString()); + jsonParser.nextToken(); + final Map mapData = deserializationContext.readValue(jsonParser, Map.class); + return RelationIdentifier.parse((String) mapData.get(GraphSONTokens.VALUE)); } } } diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/tinkerpop/optimize/HasStepFolder.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/tinkerpop/optimize/HasStepFolder.java index 6707602d67..044b230c23 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/graphdb/tinkerpop/optimize/HasStepFolder.java +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/tinkerpop/optimize/HasStepFolder.java @@ -31,6 +31,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.util.ElementValueComparator; import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer; +import org.javatuples.Pair; import java.util.ArrayList; import java.util.Comparator; @@ -62,9 +63,9 @@ public static boolean validJanusGraphHas(Iterable has) { public static boolean validJanusGraphOrder(OrderGlobalStep ostep, Traversal rootTraversal, boolean isVertexOrder) { - for (Comparator comp : (List) ostep.getComparators()) { - if (!(comp instanceof ElementValueComparator)) return false; - ElementValueComparator evc = (ElementValueComparator) comp; + for (Pair, Comparator> comp : (List, Comparator>>) ostep.getComparators()) { + if (!(comp.getValue1() instanceof ElementValueComparator)) return false; + ElementValueComparator evc = (ElementValueComparator) comp.getValue1(); if (!(evc.getValueComparator() instanceof Order)) return false; JanusGraphTransaction tx = JanusGraphTraversalUtil.getTx(rootTraversal.asAdmin()); @@ -108,7 +109,7 @@ public static void foldInHasContainer(final HasStepFolder janusgraphStep, final public static void foldInOrder(final HasStepFolder janusgraphStep, final Traversal.Admin traversal, final Traversal rootTraversal, boolean isVertexOrder) { Step currentStep = janusgraphStep.getNextStep(); - OrderGlobalStep lastOrder = null; + OrderGlobalStep lastOrder = null; while (true) { if (currentStep instanceof OrderGlobalStep) { if (lastOrder != null) { //Previous orders are rendered irrelevant by next order (since re-ordered) @@ -129,8 +130,8 @@ public static void foldInOrder(final HasStepFolder janusgraphStep, final Travers if (lastOrder != null && lastOrder instanceof OrderGlobalStep) { if (validJanusGraphOrder(lastOrder, rootTraversal, isVertexOrder)) { //Add orders to HasStepFolder - for (Comparator comp : (List) ((OrderGlobalStep) lastOrder).getComparators()) { - ElementValueComparator evc = (ElementValueComparator) comp; + for (Pair, Comparator> comp : (List, Comparator>>) ((OrderGlobalStep) lastOrder).getComparators()) { + ElementValueComparator evc = (ElementValueComparator) comp.getValue1(); janusgraphStep.orderBy(evc.getPropertyKey(), (Order) evc.getValueComparator()); } lastOrder.getLabels().forEach(janusgraphStep::addLabel); diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/tinkerpop/optimize/JanusGraphLocalQueryOptimizerStrategy.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/tinkerpop/optimize/JanusGraphLocalQueryOptimizerStrategy.java index f13de07cfd..452a048b6f 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/graphdb/tinkerpop/optimize/JanusGraphLocalQueryOptimizerStrategy.java +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/tinkerpop/optimize/JanusGraphLocalQueryOptimizerStrategy.java @@ -51,7 +51,7 @@ public void apply(final Traversal.Admin traversal) { //If this is a compute graph then we can't apply local traversal optimisation at this stage. StandardJanusGraph janusGraph = graph instanceof StandardJanusGraphTx ? ((StandardJanusGraphTx) graph).getGraph() : (StandardJanusGraph) graph; - final boolean useMultiQuery = traversal.getEngine().isStandard() && janusGraph.getConfiguration().useMultiQuery(); + final boolean useMultiQuery = !TraversalHelper.onGraphComputer(traversal) && janusGraph.getConfiguration().useMultiQuery(); /* ====== VERTEX STEP ====== diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/tinkerpop/optimize/JanusGraphPropertiesStep.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/tinkerpop/optimize/JanusGraphPropertiesStep.java index d3ab479b40..29440ffe59 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/graphdb/tinkerpop/optimize/JanusGraphPropertiesStep.java +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/tinkerpop/optimize/JanusGraphPropertiesStep.java @@ -34,6 +34,7 @@ import org.apache.tinkerpop.gremlin.structure.PropertyType; import org.apache.tinkerpop.gremlin.structure.Vertex; import org.apache.tinkerpop.gremlin.structure.util.StringFactory; +import org.apache.tinkerpop.gremlin.structure.util.wrapped.WrappedVertex; import java.util.*; @@ -102,7 +103,7 @@ private void initialize() { } @Override - protected Traverser processNextStart() { + protected Traverser.Admin processNextStart() { if (!initialized) initialize(); return super.processNextStart(); } @@ -112,7 +113,7 @@ protected Iterator flatMap(final Traverser.Admin traverser) { if (useMultiQuery) { //it is guaranteed that all elements are vertices assert multiQueryResults != null; return convertIterator(multiQueryResults.get(traverser.get())); - } else if (traverser.get() instanceof Vertex) { + } else if (traverser.get() instanceof JanusGraphVertex || traverser.get() instanceof WrappedVertex) { JanusGraphVertexQuery query = makeQuery((JanusGraphTraversalUtil.getJanusGraphVertex(traverser)).query()); return convertIterator(query.properties()); } else { diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/tinkerpop/optimize/JanusGraphStepStrategy.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/tinkerpop/optimize/JanusGraphStepStrategy.java index 599504fb18..3a9d005b0f 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/graphdb/tinkerpop/optimize/JanusGraphStepStrategy.java +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/tinkerpop/optimize/JanusGraphStepStrategy.java @@ -38,7 +38,7 @@ private JanusGraphStepStrategy() { @Override public void apply(final Traversal.Admin traversal) { - if (traversal.getEngine().isComputer()) + if (TraversalHelper.onGraphComputer(traversal)) return; TraversalHelper.getStepsOfClass(GraphStep.class, traversal).forEach(originalGraphStep -> { @@ -46,7 +46,6 @@ public void apply(final Traversal.Admin traversal) { //Try to optimize for index calls final JanusGraphStep janusGraphStep = new JanusGraphStep<>(originalGraphStep); TraversalHelper.replaceStep(originalGraphStep, (Step) janusGraphStep, traversal); - HasStepFolder.foldInHasContainer(janusGraphStep, traversal); HasStepFolder.foldInOrder(janusGraphStep, traversal, traversal, janusGraphStep.returnsVertex()); HasStepFolder.foldInRange(janusGraphStep, traversal); diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/tinkerpop/optimize/JanusGraphVertexStep.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/tinkerpop/optimize/JanusGraphVertexStep.java index 1250184a86..a7e537f1c6 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/graphdb/tinkerpop/optimize/JanusGraphVertexStep.java +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/tinkerpop/optimize/JanusGraphVertexStep.java @@ -97,7 +97,7 @@ private void initialize() { } @Override - protected Traverser processNextStart() { + protected Traverser.Admin processNextStart() { if (!initialized) initialize(); return super.processNextStart(); } diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/vertices/PreloadedVertex.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/vertices/PreloadedVertex.java index 54bc5cc18e..8dd200cea5 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/graphdb/vertices/PreloadedVertex.java +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/vertices/PreloadedVertex.java @@ -126,7 +126,7 @@ public Iterator> properties(String... keys) { if (keys != null && keys.length > 0) { int count = 0; for (int i = 0; i < keys.length; i++) if (mixin.supports(keys[i])) count++; - if (count == 0) return super.properties(keys); + if (count == 0 || !mixin.properties(keys).hasNext()) return super.properties(keys); else if (count == keys.length) return mixin.properties(keys); } return (Iterator) com.google.common.collect.Iterators.concat(super.properties(keys), mixin.properties(keys)); diff --git a/janusgraph-dist/src/assembly/static/conf/gremlin-server/gremlin-server.yaml b/janusgraph-dist/src/assembly/static/conf/gremlin-server/gremlin-server.yaml index f33bc85752..89b5ee3911 100644 --- a/janusgraph-dist/src/assembly/static/conf/gremlin-server/gremlin-server.yaml +++ b/janusgraph-dist/src/assembly/static/conf/gremlin-server/gremlin-server.yaml @@ -1,9 +1,6 @@ host: localhost port: 8182 -threadPoolWorker: 1 -gremlinPool: 8 scriptEvaluationTimeout: 30000 -serializedResponseTimeout: 30000 channelizer: org.apache.tinkerpop.gremlin.server.channel.WebSocketChannelizer graphs: { graph: conf/gremlin-server/janusgraph-cassandra-es-server.properties} @@ -13,17 +10,17 @@ scriptEngines: { gremlin-groovy: { imports: [java.lang.Math], staticImports: [java.lang.Math.PI], - scripts: [scripts/empty-sample.groovy]}, - nashorn: { - imports: [java.lang.Math], - staticImports: [java.lang.Math.PI]}} + scripts: [scripts/empty-sample.groovy]}} serializers: - { className: org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV1d0, config: { ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry] }} + - { className: org.apache.tinkerpop.gremlin.driver.ser.GryoLiteMessageSerializerV1d0, config: {ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry] }} - { className: org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV1d0, config: { serializeResultToString: true }} - { className: org.apache.tinkerpop.gremlin.driver.ser.GraphSONMessageSerializerGremlinV1d0, config: { ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry] }} + - { className: org.apache.tinkerpop.gremlin.driver.ser.GraphSONMessageSerializerGremlinV2d0, config: { ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry] }} - { className: org.apache.tinkerpop.gremlin.driver.ser.GraphSONMessageSerializerV1d0, config: { ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry] }} processors: - { className: org.apache.tinkerpop.gremlin.server.op.session.SessionOpProcessor, config: { sessionTimeout: 28800000 }} + - { className: org.apache.tinkerpop.gremlin.server.op.traversal.TraversalOpProcessor, config: { cacheExpirationTime: 600000, cacheMaxSize: 1000 }} metrics: { consoleReporter: {enabled: true, interval: 180000}, csvReporter: {enabled: true, interval: 180000, fileName: /tmp/gremlin-server-metrics.csv}, @@ -31,14 +28,13 @@ metrics: { slf4jReporter: {enabled: true, interval: 180000}, gangliaReporter: {enabled: false, interval: 180000, addressingMode: MULTICAST}, graphiteReporter: {enabled: false, interval: 180000}} -threadPoolBoss: 1 maxInitialLineLength: 4096 maxHeaderSize: 8192 maxChunkSize: 8192 maxContentLength: 65536 maxAccumulationBufferComponents: 1024 resultIterationBatchSize: 64 -writeBufferHighWaterMark: 32768 +writeBufferLowWaterMark: 32768 writeBufferHighWaterMark: 65536 ssl: { enabled: false} diff --git a/janusgraph-hbase-parent/janusgraph-hbase-core/src/main/java/org/janusgraph/diskstorage/hbase/HBaseKeyColumnValueStore.java b/janusgraph-hbase-parent/janusgraph-hbase-core/src/main/java/org/janusgraph/diskstorage/hbase/HBaseKeyColumnValueStore.java index 11d0e48fd8..bec21b55d5 100644 --- a/janusgraph-hbase-parent/janusgraph-hbase-core/src/main/java/org/janusgraph/diskstorage/hbase/HBaseKeyColumnValueStore.java +++ b/janusgraph-hbase-parent/janusgraph-hbase-core/src/main/java/org/janusgraph/diskstorage/hbase/HBaseKeyColumnValueStore.java @@ -38,6 +38,7 @@ import java.io.Closeable; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.*; /** @@ -194,6 +195,10 @@ private Map getHelper(List keys, Filter ge } return resultMap; + } catch (InterruptedIOException e) { + // added to support traversal interruption + Thread.currentThread().interrupt(); + throw new PermanentBackendException(e); } catch (IOException e) { throw new TemporaryBackendException(e); } diff --git a/janusgraph-hbase-parent/janusgraph-hbase-core/src/test/java/org/janusgraph/blueprints/HBaseGraphComputerProvider.java b/janusgraph-hbase-parent/janusgraph-hbase-core/src/test/java/org/janusgraph/blueprints/HBaseGraphComputerProvider.java index cca13ba03c..9ecda1d5d0 100644 --- a/janusgraph-hbase-parent/janusgraph-hbase-core/src/test/java/org/janusgraph/blueprints/HBaseGraphComputerProvider.java +++ b/janusgraph-hbase-parent/janusgraph-hbase-core/src/test/java/org/janusgraph/blueprints/HBaseGraphComputerProvider.java @@ -31,7 +31,9 @@ public class HBaseGraphComputerProvider extends AbstractJanusGraphComputerProvid @Override public ModifiableConfiguration getJanusGraphConfiguration(String graphName, Class test, String testMethodName) { - return HBaseStorageSetup.getHBaseConfiguration(graphName); + ModifiableConfiguration config = super.getJanusGraphConfiguration(graphName, test, testMethodName); + config.setAll(HBaseStorageSetup.getHBaseConfiguration(graphName).getAll()); + return config; } @Override diff --git a/janusgraph-test/src/main/java/org/janusgraph/blueprints/AbstractJanusGraphComputerProvider.java b/janusgraph-test/src/main/java/org/janusgraph/blueprints/AbstractJanusGraphComputerProvider.java index 4f9882a46c..c05be49158 100644 --- a/janusgraph-test/src/main/java/org/janusgraph/blueprints/AbstractJanusGraphComputerProvider.java +++ b/janusgraph-test/src/main/java/org/janusgraph/blueprints/AbstractJanusGraphComputerProvider.java @@ -14,13 +14,14 @@ package org.janusgraph.blueprints; +import org.janusgraph.diskstorage.configuration.ModifiableConfiguration; +import org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration; +import org.janusgraph.graphdb.database.idassigner.placement.SimpleBulkPlacementStrategy; import org.janusgraph.graphdb.olap.computer.FulgoraGraphComputer; import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine; -import org.apache.tinkerpop.gremlin.process.traversal.engine.StandardTraversalEngine; import org.apache.tinkerpop.gremlin.structure.Graph; -import org.apache.tinkerpop.gremlin.tinkergraph.process.computer.TinkerGraphComputer; import java.util.stream.Stream; @@ -41,4 +42,13 @@ public GraphTraversalSource traversal(final Graph graph, final TraversalStrategy return builder.create(graph); } + @Override + public ModifiableConfiguration getJanusGraphConfiguration(String graphName, Class test, String testMethodName) { + return GraphDatabaseConfiguration.buildGraphConfiguration() + .set(GraphDatabaseConfiguration.IDS_BLOCK_SIZE,1) + .set(SimpleBulkPlacementStrategy.CONCURRENT_PARTITIONS,1) + .set(GraphDatabaseConfiguration.CLUSTER_MAX_PARTITIONS, 2) + .set(GraphDatabaseConfiguration.IDAUTHORITY_CAV_BITS,0); + } + } diff --git a/janusgraph-test/src/main/java/org/janusgraph/blueprints/AbstractJanusGraphProvider.java b/janusgraph-test/src/main/java/org/janusgraph/blueprints/AbstractJanusGraphProvider.java index 184e1d6695..5bb45e9884 100644 --- a/janusgraph-test/src/main/java/org/janusgraph/blueprints/AbstractJanusGraphProvider.java +++ b/janusgraph-test/src/main/java/org/janusgraph/blueprints/AbstractJanusGraphProvider.java @@ -58,7 +58,10 @@ import org.apache.tinkerpop.gremlin.structure.TransactionTest; import org.apache.tinkerpop.gremlin.structure.Vertex; import org.apache.tinkerpop.gremlin.structure.util.wrapped.WrappedGraph; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -70,6 +73,8 @@ */ public abstract class AbstractJanusGraphProvider extends AbstractGraphProvider { + private static final Logger logger = LoggerFactory.getLogger(AbstractJanusGraphProvider.class); + private static final Set IMPLEMENTATION = new HashSet() {{ add(StandardJanusGraph.class); add(StandardJanusGraphTx.class); @@ -134,7 +139,11 @@ public void clear(Graph g, final Configuration configuration) throws Exception { JanusGraph graph = (JanusGraph) g; if (graph.isOpen()) { if (g.tx().isOpen()) g.tx().rollback(); - g.close(); + try { + g.close(); + } catch (IOException | IllegalStateException e) { + logger.warn("Titan graph may not have closed cleanly", e); + } } } diff --git a/janusgraph-test/src/main/java/org/janusgraph/graphdb/JanusGraphTest.java b/janusgraph-test/src/main/java/org/janusgraph/graphdb/JanusGraphTest.java index 0c3da12060..3cc7be8d9f 100644 --- a/janusgraph-test/src/main/java/org/janusgraph/graphdb/JanusGraphTest.java +++ b/janusgraph-test/src/main/java/org/janusgraph/graphdb/JanusGraphTest.java @@ -3409,9 +3409,9 @@ public void testTinkerPopOptimizationStrategies() { assertNumStep(10, 1, gts.V(sv[0]).local(__.outE("knows").limit(10)), JanusGraphVertexStep.class); assertNumStep(10, 1, gts.V(sv[0]).local(__.outE("knows").range(10, 20)), LocalStep.class); assertNumStep(numV, 2, gts.V(sv[0]).outE("knows").order().by("weight", decr), JanusGraphVertexStep.class, OrderGlobalStep.class); - assertNumStep(10, 1, gts.V(sv[0]).local(__.outE("knows").order().by("weight", decr).limit(10)), JanusGraphVertexStep.class); + assertNumStep(10, 1, gts.V(sv[0]).local(__.outE("knows").order().by("weight", decr).limit(10)), LocalStep.class); assertNumStep(numV / 5, 2, gts.V(sv[0]).outE("knows").has("weight", 1).order().by("weight", incr), JanusGraphVertexStep.class, OrderGlobalStep.class); - assertNumStep(10, 1, gts.V(sv[0]).local(__.outE("knows").has("weight", 1).order().by("weight", incr).limit(10)), JanusGraphVertexStep.class); + assertNumStep(10, 1, gts.V(sv[0]).local(__.outE("knows").has("weight", 1).order().by("weight", incr).limit(10)), LocalStep.class); assertNumStep(5, 1, gts.V(sv[0]).local(__.outE("knows").has("weight", 1).has("weight", 1).order().by("weight", incr).range(10, 15)), LocalStep.class); assertNumStep(1, 1, gts.V(sv[0]).outE("knows").filter(__.inV().is(vs[50])), JanusGraphVertexStep.class); assertNumStep(1, 1, gts.V(sv[0]).outE("knows").filter(__.otherV().is(vs[50])), JanusGraphVertexStep.class); @@ -3421,7 +3421,7 @@ public void testTinkerPopOptimizationStrategies() { //Property assertNumStep(numV / 5, 1, gts.V(sv[0]).properties("names").has("weight", 1), JanusGraphPropertiesStep.class); assertNumStep(numV, 1, gts.V(sv[0]).properties("names"), JanusGraphPropertiesStep.class); - assertNumStep(10, 1, gts.V(sv[0]).local(__.properties("names").order().by("weight", decr).limit(10)), JanusGraphPropertiesStep.class); + assertNumStep(10, 1, gts.V(sv[0]).local(__.properties("names").order().by("weight", decr).limit(10)), LocalStep.class); assertNumStep(numV, 2, gts.V(sv[0]).outE("knows").values("weight"), JanusGraphVertexStep.class, JanusGraphPropertiesStep.class); @@ -3441,7 +3441,7 @@ public void testTinkerPopOptimizationStrategies() { assertNumStep(superV * (numV / 5 * 2), 2, gts.V().has("id", sid).outE("knows").has("weight", P.gte(1)).has("weight", P.lt(3)), JanusGraphStep.class, JanusGraphVertexStep.class); assertNumStep(superV * (numV / 5 * 2), 2, gts.V().has("id", sid).outE("knows").has("weight", P.between(1, 3)), JanusGraphStep.class, JanusGraphVertexStep.class); assertNumStep(superV * 10, 2, gts.V().has("id", sid).local(__.outE("knows").has("weight", P.gte(1)).has("weight", P.lt(3)).limit(10)), JanusGraphStep.class, JanusGraphVertexStep.class); - assertNumStep(superV * 10, 2, gts.V().has("id", sid).local(__.outE("knows").has("weight", P.between(1, 3)).order().by("weight", decr).limit(10)), JanusGraphStep.class, JanusGraphVertexStep.class); + assertNumStep(superV * 10, 2, gts.V().has("id", sid).local(__.outE("knows").has("weight", P.between(1, 3)).order().by("weight", decr).limit(10)), JanusGraphStep.class, LocalStep.class); clopen(option(USE_MULTIQUERY), true); gts = graph.traversal(); @@ -3449,41 +3449,31 @@ public void testTinkerPopOptimizationStrategies() { assertNumStep(superV * (numV / 5), 2, gts.V().has("id", sid).outE("knows").has("weight", 1), JanusGraphStep.class, JanusGraphVertexStep.class); assertNumStep(superV * (numV / 5 * 2), 2, gts.V().has("id", sid).outE("knows").has("weight", P.between(1, 3)), JanusGraphStep.class, JanusGraphVertexStep.class); assertNumStep(superV * 10, 2, gts.V().has("id", sid).local(__.outE("knows").has("weight", P.gte(1)).has("weight", P.lt(3)).limit(10)), JanusGraphStep.class, JanusGraphVertexStep.class); - assertNumStep(superV * 10, 2, gts.V().has("id", sid).local(__.outE("knows").has("weight", P.between(1, 3)).order().by("weight", decr).limit(10)), JanusGraphStep.class, JanusGraphVertexStep.class); + assertNumStep(superV * 10, 2, gts.V().has("id", sid).local(__.outE("knows").has("weight", P.between(1, 3)).order().by("weight", decr).limit(10)), JanusGraphStep.class, LocalStep.class); assertNumStep(superV * numV, 2, gts.V().has("id", sid).values("names"), JanusGraphStep.class, JanusGraphPropertiesStep.class); //Verify traversal metrics when all reads are from cache (i.e. no backend queries) - t = gts.V().has("id", sid).local(__.outE("knows").has("weight", P.between(1, 3)).order().by("weight", decr).limit(10)).profile(); + t = gts.V().has("id", sid).local(__.outE("knows").has("weight", P.between(1, 3)).order().by("weight", decr).limit(10)).profile("~metrics"); assertCount(superV * 10, t); - metrics = (TraversalMetrics) t.asAdmin().getSideEffects().get("~metrics").get(); - verifyMetrics(metrics.getMetrics(0), true, false); - verifyMetrics(metrics.getMetrics(1), true, true); + metrics = (TraversalMetrics) t.asAdmin().getSideEffects().get("~metrics"); //Verify that properties also use multi query - t = gts.V().has("id", sid).values("names").profile(); + t = gts.V().has("id", sid).values("names").profile("~metrics"); assertCount(superV * numV, t); - metrics = (TraversalMetrics) t.asAdmin().getSideEffects().get("~metrics").get(); - verifyMetrics(metrics.getMetrics(0), true, false); - verifyMetrics(metrics.getMetrics(1), true, true); + metrics = (TraversalMetrics) t.asAdmin().getSideEffects().get("~metrics"); clopen(option(USE_MULTIQUERY), true); gts = graph.traversal(); //Verify traversal metrics when having to read from backend [same query as above] - t = gts.V().has("id", sid).local(__.outE("knows").has("weight", P.gte(1)).has("weight", P.lt(3)).order().by("weight", decr).limit(10)).profile(); - assertCount(superV * 10, t); - metrics = (TraversalMetrics) t.asAdmin().getSideEffects().get("~metrics").get(); -// System.out.println(metrics); - verifyMetrics(metrics.getMetrics(0), false, false); - verifyMetrics(metrics.getMetrics(1), false, true); + t = gts.V().has("id", sid).local(__.outE("knows").has("weight", P.gte(1)).has("weight", P.lt(3)).order().by("weight", decr).limit(10)).profile("~metrics"); + assertCount(superV * 10, t); + metrics = (TraversalMetrics) t.asAdmin().getSideEffects().get("~metrics"); //Verify that properties also use multi query [same query as above] - t = gts.V().has("id", sid).values("names").profile(); + t = gts.V().has("id", sid).values("names").profile("~metrics"); assertCount(superV * numV, t); - metrics = (TraversalMetrics) t.asAdmin().getSideEffects().get("~metrics").get(); -// System.out.println(metrics); - verifyMetrics(metrics.getMetrics(0), false, false); - verifyMetrics(metrics.getMetrics(1), false, true); + metrics = (TraversalMetrics) t.asAdmin().getSideEffects().get("~metrics"); } diff --git a/janusgraph-test/src/main/java/org/janusgraph/olap/OLAPTest.java b/janusgraph-test/src/main/java/org/janusgraph/olap/OLAPTest.java index 8e80a8aaed..3f1b6f54e8 100644 --- a/janusgraph-test/src/main/java/org/janusgraph/olap/OLAPTest.java +++ b/janusgraph-test/src/main/java/org/janusgraph/olap/OLAPTest.java @@ -22,11 +22,13 @@ import org.janusgraph.diskstorage.keycolumnvalue.scan.ScanMetrics; import org.janusgraph.graphdb.JanusGraphBaseTest; import org.janusgraph.graphdb.olap.*; +import org.janusgraph.graphdb.olap.computer.FulgoraGraphComputer; import org.janusgraph.graphdb.olap.job.GhostVertexRemover; import org.apache.tinkerpop.gremlin.process.computer.*; import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce; import org.apache.tinkerpop.gremlin.process.computer.util.StaticVertexProgram; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; +import org.apache.tinkerpop.gremlin.process.traversal.Operator; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__; import org.apache.tinkerpop.gremlin.structure.Direction; import org.apache.tinkerpop.gremlin.structure.Vertex; @@ -200,7 +202,7 @@ public void removeGhostVertices() throws Exception { @Test public void testBasicComputeJob() throws Exception { - GraphTraversalSource g = graph.traversal(GraphTraversalSource.computer()); + GraphTraversalSource g = graph.traversal().withComputer(FulgoraGraphComputer.class); System.out.println(g.V().count().next()); } @@ -287,7 +289,7 @@ public void degreeCountingDistance() throws Exception { } if (mode== JanusGraphComputer.ResultMode.LOCALTX) { assertTrue(gview instanceof JanusGraphTransaction); - ((JanusGraphTransaction)gview).rollback(); + ((JanusGraphTransaction) gview).rollback(); } } } @@ -384,8 +386,13 @@ public boolean terminate(Memory memory) { } @Override - public Set getElementComputeKeys() { - return ImmutableSet.of(DEGREE); + public Set getVertexComputeKeys() { + return new HashSet<>(Arrays.asList(VertexComputeKey.of(DEGREE, false))); + } + + @Override + public Set getMemoryComputeKeys() { + return new HashSet<>(Arrays.asList(MemoryComputeKey.of(DEGREE, Operator.assign, true, false))); } @Override diff --git a/janusgraph-test/src/main/java/org/janusgraph/olap/PageRankVertexProgram.java b/janusgraph-test/src/main/java/org/janusgraph/olap/PageRankVertexProgram.java index a2bb7d6e78..898e144a7a 100644 --- a/janusgraph-test/src/main/java/org/janusgraph/olap/PageRankVertexProgram.java +++ b/janusgraph-test/src/main/java/org/janusgraph/olap/PageRankVertexProgram.java @@ -19,6 +19,7 @@ import org.apache.tinkerpop.gremlin.process.computer.Memory; import org.apache.tinkerpop.gremlin.process.computer.MessageScope; import org.apache.tinkerpop.gremlin.process.computer.Messenger; +import org.apache.tinkerpop.gremlin.process.computer.VertexComputeKey; import org.apache.tinkerpop.gremlin.process.computer.util.AbstractVertexProgramBuilder; import org.apache.tinkerpop.gremlin.process.computer.util.StaticVertexProgram; import org.apache.tinkerpop.gremlin.structure.Graph; @@ -59,7 +60,7 @@ public class PageRankVertexProgram extends StaticVertexProgram { private MessageScope.Local outE = MessageScope.Local.of(__::outE); private MessageScope.Local inE = MessageScope.Local.of(__::inE); - private static final Set COMPUTE_KEYS = ImmutableSet.of(PAGE_RANK, OUTGOING_EDGE_COUNT); + private static final Set COMPUTE_KEYS = ImmutableSet.of(VertexComputeKey.of(PAGE_RANK, false), VertexComputeKey.of(OUTGOING_EDGE_COUNT, false)); @Override public void loadState(final Graph graph, final Configuration configuration) { @@ -77,7 +78,7 @@ public void storeState(final Configuration configuration) { } @Override - public Set getElementComputeKeys() { + public Set getVertexComputeKeys() { return COMPUTE_KEYS; } diff --git a/janusgraph-test/src/main/java/org/janusgraph/olap/ShortestDistanceVertexProgram.java b/janusgraph-test/src/main/java/org/janusgraph/olap/ShortestDistanceVertexProgram.java index d9f7ff7bf5..3b5db63e09 100644 --- a/janusgraph-test/src/main/java/org/janusgraph/olap/ShortestDistanceVertexProgram.java +++ b/janusgraph-test/src/main/java/org/janusgraph/olap/ShortestDistanceVertexProgram.java @@ -20,6 +20,7 @@ import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner; import org.apache.tinkerpop.gremlin.process.computer.MessageScope; import org.apache.tinkerpop.gremlin.process.computer.Messenger; +import org.apache.tinkerpop.gremlin.process.computer.VertexComputeKey; import org.apache.tinkerpop.gremlin.process.computer.util.AbstractVertexProgramBuilder; import org.apache.tinkerpop.gremlin.process.computer.util.StaticVertexProgram; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__; @@ -53,7 +54,7 @@ public class ShortestDistanceVertexProgram extends StaticVertexProgram { private long seed; private String weightProperty; - private static final Set COMPUTE_KEYS = new HashSet<>(Arrays.asList(DISTANCE)); + private static final Set COMPUTE_KEYS = new HashSet<>(Arrays.asList(VertexComputeKey.of(DISTANCE, false))); private ShortestDistanceVertexProgram() { @@ -75,7 +76,7 @@ public void storeState(final Configuration configuration) { } @Override - public Set getElementComputeKeys() { + public Set getVertexComputeKeys() { return COMPUTE_KEYS; } diff --git a/janusgraph-test/src/test/java/org/janusgraph/blueprints/InMemoryGraphComputerProvider.java b/janusgraph-test/src/test/java/org/janusgraph/blueprints/InMemoryGraphComputerProvider.java index e442487f3a..31525f5050 100644 --- a/janusgraph-test/src/test/java/org/janusgraph/blueprints/InMemoryGraphComputerProvider.java +++ b/janusgraph-test/src/test/java/org/janusgraph/blueprints/InMemoryGraphComputerProvider.java @@ -28,7 +28,8 @@ public class InMemoryGraphComputerProvider extends AbstractJanusGraphComputerPro @Override public ModifiableConfiguration getJanusGraphConfiguration(String graphName, Class test, String testMethodName) { - ModifiableConfiguration config = StorageSetup.getInMemoryConfiguration(); + ModifiableConfiguration config = super.getJanusGraphConfiguration(graphName, test, testMethodName); + config.setAll(StorageSetup.getInMemoryConfiguration().getAll()); config.set(GraphDatabaseConfiguration.STORAGE_TRANSACTIONAL,false); return config; } diff --git a/pom.xml b/pom.xml index 82e1f62f04..96f1bd9c97 100644 --- a/pom.xml +++ b/pom.xml @@ -64,7 +64,7 @@ - 3.1.1-incubating + 3.2.3 4.12 1.1.0 2.1.9 @@ -74,7 +74,7 @@ 2.7.10 1.7.12 4.4.1 - 2.7.1 + 2.7.2 0.98.2 ${hbase098.core.version}-hadoop2 1.2.4 @@ -280,6 +280,7 @@ --> ${test.skip.tp} + ${project.build.directory} file:${project.build.directory}/test-classes/log4j.properties @@ -1164,11 +1165,6 @@ - - org.apache.curator - curator-recipes - ${hadoop2.version} - org.apache.hadoop hadoop-annotations