Skip to content

Commit

Permalink
[FLINK] Changes to SystemML
Browse files Browse the repository at this point in the history
Changes to SystemML in order to support Flink.
  • Loading branch information
carabolic authored and akunft committed May 19, 2016
1 parent 9cc0722 commit 11d910c
Show file tree
Hide file tree
Showing 38 changed files with 1,181 additions and 309 deletions.
31 changes: 31 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@
<spark.version>1.4.1</spark.version>
<scala.version>2.10.5</scala.version>
<scala.binary.version>2.10</scala.binary.version>
<flink.version>1.0.0</flink.version>

<!-- OS-specific JVM arguments for running integration tests -->
<integrationTestExtraJVMArgs />
</properties>
Expand Down Expand Up @@ -754,6 +756,35 @@


<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop2</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop2</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
Expand Down
16 changes: 13 additions & 3 deletions src/main/java/org/apache/sysml/api/DMLScript.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory;
import org.apache.sysml.runtime.controlprogram.context.FlinkExecutionContext;
import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
import org.apache.sysml.runtime.controlprogram.parfor.ProgramConverter;
import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
Expand All @@ -93,7 +94,9 @@ public enum RUNTIME_PLATFORM {
SINGLE_NODE, // execute all matrix operations in CP
HYBRID, // execute matrix operations in CP or MR
HYBRID_SPARK, // execute matrix operations in CP or Spark
SPARK // execute matrix operations in Spark
SPARK, // execute matrix operations in Spark
FLINK,
HYBRID_FLINK
}

public static RUNTIME_PLATFORM rtplatform = RUNTIME_PLATFORM.HYBRID; //default exec mode
Expand Down Expand Up @@ -520,6 +523,10 @@ else if ( platform.equalsIgnoreCase("spark"))
lrtplatform = RUNTIME_PLATFORM.SPARK;
else if ( platform.equalsIgnoreCase("hybrid_spark"))
lrtplatform = RUNTIME_PLATFORM.HYBRID_SPARK;
else if ( platform.equalsIgnoreCase("flink"))
lrtplatform = RUNTIME_PLATFORM.FLINK;
else if ( platform.equalsIgnoreCase("hybrid_flink"))
lrtplatform = RUNTIME_PLATFORM.HYBRID_FLINK;
else
System.err.println("ERROR: Unknown runtime platform: " + platform);

Expand Down Expand Up @@ -674,8 +681,11 @@ private static void execute(String dmlScriptStr, String fnameOptConfig, Map<Stri

//run execute (w/ exception handling to ensure proper shutdown)
ec = ExecutionContextFactory.createContext(rtprog);
rtprog.execute( ec );

rtprog.execute( ec );
if (ec instanceof FlinkExecutionContext) {
((FlinkExecutionContext) ec).execute();

}
}
finally //ensure cleanup/shutdown
{
Expand Down
55 changes: 0 additions & 55 deletions src/main/java/org/apache/sysml/api/FlinkMLOutput.java

This file was deleted.

Loading

0 comments on commit 11d910c

Please sign in to comment.