From b94fd136e556157d763a6c88da16e7063dc8283f Mon Sep 17 00:00:00 2001 From: Chris Cranford Date: Fri, 24 May 2024 10:03:04 -0400 Subject: [PATCH] DBZ-7874 Consolidate the MariaDB and MySQL dialects --- .../dialect/mysql/MariaDbDatabaseDialect.java | 111 +----------------- .../dialect/mysql/MySqlDatabaseDialect.java | 2 +- 2 files changed, 2 insertions(+), 111 deletions(-) diff --git a/src/main/java/io/debezium/connector/jdbc/dialect/mysql/MariaDbDatabaseDialect.java b/src/main/java/io/debezium/connector/jdbc/dialect/mysql/MariaDbDatabaseDialect.java index d214406f..48bef79c 100644 --- a/src/main/java/io/debezium/connector/jdbc/dialect/mysql/MariaDbDatabaseDialect.java +++ b/src/main/java/io/debezium/connector/jdbc/dialect/mysql/MariaDbDatabaseDialect.java @@ -5,18 +5,9 @@ */ package io.debezium.connector.jdbc.dialect.mysql; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; -import java.time.format.DateTimeFormatterBuilder; -import java.time.temporal.TemporalAccessor; -import java.util.Arrays; import java.util.List; -import java.util.Optional; import org.hibernate.SessionFactory; -import org.hibernate.StatelessSession; import org.hibernate.dialect.Dialect; import org.hibernate.dialect.MariaDBDialect; import org.slf4j.Logger; @@ -26,30 +17,17 @@ import io.debezium.connector.jdbc.SinkRecordDescriptor; import io.debezium.connector.jdbc.dialect.DatabaseDialect; import io.debezium.connector.jdbc.dialect.DatabaseDialectProvider; -import io.debezium.connector.jdbc.dialect.GeneralDatabaseDialect; import io.debezium.connector.jdbc.dialect.SqlStatementBuilder; import io.debezium.connector.jdbc.relational.TableDescriptor; -import io.debezium.time.ZonedTimestamp; -import io.debezium.util.Strings; /** * A {@link DatabaseDialect} implementation for MariaDB. * */ -public class MariaDbDatabaseDialect extends GeneralDatabaseDialect { +public class MariaDbDatabaseDialect extends MySqlDatabaseDialect { private static final Logger LOGGER = LoggerFactory.getLogger(MariaDbDatabaseDialect.class); - private static final List NO_DEFAULT_VALUE_TYPES = Arrays.asList( - "tinytext", "mediumtext", "longtext", "text", "tinyblob", "mediumblob", "longblob"); - - private static final DateTimeFormatter ISO_LOCAL_DATE_TIME_WITH_SPACE = new DateTimeFormatterBuilder() - .parseCaseInsensitive() - .append(DateTimeFormatter.ISO_LOCAL_DATE) - .appendLiteral(' ') - .append(DateTimeFormatter.ISO_LOCAL_TIME) - .toFormatter(); - public static class MariaDbDatabaseDialectProvider implements DatabaseDialectProvider { @Override public boolean supports(Dialect dialect) { @@ -68,85 +46,8 @@ public DatabaseDialect instantiate(JdbcSinkConnectorConfig config, SessionFactor } } - private final boolean connectionTimeZoneSet; - private MariaDbDatabaseDialect(JdbcSinkConnectorConfig config, SessionFactory sessionFactory) { super(config, sessionFactory); - try (StatelessSession session = sessionFactory.openStatelessSession()) { - this.connectionTimeZoneSet = session.doReturningWork(connection -> connection.getMetaData().getURL().contains("connectionTimeZone=")); - } - } - - @Override - protected Optional getDatabaseTimeZoneQuery() { - return Optional.of("SELECT @@global.time_zone, @@session.time_zone"); - } - - @Override - protected String getDatabaseTimeZoneQueryResult(ResultSet rs) throws SQLException { - return rs.getString(1) + " (global), " + rs.getString(2) + " (system)"; - } - - @Override - public boolean isTimeZoneSet() { - return connectionTimeZoneSet || super.isTimeZoneSet(); - } - - @Override - public boolean shouldBindTimeWithTimeZoneAsDatabaseTimeZone() { - return true; - } - - @Override - protected void registerTypes() { - super.registerTypes(); - - registerType(BooleanType.INSTANCE); - registerType(BitType.INSTANCE); - registerType(BytesType.INSTANCE); - registerType(EnumType.INSTANCE); - registerType(SetType.INSTANCE); - registerType(MediumIntType.INSTANCE); - registerType(IntegerType.INSTANCE); - registerType(TinyIntType.INSTANCE); - registerType(YearType.INSTANCE); - registerType(JsonType.INSTANCE); - registerType(MapToJsonType.INSTANCE); - registerType(GeometryType.INSTANCE); - registerType(PointType.INSTANCE); - registerType(ZonedTimestampType.INSTANCE); - registerType(ZonedTimeType.INSTANCE); - } - - @Override - public int getMaxVarcharLengthInKey() { - return 255; - } - - @Override - public String getFormattedTime(TemporalAccessor value) { - return String.format("'%s'", DateTimeFormatter.ISO_LOCAL_TIME.format(value)); - } - - @Override - public String getFormattedDateTime(TemporalAccessor value) { - return String.format("'%s'", ISO_LOCAL_DATE_TIME_WITH_SPACE.format(value)); - } - - @Override - public String getFormattedTimestamp(TemporalAccessor value) { - return String.format("'%s'", ISO_LOCAL_DATE_TIME_WITH_SPACE.format(value)); - } - - @Override - public String getFormattedTimestampWithTimeZone(String value) { - final ZonedDateTime zonedDateTime = ZonedDateTime.parse(value, ZonedTimestamp.FORMATTER); - return String.format("'%s'", DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(zonedDateTime)); - } - - @Override - public String getAlterTablePrefix() { - return "ADD COLUMN ("; } /* @@ -177,14 +78,4 @@ public String getUpsertStatement(TableDescriptor table, SinkRecordDescriptor rec return builder.build(); } - @Override - protected void addColumnDefaultValue(SinkRecordDescriptor.FieldDescriptor field, StringBuilder columnSpec) { - final String fieldType = field.getTypeName(); - if (!Strings.isNullOrBlank(fieldType)) { - if (NO_DEFAULT_VALUE_TYPES.contains(fieldType.toLowerCase())) { - return; - } - } - super.addColumnDefaultValue(field, columnSpec); - } } diff --git a/src/main/java/io/debezium/connector/jdbc/dialect/mysql/MySqlDatabaseDialect.java b/src/main/java/io/debezium/connector/jdbc/dialect/mysql/MySqlDatabaseDialect.java index 2c4f5633..e2c438db 100644 --- a/src/main/java/io/debezium/connector/jdbc/dialect/mysql/MySqlDatabaseDialect.java +++ b/src/main/java/io/debezium/connector/jdbc/dialect/mysql/MySqlDatabaseDialect.java @@ -66,7 +66,7 @@ public DatabaseDialect instantiate(JdbcSinkConnectorConfig config, SessionFactor private final boolean connectionTimeZoneSet; - private MySqlDatabaseDialect(JdbcSinkConnectorConfig config, SessionFactory sessionFactory) { + protected MySqlDatabaseDialect(JdbcSinkConnectorConfig config, SessionFactory sessionFactory) { super(config, sessionFactory); try (StatelessSession session = sessionFactory.openStatelessSession()) {