Skip to content

Commit

Permalink
release v1.2.2
Browse files Browse the repository at this point in the history
release v1.2.2
  • Loading branch information
xlfjcg committed May 27, 2022
1 parent c7cc949 commit 38879d2
Show file tree
Hide file tree
Showing 12 changed files with 52 additions and 55 deletions.
37 changes: 24 additions & 13 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ limitations under the License.
<modelVersion>4.0.0</modelVersion>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<version>1.2.1_flink-1.14_${scala.version}</version>
<version>1.2.2_flink-1.14_${scala.version}</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
Expand Down Expand Up @@ -86,6 +86,11 @@ limitations under the License.
<scope>provided</scope>
</dependency>
<!-- other -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.1-jre</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
Expand Down Expand Up @@ -126,18 +131,6 @@ limitations under the License.
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
<dependency>
<groupId>org.jmockit</groupId>
<artifactId>jmockit</artifactId>
<version>1.48</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>starrocks-thrift-sdk</artifactId>
Expand Down Expand Up @@ -173,6 +166,19 @@ limitations under the License.
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.jmockit</groupId>
<artifactId>jmockit</artifactId>
<version>1.48</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down Expand Up @@ -266,6 +272,10 @@ limitations under the License.
<pattern>com.google.flatbuffers</pattern>
<shadedPattern>com.starrocks.shade.com.google.flatbuffers</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.common</pattern>
<shadedPattern>com.starrocks.shade.com.google.common</shadedPattern>
</relocation>
<relocation>
<pattern>com.fasterxml.jackson.core</pattern>
<shadedPattern>com.starrocks.shade.com.fasterxml.jackson.core</shadedPattern>
Expand All @@ -288,6 +298,7 @@ limitations under the License.
<include>io.netty:netty-buffer</include>
<include>io.netty:netty-common</include>
<include>com.google.flatbuffers:flatbuffers-java</include>
<include>com.google.guava:guava</include>
<include>com.fasterxml.jackson.core:jackson-annotations</include>
<include>com.fasterxml.jackson.core:jackson-core</include>
<include>com.fasterxml.jackson.core:jackson-databind</include>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.metrics.Counter;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.constraints.UniqueConstraint;

Expand Down Expand Up @@ -113,19 +112,19 @@ public StarRocksSinkManager(StarRocksSinkOptions sinkOptions, TableSchema flinkS
this.starrocksQueryVisitor = new StarRocksQueryVisitor(jdbcConnProvider, sinkOptions.getDatabaseName(), sinkOptions.getTableName());
// validate table structure
typesMap = new HashMap<>();
typesMap.put("bigint", Lists.newArrayList(LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY));
typesMap.put("largeint", Lists.newArrayList(LogicalTypeRoot.DECIMAL, LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY));
typesMap.put("char", Lists.newArrayList(LogicalTypeRoot.CHAR, LogicalTypeRoot.VARCHAR));
typesMap.put("date", Lists.newArrayList(LogicalTypeRoot.DATE, LogicalTypeRoot.VARCHAR));
typesMap.put("datetime", Lists.newArrayList(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, LogicalTypeRoot.VARCHAR));
typesMap.put("decimal", Lists.newArrayList(LogicalTypeRoot.DECIMAL, LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.DOUBLE, LogicalTypeRoot.FLOAT));
typesMap.put("double", Lists.newArrayList(LogicalTypeRoot.DOUBLE, LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER));
typesMap.put("float", Lists.newArrayList(LogicalTypeRoot.FLOAT, LogicalTypeRoot.INTEGER));
typesMap.put("int", Lists.newArrayList(LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY));
typesMap.put("tinyint", Lists.newArrayList(LogicalTypeRoot.TINYINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY, LogicalTypeRoot.BOOLEAN));
typesMap.put("smallint", Lists.newArrayList(LogicalTypeRoot.SMALLINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY));
typesMap.put("varchar", Lists.newArrayList(LogicalTypeRoot.VARCHAR, LogicalTypeRoot.ARRAY, LogicalTypeRoot.MAP, LogicalTypeRoot.ROW));
typesMap.put("string", Lists.newArrayList(LogicalTypeRoot.CHAR, LogicalTypeRoot.VARCHAR, LogicalTypeRoot.ARRAY, LogicalTypeRoot.MAP, LogicalTypeRoot.ROW));
typesMap.put("bigint", Arrays.asList(LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY));
typesMap.put("largeint", Arrays.asList(LogicalTypeRoot.DECIMAL, LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY));
typesMap.put("char", Arrays.asList(LogicalTypeRoot.CHAR, LogicalTypeRoot.VARCHAR));
typesMap.put("date", Arrays.asList(LogicalTypeRoot.DATE, LogicalTypeRoot.VARCHAR));
typesMap.put("datetime", Arrays.asList(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, LogicalTypeRoot.VARCHAR));
typesMap.put("decimal", Arrays.asList(LogicalTypeRoot.DECIMAL, LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.DOUBLE, LogicalTypeRoot.FLOAT));
typesMap.put("double", Arrays.asList(LogicalTypeRoot.DOUBLE, LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER));
typesMap.put("float", Arrays.asList(LogicalTypeRoot.FLOAT, LogicalTypeRoot.INTEGER));
typesMap.put("int", Arrays.asList(LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY));
typesMap.put("tinyint", Arrays.asList(LogicalTypeRoot.TINYINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY, LogicalTypeRoot.BOOLEAN));
typesMap.put("smallint", Arrays.asList(LogicalTypeRoot.SMALLINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY));
typesMap.put("varchar", Arrays.asList(LogicalTypeRoot.VARCHAR, LogicalTypeRoot.ARRAY, LogicalTypeRoot.MAP, LogicalTypeRoot.ROW));
typesMap.put("string", Arrays.asList(LogicalTypeRoot.CHAR, LogicalTypeRoot.VARCHAR, LogicalTypeRoot.ARRAY, LogicalTypeRoot.MAP, LogicalTypeRoot.ROW));
validateTableStructure(flinkSchema);
String version = this.starrocksQueryVisitor.getStarRocksVersion();
this.starrocksStreamLoadVisitor = new StarRocksStreamLoadVisitor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@

import java.io.StringWriter;

import org.apache.flink.calcite.shaded.com.google.common.base.Strings;

public class StarRocksDelimiterParser {

private static final String HEX_STRING = "0123456789ABCDEF";

public static String parse(String sp, String dSp) throws RuntimeException {
if (Strings.isNullOrEmpty(sp)) {
if (sp == null || sp.length() == 0) {
return dSp;
}
if (!sp.toUpperCase().startsWith("\\X")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@
import com.alibaba.fastjson.serializer.SerializeConfig;
import com.alibaba.fastjson.serializer.SerializeWriter;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.DecimalData;
Expand Down Expand Up @@ -145,7 +145,7 @@ private Object typeConvertion(LogicalType type, RowData record, int pos) {
return convertNestedMap(record.getMap(pos), type);
case ROW:
RowType rType = (RowType)type;
Map<String, Object> m = Maps.newHashMap();
Map<String, Object> m = new HashMap<>();
RowData row = record.getRow(pos, rType.getFieldCount());
rType.getFields().parallelStream().forEach(f -> m.put(f.getName(), typeConvertion(f.getType(), row, rType.getFieldIndex(f.getName()))));
return m;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.starrocks.connector.flink.table.sink;

import com.google.common.base.Strings;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
Expand All @@ -29,9 +30,9 @@
import org.apache.flink.table.data.binary.NestedRowData;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.InstantiationUtil;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
import net.sf.jsqlparser.statement.Statement;
import net.sf.jsqlparser.statement.alter.Alter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ public enum StreamLoadFormat {

public StarRocksSinkOptions(ReadableConfig options, Map<String, String> optionsMap) {
this.tableOptions = options;
this.tableOptionsMap = optionsMap;
// Can not promise the input parameter optionsMap is serializable. Use the HashMap to copy the data.
this.tableOptionsMap = new HashMap<>(optionsMap);
parseSinkStreamLoadProperties();
this.validate();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.starrocks.connector.flink.table.source;

import com.google.common.base.Strings;
import com.starrocks.connector.flink.table.source.struct.ColunmRichInfo;
import com.starrocks.connector.flink.table.source.struct.QueryBeXTablets;
import com.starrocks.connector.flink.table.source.struct.QueryInfo;
Expand All @@ -22,7 +23,6 @@
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import java.nio.file.Path;
import java.nio.file.Paths;

import com.google.common.base.Strings;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@

package com.starrocks.connector.flink.manager.sink;

import java.util.ArrayList;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema.Builder;

Expand All @@ -28,6 +24,7 @@
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -58,7 +55,7 @@ public void testValidateTableStructure() {
new Expectations(){
{
v.getTableColumnsMetaData();
result = Lists.newArrayList();
result = new ArrayList<>();
}
};
String exMsg = "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@

package com.starrocks.connector.flink.row.sink;

import com.google.common.base.Strings;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.calcite.shaded.com.google.common.base.Strings;

import org.junit.Test;

import mockit.Injectable;
Expand All @@ -29,8 +30,6 @@

import com.alibaba.fastjson.JSON;
import com.starrocks.connector.flink.StarRocksSinkBaseTest;
import com.starrocks.connector.flink.row.sink.StarRocksGenericRowTransformer;
import com.starrocks.connector.flink.row.sink.StarRocksSerializerFactory;

public class StarRocksGenericRowTransformerTest extends StarRocksSinkBaseTest {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@

package com.starrocks.connector.flink.row.sink;

import com.google.common.base.Strings;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
import org.junit.Test;

import mockit.Injectable;
Expand All @@ -32,8 +32,6 @@

import com.alibaba.fastjson.JSON;
import com.starrocks.connector.flink.StarRocksSinkBaseTest;
import com.starrocks.connector.flink.row.sink.StarRocksSerializerFactory;
import com.starrocks.connector.flink.row.sink.StarRocksTableRowTransformer;

import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,12 @@

package com.starrocks.connector.flink.table.sink;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
import org.apache.http.HttpEntity;
import org.apache.http.entity.ByteArrayEntity;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import com.starrocks.connector.flink.StarRocksSinkBaseTest;
import com.starrocks.connector.flink.manager.StarRocksSinkBufferEntity;
Expand Down

0 comments on commit 38879d2

Please sign in to comment.