Skip to content

Commit

Permalink
[Enhancement] Support to read json column (StarRocks#334)
Browse files Browse the repository at this point in the history
Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy committed Jan 29, 2024
1 parent 1049070 commit 3242e74
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private void genFlinkRows() {
ColumnRichInfo richInfo = columnRichInfos.get(selectedColumns[i].getColumnIndexInFlinkTable());
nullable = richInfo.getDataType().getLogicalType().isNullable();
LogicalTypeRoot flinkTypeRoot = richInfo.getDataType().getLogicalType().getTypeRoot();
String srType = DataUtil.clearBracket(column.getType());
String srType = column.getType().isPresent() ? DataUtil.clearBracket(column.getType().get()) : null;
if (Const.DataTypeRelationMap.containsKey(flinkTypeRoot)
&& Const.DataTypeRelationMap.get(flinkTypeRoot).containsKey(srType)) {
translators = Const.DataTypeRelationMap.get(flinkTypeRoot).get(srType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,13 @@

package com.starrocks.connector.flink.table.source.struct;

import javax.annotation.Nullable;

import java.util.Optional;

public class Column {
private String name;
@Nullable
private String type;
private String comment;
private int precision;
Expand All @@ -24,7 +29,7 @@ public class Column {
public Column() {
}

public Column(String name, String type, String comment, int precision, int scale) {
public Column(String name, @Nullable String type, String comment, int precision, int scale) {
this.name = name;
this.type = type;
this.comment = comment;
Expand All @@ -40,8 +45,8 @@ public void setName(String name) {
this.name = name;
}

public String getType() {
return type;
public Optional<String> getType() {
return Optional.ofNullable(type);
}

public void setType(String type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ public int size() {

public static StarRocksSchema genSchema(List<TScanColumnDesc> tscanColumnDescs) {
StarRocksSchema schema = new StarRocksSchema();
for (TScanColumnDesc tscanColumnDesc : tscanColumnDescs) {
schema.put(tscanColumnDesc.getName(), tscanColumnDesc.getType().name(), "", 0, 0);
for (TScanColumnDesc desc : tscanColumnDescs) {
// The type for some columns may be null, such as json column
schema.put(desc.getName(), desc.getType() == null ? null : desc.getType().name(), "", 0, 0);
}
return schema;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package com.starrocks.connector.flink.it.sink;

import com.starrocks.connector.flink.it.StarRocksITTestBase;
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
Expand All @@ -33,6 +31,10 @@
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;

import com.starrocks.connector.flink.it.StarRocksITTestBase;
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
Expand Down Expand Up @@ -620,4 +622,106 @@ public void testUnalignedTypes() throws Exception {
List<List<Object>> actualData = scanTable(DB_CONNECTION, DB_NAME, tableName);
verifyResult(Collections.singletonList(Arrays.asList(1, "123", "{\"key\": 1, \"value\": 2}")), actualData);
}

@Test
public void testJsonType() throws Exception {
String tableName = createJsonTable("testJsonType");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv;
tEnv = StreamTableEnvironment.create(env);
DataStream<Row> dataStream =
env.fromElements(
Row.ofKind(RowKind.INSERT, 1, 1.0, "{\"a\": 1, \"b\": true}"),
Row.ofKind(RowKind.INSERT, 2, 2.0, "{\"a\": 2, \"b\": false}"));
Table table = tEnv.fromChangelogStream(dataStream, Schema.newBuilder().primaryKey("f0").build(), ChangelogMode.all());
tEnv.createTemporaryView("src", table);

StarRocksSinkOptions sinkOptions = StarRocksSinkOptions.builder()
.withProperty("jdbc-url", getJdbcUrl())
.withProperty("load-url", getHttpUrls())
.withProperty("database-name", DB_NAME)
.withProperty("table-name", tableName)
.withProperty("username", "root")
.withProperty("password", "")
.build();

String createSinkSQL = "CREATE TABLE sink(" +
"c0 INT," +
"c1 DOUBLE," +
"c2 STRING," +
"PRIMARY KEY (`c0`) NOT ENFORCED" +
") WITH ( " +
"'connector' = 'starrocks'," +
"'jdbc-url'='" + sinkOptions.getJdbcUrl() + "'," +
"'load-url'='" + String.join(";", sinkOptions.getLoadUrlList()) + "'," +
"'sink.version' = '" + (isSinkV2 ? "V2" : "V1") + "'," +
"'sink.use.new-sink-api' = '" + (newSinkApi ? "true" : "false") + "'," +
"'database-name' = '" + DB_NAME + "'," +
"'table-name' = '" + sinkOptions.getTableName() + "'," +
"'username' = '" + sinkOptions.getUsername() + "'," +
"'password' = '" + sinkOptions.getPassword() + "'" +
")";
tEnv.executeSql(createSinkSQL);
tEnv.executeSql("INSERT INTO sink SELECT * FROM src").await();

String createSrcSQL = "CREATE TABLE sr_src(" +
"c0 INT," +
"c1 DOUBLE," +
"c2 STRING," +
"PRIMARY KEY (`c0`) NOT ENFORCED" +
") WITH ( " +
"'connector' = 'starrocks'," +
"'jdbc-url'='" + sinkOptions.getJdbcUrl() + "'," +
"'scan-url'='" + String.join(";", sinkOptions.getLoadUrlList()) + "'," +
"'database-name' = '" + DB_NAME + "'," +
"'table-name' = '" + sinkOptions.getTableName() + "'," +
"'username' = '" + sinkOptions.getUsername() + "'," +
"'password' = '" + sinkOptions.getPassword() + "'" +
")";
tEnv.executeSql(createSrcSQL);
List<List<Object>> actualData;
try (CloseableIterator<Row> result = tEnv.executeSql("SELECT * FROM sr_src").collect()) {
actualData = collectResult(result);
}

List<List<Object>> expectedData = Arrays.asList(
Arrays.asList(1, 1.0, "{\"a\": 1, \"b\": true}"),
Arrays.asList(2, 2.0, "{\"a\": 2, \"b\": false}")
);

verifyResult(expectedData, actualData);
}

private List<List<Object>> collectResult(CloseableIterator<Row> result) {
List<List<Object>> data = new ArrayList<>();
while (result.hasNext()) {
Row row = result.next();
List<Object> list = new ArrayList<>();
for (int i = 0; i < row.getArity(); i++) {
list.add(row.getField(i).toString());
}
data.add(list);
}
return data;
}

private String createJsonTable(String tablePrefix) throws Exception {
String tableName = tablePrefix + "_" + genRandomUuid();
String createStarRocksTable =
String.format(
"CREATE TABLE `%s`.`%s` (" +
"c0 INT," +
"c1 DOUBLE," +
"c2 JSON" +
") ENGINE = OLAP " +
"PRIMARY KEY(c0) " +
"DISTRIBUTED BY HASH (c0) BUCKETS 8 " +
"PROPERTIES (" +
"\"replication_num\" = \"1\"" +
")",
DB_NAME, tableName);
executeSrSQL(createStarRocksTable);
return tableName;
}
}

0 comments on commit 3242e74

Please sign in to comment.