diff --git a/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java b/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java index 36a5780a..ff40e11f 100644 --- a/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java +++ b/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java @@ -425,7 +425,7 @@ public void testEnableExactlyOnceLabelGen() throws Exception { Map options = new HashMap<>(); options.put("sink.semantic", "exactly-once"); options.put("sink.label-prefix", "test_label"); - options.put("sink.enable.exactly-once.label-gen", "true"); + options.put("sink.exactly-once.enable-label-gen", "true"); String checkpointDir = temporaryFolder.newFolder().toURI().toString(); testConfigurationBase(options, env -> { @@ -464,7 +464,7 @@ public void testAbortLingeringTransactionsWithCheckNum() throws Exception { Map options = new HashMap<>(); options.put("sink.semantic", "exactly-once"); options.put("sink.label-prefix", "test_label"); - options.put("sink.abort.check-num-txns", "10"); + options.put("sink.exactly-once.check-num-lingering-txn", "10"); String checkpointDir = temporaryFolder.newFolder().toURI().toString(); testConfigurationBase(options, env -> { @@ -478,6 +478,14 @@ public void testAbortLingeringTransactionsWithCheckNum() throws Exception { ); } + @Test + public void testCsvFormatWithColumnSeparatorAndRowDelimiter() throws Exception { + Map map = new HashMap<>(); + map.put("sink.properties.column_separator", "\\x01"); + map.put("sink.properties.row_delimiter", "\\0x2"); + testConfigurationBase(map, env -> null); + } + @Test public void testJsonFormat() throws Exception { if (isSinkV2) { @@ -492,7 +500,7 @@ public void testJsonFormat() throws Exception { } private void testConfigurationBase(Map options, Function setFlinkEnv) throws Exception { - String tableName = createPkTable("testAtLeastOnceBase"); + String tableName = createPkTable("testConfigurationBase"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); setFlinkEnv.apply(env); diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/DelimiterParser.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/DelimiterParser.java new file mode 100644 index 00000000..ae6fa84a --- /dev/null +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/DelimiterParser.java @@ -0,0 +1,79 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.data.load.stream; + +import java.io.StringWriter; + +/** + * Parse the delimiter which contains escape characters. The logic is same as + * that in StarRocks, See class Delimiter of StarRocks FE. + */ +public class DelimiterParser { + + private static final String HEX_STRING = "0123456789ABCDEF"; + + private static byte[] hexStrToBytes(String hexStr) { + String upperHexStr = hexStr.toUpperCase(); + int length = upperHexStr.length() / 2; + char[] hexChars = upperHexStr.toCharArray(); + byte[] bytes = new byte[length]; + for (int i = 0; i < length; i++) { + int pos = i * 2; + bytes[i] = (byte) (charToByte(hexChars[pos]) << 4 | (0xff & charToByte(hexChars[pos + 1]))); + } + return bytes; + } + + private static byte charToByte(char c) { + return (byte) HEX_STRING.indexOf(c); + } + + public static String convertDelimiter(String originStr) { + if (originStr == null || originStr.isEmpty()) { + throw new RuntimeException("The delimiter can't be null or empty"); + } + + if (originStr.toUpperCase().startsWith("\\X") || originStr.toUpperCase().startsWith("0X")) { + String hexStr = originStr.substring(2); + // check hex str + if (hexStr.isEmpty()) { + throw new RuntimeException("Invalid delimiter '" + originStr + ": empty hex string"); + } + if (hexStr.length() % 2 != 0) { + throw new RuntimeException("Invalid delimiter '" + originStr + ": hex length must be a even number"); + } + for (char hexChar : hexStr.toUpperCase().toCharArray()) { + if (HEX_STRING.indexOf(hexChar) == -1) { + throw new RuntimeException("Invalid delimiter '" + originStr + "': invalid hex format"); + } + } + + // transform to delimiter + StringWriter writer = new StringWriter(); + for (byte b : hexStrToBytes(hexStr)) { + writer.append((char) b); + } + return writer.toString(); + } else { + return originStr; + } + } +} diff --git a/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/DelimiterParserTest.java b/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/DelimiterParserTest.java new file mode 100644 index 00000000..ec6e5367 --- /dev/null +++ b/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/DelimiterParserTest.java @@ -0,0 +1,46 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.data.load.stream; + +import org.junit.Assert; +import org.junit.Test; + +public class DelimiterParserTest { + + @Test + public void testNormal() { + Assert.assertEquals("\n", DelimiterParser.convertDelimiter("\n")); + Assert.assertEquals("\1", DelimiterParser.convertDelimiter("\\x01")); + Assert.assertEquals("\0\1", DelimiterParser.convertDelimiter("\\x0001")); + Assert.assertEquals("|", DelimiterParser.convertDelimiter("|")); + Assert.assertEquals("\\|", DelimiterParser.convertDelimiter("\\|")); + } + + @Test(expected = RuntimeException.class) + public void testHexFormatError() { + DelimiterParser.convertDelimiter("\\x0g"); + } + + @Test(expected = RuntimeException.class) + public void testHexLengthError() { + DelimiterParser.convertDelimiter("\\x011"); + } +}