Skip to content

Commit

Permalink
add_be_host_mapping_list_config
Browse files Browse the repository at this point in the history
Signed-off-by: ico01 <[email protected]>
  • Loading branch information
ico01 committed Jun 12, 2024
1 parent 846f1e4 commit 93997f4
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 22 deletions.
29 changes: 15 additions & 14 deletions docs/connector-read.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,21 +155,22 @@ This section describes the parameters you need to configure when you use the Spa

The following parameters apply to all three reading methods: Spark SQL, Spark DataFrame, and Spark RDD.

| Parameter | Default value | Description |
| ------------------------------------ | ----------------- | ------------------------------------------------------------ |
| starrocks.fenodes | None | The HTTP URL of the FE in your StarRocks cluster. Format `<fe_host>:<fe_http_port>`. You can specify multiple URLs, which must be separated by a comma (,). |
| starrocks.table.identifier | None | The name of the StarRocks table. Format: `<database_name>.<table_name>`. |
| starrocks.request.retries | 3 | The maximum number of times that Spark can retry to send a read request o StarRocks. |
| starrocks.request.connect.timeout.ms | 30000 | The maximum amount of time after which a read request sent to StarRocks times out. |
| starrocks.request.read.timeout.ms | 30000 | The maximum amount of time after which the reading for a request sent to StarRocks times out. |
| starrocks.request.query.timeout.s | 3600 | The maximum amount of time after which a query of data from StarRocks times out. The default timeout period is 1 hour. `-1` means that no timeout period is specified. |
| Parameter | Default value | Description |
| ------------------------------------ | ----------------- |------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| starrocks.fenodes | None | The HTTP URL of the FE in your StarRocks cluster. Format `<fe_host>:<fe_http_port>`. You can specify multiple URLs, which must be separated by a comma (,). |
| starrocks.be.host.mapping.list | None | The list of mappings between the external IPs or domain names and internal service names of StarRocks BE nodes. Ofen used in kubernetes. Format: `<external_ip>:<port>,<internal_service_name>:<port>;...`. |
| starrocks.table.identifier | None | The name of the StarRocks table. Format: `<database_name>.<table_name>`. |
| starrocks.request.retries | 3 | The maximum number of times that Spark can retry to send a read request o StarRocks. |
| starrocks.request.connect.timeout.ms | 30000 | The maximum amount of time after which a read request sent to StarRocks times out. |
| starrocks.request.read.timeout.ms | 30000 | The maximum amount of time after which the reading for a request sent to StarRocks times out. |
| starrocks.request.query.timeout.s | 3600 | The maximum amount of time after which a query of data from StarRocks times out. The default timeout period is 1 hour. `-1` means that no timeout period is specified. |
| starrocks.request.tablet.size | Integer.MAX_VALUE | The number of StarRocks tablets grouped into each Spark RDD partition. A smaller value of this parameter indicates that a larger number of Spark RDD partitions will be generated. A larger number of Spark RDD partitions means higher parallelism on Spark but greater pressure on StarRocks. |
| starrocks.batch.size | 4096 | The maximum number of rows that can be read from BEs at a time. Increasing the value of this parameter can reduce the number of connections established between Spark and StarRocks, thereby mitigating extra time overheads caused by network latency. |
| starrocks.exec.mem.limit | 2147483648 | The maximum amount of memory allowed per query. Unit: bytes. The default memory limit is 2 GB. |
| starrocks.deserialize.arrow.async | false | Specifies whether to support asynchronously converting the Arrow memory format to RowBatches required for the iteration of the Spark connector. |
| starrocks.deserialize.queue.size | 64 | The size of the internal queue that holds tasks for asynchronously converting the Arrow memory format to RowBatches. This parameter is valid when `starrocks.deserialize.arrow.async` is set to `true`. |
| starrocks.filter.query | None | The condition based on which you want to filter data on StarRocks. You can specify multiple filter conditions, which must be joined by `and`. StarRocks filters the data from the StarRocks table based on the specified filter conditions before the data is read by Spark. |
| starrocks.timezone | Default timezone of JVM | Supported since 1.1.1. The timezone used to convert StarRocks `DATETIME` to Spark `TimestampType`. The default is the timezone of JVM returned by `ZoneId#systemDefault()`. The format could be a timezone name such as `Asia/Shanghai`, or a zone offset such as `+08:00`. |
| starrocks.batch.size | 4096 | The maximum number of rows that can be read from BEs at a time. Increasing the value of this parameter can reduce the number of connections established between Spark and StarRocks, thereby mitigating extra time overheads caused by network latency. |
| starrocks.exec.mem.limit | 2147483648 | The maximum amount of memory allowed per query. Unit: bytes. The default memory limit is 2 GB. |
| starrocks.deserialize.arrow.async | false | Specifies whether to support asynchronously converting the Arrow memory format to RowBatches required for the iteration of the Spark connector. |
| starrocks.deserialize.queue.size | 64 | The size of the internal queue that holds tasks for asynchronously converting the Arrow memory format to RowBatches. This parameter is valid when `starrocks.deserialize.arrow.async` is set to `true`. |
| starrocks.filter.query | None | The condition based on which you want to filter data on StarRocks. You can specify multiple filter conditions, which must be joined by `and`. StarRocks filters the data from the StarRocks table based on the specified filter conditions before the data is read by Spark. |
| starrocks.timezone | Default timezone of JVM | Supported since 1.1.1. The timezone used to convert StarRocks `DATETIME` to Spark `TimestampType`. The default is the timezone of JVM returned by `ZoneId#systemDefault()`. The format could be a timezone name such as `Asia/Shanghai`, or a zone offset such as `+08:00`. |
### Parameters for Spark SQL and Spark DataFrame
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public interface ConfigurationOptions {
// starrocks fe node address
String STARROCKS_FENODES = "starrocks.fenodes";

String STARROCKS_BE_HOST_MAPPING_LIST = "starrocks.be.host.mapping.list";

String STARROCKS_DEFAULT_CLUSTER = "default_cluster";

String STARROCKS_TIMEZONE = "starrocks.timezone";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,15 @@

import static com.starrocks.connector.spark.util.ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE;

import com.starrocks.connector.spark.cfg.ConfigurationOptions;
import com.starrocks.connector.spark.cfg.Settings;
import com.starrocks.connector.spark.exception.IllegalArgumentException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;

/**
* present an StarRocks BE address.
*/
Expand All @@ -34,12 +39,30 @@ public class Routing {
private String host;
private int port;

public Routing(String routing) throws IllegalArgumentException {
parseRouting(routing);
public Routing(String routing, Settings settings) throws IllegalArgumentException {
parseRouting(routing, settings);
}

private void parseRouting(String routing) throws IllegalArgumentException {
private void parseRouting(String routing, Settings settings) throws IllegalArgumentException {
logger.debug("Parse StarRocks BE address: '{}'.", routing);
String beHostMappingList = settings.getProperty(ConfigurationOptions.STARROCKS_BE_HOST_MAPPING_LIST, "");
if (beHostMappingList.length() > 0) {
String list = beHostMappingList;
Map<String, String> mappingMap = new HashMap<>();
String[] beHostMappingInfos = list.split(";");
for (String beHostMappingInfo : beHostMappingInfos) {
String[] mapping = beHostMappingInfo.split(",");
mappingMap.put(mapping[1].trim(), mapping[0].trim());
}
if (!mappingMap.containsKey(routing)) {
throw new RuntimeException("Not find be node info from the be port mappping list");
}
routing = mappingMap.get(routing);
logger.info("query data from be by using be-hostname {}", routing);
} else {
logger.info("query data from be by using be-ip {}", routing);
}

String[] hostPort = routing.split(":");
if (hostPort.length != 2) {
logger.error("Format of StarRocks BE address '{}' is illegal.", routing);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
protected val logger = Logger.getLogger(classOf[ScalaValueReader])

protected val timeZone = ZoneId.of(settings.getProperty(STARROCKS_TIMEZONE, ZoneId.systemDefault.toString))
protected val client = new BackendClient(new Routing(partition.getBeAddress), settings)
protected val client = new BackendClient(new Routing(partition.getBeAddress, settings), settings)
protected var offset = 0
protected var eos: AtomicBoolean = new AtomicBoolean(false)
protected var rowBatch: RowBatch = _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@

import static org.hamcrest.core.StringStartsWith.startsWith;

import com.starrocks.connector.spark.cfg.ConfigurationOptions;
import com.starrocks.connector.spark.cfg.PropertiesSettings;
import com.starrocks.connector.spark.cfg.Settings;
import com.starrocks.connector.spark.exception.IllegalArgumentException;

import org.junit.Assert;
Expand All @@ -32,18 +35,36 @@ public class TestRouting {
@Rule
public ExpectedException thrown = ExpectedException.none();


@Test
public void testRouting() throws Exception {
Routing r1 = new Routing("10.11.12.13:1234");
public void testRoutingNoBeMappingList() throws Exception {
Settings settings = new PropertiesSettings();
Routing r1 = new Routing("10.11.12.13:1234", settings);
Assert.assertEquals("10.11.12.13", r1.getHost());
Assert.assertEquals(1234, r1.getPort());

thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(startsWith("argument "));
new Routing("10.11.12.13:wxyz");
new Routing("10.11.12.13:wxyz", settings);

thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(startsWith("Parse "));
new Routing("10.11.12.13");
new Routing("10.11.12.13", settings);
}

@Test
public void testRoutingBeMappingList() throws Exception {
Settings settings = new PropertiesSettings();
String mappingList = "20.11.12.13:6666,10.11.12.13:1234;21.11.12.13:5555,11.11.12.13:1234";
settings.setProperty(ConfigurationOptions.STARROCKS_BE_HOST_MAPPING_LIST, mappingList);

Routing r1 = new Routing("10.11.12.13:1234", settings);
Assert.assertEquals("20.11.12.13", r1.getHost());
Assert.assertEquals(6666, r1.getPort());

Routing r2 = new Routing("11.11.12.13:1234", settings);
Assert.assertEquals("21.11.12.13", r2.getHost());
Assert.assertEquals(5555, r2.getPort());

}
}

0 comments on commit 93997f4

Please sign in to comment.