From 8c0f428c58a20249a8e04ac5694b38ef60a660eb Mon Sep 17 00:00:00 2001 From: Chris Cranford Date: Wed, 3 Jul 2024 08:42:02 -0400 Subject: [PATCH] DBZ-8016 Introduce `lowercase` and `uppercase` naming strategies --- .../jdbc/JdbcSinkConnectorConfig.java | 35 +++++++++++++++++-- .../naming/LowercaseColumnNamingStrategy.java | 19 ++++++++++ .../naming/LowercaseTableNamingStrategy.java | 23 ++++++++++++ .../naming/UppercaseColumnNamingStrategy.java | 19 ++++++++++ .../naming/UppercaseTableNamingStrategy.java | 23 ++++++++++++ 5 files changed, 117 insertions(+), 2 deletions(-) create mode 100644 src/main/java/io/debezium/connector/jdbc/naming/LowercaseColumnNamingStrategy.java create mode 100644 src/main/java/io/debezium/connector/jdbc/naming/LowercaseTableNamingStrategy.java create mode 100644 src/main/java/io/debezium/connector/jdbc/naming/UppercaseColumnNamingStrategy.java create mode 100644 src/main/java/io/debezium/connector/jdbc/naming/UppercaseTableNamingStrategy.java diff --git a/src/main/java/io/debezium/connector/jdbc/JdbcSinkConnectorConfig.java b/src/main/java/io/debezium/connector/jdbc/JdbcSinkConnectorConfig.java index a4c89ffe..ea150c2f 100644 --- a/src/main/java/io/debezium/connector/jdbc/JdbcSinkConnectorConfig.java +++ b/src/main/java/io/debezium/connector/jdbc/JdbcSinkConnectorConfig.java @@ -23,12 +23,17 @@ import io.debezium.config.EnumeratedValue; import io.debezium.config.Field; import io.debezium.config.Field.ValidationOutput; +import io.debezium.config.Instantiator; import io.debezium.connector.jdbc.filter.FieldFilterFactory; import io.debezium.connector.jdbc.filter.FieldFilterFactory.FieldNameFilter; import io.debezium.connector.jdbc.naming.ColumnNamingStrategy; import io.debezium.connector.jdbc.naming.DefaultColumnNamingStrategy; import io.debezium.connector.jdbc.naming.DefaultTableNamingStrategy; +import io.debezium.connector.jdbc.naming.LowercaseColumnNamingStrategy; +import io.debezium.connector.jdbc.naming.LowercaseTableNamingStrategy; import io.debezium.connector.jdbc.naming.TableNamingStrategy; +import io.debezium.connector.jdbc.naming.UppercaseColumnNamingStrategy; +import io.debezium.connector.jdbc.naming.UppercaseTableNamingStrategy; import io.debezium.util.Strings; /** @@ -533,8 +538,8 @@ public JdbcSinkConnectorConfig(Map props) { this.schemaEvolutionMode = SchemaEvolutionMode.parse(config.getString(SCHEMA_EVOLUTION)); this.quoteIdentifiers = config.getBoolean(QUOTE_IDENTIFIERS_FIELD); // this.dataTypeMapping = Strings.setOf(config.getString(DATA_TYPE_MAPPING_FIELD), String::new); - this.tableNamingStrategy = config.getInstance(TABLE_NAMING_STRATEGY_FIELD, TableNamingStrategy.class); - this.columnNamingStrategy = config.getInstance(COLUMN_NAMING_STRATEGY_FIELD, ColumnNamingStrategy.class); + this.tableNamingStrategy = resolveTableNamingStrategy(config); + this.columnNamingStrategy = resolveColumnNamingStrategy(config); this.databaseTimezone = config.getString(DATABASE_TIME_ZONE_FIELD); this.postgresPostgisSchema = config.getString(POSTGRES_POSTGIS_SCHEMA_FIELD); this.sqlServerIdentityInsert = config.getBoolean(SQLSERVER_IDENTITY_INSERT_FIELD); @@ -711,4 +716,30 @@ private static int validateDeleteEnabled(Configuration config, Field field, Vali } return 0; } + + private static ColumnNamingStrategy resolveColumnNamingStrategy(Configuration config) { + final String className = config.getString(COLUMN_NAMING_STRATEGY_FIELD); + if (!Strings.isNullOrEmpty(className)) { + if (className.equalsIgnoreCase("lowercase")) { + return Instantiator.getInstance(LowercaseColumnNamingStrategy.class.getName()); + } + else if (className.equalsIgnoreCase("uppercase")) { + return Instantiator.getInstance(UppercaseColumnNamingStrategy.class.getName()); + } + } + return config.getInstance(COLUMN_NAMING_STRATEGY_FIELD, ColumnNamingStrategy.class); + } + + private static TableNamingStrategy resolveTableNamingStrategy(Configuration config) { + final String className = config.getString(TABLE_NAMING_STRATEGY_FIELD); + if (!Strings.isNullOrEmpty(className)) { + if (className.equalsIgnoreCase("lowercase")) { + return Instantiator.getInstance(LowercaseTableNamingStrategy.class.getName()); + } + else if (className.equalsIgnoreCase("uppercase")) { + return Instantiator.getInstance(UppercaseTableNamingStrategy.class.getName()); + } + } + return config.getInstance(TABLE_NAMING_STRATEGY_FIELD, TableNamingStrategy.class); + } } diff --git a/src/main/java/io/debezium/connector/jdbc/naming/LowercaseColumnNamingStrategy.java b/src/main/java/io/debezium/connector/jdbc/naming/LowercaseColumnNamingStrategy.java new file mode 100644 index 00000000..26f55936 --- /dev/null +++ b/src/main/java/io/debezium/connector/jdbc/naming/LowercaseColumnNamingStrategy.java @@ -0,0 +1,19 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.jdbc.naming; + +/** + * The lower-case implementation of the {@link ColumnNamingStrategy} that simply returns the field's + * name as the column name in the destination table but in all lower-case. + * + * @author Chris Cranford + */ +public class LowercaseColumnNamingStrategy implements ColumnNamingStrategy { + @Override + public String resolveColumnName(String fieldName) { + return fieldName.toLowerCase(); + } +} diff --git a/src/main/java/io/debezium/connector/jdbc/naming/LowercaseTableNamingStrategy.java b/src/main/java/io/debezium/connector/jdbc/naming/LowercaseTableNamingStrategy.java new file mode 100644 index 00000000..d09fa2ce --- /dev/null +++ b/src/main/java/io/debezium/connector/jdbc/naming/LowercaseTableNamingStrategy.java @@ -0,0 +1,23 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.jdbc.naming; + +import org.apache.kafka.connect.sink.SinkRecord; + +import io.debezium.connector.jdbc.JdbcSinkConnectorConfig; + +/** + * A lower-case implementation of the {@link DefaultTableNamingStrategy} where the computed table name + * will also be returned in all lower-case. + * + * @author Chris Cranford + */ +public class LowercaseTableNamingStrategy extends DefaultTableNamingStrategy { + @Override + public String resolveTableName(JdbcSinkConnectorConfig config, SinkRecord record) { + return super.resolveTableName(config, record).toLowerCase(); + } +} diff --git a/src/main/java/io/debezium/connector/jdbc/naming/UppercaseColumnNamingStrategy.java b/src/main/java/io/debezium/connector/jdbc/naming/UppercaseColumnNamingStrategy.java new file mode 100644 index 00000000..bfce0400 --- /dev/null +++ b/src/main/java/io/debezium/connector/jdbc/naming/UppercaseColumnNamingStrategy.java @@ -0,0 +1,19 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.jdbc.naming; + +/** + * The upper-case implementation of the {@link ColumnNamingStrategy} that simply returns the field's + * name as the column name in the destination table but in all upper-case. + * + * @author Chris Cranford + */ +public class UppercaseColumnNamingStrategy implements ColumnNamingStrategy { + @Override + public String resolveColumnName(String fieldName) { + return fieldName.toUpperCase(); + } +} diff --git a/src/main/java/io/debezium/connector/jdbc/naming/UppercaseTableNamingStrategy.java b/src/main/java/io/debezium/connector/jdbc/naming/UppercaseTableNamingStrategy.java new file mode 100644 index 00000000..7603576e --- /dev/null +++ b/src/main/java/io/debezium/connector/jdbc/naming/UppercaseTableNamingStrategy.java @@ -0,0 +1,23 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.jdbc.naming; + +import org.apache.kafka.connect.sink.SinkRecord; + +import io.debezium.connector.jdbc.JdbcSinkConnectorConfig; + +/** + * An upper-case implementation of the {@link DefaultTableNamingStrategy} where the computed table name + * will also be returned in all upper-case. + * + * @author Chris Cranford + */ +public class UppercaseTableNamingStrategy extends DefaultTableNamingStrategy { + @Override + public String resolveTableName(JdbcSinkConnectorConfig config, SinkRecord record) { + return super.resolveTableName(config, record).toUpperCase(); + } +}