diff --git a/docs/connector-read.md b/docs/connector-read.md index fab85f2..6d57ae7 100644 --- a/docs/connector-read.md +++ b/docs/connector-read.md @@ -155,21 +155,22 @@ This section describes the parameters you need to configure when you use the Spa The following parameters apply to all three reading methods: Spark SQL, Spark DataFrame, and Spark RDD. -| Parameter | Default value | Description | -| ------------------------------------ | ----------------- | ------------------------------------------------------------ | -| starrocks.fenodes | None | The HTTP URL of the FE in your StarRocks cluster. Format `:`. You can specify multiple URLs, which must be separated by a comma (,). | -| starrocks.table.identifier | None | The name of the StarRocks table. Format: `.`. | -| starrocks.request.retries | 3 | The maximum number of times that Spark can retry to send a read request o StarRocks. | -| starrocks.request.connect.timeout.ms | 30000 | The maximum amount of time after which a read request sent to StarRocks times out. | -| starrocks.request.read.timeout.ms | 30000 | The maximum amount of time after which the reading for a request sent to StarRocks times out. | -| starrocks.request.query.timeout.s | 3600 | The maximum amount of time after which a query of data from StarRocks times out. The default timeout period is 1 hour. `-1` means that no timeout period is specified. | +| Parameter | Default value | Description | +| ------------------------------------ | ----------------- |------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| starrocks.fenodes | None | The HTTP URL of the FE in your StarRocks cluster. Format `:`. You can specify multiple URLs, which must be separated by a comma (,). | +| starrocks.be.host.mapping.list | None | The list of mappings between the external IPs or domain names and internal service names of StarRocks BE nodes. Ofen used in kubernetes. Format: `:,:;...`. | +| starrocks.table.identifier | None | The name of the StarRocks table. Format: `.`. | +| starrocks.request.retries | 3 | The maximum number of times that Spark can retry to send a read request o StarRocks. | +| starrocks.request.connect.timeout.ms | 30000 | The maximum amount of time after which a read request sent to StarRocks times out. | +| starrocks.request.read.timeout.ms | 30000 | The maximum amount of time after which the reading for a request sent to StarRocks times out. | +| starrocks.request.query.timeout.s | 3600 | The maximum amount of time after which a query of data from StarRocks times out. The default timeout period is 1 hour. `-1` means that no timeout period is specified. | | starrocks.request.tablet.size | Integer.MAX_VALUE | The number of StarRocks tablets grouped into each Spark RDD partition. A smaller value of this parameter indicates that a larger number of Spark RDD partitions will be generated. A larger number of Spark RDD partitions means higher parallelism on Spark but greater pressure on StarRocks. | -| starrocks.batch.size | 4096 | The maximum number of rows that can be read from BEs at a time. Increasing the value of this parameter can reduce the number of connections established between Spark and StarRocks, thereby mitigating extra time overheads caused by network latency. | -| starrocks.exec.mem.limit | 2147483648 | The maximum amount of memory allowed per query. Unit: bytes. The default memory limit is 2 GB. | -| starrocks.deserialize.arrow.async | false | Specifies whether to support asynchronously converting the Arrow memory format to RowBatches required for the iteration of the Spark connector. | -| starrocks.deserialize.queue.size | 64 | The size of the internal queue that holds tasks for asynchronously converting the Arrow memory format to RowBatches. This parameter is valid when `starrocks.deserialize.arrow.async` is set to `true`. | -| starrocks.filter.query | None | The condition based on which you want to filter data on StarRocks. You can specify multiple filter conditions, which must be joined by `and`. StarRocks filters the data from the StarRocks table based on the specified filter conditions before the data is read by Spark. | -| starrocks.timezone | Default timezone of JVM | Supported since 1.1.1. The timezone used to convert StarRocks `DATETIME` to Spark `TimestampType`. The default is the timezone of JVM returned by `ZoneId#systemDefault()`. The format could be a timezone name such as `Asia/Shanghai`, or a zone offset such as `+08:00`. | +| starrocks.batch.size | 4096 | The maximum number of rows that can be read from BEs at a time. Increasing the value of this parameter can reduce the number of connections established between Spark and StarRocks, thereby mitigating extra time overheads caused by network latency. | +| starrocks.exec.mem.limit | 2147483648 | The maximum amount of memory allowed per query. Unit: bytes. The default memory limit is 2 GB. | +| starrocks.deserialize.arrow.async | false | Specifies whether to support asynchronously converting the Arrow memory format to RowBatches required for the iteration of the Spark connector. | +| starrocks.deserialize.queue.size | 64 | The size of the internal queue that holds tasks for asynchronously converting the Arrow memory format to RowBatches. This parameter is valid when `starrocks.deserialize.arrow.async` is set to `true`. | +| starrocks.filter.query | None | The condition based on which you want to filter data on StarRocks. You can specify multiple filter conditions, which must be joined by `and`. StarRocks filters the data from the StarRocks table based on the specified filter conditions before the data is read by Spark. | +| starrocks.timezone | Default timezone of JVM | Supported since 1.1.1. The timezone used to convert StarRocks `DATETIME` to Spark `TimestampType`. The default is the timezone of JVM returned by `ZoneId#systemDefault()`. The format could be a timezone name such as `Asia/Shanghai`, or a zone offset such as `+08:00`. | ### Parameters for Spark SQL and Spark DataFrame diff --git a/src/main/java/com/starrocks/connector/spark/cfg/ConfigurationOptions.java b/src/main/java/com/starrocks/connector/spark/cfg/ConfigurationOptions.java index 2f96f98..298c7e6 100644 --- a/src/main/java/com/starrocks/connector/spark/cfg/ConfigurationOptions.java +++ b/src/main/java/com/starrocks/connector/spark/cfg/ConfigurationOptions.java @@ -23,6 +23,8 @@ public interface ConfigurationOptions { // starrocks fe node address String STARROCKS_FENODES = "starrocks.fenodes"; + String STARROCKS_BE_HOST_MAPPING_LIST = "starrocks.be.host.mapping.list"; + String STARROCKS_DEFAULT_CLUSTER = "default_cluster"; String STARROCKS_TIMEZONE = "starrocks.timezone"; diff --git a/src/main/java/com/starrocks/connector/spark/serialization/Routing.java b/src/main/java/com/starrocks/connector/spark/serialization/Routing.java index 0637a94..5481e20 100644 --- a/src/main/java/com/starrocks/connector/spark/serialization/Routing.java +++ b/src/main/java/com/starrocks/connector/spark/serialization/Routing.java @@ -21,10 +21,15 @@ import static com.starrocks.connector.spark.util.ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE; +import com.starrocks.connector.spark.cfg.ConfigurationOptions; +import com.starrocks.connector.spark.cfg.Settings; import com.starrocks.connector.spark.exception.IllegalArgumentException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; +import java.util.Map; + /** * present an StarRocks BE address. */ @@ -34,12 +39,30 @@ public class Routing { private String host; private int port; - public Routing(String routing) throws IllegalArgumentException { - parseRouting(routing); + public Routing(String routing, Settings settings) throws IllegalArgumentException { + parseRouting(routing, settings); } - private void parseRouting(String routing) throws IllegalArgumentException { + private void parseRouting(String routing, Settings settings) throws IllegalArgumentException { logger.debug("Parse StarRocks BE address: '{}'.", routing); + String beHostMappingList = settings.getProperty(ConfigurationOptions.STARROCKS_BE_HOST_MAPPING_LIST, ""); + if (beHostMappingList.length() > 0) { + String list = beHostMappingList; + Map mappingMap = new HashMap<>(); + String[] beHostMappingInfos = list.split(";"); + for (String beHostMappingInfo : beHostMappingInfos) { + String[] mapping = beHostMappingInfo.split(","); + mappingMap.put(mapping[1].trim(), mapping[0].trim()); + } + if (!mappingMap.containsKey(routing)) { + throw new RuntimeException("Not find be node info from the be port mappping list"); + } + routing = mappingMap.get(routing); + logger.info("query data from be by using be-hostname {}", routing); + } else { + logger.info("query data from be by using be-ip {}", routing); + } + String[] hostPort = routing.split(":"); if (hostPort.length != 2) { logger.error("Format of StarRocks BE address '{}' is illegal.", routing); diff --git a/src/main/scala/com/starrocks/connector/spark/rdd/ScalaValueReader.scala b/src/main/scala/com/starrocks/connector/spark/rdd/ScalaValueReader.scala index 89d6bd0..495e24c 100644 --- a/src/main/scala/com/starrocks/connector/spark/rdd/ScalaValueReader.scala +++ b/src/main/scala/com/starrocks/connector/spark/rdd/ScalaValueReader.scala @@ -48,7 +48,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) { protected val logger = Logger.getLogger(classOf[ScalaValueReader]) protected val timeZone = ZoneId.of(settings.getProperty(STARROCKS_TIMEZONE, ZoneId.systemDefault.toString)) - protected val client = new BackendClient(new Routing(partition.getBeAddress), settings) + protected val client = new BackendClient(new Routing(partition.getBeAddress, settings), settings) protected var offset = 0 protected var eos: AtomicBoolean = new AtomicBoolean(false) protected var rowBatch: RowBatch = _ diff --git a/src/test/java/com/starrocks/connector/spark/serialization/TestRouting.java b/src/test/java/com/starrocks/connector/spark/serialization/TestRouting.java index bc01b69..077139d 100644 --- a/src/test/java/com/starrocks/connector/spark/serialization/TestRouting.java +++ b/src/test/java/com/starrocks/connector/spark/serialization/TestRouting.java @@ -21,6 +21,9 @@ import static org.hamcrest.core.StringStartsWith.startsWith; +import com.starrocks.connector.spark.cfg.ConfigurationOptions; +import com.starrocks.connector.spark.cfg.PropertiesSettings; +import com.starrocks.connector.spark.cfg.Settings; import com.starrocks.connector.spark.exception.IllegalArgumentException; import org.junit.Assert; @@ -32,18 +35,36 @@ public class TestRouting { @Rule public ExpectedException thrown = ExpectedException.none(); + @Test - public void testRouting() throws Exception { - Routing r1 = new Routing("10.11.12.13:1234"); + public void testRoutingNoBeMappingList() throws Exception { + Settings settings = new PropertiesSettings(); + Routing r1 = new Routing("10.11.12.13:1234", settings); Assert.assertEquals("10.11.12.13", r1.getHost()); Assert.assertEquals(1234, r1.getPort()); thrown.expect(IllegalArgumentException.class); thrown.expectMessage(startsWith("argument ")); - new Routing("10.11.12.13:wxyz"); + new Routing("10.11.12.13:wxyz", settings); thrown.expect(IllegalArgumentException.class); thrown.expectMessage(startsWith("Parse ")); - new Routing("10.11.12.13"); + new Routing("10.11.12.13", settings); + } + + @Test + public void testRoutingBeMappingList() throws Exception { + Settings settings = new PropertiesSettings(); + String mappingList = "20.11.12.13:6666,10.11.12.13:1234;21.11.12.13:5555,11.11.12.13:1234"; + settings.setProperty(ConfigurationOptions.STARROCKS_BE_HOST_MAPPING_LIST, mappingList); + + Routing r1 = new Routing("10.11.12.13:1234", settings); + Assert.assertEquals("20.11.12.13", r1.getHost()); + Assert.assertEquals(6666, r1.getPort()); + + Routing r2 = new Routing("11.11.12.13:1234", settings); + Assert.assertEquals("21.11.12.13", r2.getHost()); + Assert.assertEquals(5555, r2.getPort()); + } }