Skip to content

Commit

Permalink
add_be_host_mapping_list_config
Browse files Browse the repository at this point in the history
Signed-off-by: ico01 <[email protected]>
  • Loading branch information
ico01 committed Jun 12, 2024
1 parent 846f1e4 commit ce86cc1
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public interface ConfigurationOptions {
// starrocks fe node address
String STARROCKS_FENODES = "starrocks.fenodes";

String STARROCKS_BE_HOST_MAPPING_LIST = "starrocks.be.host.mapping.list";

String STARROCKS_DEFAULT_CLUSTER = "default_cluster";

String STARROCKS_TIMEZONE = "starrocks.timezone";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,15 @@

import static com.starrocks.connector.spark.util.ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE;

import com.starrocks.connector.spark.cfg.ConfigurationOptions;
import com.starrocks.connector.spark.cfg.Settings;
import com.starrocks.connector.spark.exception.IllegalArgumentException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;

/**
* present an StarRocks BE address.
*/
Expand All @@ -34,12 +39,30 @@ public class Routing {
private String host;
private int port;

public Routing(String routing) throws IllegalArgumentException {
parseRouting(routing);
public Routing(String routing, Settings settings) throws IllegalArgumentException {
parseRouting(routing, settings);
}

private void parseRouting(String routing) throws IllegalArgumentException {
private void parseRouting(String routing, Settings settings) throws IllegalArgumentException {
logger.debug("Parse StarRocks BE address: '{}'.", routing);
String beHostMappingList = settings.getProperty(ConfigurationOptions.STARROCKS_BE_HOST_MAPPING_LIST, "");
if (beHostMappingList.length() > 0) {
String list = beHostMappingList;
Map<String, String> mappingMap = new HashMap<>();
String[] beHostMappingInfos = list.split(";");
for (String beHostMappingInfo : beHostMappingInfos) {
String[] mapping = beHostMappingInfo.split(",");
mappingMap.put(mapping[1].trim(), mapping[0].trim());
}
if (!mappingMap.containsKey(routing)) {
throw new RuntimeException("Not find be node info from the be port mappping list");
}
routing = mappingMap.get(routing);
logger.info("query data from be by using be-hostname {}", routing);
} else {
logger.info("query data from be by using be-ip {}", routing);
}

String[] hostPort = routing.split(":");
if (hostPort.length != 2) {
logger.error("Format of StarRocks BE address '{}' is illegal.", routing);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
protected val logger = Logger.getLogger(classOf[ScalaValueReader])

protected val timeZone = ZoneId.of(settings.getProperty(STARROCKS_TIMEZONE, ZoneId.systemDefault.toString))
protected val client = new BackendClient(new Routing(partition.getBeAddress), settings)
protected val client = new BackendClient(new Routing(partition.getBeAddress, settings), settings)
protected var offset = 0
protected var eos: AtomicBoolean = new AtomicBoolean(false)
protected var rowBatch: RowBatch = _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@

import static org.hamcrest.core.StringStartsWith.startsWith;

import com.starrocks.connector.spark.cfg.ConfigurationOptions;
import com.starrocks.connector.spark.cfg.PropertiesSettings;
import com.starrocks.connector.spark.cfg.Settings;
import com.starrocks.connector.spark.exception.IllegalArgumentException;

import org.junit.Assert;
Expand All @@ -32,18 +35,36 @@ public class TestRouting {
@Rule
public ExpectedException thrown = ExpectedException.none();


@Test
public void testRouting() throws Exception {
Routing r1 = new Routing("10.11.12.13:1234");
public void testRoutingNoBeMappingList() throws Exception {
Settings settings = new PropertiesSettings();
Routing r1 = new Routing("10.11.12.13:1234", settings);
Assert.assertEquals("10.11.12.13", r1.getHost());
Assert.assertEquals(1234, r1.getPort());

thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(startsWith("argument "));
new Routing("10.11.12.13:wxyz");
new Routing("10.11.12.13:wxyz", settings);

thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(startsWith("Parse "));
new Routing("10.11.12.13");
new Routing("10.11.12.13", settings);
}

@Test
public void testRoutingBeMappingList() throws Exception {
Settings settings = new PropertiesSettings();
String mappingList = "20.11.12.13:6666,10.11.12.13:1234;21.11.12.13:5555,11.11.12.13:1234";
settings.setProperty(ConfigurationOptions.STARROCKS_BE_HOST_MAPPING_LIST, mappingList);

Routing r1 = new Routing("10.11.12.13:1234", settings);
Assert.assertEquals("20.11.12.13", r1.getHost());
Assert.assertEquals(6666, r1.getPort());

Routing r2 = new Routing("11.11.12.13:1234", settings);
Assert.assertEquals("21.11.12.13", r2.getHost());
Assert.assertEquals(5555, r2.getPort());

}
}

0 comments on commit ce86cc1

Please sign in to comment.