Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to Tinkerpop 3.2.3 #1312

Open
wants to merge 4 commits into
base: titan11
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
</scm>
<properties>
<titan.compatible.versions />
<tinkerpop.version>3.1.1-incubating</tinkerpop.version>
<tinkerpop.version>3.2.3</tinkerpop.version>
<junit.version>4.12</junit.version>
<mrunit.version>1.1.0</mrunit.version>
<cassandra.version>2.1.9</cassandra.version>
Expand All @@ -75,7 +75,7 @@
<slf4j.version>1.7.12</slf4j.version>
<httpcomponents.version>4.4.1</httpcomponents.version>
<hadoop1.version>1.2.1</hadoop1.version>
<hadoop2.version>2.7.1</hadoop2.version>
<hadoop2.version>2.7.2</hadoop2.version>
<hbase094.version>0.94.25</hbase094.version>
<hbase096.core.version>0.96.2</hbase096.core.version>
<hbase096.version>${hbase096.core.version}-hadoop2</hbase096.version>
Expand Down Expand Up @@ -275,6 +275,9 @@
<exclude>**/*</exclude>
</excludes> -->
<skipTests>${test.skip.tp}</skipTests>
<systemPropertyVariables>
<build.dir>${project.build.directory}</build.dir>
</systemPropertyVariables>
</configuration>
</execution>
</executions>
Expand Down Expand Up @@ -1169,11 +1172,6 @@

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>${hadoop2.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ public class BerkeleyGraphComputerProvider extends AbstractTitanGraphComputerPro

@Override
public ModifiableConfiguration getTitanConfiguration(String graphName, Class<?> test, String testMethodName) {
ModifiableConfiguration config = BerkeleyStorageSetup.getBerkeleyJEConfiguration(StorageSetup.getHomeDir(graphName));
ModifiableConfiguration config = super.getTitanConfiguration(graphName, test, testMethodName);
config.setAll(BerkeleyStorageSetup.getBerkeleyJEConfiguration(StorageSetup.getHomeDir(graphName)).getAll());
config.set(GraphDatabaseConfiguration.IDAUTHORITY_WAIT, Duration.ofMillis(20));
config.set(GraphDatabaseConfiguration.STORAGE_TRANSACTIONAL,false);
return config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.thinkaurelius.titan.CassandraStorageSetup;
import com.thinkaurelius.titan.blueprints.AbstractTitanGraphComputerProvider;
import com.thinkaurelius.titan.blueprints.AbstractTitanGraphProvider;
import com.thinkaurelius.titan.diskstorage.configuration.ModifiableConfiguration;
import com.thinkaurelius.titan.graphdb.olap.computer.FulgoraGraphComputer;
import org.apache.tinkerpop.gremlin.GraphProvider;
Expand All @@ -16,7 +15,9 @@ public class ThriftGraphComputerProvider extends AbstractTitanGraphComputerProvi
@Override
public ModifiableConfiguration getTitanConfiguration(String graphName, Class<?> test, String testMethodName) {
CassandraStorageSetup.startCleanEmbedded();
return CassandraStorageSetup.getCassandraThriftConfiguration(graphName);
ModifiableConfiguration config = super.getTitanConfiguration(graphName, test, testMethodName);
config.setAll(CassandraStorageSetup.getCassandraThriftConfiguration(graphName).getAll());
return config;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@
test = "org.apache.tinkerpop.gremlin.process.computer.GraphComputerTest",
method = "shouldProcessResultGraphNewWithPersistVertexProperties",
reason = "The result graph should return an empty iterator when vertex.edges() or vertex.vertices() is called.")
@Graph.OptOut(
test = "org.apache.tinkerpop.gremlin.structure.io.IoTest$GraphMLTest",
method = "shouldReadGraphMLWithNoEdgeLabels",
reason = "Titan does not support default edge label (edge) used when GraphML is missing edge labels.")
@Graph.OptOut(
test = "org.apache.tinkerpop.gremlin.process.computer.GraphComputerTest",
method = "shouldSupportGraphFilter",
reason = "Titan currently does not support graph filters but does not throw proper exception because doing so breaks numerous tests in gremlin-test ProcessComputerSuite.")
public interface TitanGraph extends TitanGraphTransaction {

/* ---------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.thinkaurelius.titan.diskstorage.keycolumnvalue.cache.KCVSCache;
import com.thinkaurelius.titan.diskstorage.log.kcvs.ExternalCachePersistor;
import org.apache.commons.lang.StringUtils;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -425,7 +426,14 @@ public String toString() {


private final <V> V executeRead(Callable<V> exe) throws TitanException {
return BackendOperation.execute(exe, maxReadTime);
try {
return BackendOperation.execute(exe, maxReadTime);
} catch (TitanException e) {
// support traversal interruption
// TODO: Refactor to allow direct propagation of underlying interrupt exception
if (Thread.interrupted()) throw new TraversalInterruptedException();
throw e;
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public interface ScanJob extends Cloneable {
/**
* Invoked before a block of computation (i.e. multiple process() calls) is handed to this particular ScanJob.
* Can be used to initialize the iteration. This method is called exactly once for each before a block of computation.
* This method is semantically aligned with {@link com.tinkerpop.gremlin.process.computer.VertexProgram#workerIterationStart()}
* This method is semantically aligned with {@link org.tinkerpop.gremlin.process.computer.VertexProgram#workerIterationStart()}
*
* This method may not be called if there is no data to be processed. Correspondingly, the end method won't be called either.
*
Expand All @@ -35,7 +35,7 @@ public default void workerIterationStart(Configuration jobConfiguration,
/**
* Invoked after a block of computation (i.e. multiple process() calls) is handed to this particular ScanJob.
* Can be used to close any resources held by this job. This method is called exactly once for each after a block of computation.
* This method is semantically aligned with {@link com.tinkerpop.gremlin.process.computer.VertexProgram#workerIterationEnd()}
* This method is semantically aligned with {@link org.tinkerpop.gremlin.process.computer.VertexProgram#workerIterationEnd()}
*
* This method may not be called if there is no data to be processed. Correspondingly, the start method won't be called either.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ public static final<V> V executeDirect(Callable<V> exe, Duration totalWaitTime)
try {
Thread.sleep(waitTime.toMillis());
} catch (InterruptedException r) {
// added thread interrupt signal to support traversal interruption
Thread.currentThread().interrupt();
throw new PermanentBackendException("Interrupted while waiting to retry failed backend operation", r);
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.thinkaurelius.titan.graphdb.types.TypeDefinitionCategory;
import com.thinkaurelius.titan.graphdb.types.TypeDefinitionDescription;

import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -119,6 +120,8 @@ public StandardSerializer() {
registerClassInternal(64,Duration.class, new DurationSerializer());
registerClassInternal(65,Instant.class, new InstantSerializer());
registerClassInternal(66,StandardTransactionId.class, new StandardTransactionIdSerializer());
registerClassInternal(67,TraverserSet.class, new SerializableSerializer());
registerClassInternal(68,HashMap.class, new SerializableSerializer());

}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.thinkaurelius.titan.graphdb.database.serialize.attribute;

import com.thinkaurelius.titan.core.attribute.AttributeSerializer;
import com.thinkaurelius.titan.diskstorage.ScanBuffer;
import com.thinkaurelius.titan.diskstorage.WriteBuffer;
import com.thinkaurelius.titan.graphdb.database.serialize.DataOutput;
import com.thinkaurelius.titan.graphdb.database.serialize.Serializer;
import com.thinkaurelius.titan.graphdb.database.serialize.SerializerInjected;
import org.apache.commons.lang3.SerializationUtils;

import java.io.Serializable;
import java.util.HashMap;

public class SerializableSerializer<T extends Serializable> implements AttributeSerializer<T>, SerializerInjected {

private Serializer serializer;

@Override
public T read(ScanBuffer buffer) {
byte[] data = serializer.readObjectNotNull(buffer,byte[].class);
return (T) SerializationUtils.deserialize(data);
}

@Override
public void write(WriteBuffer buffer, T attribute) {
DataOutput out = (DataOutput) buffer;
out.writeObjectNotNull(SerializationUtils.serialize(attribute));
}

@Override
public void setSerializer(Serializer serializer) {
this.serializer = serializer;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,10 @@ protected VertexJobConverter(TitanGraph graph, VertexScanJob job) {
}

protected VertexJobConverter(VertexJobConverter copy) {
this.graph = new GraphProvider();
if (copy.graph.isProvided()) this.graph.setGraph(copy.graph.get());
this.graph = copy.graph;
this.job = copy.job.clone();
this.tx = copy.tx;
this.idManager = copy.idManager;
}

public static ScanJob convert(TitanGraph graph, VertexScanJob vertexJob) {
Expand All @@ -82,18 +83,22 @@ public static StandardTitanTx startTransaction(StandardTitanGraph graph) {

@Override
public void workerIterationStart(Configuration jobConfig, Configuration graphConfig, ScanMetrics metrics) {
graph.initializeGraph(graphConfig);
idManager = graph.get().getIDManager();
try {
tx = startTransaction(graph.get());
open(graphConfig);
job.workerIterationStart(graph.get(), jobConfig, metrics);
} catch (Throwable e) {
close();
throw e;
}
}

private void close() {
protected void open(Configuration graphConfig) {
graph.initializeGraph(graphConfig);
idManager = graph.get().getIDManager();
tx = startTransaction(graph.get());
}

protected void close() {
if (null != tx && tx.isOpen())
tx.rollback();
graph.close();
Expand Down
Loading