diff --git a/.github/workflows/maven.yaml b/.github/workflows/maven.yaml index 054582e1..d8724ef6 100644 --- a/.github/workflows/maven.yaml +++ b/.github/workflows/maven.yaml @@ -21,7 +21,7 @@ on: branches: [ main ] env: - JAVA_VERSION: 17 + JAVA_VERSION: 11 jobs: build: @@ -42,9 +42,31 @@ jobs: run: mvn -B package --file pom.xml ## ---------------------------------------------------------------------------------- - ## Apache Spark + ## Apache Spark (Session) ## ---------------------------------------------------------------------------------- - spark: + spark-local-session: + needs: build + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + lst: [ delta ] + steps: + - name: Check out repository code + uses: actions/checkout@v3 + - name: Set up JDK + uses: actions/setup-java@v3 + with: + java-version: ${{ env.JAVA_VERSION }} + distribution: 'temurin' + cache: maven + - name: Run tests + run: mvn -B test --file pom.xml -Pspark-client -Dlst-bench.test.db=spark -Dlst-bench.test.lst=${{ matrix.lst }} -Dlst-bench.test.connection=spark + + ## ---------------------------------------------------------------------------------- + ## Apache Spark (JDBC) + ## ---------------------------------------------------------------------------------- + spark-jdbc: needs: build runs-on: ubuntu-latest strategy: @@ -79,4 +101,4 @@ jobs: - name: Start Spark Thrift Server run: ./.github/scripts/spark/start_thrift_server.sh ${{ matrix.lst }} - name: Run tests - run: mvn -B test --file pom.xml -Pspark-oss -Dlst-bench.test.db=spark -Dlst-bench.test.lst=${{ matrix.lst }} + run: mvn -B test --file pom.xml -Pspark-jdbc -Dlst-bench.test.db=spark -Dlst-bench.test.lst=${{ matrix.lst }} -Dlst-bench.test.connection=jdbc diff --git a/pom.xml b/pom.xml index b71b92f9..34254e3e 100644 --- a/pom.xml +++ b/pom.xml @@ -9,8 +9,8 @@ 0.1-SNAPSHOT - 17 - 17 + 11 + 11 UTF-8 @@ -36,6 +36,12 @@ 2.9.3 provided + + org.apache.spark + spark-sql_2.12 + 3.3.2 + provided + com.google.code.findbugs jsr305 @@ -84,12 +90,36 @@ 5.9.3 test + + io.delta + delta-core_2.12 + 2.2.0 + test + + + io.delta + delta-storage + 2.2.0 + test + + + org.apache.hudi + hudi-spark3.3-bundle_2.12 + 0.12.2 + test + + + org.apache.iceberg + iceberg-spark-runtime-3.3_2.12 + 1.1.0 + test + - spark-databricks + databricks-jdbc com.databricks @@ -99,7 +129,7 @@ - spark-oss + spark-jdbc org.apache.hive @@ -131,7 +161,17 @@ - trino-oss + spark-client + + + org.apache.spark + spark-sql_2.12 + 3.3.2 + + + + + trino-jdbc io.trino @@ -160,6 +200,7 @@ copy-dependencies + provided ${project.build.directory}/lib diff --git a/src/main/java/com/microsoft/lst_bench/Driver.java b/src/main/java/com/microsoft/lst_bench/Driver.java index 0dfb584c..738c1bf2 100644 --- a/src/main/java/com/microsoft/lst_bench/Driver.java +++ b/src/main/java/com/microsoft/lst_bench/Driver.java @@ -17,17 +17,18 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLMapper; +import com.microsoft.lst_bench.client.ConnectionManager; import com.microsoft.lst_bench.common.BenchmarkConfig; import com.microsoft.lst_bench.common.BenchmarkRunnable; import com.microsoft.lst_bench.common.LSTBenchmarkExecutor; +import com.microsoft.lst_bench.input.BenchmarkObjectFactory; import com.microsoft.lst_bench.input.TaskLibrary; import com.microsoft.lst_bench.input.Workload; import com.microsoft.lst_bench.input.config.ConnectionConfig; import com.microsoft.lst_bench.input.config.ConnectionsConfig; import com.microsoft.lst_bench.input.config.ExperimentConfig; import com.microsoft.lst_bench.input.config.TelemetryConfig; -import com.microsoft.lst_bench.sql.ConnectionManager; -import com.microsoft.lst_bench.telemetry.JDBCTelemetryRegistry; +import com.microsoft.lst_bench.telemetry.SQLTelemetryRegistry; import com.microsoft.lst_bench.telemetry.TelemetryHook; import java.io.File; import java.util.LinkedHashMap; @@ -114,10 +115,22 @@ public static void main(String[] args) throws Exception { final TelemetryConfig telemetryConfig = mapper.readValue(new File(inputTelemetryConfigFile), TelemetryConfig.class); + run(taskLibrary, workload, connectionsConfig, experimentConfig, telemetryConfig); + } + + /** Run benchmark. */ + public static void run( + TaskLibrary taskLibrary, + Workload workload, + ConnectionsConfig connectionsConfig, + ExperimentConfig experimentConfig, + TelemetryConfig telemetryConfig) + throws Exception { // Create connections manager Map idToConnectionManager = new LinkedHashMap<>(); for (ConnectionConfig connectionConfig : connectionsConfig.getConnections()) { - ConnectionManager connectionManager = ConnectionManager.from(connectionConfig); + ConnectionManager connectionManager = + BenchmarkObjectFactory.connectionManager(connectionConfig); if (idToConnectionManager.containsKey(connectionConfig.getId())) { throw new IllegalArgumentException("Duplicate connection id: " + connectionConfig.getId()); } @@ -126,9 +139,9 @@ public static void main(String[] args) throws Exception { // Create log utility final ConnectionManager telemetryConnectionManager = - ConnectionManager.from(telemetryConfig.getConnection()); - final JDBCTelemetryRegistry telemetryRegistry = - new JDBCTelemetryRegistry( + BenchmarkObjectFactory.connectionManager(telemetryConfig.getConnection()); + final SQLTelemetryRegistry telemetryRegistry = + new SQLTelemetryRegistry( telemetryConnectionManager, telemetryConfig.isExecuteDDL(), telemetryConfig.getDDLFile(), @@ -139,7 +152,7 @@ public static void main(String[] args) throws Exception { // Create experiment configuration final BenchmarkConfig benchmarkConfig = - BenchmarkConfig.from(experimentConfig, taskLibrary, workload); + BenchmarkObjectFactory.benchmarkConfig(experimentConfig, taskLibrary, workload); // Run experiment final BenchmarkRunnable experiment = diff --git a/src/main/java/com/microsoft/lst_bench/client/ClientException.java b/src/main/java/com/microsoft/lst_bench/client/ClientException.java new file mode 100644 index 00000000..d1f2af90 --- /dev/null +++ b/src/main/java/com/microsoft/lst_bench/client/ClientException.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) Microsoft Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.microsoft.lst_bench.client; + +/** A client exception. */ +public class ClientException extends Exception { + + public ClientException(Exception cause) { + super(cause); + } + + public ClientException(String message) { + super(message); + } +} diff --git a/src/main/java/com/microsoft/lst_bench/client/Connection.java b/src/main/java/com/microsoft/lst_bench/client/Connection.java new file mode 100644 index 00000000..9bf9deb7 --- /dev/null +++ b/src/main/java/com/microsoft/lst_bench/client/Connection.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) Microsoft Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.microsoft.lst_bench.client; + +/** A connection. */ +public interface Connection extends AutoCloseable { + + void execute(String sqlText) throws ClientException; + + void close() throws ClientException; +} diff --git a/src/main/java/com/microsoft/lst_bench/client/ConnectionManager.java b/src/main/java/com/microsoft/lst_bench/client/ConnectionManager.java new file mode 100644 index 00000000..553723db --- /dev/null +++ b/src/main/java/com/microsoft/lst_bench/client/ConnectionManager.java @@ -0,0 +1,22 @@ +/* + * Copyright (c) Microsoft Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.microsoft.lst_bench.client; + +/** A connection manager. */ +public interface ConnectionManager { + + Connection createConnection() throws ClientException; +} diff --git a/src/main/java/com/microsoft/lst_bench/client/JDBCConnection.java b/src/main/java/com/microsoft/lst_bench/client/JDBCConnection.java new file mode 100644 index 00000000..53b6ca56 --- /dev/null +++ b/src/main/java/com/microsoft/lst_bench/client/JDBCConnection.java @@ -0,0 +1,54 @@ +/* + * Copyright (c) Microsoft Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.microsoft.lst_bench.client; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +/** A JDBC connection. */ +public class JDBCConnection implements Connection { + + private final java.sql.Connection connection; + + public JDBCConnection(java.sql.Connection connection) { + this.connection = connection; + } + + @Override + public void execute(String sqlText) throws ClientException { + try (Statement s = connection.createStatement()) { + boolean hasResults = s.execute(sqlText); + if (hasResults) { + ResultSet rs = s.getResultSet(); + while (rs.next()) { + // do nothing + } + } + } catch (Exception e) { + throw new ClientException(e); + } + } + + @Override + public void close() throws ClientException { + try { + connection.close(); + } catch (SQLException e) { + throw new ClientException(e); + } + } +} diff --git a/src/main/java/com/microsoft/lst_bench/sql/ConnectionManager.java b/src/main/java/com/microsoft/lst_bench/client/JDBCConnectionManager.java similarity index 52% rename from src/main/java/com/microsoft/lst_bench/sql/ConnectionManager.java rename to src/main/java/com/microsoft/lst_bench/client/JDBCConnectionManager.java index f661145a..8eee9f9e 100644 --- a/src/main/java/com/microsoft/lst_bench/sql/ConnectionManager.java +++ b/src/main/java/com/microsoft/lst_bench/client/JDBCConnectionManager.java @@ -13,17 +13,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.microsoft.lst_bench.sql; +package com.microsoft.lst_bench.client; -import com.microsoft.lst_bench.input.config.ConnectionConfig; -import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import javax.annotation.Nullable; import org.apache.commons.lang3.StringUtils; /** Simple JDBC connection manager. */ -public class ConnectionManager { +public class JDBCConnectionManager implements ConnectionManager { private final String url; @@ -31,28 +29,22 @@ public class ConnectionManager { @Nullable private final String password; - private ConnectionManager(String url, String username, String password) { + public JDBCConnectionManager(String url, String username, String password) { this.url = url; this.username = username; this.password = password; } - public Connection createConnection() throws SQLException { - if (StringUtils.isEmpty(username)) { - return DriverManager.getConnection(url); - } else { - return DriverManager.getConnection(url, username, password); - } - } - - public static ConnectionManager from(ConnectionConfig connectionConfig) { + @Override + public Connection createConnection() throws ClientException { try { - Class.forName(connectionConfig.getDriver()); - } catch (ClassNotFoundException e) { - throw new IllegalArgumentException( - "Unable to load driver class: " + connectionConfig.getDriver(), e); + if (StringUtils.isEmpty(username)) { + return new JDBCConnection(DriverManager.getConnection(url)); + } else { + return new JDBCConnection(DriverManager.getConnection(url, username, password)); + } + } catch (SQLException e) { + throw new ClientException(e); } - return new ConnectionManager( - connectionConfig.getUrl(), connectionConfig.getUsername(), connectionConfig.getPassword()); } } diff --git a/src/main/java/com/microsoft/lst_bench/client/SparkConnection.java b/src/main/java/com/microsoft/lst_bench/client/SparkConnection.java new file mode 100644 index 00000000..3e5ae7a2 --- /dev/null +++ b/src/main/java/com/microsoft/lst_bench/client/SparkConnection.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) Microsoft Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.microsoft.lst_bench.client; + +import java.util.List; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +/** A Spark connection. */ +public class SparkConnection implements Connection { + + private final SparkSession session; + + public SparkConnection(SparkSession session) { + this.session = session; + } + + @Override + public void execute(String sqlText) { + List results = session.sql(sqlText).collectAsList(); + for (Row row : results) { + // do nothing + } + } + + @Override + public void close() { + session.close(); + } +} diff --git a/src/main/java/com/microsoft/lst_bench/client/SparkConnectionManager.java b/src/main/java/com/microsoft/lst_bench/client/SparkConnectionManager.java new file mode 100644 index 00000000..9f92e839 --- /dev/null +++ b/src/main/java/com/microsoft/lst_bench/client/SparkConnectionManager.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) Microsoft Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.microsoft.lst_bench.client; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.spark.sql.SparkSession; + +/** Simple Spark session connection manager. */ +public class SparkConnectionManager implements ConnectionManager { + + private final String url; + + private final Map config; + + public SparkConnectionManager(String url, Map config) { + this.url = url; + this.config = Collections.unmodifiableMap(config == null ? new HashMap<>() : config); + } + + public Connection createConnection() { + SparkSession.Builder builder = SparkSession.builder().master(url); + this.config.forEach(builder::config); + return new SparkConnection(builder.getOrCreate()); + } +} diff --git a/src/main/java/com/microsoft/lst_bench/common/BenchmarkConfig.java b/src/main/java/com/microsoft/lst_bench/common/BenchmarkConfig.java index 067279b1..e4f1ecd5 100644 --- a/src/main/java/com/microsoft/lst_bench/common/BenchmarkConfig.java +++ b/src/main/java/com/microsoft/lst_bench/common/BenchmarkConfig.java @@ -15,31 +15,10 @@ */ package com.microsoft.lst_bench.common; -import com.microsoft.lst_bench.exec.FileExec; -import com.microsoft.lst_bench.exec.ImmutablePhaseExec; -import com.microsoft.lst_bench.exec.ImmutableSessionExec; -import com.microsoft.lst_bench.exec.ImmutableTaskExec; -import com.microsoft.lst_bench.exec.ImmutableWorkloadExec; -import com.microsoft.lst_bench.exec.PhaseExec; -import com.microsoft.lst_bench.exec.SessionExec; -import com.microsoft.lst_bench.exec.TaskExec; import com.microsoft.lst_bench.exec.WorkloadExec; -import com.microsoft.lst_bench.input.Phase; -import com.microsoft.lst_bench.input.Session; -import com.microsoft.lst_bench.input.Task; -import com.microsoft.lst_bench.input.TaskLibrary; -import com.microsoft.lst_bench.input.TaskTemplate; -import com.microsoft.lst_bench.input.Workload; -import com.microsoft.lst_bench.input.config.ExperimentConfig; -import com.microsoft.lst_bench.sql.SQLParser; -import com.microsoft.lst_bench.util.FileParser; -import com.microsoft.lst_bench.util.StringUtils; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.stream.Collectors; /** A benchmark configuration. */ public class BenchmarkConfig { @@ -49,7 +28,7 @@ public class BenchmarkConfig { private final Map metadata; private final WorkloadExec workload; - private BenchmarkConfig( + public BenchmarkConfig( String id, int repetitions, Map metadata, WorkloadExec workload) { this.id = id; this.repetitions = repetitions; @@ -72,227 +51,4 @@ public Map getMetadata() { public WorkloadExec getWorkload() { return workload; } - - /** - * Creates a benchmark configuration from the experiment configuration, task library, and - * workload. - * - * @param experimentConfig the experiment configuration - * @param taskLibrary the task library - * @param workload the workload - * @return a benchmark configuration - */ - public static BenchmarkConfig from( - ExperimentConfig experimentConfig, TaskLibrary taskLibrary, Workload workload) { - Map idToTaskTemplate = parseTaskLibrary(taskLibrary); - ImmutableWorkloadExec workloadExec = - createWorkloadExec(workload, idToTaskTemplate, experimentConfig); - return new BenchmarkConfig( - experimentConfig.getId(), - experimentConfig.getRepetitions(), - experimentConfig.getMetadata(), - workloadExec); - } - - /** - * Parses the task library to create a map of task templates with unique IDs. - * - * @param taskLibrary the task library to parse - * @return a map of task templates with unique IDs - * @throws IllegalArgumentException if there are duplicate task template IDs - */ - private static Map parseTaskLibrary(TaskLibrary taskLibrary) { - Map idToTaskTemplate = new HashMap<>(); - for (TaskTemplate taskTemplate : taskLibrary.getTaskTemplates()) { - if (idToTaskTemplate.containsKey(taskTemplate.getId())) { - throw new IllegalArgumentException("Duplicate task template id: " + taskTemplate.getId()); - } - idToTaskTemplate.put(taskTemplate.getId(), taskTemplate); - } - return idToTaskTemplate; - } - - /** - * Creates a workload execution from the workload and task library. - * - * @param workload the workload to execute - * @param idToTaskTemplate a map of task templates with unique IDs - * @param experimentConfig the experiment configuration - * @return a workload execution - * @throws IllegalArgumentException if the workload contains an invalid task template ID - */ - private static ImmutableWorkloadExec createWorkloadExec( - Workload workload, - Map idToTaskTemplate, - ExperimentConfig experimentConfig) { - Map taskTemplateIdToPermuteOrderCounter = new HashMap<>(); - Map taskTemplateIdToParameterValuesCounter = new HashMap<>(); - List phases = new ArrayList<>(); - for (Phase phase : workload.getPhases()) { - PhaseExec phaseExec = - createPhaseExec( - phase, - idToTaskTemplate, - experimentConfig, - taskTemplateIdToPermuteOrderCounter, - taskTemplateIdToParameterValuesCounter); - phases.add(phaseExec); - } - return ImmutableWorkloadExec.of(workload.getId(), phases); - } - - private static PhaseExec createPhaseExec( - Phase phase, - Map idToTaskTemplate, - ExperimentConfig experimentConfig, - Map taskTemplateIdToPermuteOrderCounter, - Map taskTemplateIdToParameterValuesCounter) { - List sessions = new ArrayList<>(); - for (int i = 0; i < phase.getSessions().size(); i++) { - Session session = phase.getSessions().get(i); - String sessionId = String.valueOf(i); - SessionExec sessionExec = - createSessionExec( - sessionId, - session, - idToTaskTemplate, - experimentConfig, - taskTemplateIdToPermuteOrderCounter, - taskTemplateIdToParameterValuesCounter); - sessions.add(sessionExec); - } - return ImmutablePhaseExec.of(phase.getId(), sessions); - } - - private static SessionExec createSessionExec( - String sessionId, - Session session, - Map idToTaskTemplate, - ExperimentConfig experimentConfig, - Map taskTemplateIdToPermuteOrderCounter, - Map taskTemplateIdToParameterValuesCounter) { - List tasks = new ArrayList<>(); - for (int j = 0; j < session.getTasks().size(); j++) { - Task task = session.getTasks().get(j); - String taskId = task.getTemplateId() + "_" + j; - TaskExec taskExec = - createTaskExec( - taskId, - task, - idToTaskTemplate, - experimentConfig, - taskTemplateIdToPermuteOrderCounter, - taskTemplateIdToParameterValuesCounter); - tasks.add(taskExec); - } - return ImmutableSessionExec.of(sessionId, tasks); - } - - private static TaskExec createTaskExec( - String taskId, - Task task, - Map idToTaskTemplate, - ExperimentConfig experimentConfig, - Map taskTemplateIdToPermuteOrderCounter, - Map taskTemplateIdToParameterValuesCounter) { - TaskTemplate taskTemplate = idToTaskTemplate.get(task.getTemplateId()); - if (taskTemplate == null) { - throw new IllegalArgumentException("Unknown task template id: " + task.getTemplateId()); - } - List files = - createFileExecList( - taskTemplate, - task, - experimentConfig, - taskTemplateIdToPermuteOrderCounter, - taskTemplateIdToParameterValuesCounter); - return ImmutableTaskExec.of(taskId, files).withTimeTravelPhaseId(task.getTimeTravelPhaseId()); - } - - private static List createFileExecList( - TaskTemplate taskTemplate, - Task task, - ExperimentConfig experimentConfig, - Map taskTemplateIdToPermuteOrderCounter, - Map taskTemplateIdToParameterValuesCounter) { - List files = new ArrayList<>(); - for (String file : taskTemplate.getFiles()) { - files.add(SQLParser.getStatements(file)); - } - files = applyPermutationOrder(taskTemplate, task, taskTemplateIdToPermuteOrderCounter, files); - files = applyReplaceRegex(task, files); - files = - applyParameterValues( - taskTemplate, experimentConfig, taskTemplateIdToParameterValuesCounter, files); - return files; - } - - private static List applyPermutationOrder( - TaskTemplate taskTemplate, - Task task, - Map taskTemplateIdToPermuteOrderCounter, - List files) { - if (taskTemplate.getPermutationOrdersDirectory() == null) { - // Create statements with certain order - return files; - } - Map idToFile = new HashMap<>(); - for (FileExec file : files) { - idToFile.put(file.getId(), file); - } - int counter; - if (Boolean.TRUE.equals(task.isPermuteOrder())) { - counter = - taskTemplateIdToPermuteOrderCounter.compute( - taskTemplate.getId(), (k, v) -> v == null ? 1 : v + 1); - } else { - counter = 0; - } - List permutationOrder = - FileParser.getPermutationOrder(taskTemplate.getPermutationOrdersDirectory(), counter); - List sortedFiles = new ArrayList<>(); - for (String fileId : permutationOrder) { - sortedFiles.add(idToFile.get(fileId)); - } - return sortedFiles; - } - - private static List applyReplaceRegex(Task task, List files) { - if (task.getReplaceRegex() == null) { - return files; - } - return files.stream() - .map( - file -> { - for (Task.ReplaceRegex regex : task.getReplaceRegex()) { - file = StringUtils.replaceRegex(file, regex.getPattern(), regex.getReplacement()); - } - return file; - }) - .collect(Collectors.toList()); - } - - private static List applyParameterValues( - TaskTemplate taskTemplate, - ExperimentConfig experimentConfig, - Map taskTemplateIdToParameterValuesCounter, - List files) { - Map parameterValues = new HashMap<>(); - if (taskTemplate.getParameterValuesFile() != null) { - // Include parameter values defined in the task template - parameterValues.putAll( - FileParser.getParameterValues( - taskTemplate.getParameterValuesFile(), - taskTemplateIdToParameterValuesCounter.compute( - taskTemplate.getId(), (k, v) -> v == null ? 1 : v + 1))); - } - if (experimentConfig.getParameterValues() != null) { - // Include experiment-specific parameter values (they can override the ones defined in - // the task template) - parameterValues.putAll(experimentConfig.getParameterValues()); - } - return files.stream() - .map(f -> StringUtils.replaceParameters(f, parameterValues)) - .collect(Collectors.toList()); - } } diff --git a/src/main/java/com/microsoft/lst_bench/common/LSTBenchmarkExecutor.java b/src/main/java/com/microsoft/lst_bench/common/LSTBenchmarkExecutor.java index d6e7a1ce..f54eac66 100644 --- a/src/main/java/com/microsoft/lst_bench/common/LSTBenchmarkExecutor.java +++ b/src/main/java/com/microsoft/lst_bench/common/LSTBenchmarkExecutor.java @@ -16,24 +16,22 @@ package com.microsoft.lst_bench.common; import com.fasterxml.jackson.databind.ObjectMapper; +import com.microsoft.lst_bench.client.ClientException; +import com.microsoft.lst_bench.client.Connection; +import com.microsoft.lst_bench.client.ConnectionManager; import com.microsoft.lst_bench.exec.FileExec; import com.microsoft.lst_bench.exec.PhaseExec; import com.microsoft.lst_bench.exec.SessionExec; import com.microsoft.lst_bench.exec.StatementExec; import com.microsoft.lst_bench.exec.TaskExec; import com.microsoft.lst_bench.exec.WorkloadExec; -import com.microsoft.lst_bench.sql.ConnectionManager; import com.microsoft.lst_bench.telemetry.EventInfo; import com.microsoft.lst_bench.telemetry.EventInfo.EventType; import com.microsoft.lst_bench.telemetry.EventInfo.Status; import com.microsoft.lst_bench.telemetry.ImmutableEventInfo; -import com.microsoft.lst_bench.telemetry.JDBCTelemetryRegistry; +import com.microsoft.lst_bench.telemetry.SQLTelemetryRegistry; import com.microsoft.lst_bench.util.DateTimeFormatter; import com.microsoft.lst_bench.util.StringUtils; -import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.ArrayList; @@ -59,7 +57,7 @@ public class LSTBenchmarkExecutor extends BenchmarkRunnable { private final Map idToConnectionManager; private final BenchmarkConfig config; - private final JDBCTelemetryRegistry telemetryRegistry; + private final SQLTelemetryRegistry telemetryRegistry; // UUID to identify the experiment run. The experiment telemetry will be tagged with this UUID. private final UUID experimentRunId; @@ -67,7 +65,7 @@ public class LSTBenchmarkExecutor extends BenchmarkRunnable { public LSTBenchmarkExecutor( Map idToConnectionManager, BenchmarkConfig config, - JDBCTelemetryRegistry telemetryRegistry) { + SQLTelemetryRegistry telemetryRegistry) { super(); this.idToConnectionManager = Collections.unmodifiableMap(idToConnectionManager); this.config = config; @@ -238,7 +236,7 @@ public Worker( } @Override - public Boolean call() throws SQLException { + public Boolean call() throws ClientException { Instant sessionStartTime = Instant.now(); try (Connection connection = connectionManager.createConnection()) { for (TaskExec task : session.getTasks()) { @@ -263,21 +261,14 @@ public Boolean call() throws SQLException { } private void executeTask(Connection connection, TaskExec task, Map values) - throws SQLException { + throws ClientException { for (FileExec file : task.getFiles()) { Instant fileStartTime = Instant.now(); try { for (StatementExec statement : file.getStatements()) { Instant statementStartTime = Instant.now(); - try (Statement s = connection.createStatement()) { - boolean hasResults = - s.execute(StringUtils.replaceParameters(statement, values).getStatement()); - if (hasResults) { - ResultSet rs = s.getResultSet(); - while (rs.next()) { - // do nothing - } - } + try { + connection.execute(StringUtils.replaceParameters(statement, values).getStatement()); } catch (Exception e) { LOGGER.error("Exception executing statement: " + statement.getId()); writeStatementEvent(statementStartTime, statement.getId(), Status.FAILURE); diff --git a/src/main/java/com/microsoft/lst_bench/input/BenchmarkObjectFactory.java b/src/main/java/com/microsoft/lst_bench/input/BenchmarkObjectFactory.java new file mode 100644 index 00000000..2bb62ac4 --- /dev/null +++ b/src/main/java/com/microsoft/lst_bench/input/BenchmarkObjectFactory.java @@ -0,0 +1,305 @@ +/* + * Copyright (c) Microsoft Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.microsoft.lst_bench.input; + +import com.microsoft.lst_bench.client.ConnectionManager; +import com.microsoft.lst_bench.client.JDBCConnectionManager; +import com.microsoft.lst_bench.client.SparkConnectionManager; +import com.microsoft.lst_bench.common.BenchmarkConfig; +import com.microsoft.lst_bench.exec.FileExec; +import com.microsoft.lst_bench.exec.ImmutablePhaseExec; +import com.microsoft.lst_bench.exec.ImmutableSessionExec; +import com.microsoft.lst_bench.exec.ImmutableTaskExec; +import com.microsoft.lst_bench.exec.ImmutableWorkloadExec; +import com.microsoft.lst_bench.exec.PhaseExec; +import com.microsoft.lst_bench.exec.SessionExec; +import com.microsoft.lst_bench.exec.TaskExec; +import com.microsoft.lst_bench.input.config.ConnectionConfig; +import com.microsoft.lst_bench.input.config.ExperimentConfig; +import com.microsoft.lst_bench.input.config.JDBCConnectionConfig; +import com.microsoft.lst_bench.input.config.SparkConnectionConfig; +import com.microsoft.lst_bench.sql.SQLParser; +import com.microsoft.lst_bench.util.FileParser; +import com.microsoft.lst_bench.util.StringUtils; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** Factory class for creating benchmark objects from the input configuration. */ +public class BenchmarkObjectFactory { + + private BenchmarkObjectFactory() { + // Defeat instantiation + } + + /** + * Creates a connection manager from the connection configuration. + * + * @param connectionConfig the connection configuration + */ + public static ConnectionManager connectionManager(ConnectionConfig connectionConfig) { + if (connectionConfig instanceof JDBCConnectionConfig) { + return jdbcConnectionManager((JDBCConnectionConfig) connectionConfig); + } else if (connectionConfig instanceof SparkConnectionConfig) { + return sparkConnectionManager((SparkConnectionConfig) connectionConfig); + } else { + throw new IllegalArgumentException( + "Unsupported connection config type: " + connectionConfig.getClass().getName()); + } + } + + private static JDBCConnectionManager jdbcConnectionManager( + JDBCConnectionConfig connectionConfig) { + try { + Class.forName(connectionConfig.getDriver()); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException( + "Unable to load driver class: " + connectionConfig.getDriver(), e); + } + return new JDBCConnectionManager( + connectionConfig.getUrl(), connectionConfig.getUsername(), connectionConfig.getPassword()); + } + + private static SparkConnectionManager sparkConnectionManager( + SparkConnectionConfig connectionConfig) { + return new SparkConnectionManager(connectionConfig.getUrl(), connectionConfig.getConfig()); + } + + /** + * Creates a benchmark configuration from the experiment configuration, task library, and + * workload. + * + * @param experimentConfig the experiment configuration + * @param taskLibrary the task library + * @param workload the workload + * @return a benchmark configuration + */ + public static BenchmarkConfig benchmarkConfig( + ExperimentConfig experimentConfig, TaskLibrary taskLibrary, Workload workload) { + Map idToTaskTemplate = parseTaskLibrary(taskLibrary); + ImmutableWorkloadExec workloadExec = + createWorkloadExec(workload, idToTaskTemplate, experimentConfig); + return new BenchmarkConfig( + experimentConfig.getId(), + experimentConfig.getRepetitions(), + experimentConfig.getMetadata(), + workloadExec); + } + + /** + * Parses the task library to create a map of task templates with unique IDs. + * + * @param taskLibrary the task library to parse + * @return a map of task templates with unique IDs + * @throws IllegalArgumentException if there are duplicate task template IDs + */ + private static Map parseTaskLibrary(TaskLibrary taskLibrary) { + Map idToTaskTemplate = new HashMap<>(); + for (TaskTemplate taskTemplate : taskLibrary.getTaskTemplates()) { + if (idToTaskTemplate.containsKey(taskTemplate.getId())) { + throw new IllegalArgumentException("Duplicate task template id: " + taskTemplate.getId()); + } + idToTaskTemplate.put(taskTemplate.getId(), taskTemplate); + } + return idToTaskTemplate; + } + + /** + * Creates a workload execution from the workload and task library. + * + * @param workload the workload to execute + * @param idToTaskTemplate a map of task templates with unique IDs + * @param experimentConfig the experiment configuration + * @return a workload execution + * @throws IllegalArgumentException if the workload contains an invalid task template ID + */ + private static ImmutableWorkloadExec createWorkloadExec( + Workload workload, + Map idToTaskTemplate, + ExperimentConfig experimentConfig) { + Map taskTemplateIdToPermuteOrderCounter = new HashMap<>(); + Map taskTemplateIdToParameterValuesCounter = new HashMap<>(); + List phases = new ArrayList<>(); + for (Phase phase : workload.getPhases()) { + PhaseExec phaseExec = + createPhaseExec( + phase, + idToTaskTemplate, + experimentConfig, + taskTemplateIdToPermuteOrderCounter, + taskTemplateIdToParameterValuesCounter); + phases.add(phaseExec); + } + return ImmutableWorkloadExec.of(workload.getId(), phases); + } + + private static PhaseExec createPhaseExec( + Phase phase, + Map idToTaskTemplate, + ExperimentConfig experimentConfig, + Map taskTemplateIdToPermuteOrderCounter, + Map taskTemplateIdToParameterValuesCounter) { + List sessions = new ArrayList<>(); + for (int i = 0; i < phase.getSessions().size(); i++) { + Session session = phase.getSessions().get(i); + String sessionId = String.valueOf(i); + SessionExec sessionExec = + createSessionExec( + sessionId, + session, + idToTaskTemplate, + experimentConfig, + taskTemplateIdToPermuteOrderCounter, + taskTemplateIdToParameterValuesCounter); + sessions.add(sessionExec); + } + return ImmutablePhaseExec.of(phase.getId(), sessions); + } + + private static SessionExec createSessionExec( + String sessionId, + Session session, + Map idToTaskTemplate, + ExperimentConfig experimentConfig, + Map taskTemplateIdToPermuteOrderCounter, + Map taskTemplateIdToParameterValuesCounter) { + List tasks = new ArrayList<>(); + for (int j = 0; j < session.getTasks().size(); j++) { + Task task = session.getTasks().get(j); + String taskId = task.getTemplateId() + "_" + j; + TaskExec taskExec = + createTaskExec( + taskId, + task, + idToTaskTemplate, + experimentConfig, + taskTemplateIdToPermuteOrderCounter, + taskTemplateIdToParameterValuesCounter); + tasks.add(taskExec); + } + return ImmutableSessionExec.of(sessionId, tasks); + } + + private static TaskExec createTaskExec( + String taskId, + Task task, + Map idToTaskTemplate, + ExperimentConfig experimentConfig, + Map taskTemplateIdToPermuteOrderCounter, + Map taskTemplateIdToParameterValuesCounter) { + TaskTemplate taskTemplate = idToTaskTemplate.get(task.getTemplateId()); + if (taskTemplate == null) { + throw new IllegalArgumentException("Unknown task template id: " + task.getTemplateId()); + } + List files = + createFileExecList( + taskTemplate, + task, + experimentConfig, + taskTemplateIdToPermuteOrderCounter, + taskTemplateIdToParameterValuesCounter); + return ImmutableTaskExec.of(taskId, files).withTimeTravelPhaseId(task.getTimeTravelPhaseId()); + } + + private static List createFileExecList( + TaskTemplate taskTemplate, + Task task, + ExperimentConfig experimentConfig, + Map taskTemplateIdToPermuteOrderCounter, + Map taskTemplateIdToParameterValuesCounter) { + List files = new ArrayList<>(); + for (String file : taskTemplate.getFiles()) { + files.add(SQLParser.getStatements(file)); + } + files = applyPermutationOrder(taskTemplate, task, taskTemplateIdToPermuteOrderCounter, files); + files = applyReplaceRegex(task, files); + files = + applyParameterValues( + taskTemplate, experimentConfig, taskTemplateIdToParameterValuesCounter, files); + return files; + } + + private static List applyPermutationOrder( + TaskTemplate taskTemplate, + Task task, + Map taskTemplateIdToPermuteOrderCounter, + List files) { + if (taskTemplate.getPermutationOrdersDirectory() == null) { + // Create statements with certain order + return files; + } + Map idToFile = new HashMap<>(); + for (FileExec file : files) { + idToFile.put(file.getId(), file); + } + int counter; + if (Boolean.TRUE.equals(task.isPermuteOrder())) { + counter = + taskTemplateIdToPermuteOrderCounter.compute( + taskTemplate.getId(), (k, v) -> v == null ? 1 : v + 1); + } else { + counter = 0; + } + List permutationOrder = + FileParser.getPermutationOrder(taskTemplate.getPermutationOrdersDirectory(), counter); + List sortedFiles = new ArrayList<>(); + for (String fileId : permutationOrder) { + sortedFiles.add(idToFile.get(fileId)); + } + return sortedFiles; + } + + private static List applyReplaceRegex(Task task, List files) { + if (task.getReplaceRegex() == null) { + return files; + } + return files.stream() + .map( + file -> { + for (Task.ReplaceRegex regex : task.getReplaceRegex()) { + file = StringUtils.replaceRegex(file, regex.getPattern(), regex.getReplacement()); + } + return file; + }) + .collect(Collectors.toList()); + } + + private static List applyParameterValues( + TaskTemplate taskTemplate, + ExperimentConfig experimentConfig, + Map taskTemplateIdToParameterValuesCounter, + List files) { + Map parameterValues = new HashMap<>(); + if (taskTemplate.getParameterValuesFile() != null) { + // Include parameter values defined in the task template + parameterValues.putAll( + FileParser.getParameterValues( + taskTemplate.getParameterValuesFile(), + taskTemplateIdToParameterValuesCounter.compute( + taskTemplate.getId(), (k, v) -> v == null ? 1 : v + 1))); + } + if (experimentConfig.getParameterValues() != null) { + // Include experiment-specific parameter values (they can override the ones defined in + // the task template) + parameterValues.putAll(experimentConfig.getParameterValues()); + } + return files.stream() + .map(f -> StringUtils.replaceParameters(f, parameterValues)) + .collect(Collectors.toList()); + } +} diff --git a/src/main/java/com/microsoft/lst_bench/input/config/ConnectionConfig.java b/src/main/java/com/microsoft/lst_bench/input/config/ConnectionConfig.java index 39727a60..409e22ec 100644 --- a/src/main/java/com/microsoft/lst_bench/input/config/ConnectionConfig.java +++ b/src/main/java/com/microsoft/lst_bench/input/config/ConnectionConfig.java @@ -15,26 +15,20 @@ */ package com.microsoft.lst_bench.input.config; -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.databind.annotation.JsonDeserialize; -import com.fasterxml.jackson.databind.annotation.JsonSerialize; -import javax.annotation.Nullable; -import org.immutables.value.Value; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; /** Represents a single input connection configuration. */ -@Value.Immutable -@Value.Style(jdkOnly = true) -@JsonSerialize(as = ImmutableConnectionConfig.class) -@JsonDeserialize(as = ImmutableConnectionConfig.class) -@JsonInclude(JsonInclude.Include.NON_NULL) +@JsonTypeInfo( + use = JsonTypeInfo.Id.NAME, + property = "type", + defaultImpl = JDBCConnectionConfig.class) +@JsonSubTypes({ + @JsonSubTypes.Type(value = JDBCConnectionConfig.class, name = "jdbc"), + @JsonSubTypes.Type(value = SparkConnectionConfig.class, name = "spark") +}) public interface ConnectionConfig { String getId(); - String getDriver(); - String getUrl(); - - @Nullable String getUsername(); - - @Nullable String getPassword(); } diff --git a/src/main/java/com/microsoft/lst_bench/input/config/JDBCConnectionConfig.java b/src/main/java/com/microsoft/lst_bench/input/config/JDBCConnectionConfig.java new file mode 100644 index 00000000..2ac6f47c --- /dev/null +++ b/src/main/java/com/microsoft/lst_bench/input/config/JDBCConnectionConfig.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) Microsoft Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.microsoft.lst_bench.input.config; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import javax.annotation.Nullable; +import org.immutables.value.Value; + +/** Represents a single input connection configuration. */ +@Value.Immutable +@Value.Style(jdkOnly = true) +@JsonSerialize(as = ImmutableJDBCConnectionConfig.class) +@JsonDeserialize(as = ImmutableJDBCConnectionConfig.class) +@JsonInclude(JsonInclude.Include.NON_NULL) +public interface JDBCConnectionConfig extends ConnectionConfig { + String getDriver(); + + @Nullable String getUsername(); + + @Nullable String getPassword(); +} diff --git a/src/main/java/com/microsoft/lst_bench/input/config/SparkConnectionConfig.java b/src/main/java/com/microsoft/lst_bench/input/config/SparkConnectionConfig.java new file mode 100644 index 00000000..ccddb229 --- /dev/null +++ b/src/main/java/com/microsoft/lst_bench/input/config/SparkConnectionConfig.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) Microsoft Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.microsoft.lst_bench.input.config; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import java.util.Map; +import javax.annotation.Nullable; +import org.immutables.value.Value; + +/** Represents a single input connection configuration. */ +@Value.Immutable +@Value.Style(jdkOnly = true) +@JsonSerialize(as = ImmutableSparkConnectionConfig.class) +@JsonDeserialize(as = ImmutableSparkConnectionConfig.class) +@JsonInclude(JsonInclude.Include.NON_NULL) +public interface SparkConnectionConfig extends ConnectionConfig { + @Nullable Map getConfig(); +} diff --git a/src/main/java/com/microsoft/lst_bench/telemetry/JDBCTelemetryRegistry.java b/src/main/java/com/microsoft/lst_bench/telemetry/SQLTelemetryRegistry.java similarity index 84% rename from src/main/java/com/microsoft/lst_bench/telemetry/JDBCTelemetryRegistry.java rename to src/main/java/com/microsoft/lst_bench/telemetry/SQLTelemetryRegistry.java index 8a23fa66..2a0249b1 100644 --- a/src/main/java/com/microsoft/lst_bench/telemetry/JDBCTelemetryRegistry.java +++ b/src/main/java/com/microsoft/lst_bench/telemetry/SQLTelemetryRegistry.java @@ -15,13 +15,12 @@ */ package com.microsoft.lst_bench.telemetry; +import com.microsoft.lst_bench.client.ClientException; +import com.microsoft.lst_bench.client.Connection; +import com.microsoft.lst_bench.client.ConnectionManager; import com.microsoft.lst_bench.exec.StatementExec; -import com.microsoft.lst_bench.sql.ConnectionManager; import com.microsoft.lst_bench.sql.SQLParser; import com.microsoft.lst_bench.util.StringUtils; -import java.sql.Connection; -import java.sql.SQLException; -import java.sql.Statement; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -31,10 +30,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** A telemetry registry that writes events to a JDBC database. */ -public class JDBCTelemetryRegistry { +/** A telemetry registry that writes events to a SQL-compatible database. */ +public class SQLTelemetryRegistry { - private static final Logger LOGGER = LoggerFactory.getLogger(JDBCTelemetryRegistry.class); + private static final Logger LOGGER = LoggerFactory.getLogger(SQLTelemetryRegistry.class); private final ConnectionManager connectionManager; @@ -43,13 +42,13 @@ public class JDBCTelemetryRegistry { // TODO: Make writing events thread-safe. private List eventsStream; - public JDBCTelemetryRegistry( + public SQLTelemetryRegistry( ConnectionManager connectionManager, boolean executeDdl, String ddlFile, String insertFile, Map parameterValues) - throws SQLException { + throws ClientException { this.connectionManager = connectionManager; this.eventsStream = Collections.synchronizedList(new ArrayList<>()); this.insertFileStatements = @@ -63,14 +62,14 @@ public JDBCTelemetryRegistry( } } - private void executeDdl(String ddlFile, Map parameterValues) throws SQLException { + private void executeDdl(String ddlFile, Map parameterValues) + throws ClientException { LOGGER.info("Creating new logging tables..."); - try (Connection connection = connectionManager.createConnection(); - Statement statement = connection.createStatement()) { + try (Connection connection = connectionManager.createConnection()) { List ddlFileStatements = SQLParser.getStatements(ddlFile).getStatements(); for (StatementExec query : ddlFileStatements) { String currentQuery = StringUtils.replaceParameters(query, parameterValues).getStatement(); - statement.execute(currentQuery); + connection.execute(currentQuery); } } LOGGER.info("Logging tables created."); @@ -86,8 +85,7 @@ public void flush() throws EventException { if (eventsStream.isEmpty()) return; LOGGER.info("Flushing events to database..."); - try (Connection connection = connectionManager.createConnection(); - Statement statement = connection.createStatement()) { + try (Connection connection = connectionManager.createConnection()) { Map values = new HashMap<>(); values.put( "tuples", @@ -106,12 +104,12 @@ public void flush() throws EventException { .collect(Collectors.joining("),(", "(", ")"))); for (StatementExec query : insertFileStatements) { String currentQuery = StringUtils.replaceParameters(query, values).getStatement(); - statement.execute(currentQuery); + connection.execute(currentQuery); } eventsStream = Collections.synchronizedList(new ArrayList<>()); LOGGER.info("Events flushed to database."); - } catch (SQLException e) { + } catch (ClientException e) { throw new EventException(e); } } diff --git a/src/main/java/com/microsoft/lst_bench/telemetry/TelemetryHook.java b/src/main/java/com/microsoft/lst_bench/telemetry/TelemetryHook.java index 1452c411..eeb46e17 100644 --- a/src/main/java/com/microsoft/lst_bench/telemetry/TelemetryHook.java +++ b/src/main/java/com/microsoft/lst_bench/telemetry/TelemetryHook.java @@ -17,9 +17,9 @@ public class TelemetryHook extends Thread { - private final JDBCTelemetryRegistry telemetryRegistry; + private final SQLTelemetryRegistry telemetryRegistry; - public TelemetryHook(JDBCTelemetryRegistry telemetryRegistry) { + public TelemetryHook(SQLTelemetryRegistry telemetryRegistry) { this.telemetryRegistry = telemetryRegistry; } diff --git a/src/main/resources/config/sample_connections_config.yaml b/src/main/resources/config/sample_connections_config.yaml index 1c4f483b..face2c1a 100644 --- a/src/main/resources/config/sample_connections_config.yaml +++ b/src/main/resources/config/sample_connections_config.yaml @@ -8,7 +8,13 @@ connections: username: admin password: p@ssw0rd0 - id: spark_1 + type: jdbc driver: org.apache.hive.jdbc.HiveDriver url: jdbc:hive2://127.0.0.1:10001 username: admin password: p@ssw0rd1 +- id: spark_2 + type: spark + url: spark://127.0.0.1:7077 + config: + spark.worker.timeout: "60" diff --git a/src/main/resources/schemas/connections_config.json b/src/main/resources/schemas/connections_config.json index 789c9870..ccbbe071 100644 --- a/src/main/resources/schemas/connections_config.json +++ b/src/main/resources/schemas/connections_config.json @@ -1,6 +1,63 @@ { "$schema": "https://json-schema.org/draft/2020-12/schema", - "$id": "https://example.com/schemas/lst-bench/connections_config.json", + "definitions": { + "connection": { + "type": "object", + "title": "Connection configuration", + "anyOf": [ { + "$ref": "#/definitions/connection/definitions/jdbc_connection" + }, { + "$ref": "#/definitions/connection/definitions/spark_connection" + } ], + "properties": { + "type": { + "type": "string", + "title": "Connection type (default: jdbc)" + }, + "id": { + "type": "string", + "title": "Identifier for the connection" + }, + "url": { + "type": "string", + "title": "Connection URL" + } + }, + "required": [ "id", "url" ], + "definitions": { + "jdbc_connection": { + "type": "object", + "title": "JDBC connection configuration", + "properties": { + "driver": { + "type": "string", + "title": "JDBC driver class name" + }, + "username": { + "type": "string", + "title": "Username for authentication" + }, + "password": { + "type": "string", + "title": "Password for authentication" + } + }, + "required": [ "driver" ] + }, + "spark_connection": { + "type": "object", + "title": "Spark connection configuration", + "properties": { + "config": { + "type": "object", + "title": "Configuration parameter values", + "description": "Map of configuration parameter name-value pairs for Spark" + } + } + } + } + } + }, "type": "object", "title": "Schema for connections configuration input file", "required": [ "version", "connections" ], @@ -13,31 +70,7 @@ "type": "array", "title": "List of JDBC connection configurations", "items": { - "type": "object", - "title": "Connection configuration", - "required": [ "id", "driver", "url" ], - "properties": { - "id": { - "type": "string", - "title": "Identifier for the connection" - }, - "driver": { - "type": "string", - "title": "JDBC driver class name" - }, - "url": { - "type": "string", - "title": "JDBC connection URL" - }, - "username": { - "type": "string", - "title": "Username for authentication" - }, - "password": { - "type": "string", - "title": "Password for authentication" - } - } + "$ref": "#/definitions/connection" } } } diff --git a/src/main/resources/schemas/experiment_config.json b/src/main/resources/schemas/experiment_config.json index fb559d66..5b26cc2c 100644 --- a/src/main/resources/schemas/experiment_config.json +++ b/src/main/resources/schemas/experiment_config.json @@ -1,6 +1,5 @@ { "$schema": "https://json-schema.org/draft/2020-12/schema", - "$id": "https://example.com/schemas/lst-bench/experiment_config.json", "type": "object", "title": "Schema for experiment configuration input file", "required": [ "version", "id", "repetitions" ], diff --git a/src/main/resources/schemas/task_library.json b/src/main/resources/schemas/task_library.json index 50d112cd..03dd185a 100644 --- a/src/main/resources/schemas/task_library.json +++ b/src/main/resources/schemas/task_library.json @@ -1,6 +1,5 @@ { "$schema": "https://json-schema.org/draft/2020-12/schema", - "$id": "https://example.com/schemas/lst-bench/task_library.json", "type": "object", "title": "Schema for task library definition file", "required": [ "version", "task_templates" ], diff --git a/src/main/resources/schemas/telemetry_config.json b/src/main/resources/schemas/telemetry_config.json index bc9e7185..e9ced2ff 100644 --- a/src/main/resources/schemas/telemetry_config.json +++ b/src/main/resources/schemas/telemetry_config.json @@ -1,6 +1,5 @@ { "$schema": "https://json-schema.org/draft/2020-12/schema", - "$id": "https://example.com/schemas/lst-bench/telemetry_config.json", "type": "object", "title": "Schema for telemetry configuration input file", "required": [ "version", "connection", "execute_ddl", "ddl_file", "insert_file" ], @@ -10,31 +9,7 @@ "title": "File format version" }, "connection": { - "type": "object", - "title": "Connection configuration", - "required": [ "id", "driver", "url" ], - "properties": { - "id": { - "type": "string", - "title": "Identifier for the connection" - }, - "driver": { - "type": "string", - "title": "JDBC driver class name" - }, - "url": { - "type": "string", - "title": "JDBC connection URL" - }, - "username": { - "type": "string", - "title": "Username for authentication" - }, - "password": { - "type": "string", - "title": "Password for authentication" - } - } + "$ref": "resource:/schemas/connections_config.json#/definitions/connection" }, "execute_ddl": { "type": "boolean", diff --git a/src/main/resources/schemas/workload.json b/src/main/resources/schemas/workload.json index ff818b69..8056a3a8 100644 --- a/src/main/resources/schemas/workload.json +++ b/src/main/resources/schemas/workload.json @@ -1,6 +1,5 @@ { "$schema": "https://json-schema.org/draft/2020-12/schema", - "$id": "https://example.com/schemas/lst-bench/workload.json", "type": "object", "title": "Schema for workload definition file", "required": [ "version", "id", "phases" ], diff --git a/src/test/java/com/microsoft/lst_bench/DriverSparkTest.java b/src/test/java/com/microsoft/lst_bench/DriverSparkTest.java index 13e6d208..48133966 100644 --- a/src/test/java/com/microsoft/lst_bench/DriverSparkTest.java +++ b/src/test/java/com/microsoft/lst_bench/DriverSparkTest.java @@ -15,8 +15,24 @@ */ package com.microsoft.lst_bench; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLMapper; +import com.microsoft.lst_bench.input.TaskLibrary; +import com.microsoft.lst_bench.input.Workload; +import com.microsoft.lst_bench.input.config.ConnectionsConfig; +import com.microsoft.lst_bench.input.config.ExperimentConfig; +import com.microsoft.lst_bench.input.config.ImmutableExperimentConfig; +import com.microsoft.lst_bench.input.config.TelemetryConfig; +import java.io.File; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.EnabledIfSystemProperty; +import org.junit.jupiter.api.io.TempDir; /** Unit test for LST-Bench driver running on Spark. */ @EnabledIfSystemProperty(named = "lst-bench.test.db", matches = "spark") @@ -24,55 +40,148 @@ public class DriverSparkTest { @Test @EnabledIfSystemProperty(named = "lst-bench.test.lst", matches = "delta") - public void testTPCDSW0Delta() throws Exception { - Driver.main( - new String[] { - "-c", - "src/test/resources/config/spark/connections_config.yaml", - "-e", - "src/test/resources/config/spark/experiment_config_delta.yaml", - "-t", - "src/test/resources/config/spark/telemetry_config.yaml", - "-l", - "src/main/resources/config/tpcds/task_library.yaml", - "-w", - "src/test/resources/config/spark/w_all_tpcds_delta.yaml" - }); + @EnabledIfSystemProperty(named = "lst-bench.test.connection", matches = "jdbc") + public void testJDBCTPCDSAllTasksDelta() throws Exception { + runDriver( + "src/test/resources/config/spark/jdbc_connection_config.yaml", + "src/test/resources/config/spark/experiment_config_delta.yaml", + "src/test/resources/config/spark/telemetry_config.yaml", + "src/main/resources/config/tpcds/task_library.yaml", + "src/test/resources/config/spark/w_all_tpcds_delta.yaml"); } @Test @EnabledIfSystemProperty(named = "lst-bench.test.lst", matches = "hudi") - public void testTPCDSW0Hudi() throws Exception { - Driver.main( - new String[] { - "-c", - "src/test/resources/config/spark/connections_config.yaml", - "-e", - "src/test/resources/config/spark/experiment_config_hudi.yaml", - "-t", - "src/test/resources/config/spark/telemetry_config.yaml", - "-l", - "src/main/resources/config/tpcds/task_library.yaml", - "-w", - "src/test/resources/config/spark/w_all_tpcds_hudi.yaml" - }); + @EnabledIfSystemProperty(named = "lst-bench.test.connection", matches = "jdbc") + public void testJDBCTPCDSAllTasksHudi() throws Exception { + runDriver( + "src/test/resources/config/spark/jdbc_connection_config.yaml", + "src/test/resources/config/spark/experiment_config_hudi.yaml", + "src/test/resources/config/spark/telemetry_config.yaml", + "src/main/resources/config/tpcds/task_library.yaml", + "src/test/resources/config/spark/w_all_tpcds_hudi.yaml"); } @Test @EnabledIfSystemProperty(named = "lst-bench.test.lst", matches = "iceberg") - public void testTPCDSW0Iceberg() throws Exception { - Driver.main( - new String[] { - "-c", - "src/test/resources/config/spark/connections_config.yaml", - "-e", - "src/test/resources/config/spark/experiment_config_iceberg.yaml", - "-t", - "src/test/resources/config/spark/telemetry_config.yaml", - "-l", - "src/main/resources/config/tpcds/task_library.yaml", - "-w", - "src/test/resources/config/spark/w_all_tpcds_iceberg.yaml" - }); + @EnabledIfSystemProperty(named = "lst-bench.test.connection", matches = "jdbc") + public void testJDBCTPCDSAllTasksIceberg() throws Exception { + runDriver( + "src/test/resources/config/spark/jdbc_connection_config.yaml", + "src/test/resources/config/spark/experiment_config_iceberg.yaml", + "src/test/resources/config/spark/telemetry_config.yaml", + "src/main/resources/config/tpcds/task_library.yaml", + "src/test/resources/config/spark/w_all_tpcds_iceberg.yaml"); + } + + private void runDriver(String arg0, String arg1, String arg2, String arg3, String arg4) + throws Exception { + Driver.main(new String[] {"-c", arg0, "-e", arg1, "-t", arg2, "-l", arg3, "-w", arg4}); + } + + @Test + @EnabledIfSystemProperty(named = "lst-bench.test.lst", matches = "delta") + @EnabledIfSystemProperty(named = "lst-bench.test.connection", matches = "spark") + public void testSparkSessionDelta(@TempDir Path tempDir) throws Exception { + testSparkSession( + "src/main/resources/config/tpcds/task_library.yaml", + "src/test/resources/config/spark/w_all_tpcds_single_session_delta.yaml", + "src/test/resources/config/spark/spark_connection_config_delta.yaml", + "src/test/resources/config/spark/experiment_config_delta.yaml", + "src/test/resources/config/spark/telemetry_config.yaml", + tempDir); + } + + @Test + @EnabledIfSystemProperty(named = "lst-bench.test.lst", matches = "hudi") + @EnabledIfSystemProperty(named = "lst-bench.test.connection", matches = "spark") + public void testSparkSessionHudi(@TempDir Path tempDir) throws Exception { + testSparkSession( + "src/main/resources/config/tpcds/task_library.yaml", + "src/test/resources/config/spark/w_all_tpcds_single_session_hudi.yaml", + "src/test/resources/config/spark/spark_connection_config_hudi.yaml", + "src/test/resources/config/spark/experiment_config_hudi.yaml", + "src/test/resources/config/spark/telemetry_config.yaml", + tempDir); + } + + @Test + @EnabledIfSystemProperty(named = "lst-bench.test.lst", matches = "iceberg") + @EnabledIfSystemProperty(named = "lst-bench.test.connection", matches = "spark") + public void testSparkSessionIceberg(@TempDir Path tempDir) throws Exception { + testSparkSession( + "src/main/resources/config/tpcds/task_library.yaml", + "src/test/resources/config/spark/w_all_tpcds_single_session_iceberg.yaml", + "src/test/resources/config/spark/spark_connection_config_iceberg.yaml", + "src/test/resources/config/spark/experiment_config_iceberg.yaml", + "src/test/resources/config/spark/telemetry_config.yaml", + tempDir); + } + + private void testSparkSession( + String arg0, String arg1, String arg2, String arg3, String arg4, Path tempDir) + throws Exception { + // Create Java objects from input files + final ObjectMapper mapper = new YAMLMapper(); + TaskLibrary taskLibrary = mapper.readValue(new File(arg0), TaskLibrary.class); + Workload workload = mapper.readValue(new File(arg1), Workload.class); + ConnectionsConfig connectionsConfig = mapper.readValue(new File(arg2), ConnectionsConfig.class); + ExperimentConfig experimentConfig = mapper.readValue(new File(arg3), ExperimentConfig.class); + TelemetryConfig telemetryConfig = mapper.readValue(new File(arg4), TelemetryConfig.class); + + // Setup path + experimentConfig = ingestTempDir(experimentConfig, tempDir); + createTempDirs( + Path.of( + Objects.requireNonNull(experimentConfig.getParameterValues()) + .get("external_data_path") + .toString())); + + // Run driver + Driver.run(taskLibrary, workload, connectionsConfig, experimentConfig, telemetryConfig); + } + + private ExperimentConfig ingestTempDir(ExperimentConfig experimentConfig, Path tempDir) { + Map parameterValues = + new HashMap<>(Objects.requireNonNull(experimentConfig.getParameterValues())); + parameterValues.compute("external_data_path", (k, value) -> tempDir.toString() + value); + parameterValues.compute("data_path", (k, value) -> tempDir.toString() + value); + return ImmutableExperimentConfig.builder() + .from(experimentConfig) + .parameterValues(parameterValues) + .build(); + } + + private void createTempDirs(Path tempDir) { + List tableDirs = + Arrays.asList( + "/call_center", + "/catalog_page", + "/catalog_returns", + "/catalog_sales", + "/customer", + "/customer_address", + "/customer_demographics", + "/date_dim", + "/household_demographics", + "/income_band", + "/inventory", + "/item", + "/promotion", + "/reason", + "/ship_mode", + "/store", + "/store_returns", + "/store_sales", + "/time_dim", + "/warehouse", + "/web_page", + "/web_returns", + "/web_sales", + "/web_site"); + for (String tableDir : tableDirs) { + File dir = new File(tempDir + tableDir); + dir.mkdirs(); + } } } diff --git a/src/test/java/com/microsoft/lst_bench/common/LSTBenchmarkExecutorTest.java b/src/test/java/com/microsoft/lst_bench/common/LSTBenchmarkExecutorTest.java index 71cc709e..864605f4 100644 --- a/src/test/java/com/microsoft/lst_bench/common/LSTBenchmarkExecutorTest.java +++ b/src/test/java/com/microsoft/lst_bench/common/LSTBenchmarkExecutorTest.java @@ -17,16 +17,17 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLMapper; +import com.microsoft.lst_bench.client.ConnectionManager; +import com.microsoft.lst_bench.input.BenchmarkObjectFactory; import com.microsoft.lst_bench.input.ImmutableTaskLibrary; import com.microsoft.lst_bench.input.ImmutableWorkload; import com.microsoft.lst_bench.input.TaskLibrary; import com.microsoft.lst_bench.input.Workload; import com.microsoft.lst_bench.input.config.ExperimentConfig; -import com.microsoft.lst_bench.input.config.ImmutableConnectionConfig; import com.microsoft.lst_bench.input.config.ImmutableExperimentConfig; +import com.microsoft.lst_bench.input.config.ImmutableJDBCConnectionConfig; import com.microsoft.lst_bench.input.config.TelemetryConfig; -import com.microsoft.lst_bench.sql.ConnectionManager; -import com.microsoft.lst_bench.telemetry.JDBCTelemetryRegistry; +import com.microsoft.lst_bench.telemetry.SQLTelemetryRegistry; import java.io.File; import java.net.URL; import java.util.HashMap; @@ -66,7 +67,7 @@ void testNoOpSetup() throws Exception { TaskLibrary taskLibrary = ImmutableTaskLibrary.builder().version(1).build(); Workload workload = ImmutableWorkload.builder().id("telemetryTest").version(1).build(); - var config = BenchmarkConfig.from(experimentConfig, taskLibrary, workload); + var config = BenchmarkObjectFactory.benchmarkConfig(experimentConfig, taskLibrary, workload); URL telemetryConfigFile = getClass().getClassLoader().getResource("./config/spark/telemetry_config.yaml"); @@ -76,14 +77,14 @@ void testNoOpSetup() throws Exception { mapper.readValue(new File(telemetryConfigFile.getFile()), TelemetryConfig.class); var uniqueTelemetryDbName = - ImmutableConnectionConfig.builder() + ImmutableJDBCConnectionConfig.builder() .from(telemetryConfig.getConnection()) .url("jdbc:duckdb:./" + telemetryDbFileName) .build(); - final JDBCTelemetryRegistry telemetryRegistry = - new JDBCTelemetryRegistry( - ConnectionManager.from(uniqueTelemetryDbName), + final SQLTelemetryRegistry telemetryRegistry = + new SQLTelemetryRegistry( + BenchmarkObjectFactory.connectionManager(uniqueTelemetryDbName), telemetryConfig.isExecuteDDL(), telemetryConfig.getDDLFile(), telemetryConfig.getInsertFile(), diff --git a/src/test/java/com/microsoft/lst_bench/input/ParserTest.java b/src/test/java/com/microsoft/lst_bench/input/ParserTest.java index da18d6ff..983d827f 100644 --- a/src/test/java/com/microsoft/lst_bench/input/ParserTest.java +++ b/src/test/java/com/microsoft/lst_bench/input/ParserTest.java @@ -17,9 +17,10 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLMapper; -import com.microsoft.lst_bench.input.config.ConnectionConfig; import com.microsoft.lst_bench.input.config.ConnectionsConfig; import com.microsoft.lst_bench.input.config.ExperimentConfig; +import com.microsoft.lst_bench.input.config.JDBCConnectionConfig; +import com.microsoft.lst_bench.input.config.SparkConnectionConfig; import com.microsoft.lst_bench.input.config.TelemetryConfig; import java.io.File; import java.io.IOException; @@ -88,19 +89,26 @@ public void testParseConnectionConfig() throws IOException { mapper.readValue( new File(CONFIG_PATH + "sample_connections_config.yaml"), ConnectionsConfig.class); Assertions.assertEquals(1, connectionsConfig.getVersion()); - Assertions.assertEquals(2, connectionsConfig.getConnections().size()); - ConnectionConfig connection0 = connectionsConfig.getConnections().get(0); + Assertions.assertEquals(3, connectionsConfig.getConnections().size()); + JDBCConnectionConfig connection0 = + (JDBCConnectionConfig) connectionsConfig.getConnections().get(0); Assertions.assertEquals("spark_0", connection0.getId()); Assertions.assertEquals("org.apache.hive.jdbc.HiveDriver", connection0.getDriver()); Assertions.assertEquals("jdbc:hive2://127.0.0.1:10000", connection0.getUrl()); Assertions.assertEquals("admin", connection0.getUsername()); Assertions.assertEquals("p@ssw0rd0", connection0.getPassword()); - ConnectionConfig connection1 = connectionsConfig.getConnections().get(1); + JDBCConnectionConfig connection1 = + (JDBCConnectionConfig) connectionsConfig.getConnections().get(1); Assertions.assertEquals("spark_1", connection1.getId()); Assertions.assertEquals("org.apache.hive.jdbc.HiveDriver", connection1.getDriver()); Assertions.assertEquals("jdbc:hive2://127.0.0.1:10001", connection1.getUrl()); Assertions.assertEquals("admin", connection1.getUsername()); Assertions.assertEquals("p@ssw0rd1", connection1.getPassword()); + SparkConnectionConfig connection2 = + (SparkConnectionConfig) connectionsConfig.getConnections().get(2); + Assertions.assertEquals("spark_2", connection2.getId()); + Assertions.assertEquals("spark://127.0.0.1:7077", connection2.getUrl()); + // TODO } @Test @@ -546,11 +554,12 @@ public void testParseTelemetryConfig() throws IOException { new File(CONFIG_PATH + "sample_telemetry_config.yaml"), TelemetryConfig.class); Assertions.assertEquals(1, telemetryConfig.getVersion()); Assertions.assertNotNull(telemetryConfig.getConnection()); - Assertions.assertEquals("duckdb_0", telemetryConfig.getConnection().getId()); - Assertions.assertEquals("org.duckdb.DuckDBDriver", telemetryConfig.getConnection().getDriver()); - Assertions.assertEquals("jdbc:duckdb:./telemetry", telemetryConfig.getConnection().getUrl()); - Assertions.assertNull(telemetryConfig.getConnection().getUsername()); - Assertions.assertNull(telemetryConfig.getConnection().getPassword()); + JDBCConnectionConfig connectionConfig = (JDBCConnectionConfig) telemetryConfig.getConnection(); + Assertions.assertEquals("duckdb_0", connectionConfig.getId()); + Assertions.assertEquals("org.duckdb.DuckDBDriver", connectionConfig.getDriver()); + Assertions.assertEquals("jdbc:duckdb:./telemetry", connectionConfig.getUrl()); + Assertions.assertNull(connectionConfig.getUsername()); + Assertions.assertNull(connectionConfig.getPassword()); Assertions.assertEquals(Boolean.TRUE, telemetryConfig.isExecuteDDL()); Assertions.assertEquals( "src/main/resources/scripts/logging/duckdb/ddl.sql", telemetryConfig.getDDLFile()); diff --git a/src/test/java/com/microsoft/lst_bench/input/ValidationTest.java b/src/test/java/com/microsoft/lst_bench/input/ValidationTest.java index 12f781de..dc26988e 100644 --- a/src/test/java/com/microsoft/lst_bench/input/ValidationTest.java +++ b/src/test/java/com/microsoft/lst_bench/input/ValidationTest.java @@ -89,8 +89,29 @@ public void testValidationExperimentConfig() throws IOException { 0, errorsFromPOJO.size(), () -> "Errors found in validation: " + errorsFromPOJO); } - @Test - public void testValidationConnectionsConfig() throws IOException { + @ParameterizedTest + @EnabledOnOs({OS.LINUX, OS.MAC}) + @ValueSource( + strings = { + "src/main/resources/config/sample_connections_config.yaml", + "src/test/resources/config/validation/connections_config_test0.yaml" + }) + public void testValidationConnectionsConfigUnix(String configFilePath) throws IOException { + testValidationConnectionsConfig(configFilePath); + } + + @ParameterizedTest + @EnabledOnOs({OS.WINDOWS}) + @ValueSource( + strings = { + "src\\main\\resources\\config\\sample_connections_config.yaml", + "src\\test\\resources\\config\\validation\\connections_config_test0.yaml" + }) + public void testValidationConnectionsConfigWin(String configFilePath) throws IOException { + testValidationConnectionsConfig(configFilePath); + } + + private void testValidationConnectionsConfig(String configFilePath) throws IOException { ObjectMapper mapper = new YAMLMapper(); // Read schema JsonSchemaFactory factory = @@ -101,16 +122,13 @@ public void testValidationConnectionsConfig() throws IOException { factory.getSchema( Files.newInputStream(Paths.get(SCHEMAS_PATH + "connections_config.json"))); // Validate YAML file contents - JsonNode jsonNodeDirect = - mapper.readTree( - Files.newInputStream(Paths.get(CONFIG_PATH + "sample_connections_config.yaml"))); + JsonNode jsonNodeDirect = mapper.readTree(Files.newInputStream(Paths.get(configFilePath))); Set errorsFromFile = schema.validate(jsonNodeDirect); Assertions.assertEquals( 0, errorsFromFile.size(), () -> "Errors found in validation: " + errorsFromFile); // Validate YAML generated from POJO object ConnectionsConfig connectionsConfig = - mapper.readValue( - new File(CONFIG_PATH + "sample_connections_config.yaml"), ConnectionsConfig.class); + mapper.readValue(new File(configFilePath), ConnectionsConfig.class); JsonNode jsonNodeObject = mapper.convertValue(connectionsConfig, JsonNode.class); Set errorsFromPOJO = schema.validate(jsonNodeObject); Assertions.assertEquals( @@ -158,26 +176,8 @@ public void testValidationTaskLibrary() throws IOException { "src/main/resources/config/tpcds/wp3_rw_concurrency.yaml", "src/main/resources/config/tpcds/wp4_time_travel.yaml" }) - public void testValidationWorkload(String workloadFilePath) throws IOException { - ObjectMapper mapper = new YAMLMapper(); - // Read schema - JsonSchemaFactory factory = - JsonSchemaFactory.builder(JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V202012)) - .objectMapper(mapper) - .build(); - JsonSchema schema = - factory.getSchema(Files.newInputStream(Paths.get(SCHEMAS_PATH + "workload.json"))); - // Validate YAML file contents - JsonNode jsonNodeDirect = mapper.readTree(Files.newInputStream(Paths.get(workloadFilePath))); - Set errorsFromFile = schema.validate(jsonNodeDirect); - Assertions.assertEquals( - 0, errorsFromFile.size(), () -> "Errors found in validation: " + errorsFromFile); - // Validate YAML generated from POJO object - Workload workload = mapper.readValue(new File(workloadFilePath), Workload.class); - JsonNode jsonNodeObject = mapper.convertValue(workload, JsonNode.class); - Set errorsFromPOJO = schema.validate(jsonNodeObject); - Assertions.assertEquals( - 0, errorsFromPOJO.size(), () -> "Errors found in validation: " + errorsFromPOJO); + public void testValidationWorkloadUnix(String workloadFilePath) throws IOException { + testValidationWorkload(workloadFilePath); } @ParameterizedTest @@ -193,6 +193,10 @@ public void testValidationWorkload(String workloadFilePath) throws IOException { "src\\main\\resources\\config\\tpcds\\wp4_time_travel.yaml" }) public void testValidationWorkloadWin(String workloadFilePath) throws IOException { + testValidationWorkload(workloadFilePath); + } + + private void testValidationWorkload(String workloadFilePath) throws IOException { ObjectMapper mapper = new YAMLMapper(); // Read schema JsonSchemaFactory factory = diff --git a/src/test/resources/config/spark/connections_config.yaml b/src/test/resources/config/spark/jdbc_connection_config.yaml similarity index 100% rename from src/test/resources/config/spark/connections_config.yaml rename to src/test/resources/config/spark/jdbc_connection_config.yaml diff --git a/src/test/resources/config/spark/spark_connection_config_delta.yaml b/src/test/resources/config/spark/spark_connection_config_delta.yaml new file mode 100644 index 00000000..6bf47e47 --- /dev/null +++ b/src/test/resources/config/spark/spark_connection_config_delta.yaml @@ -0,0 +1,11 @@ +# Description: Connections Configuration +--- +version: 1 +connections: +- id: spark_0 + type: spark + url: local[*] + config: + spark.sql.catalog.spark_catalog: org.apache.spark.sql.delta.catalog.DeltaCatalog + spark.sql.extensions: io.delta.sql.DeltaSparkSessionExtension + diff --git a/src/test/resources/config/spark/spark_connection_config_hudi.yaml b/src/test/resources/config/spark/spark_connection_config_hudi.yaml new file mode 100644 index 00000000..d9314ffe --- /dev/null +++ b/src/test/resources/config/spark/spark_connection_config_hudi.yaml @@ -0,0 +1,10 @@ +# Description: Connections Configuration +--- +version: 1 +connections: +- id: spark_0 + type: spark + url: local[*] + config: + spark.sql.catalog.spark_catalog: org.apache.spark.sql.hudi.catalog.HoodieCatalog + spark.sql.extensions: org.apache.spark.sql.hudi.HoodieSparkSessionExtension diff --git a/src/test/resources/config/spark/spark_connection_config_iceberg.yaml b/src/test/resources/config/spark/spark_connection_config_iceberg.yaml new file mode 100644 index 00000000..1bd4f305 --- /dev/null +++ b/src/test/resources/config/spark/spark_connection_config_iceberg.yaml @@ -0,0 +1,10 @@ +# Description: Connections Configuration +--- +version: 1 +connections: +- id: spark_0 + type: spark + url: local[*] + config: + spark.sql.catalog.spark_catalog: org.apache.iceberg.spark.SparkSessionCatalog + spark.sql.extensions: org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions diff --git a/src/test/resources/config/spark/w_all_tpcds_single_session_delta.yaml b/src/test/resources/config/spark/w_all_tpcds_single_session_delta.yaml new file mode 100644 index 00000000..dd602b53 --- /dev/null +++ b/src/test/resources/config/spark/w_all_tpcds_single_session_delta.yaml @@ -0,0 +1,15 @@ +# Description: Workload for test: All task types, TPC-DS, Delta +--- +version: 1 +id: w_all_tpcds_single_session +phases: +- id: all + sessions: + - tasks: + - template_id: setup + - template_id: setup_data_maintenance + - template_id: init + - template_id: build + - template_id: single_user + - template_id: data_maintenance_delta + - template_id: optimize_delta diff --git a/src/test/resources/config/spark/w_all_tpcds_single_session_hudi.yaml b/src/test/resources/config/spark/w_all_tpcds_single_session_hudi.yaml new file mode 100644 index 00000000..1a45170e --- /dev/null +++ b/src/test/resources/config/spark/w_all_tpcds_single_session_hudi.yaml @@ -0,0 +1,18 @@ +# Description: Workload for test: All task types, TPC-DS, Hudi +--- +version: 1 +id: w_all_tpcds_single_session +phases: +- id: all + sessions: + - tasks: + - template_id: setup + - template_id: setup_data_maintenance + - template_id: init + - template_id: build + replace_regex: + - pattern: '(?i)varchar\(.*\)|char\(.*\)' + replacement: 'string' + - template_id: single_user + - template_id: data_maintenance_hudi + - template_id: optimize_hudi diff --git a/src/test/resources/config/spark/w_all_tpcds_single_session_iceberg.yaml b/src/test/resources/config/spark/w_all_tpcds_single_session_iceberg.yaml new file mode 100644 index 00000000..d29d2f49 --- /dev/null +++ b/src/test/resources/config/spark/w_all_tpcds_single_session_iceberg.yaml @@ -0,0 +1,15 @@ +# Description: Workload for test: All task types, TPC-DS, Iceberg +--- +version: 1 +id: w_all_tpcds_single_session +phases: +- id: all + sessions: + - tasks: + - template_id: setup + - template_id: setup_data_maintenance + - template_id: init + - template_id: build + - template_id: single_user + - template_id: data_maintenance_iceberg + - template_id: optimize_iceberg diff --git a/src/test/resources/config/validation/connections_config_test0.yaml b/src/test/resources/config/validation/connections_config_test0.yaml new file mode 100644 index 00000000..aba8e4e5 --- /dev/null +++ b/src/test/resources/config/validation/connections_config_test0.yaml @@ -0,0 +1,21 @@ +# Description: Connections Configuration +--- +version: 1 +connections: +- id: spark_0 + driver: org.apache.hive.jdbc.HiveDriver + url: jdbc:hive2://127.0.0.1:10000 + username: admin + password: p@ssw0rd0 +- id: spark_1 + type: jdbc + driver: org.apache.hive.jdbc.HiveDriver + url: jdbc:hive2://127.0.0.1:10001 +- id: spark_2 + type: spark + url: spark://127.0.0.1:7077 +- id: spark_3 + type: spark + url: spark://127.0.0.1:7078 + config: + spark.worker.timeout: "60"