Skip to content

Commit

Permalink
Spark session client (#70)
Browse files Browse the repository at this point in the history
- Subclassing of ConnectionManager into JDBCConnectionManager and SparkConnectionManager.
- Fix for logging dependency in pom.xml.
- Moving configuration input parsing methods into their own utility class.
- TelemetryRegistry can rely on any ConnectionManager.
- Move to Java11.

Closes #17
  • Loading branch information
jcamachor authored Jun 15, 2023
1 parent 5ba8bce commit cd25d04
Show file tree
Hide file tree
Showing 36 changed files with 1,102 additions and 476 deletions.
30 changes: 26 additions & 4 deletions .github/workflows/maven.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ on:
branches: [ main ]

env:
JAVA_VERSION: 17
JAVA_VERSION: 11

jobs:
build:
Expand All @@ -42,9 +42,31 @@ jobs:
run: mvn -B package --file pom.xml

## ----------------------------------------------------------------------------------
## Apache Spark
## Apache Spark (Session)
## ----------------------------------------------------------------------------------
spark:
spark-local-session:
needs: build
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
lst: [ delta ]
steps:
- name: Check out repository code
uses: actions/checkout@v3
- name: Set up JDK
uses: actions/setup-java@v3
with:
java-version: ${{ env.JAVA_VERSION }}
distribution: 'temurin'
cache: maven
- name: Run tests
run: mvn -B test --file pom.xml -Pspark-client -Dlst-bench.test.db=spark -Dlst-bench.test.lst=${{ matrix.lst }} -Dlst-bench.test.connection=spark

## ----------------------------------------------------------------------------------
## Apache Spark (JDBC)
## ----------------------------------------------------------------------------------
spark-jdbc:
needs: build
runs-on: ubuntu-latest
strategy:
Expand Down Expand Up @@ -79,4 +101,4 @@ jobs:
- name: Start Spark Thrift Server
run: ./.github/scripts/spark/start_thrift_server.sh ${{ matrix.lst }}
- name: Run tests
run: mvn -B test --file pom.xml -Pspark-oss -Dlst-bench.test.db=spark -Dlst-bench.test.lst=${{ matrix.lst }}
run: mvn -B test --file pom.xml -Pspark-jdbc -Dlst-bench.test.db=spark -Dlst-bench.test.lst=${{ matrix.lst }} -Dlst-bench.test.connection=jdbc
51 changes: 46 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
<version>0.1-SNAPSHOT</version>

<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

Expand All @@ -36,6 +36,12 @@
<version>2.9.3</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.3.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
Expand Down Expand Up @@ -84,12 +90,36 @@
<version>5.9.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_2.12</artifactId>
<version>2.2.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-storage</artifactId>
<version>2.2.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark3.3-bundle_2.12</artifactId>
<version>0.12.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark-runtime-3.3_2.12</artifactId>
<version>1.1.0</version>
<scope>test</scope>
</dependency>
</dependencies>

<profiles>
<!-- Per-engine profile for drivers -->
<profile>
<id>spark-databricks</id>
<id>databricks-jdbc</id>
<dependencies>
<dependency>
<groupId>com.databricks</groupId>
Expand All @@ -99,7 +129,7 @@
</dependencies>
</profile>
<profile>
<id>spark-oss</id>
<id>spark-jdbc</id>
<dependencies>
<dependency>
<groupId>org.apache.hive</groupId>
Expand Down Expand Up @@ -131,7 +161,17 @@
</dependencies>
</profile>
<profile>
<id>trino-oss</id>
<id>spark-client</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.3.2</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>trino-jdbc</id>
<dependencies>
<dependency>
<groupId>io.trino</groupId>
Expand Down Expand Up @@ -160,6 +200,7 @@
<goal>copy-dependencies</goal>
</goals>
<configuration>
<excludeScope>provided</excludeScope>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
Expand Down
27 changes: 20 additions & 7 deletions src/main/java/com/microsoft/lst_bench/Driver.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,18 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
import com.microsoft.lst_bench.client.ConnectionManager;
import com.microsoft.lst_bench.common.BenchmarkConfig;
import com.microsoft.lst_bench.common.BenchmarkRunnable;
import com.microsoft.lst_bench.common.LSTBenchmarkExecutor;
import com.microsoft.lst_bench.input.BenchmarkObjectFactory;
import com.microsoft.lst_bench.input.TaskLibrary;
import com.microsoft.lst_bench.input.Workload;
import com.microsoft.lst_bench.input.config.ConnectionConfig;
import com.microsoft.lst_bench.input.config.ConnectionsConfig;
import com.microsoft.lst_bench.input.config.ExperimentConfig;
import com.microsoft.lst_bench.input.config.TelemetryConfig;
import com.microsoft.lst_bench.sql.ConnectionManager;
import com.microsoft.lst_bench.telemetry.JDBCTelemetryRegistry;
import com.microsoft.lst_bench.telemetry.SQLTelemetryRegistry;
import com.microsoft.lst_bench.telemetry.TelemetryHook;
import java.io.File;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -114,10 +115,22 @@ public static void main(String[] args) throws Exception {
final TelemetryConfig telemetryConfig =
mapper.readValue(new File(inputTelemetryConfigFile), TelemetryConfig.class);

run(taskLibrary, workload, connectionsConfig, experimentConfig, telemetryConfig);
}

/** Run benchmark. */
public static void run(
TaskLibrary taskLibrary,
Workload workload,
ConnectionsConfig connectionsConfig,
ExperimentConfig experimentConfig,
TelemetryConfig telemetryConfig)
throws Exception {
// Create connections manager
Map<String, ConnectionManager> idToConnectionManager = new LinkedHashMap<>();
for (ConnectionConfig connectionConfig : connectionsConfig.getConnections()) {
ConnectionManager connectionManager = ConnectionManager.from(connectionConfig);
ConnectionManager connectionManager =
BenchmarkObjectFactory.connectionManager(connectionConfig);
if (idToConnectionManager.containsKey(connectionConfig.getId())) {
throw new IllegalArgumentException("Duplicate connection id: " + connectionConfig.getId());
}
Expand All @@ -126,9 +139,9 @@ public static void main(String[] args) throws Exception {

// Create log utility
final ConnectionManager telemetryConnectionManager =
ConnectionManager.from(telemetryConfig.getConnection());
final JDBCTelemetryRegistry telemetryRegistry =
new JDBCTelemetryRegistry(
BenchmarkObjectFactory.connectionManager(telemetryConfig.getConnection());
final SQLTelemetryRegistry telemetryRegistry =
new SQLTelemetryRegistry(
telemetryConnectionManager,
telemetryConfig.isExecuteDDL(),
telemetryConfig.getDDLFile(),
Expand All @@ -139,7 +152,7 @@ public static void main(String[] args) throws Exception {

// Create experiment configuration
final BenchmarkConfig benchmarkConfig =
BenchmarkConfig.from(experimentConfig, taskLibrary, workload);
BenchmarkObjectFactory.benchmarkConfig(experimentConfig, taskLibrary, workload);

// Run experiment
final BenchmarkRunnable experiment =
Expand Down
28 changes: 28 additions & 0 deletions src/main/java/com/microsoft/lst_bench/client/ClientException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (c) Microsoft Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.lst_bench.client;

/** A client exception. */
public class ClientException extends Exception {

public ClientException(Exception cause) {
super(cause);
}

public ClientException(String message) {
super(message);
}
}
24 changes: 24 additions & 0 deletions src/main/java/com/microsoft/lst_bench/client/Connection.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (c) Microsoft Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.lst_bench.client;

/** A connection. */
public interface Connection extends AutoCloseable {

void execute(String sqlText) throws ClientException;

void close() throws ClientException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (c) Microsoft Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.lst_bench.client;

/** A connection manager. */
public interface ConnectionManager {

Connection createConnection() throws ClientException;
}
54 changes: 54 additions & 0 deletions src/main/java/com/microsoft/lst_bench/client/JDBCConnection.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright (c) Microsoft Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.lst_bench.client;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

/** A JDBC connection. */
public class JDBCConnection implements Connection {

private final java.sql.Connection connection;

public JDBCConnection(java.sql.Connection connection) {
this.connection = connection;
}

@Override
public void execute(String sqlText) throws ClientException {
try (Statement s = connection.createStatement()) {
boolean hasResults = s.execute(sqlText);
if (hasResults) {
ResultSet rs = s.getResultSet();
while (rs.next()) {
// do nothing
}
}
} catch (Exception e) {
throw new ClientException(e);
}
}

@Override
public void close() throws ClientException {
try {
connection.close();
} catch (SQLException e) {
throw new ClientException(e);
}
}
}
Loading

0 comments on commit cd25d04

Please sign in to comment.