diff --git a/src/main/java/com/starrocks/connector/flink/row/source/StarRocksSourceFlinkRows.java b/src/main/java/com/starrocks/connector/flink/row/source/StarRocksSourceFlinkRows.java index f27c3ed8..15e1c1a4 100644 --- a/src/main/java/com/starrocks/connector/flink/row/source/StarRocksSourceFlinkRows.java +++ b/src/main/java/com/starrocks/connector/flink/row/source/StarRocksSourceFlinkRows.java @@ -145,7 +145,7 @@ private void genFlinkRows() { ColumnRichInfo richInfo = columnRichInfos.get(selectedColumns[i].getColumnIndexInFlinkTable()); nullable = richInfo.getDataType().getLogicalType().isNullable(); LogicalTypeRoot flinkTypeRoot = richInfo.getDataType().getLogicalType().getTypeRoot(); - String srType = DataUtil.clearBracket(column.getType()); + String srType = column.getType().isPresent() ? DataUtil.clearBracket(column.getType().get()) : null; if (Const.DataTypeRelationMap.containsKey(flinkTypeRoot) && Const.DataTypeRelationMap.get(flinkTypeRoot).containsKey(srType)) { translators = Const.DataTypeRelationMap.get(flinkTypeRoot).get(srType); diff --git a/src/main/java/com/starrocks/connector/flink/table/source/struct/Column.java b/src/main/java/com/starrocks/connector/flink/table/source/struct/Column.java index 57f118a5..921f3a89 100644 --- a/src/main/java/com/starrocks/connector/flink/table/source/struct/Column.java +++ b/src/main/java/com/starrocks/connector/flink/table/source/struct/Column.java @@ -14,8 +14,13 @@ package com.starrocks.connector.flink.table.source.struct; +import javax.annotation.Nullable; + +import java.util.Optional; + public class Column { private String name; + @Nullable private String type; private String comment; private int precision; @@ -24,7 +29,7 @@ public class Column { public Column() { } - public Column(String name, String type, String comment, int precision, int scale) { + public Column(String name, @Nullable String type, String comment, int precision, int scale) { this.name = name; this.type = type; this.comment = comment; @@ -40,8 +45,8 @@ public void setName(String name) { this.name = name; } - public String getType() { - return type; + public Optional getType() { + return Optional.ofNullable(type); } public void setType(String type) { diff --git a/src/main/java/com/starrocks/connector/flink/table/source/struct/StarRocksSchema.java b/src/main/java/com/starrocks/connector/flink/table/source/struct/StarRocksSchema.java index 93241591..b75e0ebc 100644 --- a/src/main/java/com/starrocks/connector/flink/table/source/struct/StarRocksSchema.java +++ b/src/main/java/com/starrocks/connector/flink/table/source/struct/StarRocksSchema.java @@ -67,8 +67,9 @@ public int size() { public static StarRocksSchema genSchema(List tscanColumnDescs) { StarRocksSchema schema = new StarRocksSchema(); - for (TScanColumnDesc tscanColumnDesc : tscanColumnDescs) { - schema.put(tscanColumnDesc.getName(), tscanColumnDesc.getType().name(), "", 0, 0); + for (TScanColumnDesc desc : tscanColumnDescs) { + // The type for some columns may be null, such as json column + schema.put(desc.getName(), desc.getType() == null ? null : desc.getType().name(), "", 0, 0); } return schema; } diff --git a/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java b/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java index dc486fd0..31852a0b 100644 --- a/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java +++ b/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java @@ -18,8 +18,6 @@ package com.starrocks.connector.flink.it.sink; -import com.starrocks.connector.flink.it.StarRocksITTestBase; -import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.typeutils.RowTypeInfo; @@ -33,6 +31,10 @@ import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; +import org.apache.flink.util.CloseableIterator; + +import com.starrocks.connector.flink.it.StarRocksITTestBase; +import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -620,4 +622,106 @@ public void testUnalignedTypes() throws Exception { List> actualData = scanTable(DB_CONNECTION, DB_NAME, tableName); verifyResult(Collections.singletonList(Arrays.asList(1, "123", "{\"key\": 1, \"value\": 2}")), actualData); } + + @Test + public void testJsonType() throws Exception { + String tableName = createJsonTable("testJsonType"); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + StreamTableEnvironment tEnv; + tEnv = StreamTableEnvironment.create(env); + DataStream dataStream = + env.fromElements( + Row.ofKind(RowKind.INSERT, 1, 1.0, "{\"a\": 1, \"b\": true}"), + Row.ofKind(RowKind.INSERT, 2, 2.0, "{\"a\": 2, \"b\": false}")); + Table table = tEnv.fromChangelogStream(dataStream, Schema.newBuilder().primaryKey("f0").build(), ChangelogMode.all()); + tEnv.createTemporaryView("src", table); + + StarRocksSinkOptions sinkOptions = StarRocksSinkOptions.builder() + .withProperty("jdbc-url", getJdbcUrl()) + .withProperty("load-url", getHttpUrls()) + .withProperty("database-name", DB_NAME) + .withProperty("table-name", tableName) + .withProperty("username", "root") + .withProperty("password", "") + .build(); + + String createSinkSQL = "CREATE TABLE sink(" + + "c0 INT," + + "c1 DOUBLE," + + "c2 STRING," + + "PRIMARY KEY (`c0`) NOT ENFORCED" + + ") WITH ( " + + "'connector' = 'starrocks'," + + "'jdbc-url'='" + sinkOptions.getJdbcUrl() + "'," + + "'load-url'='" + String.join(";", sinkOptions.getLoadUrlList()) + "'," + + "'sink.version' = '" + (isSinkV2 ? "V2" : "V1") + "'," + + "'sink.use.new-sink-api' = '" + (newSinkApi ? "true" : "false") + "'," + + "'database-name' = '" + DB_NAME + "'," + + "'table-name' = '" + sinkOptions.getTableName() + "'," + + "'username' = '" + sinkOptions.getUsername() + "'," + + "'password' = '" + sinkOptions.getPassword() + "'" + + ")"; + tEnv.executeSql(createSinkSQL); + tEnv.executeSql("INSERT INTO sink SELECT * FROM src").await(); + + String createSrcSQL = "CREATE TABLE sr_src(" + + "c0 INT," + + "c1 DOUBLE," + + "c2 STRING," + + "PRIMARY KEY (`c0`) NOT ENFORCED" + + ") WITH ( " + + "'connector' = 'starrocks'," + + "'jdbc-url'='" + sinkOptions.getJdbcUrl() + "'," + + "'scan-url'='" + String.join(";", sinkOptions.getLoadUrlList()) + "'," + + "'database-name' = '" + DB_NAME + "'," + + "'table-name' = '" + sinkOptions.getTableName() + "'," + + "'username' = '" + sinkOptions.getUsername() + "'," + + "'password' = '" + sinkOptions.getPassword() + "'" + + ")"; + tEnv.executeSql(createSrcSQL); + List> actualData; + try (CloseableIterator result = tEnv.executeSql("SELECT * FROM sr_src").collect()) { + actualData = collectResult(result); + } + + List> expectedData = Arrays.asList( + Arrays.asList(1, 1.0, "{\"a\": 1, \"b\": true}"), + Arrays.asList(2, 2.0, "{\"a\": 2, \"b\": false}") + ); + + verifyResult(expectedData, actualData); + } + + private List> collectResult(CloseableIterator result) { + List> data = new ArrayList<>(); + while (result.hasNext()) { + Row row = result.next(); + List list = new ArrayList<>(); + for (int i = 0; i < row.getArity(); i++) { + list.add(row.getField(i).toString()); + } + data.add(list); + } + return data; + } + + private String createJsonTable(String tablePrefix) throws Exception { + String tableName = tablePrefix + "_" + genRandomUuid(); + String createStarRocksTable = + String.format( + "CREATE TABLE `%s`.`%s` (" + + "c0 INT," + + "c1 DOUBLE," + + "c2 JSON" + + ") ENGINE = OLAP " + + "PRIMARY KEY(c0) " + + "DISTRIBUTED BY HASH (c0) BUCKETS 8 " + + "PROPERTIES (" + + "\"replication_num\" = \"1\"" + + ")", + DB_NAME, tableName); + executeSrSQL(createStarRocksTable); + return tableName; + } }