Skip to content

Commit

Permalink
Formatting updates to use spaces for indentation and cleanup diff aga…
Browse files Browse the repository at this point in the history
…inst titan11
  • Loading branch information
sjudeng authored and dylanht committed Feb 8, 2017
1 parent 4d7efef commit 42d776e
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.thinkaurelius.titan.core.TitanException;
import com.thinkaurelius.titan.core.TitanGraphComputer;
import com.thinkaurelius.titan.core.TitanTransaction;
import com.thinkaurelius.titan.core.schema.TitanManagement;
import com.thinkaurelius.titan.diskstorage.configuration.Configuration;
Expand Down Expand Up @@ -47,36 +47,7 @@
/**
* @author Matthias Broecheler ([email protected])
*/
public class FulgoraGraphComputer implements GraphComputer {

public enum ResultMode {
NONE, PERSIST, LOCALTX;

public ResultGraph toResultGraph() {
switch(this) {
case NONE: return ResultGraph.ORIGINAL;
case PERSIST: return ResultGraph.ORIGINAL;
case LOCALTX: return ResultGraph.NEW;
default: throw new AssertionError("Unrecognized option: " + this);
}
}

public Persist toPersist() {
switch(this) {
case NONE: return Persist.NOTHING;
case PERSIST: return Persist.VERTEX_PROPERTIES;
case LOCALTX: return Persist.VERTEX_PROPERTIES;
default: throw new AssertionError("Unrecognized option: " + this);
}
}

}

public GraphComputer resultMode(ResultMode mode) {
result(mode.toResultGraph());
persist(mode.toPersist());
return this;
}
public class FulgoraGraphComputer implements TitanGraphComputer {

private static final Logger log =
LoggerFactory.getLogger(FulgoraGraphComputer.class);
Expand Down Expand Up @@ -112,16 +83,14 @@ public FulgoraGraphComputer(final StandardTitanGraph graph, final Configuration

@Override
public GraphComputer vertices(final Traversal<Vertex, Vertex> vertexFilter) {
// throw GraphComputer.Exceptions.graphFilterNotSupported();
this.graphFilter.setVertexFilter(vertexFilter);
return this;
this.graphFilter.setVertexFilter(vertexFilter);
return this;
}

@Override
public GraphComputer edges(final Traversal<Vertex, Edge> edgeFilter) {
// throw GraphComputer.Exceptions.graphFilterNotSupported();
this.graphFilter.setEdgeFilter(edgeFilter);
return this;
this.graphFilter.setEdgeFilter(edgeFilter);
return this;
}

@Override
Expand All @@ -139,7 +108,7 @@ public GraphComputer persist(Persist persist) {
}

@Override
public GraphComputer workers(int threads) {
public TitanGraphComputer workers(int threads) {
Preconditions.checkArgument(threads > 0, "Invalid number of threads: %s", threads);
numThreads = threads;
return this;
Expand All @@ -160,16 +129,16 @@ public GraphComputer mapReduce(final MapReduce mapReduce) {

@Override
public Future<ComputerResult> submit() {
if (this.executed)
if (executed)
throw Exceptions.computerHasAlreadyBeenSubmittedAVertexProgram();
else
this.executed = true;
executed = true;

// it is not possible execute a computer if it has no vertex program nor mapreducers
if (null == vertexProgram && this.mapReduces.isEmpty())
if (null == vertexProgram && mapReduces.isEmpty())
throw GraphComputer.Exceptions.computerHasNoVertexProgramNorMapReducers();
// it is possible to run mapreducers without a vertex program
if (null != this.vertexProgram) {
if (null != vertexProgram) {
GraphComputerHelper.validateProgramOnComputer(this, vertexProgram);
this.mapReduces.addAll(this.vertexProgram.getMapReducers());
}
Expand All @@ -180,26 +149,22 @@ public Future<ComputerResult> submit() {
// determine the legality persistence and result graph options
if (!this.features().supportsResultGraphPersistCombination(this.resultGraphMode, this.persistMode))
throw GraphComputer.Exceptions.resultGraphPersistCombinationNotSupported(this.resultGraphMode, this.persistMode);
// ensure requested workers are not larger than supported workers
if (this.numThreads > this.features().getMaxWorkers())
throw GraphComputer.Exceptions.computerRequiresMoreWorkersThanSupported(this.numThreads, this.features().getMaxWorkers());
this.memory = new FulgoraMemory(this.vertexProgram, this.mapReduces);
// ensure requested workers are not larger than supported workers
if (this.numThreads > this.features().getMaxWorkers())
throw GraphComputer.Exceptions.computerRequiresMoreWorkersThanSupported(this.numThreads, this.features().getMaxWorkers());

memory = new FulgoraMemory(vertexProgram, mapReduces);

return CompletableFuture.<ComputerResult>supplyAsync(() -> {
final long time = System.currentTimeMillis();
// DIVERGES: TinkerGraphComputerView and try (final TinkerWorkerPool workers = new TinkerWorkerPool(this.workers)) {
if (null != vertexProgram) {
// ##### Execute vertex program - DIVERGES: no FulgoraVertexMemory in TinkerGraphComputer, and no view in Fulgora
// ##### Execute vertex program
vertexMemory = new FulgoraVertexMemory(expectedNumVertices, graph.getIDManager(), vertexProgram);
// execute the vertex program
this.vertexProgram.setup(this.memory);
// DIVERGES: This is inside the while (true) { loop in TinkerGraphComputer... will put inExecute... hmmm...
// memory.completeSubRound();
vertexProgram.setup(memory);

try (VertexProgramScanJob.Executor job = VertexProgramScanJob.getVertexProgramScanJob(graph, memory, vertexMemory, vertexProgram)) {
for (int iteration = 1; ; iteration++) {
// Added here instead?
memory.completeSubRound();
vertexMemory.nextIteration(vertexProgram.getMessageScopes(memory));

Expand Down Expand Up @@ -227,19 +192,18 @@ public Future<ComputerResult> submit() {
throw new TitanException(e);
}

this.vertexMemory.completeIteration();
this.memory.completeSubRound();
vertexMemory.completeIteration();
memory.completeSubRound();
try {
if (this.vertexProgram.terminate(this.memory)) {
break;
}
} finally {
this.memory.incrIteration();
memory.incrIteration();
}
}
}
}
// DIVERGE: Missing the MapReduce only stuff here... still need to figure out the difference between the view and Titan's setup

// ##### Execute mapreduce jobs
// Collect map jobs
Expand Down Expand Up @@ -446,11 +410,11 @@ public boolean supportsEdgePropertyRemoval() {
return false;
}

@Override
public boolean supportsGraphFilter() {
return false;
}

};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public FulgoraMemory(final VertexProgram<?> vertexProgram, final Set<MapReduce>

@Override
public Set<String> keys() {
return this.previousMap.keySet().stream().filter(key -> !this.inExecute || this.memoryKeys.get(key).isBroadcast()).collect(Collectors.toSet());
return this.previousMap.keySet().stream().filter(key -> !this.inExecute || this.memoryKeys.get(key).isBroadcast()).collect(Collectors.toSet());
}

@Override
Expand Down Expand Up @@ -85,12 +85,12 @@ public long getRuntime() {
protected void complete() {
this.iteration.decrementAndGet();
this.previousMap = this.currentMap;
this.memoryKeys.values().stream().filter(MemoryComputeKey::isTransient).forEach(computeKey -> this.previousMap.remove(computeKey.getKey()));
this.memoryKeys.values().stream().filter(MemoryComputeKey::isTransient).forEach(computeKey -> this.previousMap.remove(computeKey.getKey()));
}

protected void completeSubRound() {
this.previousMap = new ConcurrentHashMap<>(this.currentMap);
this.inExecute = !this.inExecute;
this.inExecute = !this.inExecute;
}

@Override
Expand All @@ -103,27 +103,27 @@ public <R> R get(final String key) throws IllegalArgumentException {
final R r = (R) this.previousMap.get(key);
if (null == r)
throw Memory.Exceptions.memoryDoesNotExist(key);
else if (this.inExecute && !this.memoryKeys.get(key).isBroadcast())
throw Memory.Exceptions.memoryDoesNotExist(key);
else if (this.inExecute && !this.memoryKeys.get(key).isBroadcast())
throw Memory.Exceptions.memoryDoesNotExist(key);
else
return r;
}

@Override
public void add(final String key, final Object value) {
checkKeyValue(key, value);
checkKeyValue(key, value);
if (!this.inExecute && ("incr".equals(key) || "and".equals(key) || "or".equals(key)))
throw Memory.Exceptions.memoryIsCurrentlyImmutable();
else if (!this.inExecute)
throw Memory.Exceptions.memoryAddOnlyDuringVertexProgramExecute(key);
this.currentMap.compute(key, (k, v) -> null == v ? value : this.memoryKeys.get(key).getReducer().apply(v, value));
else if (!this.inExecute)
throw Memory.Exceptions.memoryAddOnlyDuringVertexProgramExecute(key);
this.currentMap.compute(key, (k, v) -> null == v ? value : this.memoryKeys.get(key).getReducer().apply(v, value));
}

@Override
public void set(final String key, final Object value) {
checkKeyValue(key, value);
if (this.inExecute)
throw Memory.Exceptions.memorySetOnlyDuringVertexProgramSetUpAndTerminate(key);
if (this.inExecute)
throw Memory.Exceptions.memorySetOnlyDuringVertexProgramSetUpAndTerminate(key);
this.currentMap.put(key, value);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@
import com.thinkaurelius.titan.diskstorage.EntryList;
import com.thinkaurelius.titan.graphdb.idmanagement.IDManager;
import com.thinkaurelius.titan.graphdb.vertices.PreloadedVertex;
import org.apache.tinkerpop.gremlin.process.computer.*;
import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
import org.apache.tinkerpop.gremlin.process.computer.Messenger;
import org.apache.tinkerpop.gremlin.process.computer.VertexComputeKey;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.structure.VertexProperty;
import org.cliffc.high_scale_lib.NonBlockingHashMapLong;

Expand Down Expand Up @@ -44,7 +48,7 @@ public FulgoraVertexMemory(int numVertices, final IDManager idManager, final Ver
this.combiner = FulgoraUtil.getMessageCombiner(vertexProgram);
this.computeKeys = vertexProgram.getVertexComputeKeys();
this.elementKeyMap = getIdMap(vertexProgram.getVertexComputeKeys().stream().map( k ->
k.getKey() ).collect(Collectors.toCollection(HashSet::new)));
k.getKey() ).collect(Collectors.toCollection(HashSet::new)));
this.previousScopes = ImmutableMap.of();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,5 @@ public List<HasContainer> getHasContainers() {
public void addHasContainer(final HasContainer hasContainer) {
this.addAll(Collections.singleton(hasContainer));
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public RecordWriter<NullWritable, VertexWritable> getRecordWriter(TaskAttemptCon
// returned by VertexProgram.getComputeKeys()
if (null == persistableKeys) {
try {
Stream<VertexComputeKey> persistableKeysStream = VertexProgram.createVertexProgram(graph, ConfUtil.makeApacheConfiguration(taskAttemptContext.getConfiguration())).getVertexComputeKeys().stream();
Stream<VertexComputeKey> persistableKeysStream = VertexProgram.createVertexProgram(graph, ConfUtil.makeApacheConfiguration(taskAttemptContext.getConfiguration())).getVertexComputeKeys().stream();
persistableKeys = persistableKeysStream.map( k -> k.getKey()).collect(Collectors.toCollection(HashSet::new));
log.debug("Set persistableKeys={}", Joiner.on(",").join(persistableKeys));
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3442,7 +3442,6 @@ public void testTinkerPopOptimizationStrategies() {
t = gts.V().has("id", sid).local(__.outE("knows").has("weight", P.between(1, 3)).order().by("weight", decr).limit(10)).profile("~metrics");
assertCount(superV * 10, t);
metrics = (TraversalMetrics) t.asAdmin().getSideEffects().get("~metrics");
// System.out.println(metrics);
//verifyMetrics(metrics.getMetrics(0), true, false);
//verifyMetrics(metrics.getMetrics(1), true, true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,8 +409,8 @@ private void testVertexPartitionOlap(CommitMode commitMode) throws Exception {
clopen(options);

//Test OLAP works with partitioned vertices
FulgoraGraphComputer computer = graph.compute(FulgoraGraphComputer.class);
computer.resultMode(FulgoraGraphComputer.ResultMode.NONE);
TitanGraphComputer computer = graph.compute(FulgoraGraphComputer.class);
computer.resultMode(TitanGraphComputer.ResultMode.NONE);
computer.workers(1);
computer.program(new OLAPTest.DegreeCounter());
computer.mapReduce(new OLAPTest.DegreeMapper());
Expand Down
34 changes: 17 additions & 17 deletions titan-test/src/main/java/com/thinkaurelius/titan/olap/OLAPTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ public void degreeCounting() throws Exception {
int numE = generateRandomGraph(numV);
clopen();

final FulgoraGraphComputer computer = graph.compute();
computer.resultMode(FulgoraGraphComputer.ResultMode.NONE);
final TitanGraphComputer computer = graph.compute();
computer.resultMode(TitanGraphComputer.ResultMode.NONE);
computer.workers(4);
computer.program(new DegreeCounter());
computer.mapReduce(new DegreeMapper());
Expand Down Expand Up @@ -228,8 +228,8 @@ public void vertexProgramExceptionPropagatesToCaller() throws InterruptedExcepti
int numE = generateRandomGraph(numV);
clopen();

final FulgoraGraphComputer computer = graph.compute();
computer.resultMode(FulgoraGraphComputer.ResultMode.NONE);
final TitanGraphComputer computer = graph.compute();
computer.resultMode(TitanGraphComputer.ResultMode.NONE);
computer.workers(1);
computer.program(new ExceptionProgram());

Expand All @@ -246,9 +246,9 @@ public void degreeCountingDistance() throws Exception {
int numE = generateRandomGraph(numV);
clopen();

// TODO does this iteration over FulgoraGraphComputer.ResultMode values imply that DegreeVariation's ResultGraph/Persist should also change?
for (FulgoraGraphComputer.ResultMode mode : FulgoraGraphComputer.ResultMode.values()) {
final FulgoraGraphComputer computer = graph.compute();
// TODO does this iteration over TitanGraphComputer.ResultMode values imply that DegreeVariation's ResultGraph/Persist should also change?
for (TitanGraphComputer.ResultMode mode : TitanGraphComputer.ResultMode.values()) {
final TitanGraphComputer computer = graph.compute();
computer.resultMode(mode);
computer.workers(1);
computer.program(new DegreeCounter(2));
Expand All @@ -273,9 +273,9 @@ public void degreeCountingDistance() throws Exception {
}
assertEquals(actualDegree2,degree2);
}
if (mode== FulgoraGraphComputer.ResultMode.LOCALTX) {
if (mode== TitanGraphComputer.ResultMode.LOCALTX) {
assertTrue(gview instanceof TitanTransaction);
((TitanTransaction)gview).rollback();
((TitanTransaction) gview).rollback();
}
}
}
Expand Down Expand Up @@ -376,10 +376,10 @@ public Set<VertexComputeKey> getVertexComputeKeys() {
return new HashSet<>(Arrays.asList(VertexComputeKey.of(DEGREE, false)));
}

@Override
public Set<MemoryComputeKey> getMemoryComputeKeys() {
return new HashSet<>(Arrays.asList(MemoryComputeKey.of(DEGREE, Operator.assign, true, false)));
}
@Override
public Set<MemoryComputeKey> getMemoryComputeKeys() {
return new HashSet<>(Arrays.asList(MemoryComputeKey.of(DEGREE, Operator.assign, true, false)));
}

@Override
public Optional<MessageCombiner<Integer>> getMessageCombiner() {
Expand Down Expand Up @@ -531,8 +531,8 @@ public void testPageRank() throws ExecutionException, InterruptedException {
correctPRSum += correctPR[iv.next().<Integer>value("distance")];
}

final FulgoraGraphComputer computer = graph.compute();
computer.resultMode(FulgoraGraphComputer.ResultMode.NONE);
final TitanGraphComputer computer = graph.compute();
computer.resultMode(TitanGraphComputer.ResultMode.NONE);
computer.workers(4);
computer.program(PageRankVertexProgram.build().iterations(10).vertexCount(numV).dampingFactor(alpha).create(graph));
computer.mapReduce(PageRankMapReduce.build().create());
Expand Down Expand Up @@ -588,8 +588,8 @@ public void testShortestDistance() throws Exception {

clopen();

final FulgoraGraphComputer computer = graph.compute();
computer.resultMode(FulgoraGraphComputer.ResultMode.NONE);
final TitanGraphComputer computer = graph.compute();
computer.resultMode(TitanGraphComputer.ResultMode.NONE);
computer.workers(4);
computer.program(ShortestDistanceVertexProgram.build().seed((long)vertex.id()).maxDepth(maxDepth + 4).create(graph));
computer.mapReduce(ShortestDistanceMapReduce.build().create());
Expand Down

0 comments on commit 42d776e

Please sign in to comment.