diff --git a/pom.xml b/pom.xml index c2766c12..790db4e8 100644 --- a/pom.xml +++ b/pom.xml @@ -43,7 +43,7 @@ limitations under the License. 4.0.0 com.starrocks flink-connector-starrocks - 1.2.1_flink-1.14_${scala.version} + 1.2.2_flink-1.14_${scala.version} 1.8 1.8 @@ -86,6 +86,11 @@ limitations under the License. provided + + com.google.guava + guava + 31.1-jre + commons-codec commons-codec @@ -126,18 +131,6 @@ limitations under the License. mysql-connector-java 5.1.49 - - org.jmockit - jmockit - 1.48 - test - - - junit - junit - 4.12 - test - com.starrocks starrocks-thrift-sdk @@ -173,6 +166,19 @@ limitations under the License. + + + org.jmockit + jmockit + 1.48 + test + + + junit + junit + 4.12 + test + @@ -266,6 +272,10 @@ limitations under the License. com.google.flatbuffers com.starrocks.shade.com.google.flatbuffers + + com.google.common + com.starrocks.shade.com.google.common + com.fasterxml.jackson.core com.starrocks.shade.com.fasterxml.jackson.core @@ -288,6 +298,7 @@ limitations under the License. io.netty:netty-buffer io.netty:netty-common com.google.flatbuffers:flatbuffers-java + com.google.guava:guava com.fasterxml.jackson.core:jackson-annotations com.fasterxml.jackson.core:jackson-core com.fasterxml.jackson.core:jackson-databind diff --git a/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkManager.java b/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkManager.java index 8c9e5fcb..bbdb8d8f 100644 --- a/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkManager.java +++ b/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkManager.java @@ -46,7 +46,6 @@ import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.metrics.Counter; import org.apache.flink.util.concurrent.ExecutorThreadFactory; -import org.apache.flink.calcite.shaded.com.google.common.collect.Lists; import org.apache.flink.table.api.TableColumn; import org.apache.flink.table.api.constraints.UniqueConstraint; @@ -113,19 +112,19 @@ public StarRocksSinkManager(StarRocksSinkOptions sinkOptions, TableSchema flinkS this.starrocksQueryVisitor = new StarRocksQueryVisitor(jdbcConnProvider, sinkOptions.getDatabaseName(), sinkOptions.getTableName()); // validate table structure typesMap = new HashMap<>(); - typesMap.put("bigint", Lists.newArrayList(LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY)); - typesMap.put("largeint", Lists.newArrayList(LogicalTypeRoot.DECIMAL, LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY)); - typesMap.put("char", Lists.newArrayList(LogicalTypeRoot.CHAR, LogicalTypeRoot.VARCHAR)); - typesMap.put("date", Lists.newArrayList(LogicalTypeRoot.DATE, LogicalTypeRoot.VARCHAR)); - typesMap.put("datetime", Lists.newArrayList(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, LogicalTypeRoot.VARCHAR)); - typesMap.put("decimal", Lists.newArrayList(LogicalTypeRoot.DECIMAL, LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.DOUBLE, LogicalTypeRoot.FLOAT)); - typesMap.put("double", Lists.newArrayList(LogicalTypeRoot.DOUBLE, LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER)); - typesMap.put("float", Lists.newArrayList(LogicalTypeRoot.FLOAT, LogicalTypeRoot.INTEGER)); - typesMap.put("int", Lists.newArrayList(LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY)); - typesMap.put("tinyint", Lists.newArrayList(LogicalTypeRoot.TINYINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY, LogicalTypeRoot.BOOLEAN)); - typesMap.put("smallint", Lists.newArrayList(LogicalTypeRoot.SMALLINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY)); - typesMap.put("varchar", Lists.newArrayList(LogicalTypeRoot.VARCHAR, LogicalTypeRoot.ARRAY, LogicalTypeRoot.MAP, LogicalTypeRoot.ROW)); - typesMap.put("string", Lists.newArrayList(LogicalTypeRoot.CHAR, LogicalTypeRoot.VARCHAR, LogicalTypeRoot.ARRAY, LogicalTypeRoot.MAP, LogicalTypeRoot.ROW)); + typesMap.put("bigint", Arrays.asList(LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY)); + typesMap.put("largeint", Arrays.asList(LogicalTypeRoot.DECIMAL, LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY)); + typesMap.put("char", Arrays.asList(LogicalTypeRoot.CHAR, LogicalTypeRoot.VARCHAR)); + typesMap.put("date", Arrays.asList(LogicalTypeRoot.DATE, LogicalTypeRoot.VARCHAR)); + typesMap.put("datetime", Arrays.asList(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, LogicalTypeRoot.VARCHAR)); + typesMap.put("decimal", Arrays.asList(LogicalTypeRoot.DECIMAL, LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.DOUBLE, LogicalTypeRoot.FLOAT)); + typesMap.put("double", Arrays.asList(LogicalTypeRoot.DOUBLE, LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER)); + typesMap.put("float", Arrays.asList(LogicalTypeRoot.FLOAT, LogicalTypeRoot.INTEGER)); + typesMap.put("int", Arrays.asList(LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY)); + typesMap.put("tinyint", Arrays.asList(LogicalTypeRoot.TINYINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY, LogicalTypeRoot.BOOLEAN)); + typesMap.put("smallint", Arrays.asList(LogicalTypeRoot.SMALLINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY)); + typesMap.put("varchar", Arrays.asList(LogicalTypeRoot.VARCHAR, LogicalTypeRoot.ARRAY, LogicalTypeRoot.MAP, LogicalTypeRoot.ROW)); + typesMap.put("string", Arrays.asList(LogicalTypeRoot.CHAR, LogicalTypeRoot.VARCHAR, LogicalTypeRoot.ARRAY, LogicalTypeRoot.MAP, LogicalTypeRoot.ROW)); validateTableStructure(flinkSchema); String version = this.starrocksQueryVisitor.getStarRocksVersion(); this.starrocksStreamLoadVisitor = new StarRocksStreamLoadVisitor( diff --git a/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksDelimiterParser.java b/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksDelimiterParser.java index 2a43316b..27784c58 100644 --- a/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksDelimiterParser.java +++ b/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksDelimiterParser.java @@ -16,14 +16,12 @@ import java.io.StringWriter; -import org.apache.flink.calcite.shaded.com.google.common.base.Strings; - public class StarRocksDelimiterParser { private static final String HEX_STRING = "0123456789ABCDEF"; public static String parse(String sp, String dSp) throws RuntimeException { - if (Strings.isNullOrEmpty(sp)) { + if (sp == null || sp.length() == 0) { return dSp; } if (!sp.toUpperCase().startsWith("\\X")) { diff --git a/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksTableRowTransformer.java b/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksTableRowTransformer.java index b204e16c..adc30116 100644 --- a/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksTableRowTransformer.java +++ b/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksTableRowTransformer.java @@ -32,11 +32,11 @@ import com.alibaba.fastjson.serializer.SerializeConfig; import com.alibaba.fastjson.serializer.SerializeWriter; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.calcite.shaded.com.google.common.collect.Maps; -import org.apache.flink.calcite.shaded.com.google.common.collect.Lists; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.data.ArrayData; import org.apache.flink.table.data.DecimalData; @@ -145,7 +145,7 @@ private Object typeConvertion(LogicalType type, RowData record, int pos) { return convertNestedMap(record.getMap(pos), type); case ROW: RowType rType = (RowType)type; - Map m = Maps.newHashMap(); + Map m = new HashMap<>(); RowData row = record.getRow(pos, rType.getFieldCount()); rType.getFields().parallelStream().forEach(f -> m.put(f.getName(), typeConvertion(f.getType(), row, rType.getFieldIndex(f.getName())))); return m; diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunction.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunction.java index 20ad8e17..4a40f991 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunction.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunction.java @@ -14,6 +14,7 @@ package com.starrocks.connector.flink.table.sink; +import com.google.common.base.Strings; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; @@ -29,9 +30,9 @@ import org.apache.flink.table.data.binary.NestedRowData; import org.apache.flink.types.RowKind; import org.apache.flink.util.InstantiationUtil; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.flink.calcite.shaded.com.google.common.base.Strings; import net.sf.jsqlparser.parser.CCJSqlParserUtil; import net.sf.jsqlparser.statement.Statement; import net.sf.jsqlparser.statement.alter.Alter; diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java index a9ee6f06..44715d20 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java @@ -92,7 +92,8 @@ public enum StreamLoadFormat { public StarRocksSinkOptions(ReadableConfig options, Map optionsMap) { this.tableOptions = options; - this.tableOptionsMap = optionsMap; + // Can not promise the input parameter optionsMap is serializable. Use the HashMap to copy the data. + this.tableOptionsMap = new HashMap<>(optionsMap); parseSinkStreamLoadProperties(); this.validate(); } diff --git a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicSourceFunction.java b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicSourceFunction.java index 997ee8e0..92a65fb5 100644 --- a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicSourceFunction.java +++ b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicSourceFunction.java @@ -14,6 +14,7 @@ package com.starrocks.connector.flink.table.source; +import com.google.common.base.Strings; import com.starrocks.connector.flink.table.source.struct.ColunmRichInfo; import com.starrocks.connector.flink.table.source.struct.QueryBeXTablets; import com.starrocks.connector.flink.table.source.struct.QueryInfo; @@ -22,7 +23,6 @@ import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.calcite.shaded.com.google.common.base.Strings; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Counter; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; diff --git a/src/main/java/com/starrocks/connector/flink/tools/ExecuteSQL.java b/src/main/java/com/starrocks/connector/flink/tools/ExecuteSQL.java index 99ca2884..dfde47bf 100644 --- a/src/main/java/com/starrocks/connector/flink/tools/ExecuteSQL.java +++ b/src/main/java/com/starrocks/connector/flink/tools/ExecuteSQL.java @@ -17,8 +17,8 @@ import java.nio.file.Path; import java.nio.file.Paths; +import com.google.common.base.Strings; import org.apache.flink.api.java.utils.MultipleParameterTool; -import org.apache.flink.calcite.shaded.com.google.common.base.Strings; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; diff --git a/src/test/java/com/starrocks/connector/flink/manager/sink/StarRocksSinkManagerTest.java b/src/test/java/com/starrocks/connector/flink/manager/sink/StarRocksSinkManagerTest.java index 5573d4a0..bafbeaa1 100644 --- a/src/test/java/com/starrocks/connector/flink/manager/sink/StarRocksSinkManagerTest.java +++ b/src/test/java/com/starrocks/connector/flink/manager/sink/StarRocksSinkManagerTest.java @@ -14,10 +14,6 @@ package com.starrocks.connector.flink.manager.sink; -import java.util.ArrayList; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.calcite.shaded.com.google.common.base.Strings; -import org.apache.flink.calcite.shaded.com.google.common.collect.Lists; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema.Builder; @@ -28,6 +24,7 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -58,7 +55,7 @@ public void testValidateTableStructure() { new Expectations(){ { v.getTableColumnsMetaData(); - result = Lists.newArrayList(); + result = new ArrayList<>(); } }; String exMsg = ""; diff --git a/src/test/java/com/starrocks/connector/flink/row/sink/StarRocksGenericRowTransformerTest.java b/src/test/java/com/starrocks/connector/flink/row/sink/StarRocksGenericRowTransformerTest.java index dde6ff57..78e341d6 100644 --- a/src/test/java/com/starrocks/connector/flink/row/sink/StarRocksGenericRowTransformerTest.java +++ b/src/test/java/com/starrocks/connector/flink/row/sink/StarRocksGenericRowTransformerTest.java @@ -14,8 +14,9 @@ package com.starrocks.connector.flink.row.sink; +import com.google.common.base.Strings; import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.calcite.shaded.com.google.common.base.Strings; + import org.junit.Test; import mockit.Injectable; @@ -29,8 +30,6 @@ import com.alibaba.fastjson.JSON; import com.starrocks.connector.flink.StarRocksSinkBaseTest; -import com.starrocks.connector.flink.row.sink.StarRocksGenericRowTransformer; -import com.starrocks.connector.flink.row.sink.StarRocksSerializerFactory; public class StarRocksGenericRowTransformerTest extends StarRocksSinkBaseTest { diff --git a/src/test/java/com/starrocks/connector/flink/row/sink/StarRocksTableRowTransformerTest.java b/src/test/java/com/starrocks/connector/flink/row/sink/StarRocksTableRowTransformerTest.java index 5d916b9d..aa100d9f 100644 --- a/src/test/java/com/starrocks/connector/flink/row/sink/StarRocksTableRowTransformerTest.java +++ b/src/test/java/com/starrocks/connector/flink/row/sink/StarRocksTableRowTransformerTest.java @@ -14,9 +14,9 @@ package com.starrocks.connector.flink.row.sink; +import com.google.common.base.Strings; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.calcite.shaded.com.google.common.base.Strings; import org.junit.Test; import mockit.Injectable; @@ -32,8 +32,6 @@ import com.alibaba.fastjson.JSON; import com.starrocks.connector.flink.StarRocksSinkBaseTest; -import com.starrocks.connector.flink.row.sink.StarRocksSerializerFactory; -import com.starrocks.connector.flink.row.sink.StarRocksTableRowTransformer; import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.GenericRowData; diff --git a/src/test/java/com/starrocks/connector/flink/table/sink/StarRocksStreamLoadVisitorTest.java b/src/test/java/com/starrocks/connector/flink/table/sink/StarRocksStreamLoadVisitorTest.java index 2d0c81e1..c95f04d5 100644 --- a/src/test/java/com/starrocks/connector/flink/table/sink/StarRocksStreamLoadVisitorTest.java +++ b/src/test/java/com/starrocks/connector/flink/table/sink/StarRocksStreamLoadVisitorTest.java @@ -14,19 +14,12 @@ package com.starrocks.connector.flink.table.sink; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.calcite.shaded.com.google.common.collect.Lists; -import org.apache.http.HttpEntity; -import org.apache.http.entity.ByteArrayEntity; import org.junit.Test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; import com.starrocks.connector.flink.StarRocksSinkBaseTest; import com.starrocks.connector.flink.manager.StarRocksSinkBufferEntity;