From 65dda9dcf1c55257ec9027848a3a2c6cf80a2537 Mon Sep 17 00:00:00 2001 From: beliefer Date: Wed, 2 Oct 2024 17:46:34 +0800 Subject: [PATCH] [SPARK-49856][SQL] Refactor the compileExpression of JdbcDialect for simplify the subclass --- .../org/apache/spark/sql/jdbc/DB2Dialect.scala | 14 +------------- .../org/apache/spark/sql/jdbc/H2Dialect.scala | 16 +++------------- .../org/apache/spark/sql/jdbc/JdbcDialects.scala | 9 +++++++-- .../spark/sql/jdbc/MsSqlServerDialect.scala | 13 +------------ .../org/apache/spark/sql/jdbc/MySQLDialect.scala | 14 ++------------ .../apache/spark/sql/jdbc/OracleDialect.scala | 14 +------------- 6 files changed, 15 insertions(+), 65 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala index f7cf70ac957ba..634f5de02e87c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala @@ -20,14 +20,11 @@ package org.apache.spark.sql.jdbc import java.sql.{SQLException, Types} import java.util.Locale -import scala.util.control.NonFatal - import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException import org.apache.spark.sql.connector.catalog.Identifier -import org.apache.spark.sql.connector.expressions.Expression import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions import org.apache.spark.sql.types._ @@ -72,16 +69,7 @@ private case class DB2Dialect() extends JdbcDialect with SQLConfHelper with NoLe } } - override def compileExpression(expr: Expression): Option[String] = { - val db2SQLBuilder = new DB2SQLBuilder() - try { - Some(db2SQLBuilder.build(expr)) - } catch { - case NonFatal(e) => - logWarning("Error occurs while compiling V2 expression", e) - None - } - } + override def jdbcSQLBuilder(): JDBCSQLBuilder = new DB2SQLBuilder() override def getCatalystType( sqlType: Int, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala index 3ece44ece9e6a..c7e9d645346a3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala @@ -23,7 +23,6 @@ import java.util.Locale import java.util.concurrent.ConcurrentHashMap import scala.jdk.CollectionConverters._ -import scala.util.control.NonFatal import org.apache.commons.lang3.StringUtils @@ -33,7 +32,7 @@ import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSu import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.catalog.functions.UnboundFunction import org.apache.spark.sql.connector.catalog.index.TableIndex -import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, NamedReference} +import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference} import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DecimalType, MetadataBuilder, ShortType, StringType, TimestampType} @@ -247,17 +246,6 @@ private[sql] case class H2Dialect() extends JdbcDialect with NoLegacyJDBCError { super.classifyException(e, errorClass, messageParameters, description) } - override def compileExpression(expr: Expression): Option[String] = { - val h2SQLBuilder = new H2SQLBuilder() - try { - Some(h2SQLBuilder.build(expr)) - } catch { - case NonFatal(e) => - logWarning("Error occurs while compiling V2 expression", e) - None - } - } - class H2SQLBuilder extends JDBCSQLBuilder { override def visitAggregateFunction( @@ -303,6 +291,8 @@ private[sql] case class H2Dialect() extends JdbcDialect with NoLegacyJDBCError { } } + override def jdbcSQLBuilder(): JDBCSQLBuilder = new H2SQLBuilder() + override def supportsLimit: Boolean = true override def supportsOffset: Boolean = true diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 290665020f883..4f2660e7b53a4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -476,6 +476,12 @@ abstract class JdbcDialect extends Serializable with Logging { @Since("3.3.0") def isSupportedFunction(funcName: String): Boolean = false + /** + * The [[JDBCSQLBuilder]] that match database dialect. + */ + @Since("4.0.0") + protected[jdbc] def jdbcSQLBuilder(): JDBCSQLBuilder = new JDBCSQLBuilder() + /** * Converts V2 expression to String representing a SQL expression. * @param expr The V2 expression to be converted. @@ -483,9 +489,8 @@ abstract class JdbcDialect extends Serializable with Logging { */ @Since("3.3.0") def compileExpression(expr: Expression): Option[String] = { - val jdbcSQLBuilder = new JDBCSQLBuilder() try { - Some(jdbcSQLBuilder.build(expr)) + Some(jdbcSQLBuilder().build(expr)) } catch { case NonFatal(e) => logWarning("Error occurs while compiling V2 expression", e) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala index 369f710edccf0..2c8eaf3a29214 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala @@ -20,8 +20,6 @@ package org.apache.spark.sql.jdbc import java.sql.SQLException import java.util.Locale -import scala.util.control.NonFatal - import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException import org.apache.spark.sql.connector.catalog.Identifier @@ -100,16 +98,7 @@ private case class MsSqlServerDialect() extends JdbcDialect with NoLegacyJDBCErr } } - override def compileExpression(expr: Expression): Option[String] = { - val msSqlServerSQLBuilder = new MsSqlServerSQLBuilder() - try { - Some(msSqlServerSQLBuilder.build(expr)) - } catch { - case NonFatal(e) => - logWarning("Error occurs while compiling V2 expression", e) - None - } - } + override def jdbcSQLBuilder(): JDBCSQLBuilder = new MsSqlServerSQLBuilder() override def getCatalystType( sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index 785bf5b13aa78..e69bbd09a6477 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -22,7 +22,6 @@ import java.util import java.util.Locale import scala.collection.mutable.ArrayBuilder -import scala.util.control.NonFatal import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.sql.AnalysisException @@ -30,7 +29,7 @@ import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException} import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.catalog.index.TableIndex -import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, NamedReference, NullOrdering, SortDirection} +import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference, NullOrdering, SortDirection} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} import org.apache.spark.sql.types._ @@ -115,16 +114,7 @@ private case class MySQLDialect() extends JdbcDialect with SQLConfHelper with No } } - override def compileExpression(expr: Expression): Option[String] = { - val mysqlSQLBuilder = new MySQLSQLBuilder() - try { - Some(mysqlSQLBuilder.build(expr)) - } catch { - case NonFatal(e) => - logWarning("Error occurs while compiling V2 expression", e) - None - } - } + override def jdbcSQLBuilder(): JDBCSQLBuilder = new MySQLSQLBuilder() override def getCatalystType( sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala index 6175b5f659932..b2e6563a1f4a8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala @@ -20,12 +20,9 @@ package org.apache.spark.sql.jdbc import java.sql.{Date, SQLException, Timestamp, Types} import java.util.Locale -import scala.util.control.NonFatal - import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.SQLConfHelper -import org.apache.spark.sql.connector.expressions.Expression import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions import org.apache.spark.sql.jdbc.OracleDialect._ @@ -64,16 +61,7 @@ private case class OracleDialect() extends JdbcDialect with SQLConfHelper with N } } - override def compileExpression(expr: Expression): Option[String] = { - val oracleSQLBuilder = new OracleSQLBuilder() - try { - Some(oracleSQLBuilder.build(expr)) - } catch { - case NonFatal(e) => - logWarning("Error occurs while compiling V2 expression", e) - None - } - } + override def jdbcSQLBuilder(): JDBCSQLBuilder = new OracleSQLBuilder() override def getCatalystType( sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {