Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

function connect by rs list #20

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,6 @@ test-output
# Logs
*.log.*

# vscode
.vscode

33 changes: 32 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,42 @@ Import the dependency for your maven project:
```

The code demo:
use configurl
``` java
// 1. initail ObTableClient
ObTableClient obTableClient = new ObTableClient();
obTableClient.setFullUserName("full_user_name");
obTableClient.setParamURL("param_url");
obTableClient.setParamURL("your configurl + database=xxx"); // e.g. http://ip:port/services?Action=ObRootServiceInfo&ObRegion=ocp&database=test
obTableClient.setPassword("password");
obTableClient.setSysUserName("sys_user_name");
obTableClient.setSysPassword("sys_user_passwd");
obTableClient.init();

// 2. single execute
// return affectedRows
obTableClient.insert("test_varchar_table", "foo", new String[] { "c2" }, new String[] { "bar" });
// return Map<String, Object>
obTableClient.get("test_varchar_table", "foo", new String[] { "c2" });
// return affectedRows
obTableClient.delete("test_varchar_table", "foo");

// 3. batch execute
TableBatchOps batchOps = obTableClient.batch("test_varchar_table");
batchOps.insert("foo", new String[] { "c2" }, new String[] { "bar" });
batchOps.get("foo", new String[] { "c2" });
batchOps.delete("foo");

List<Object> results = batchOps.execute();
// the results include 3 item: 1. affectedRows; 2. Map; 3. affectedRows.
```
use rs list
``` java
// 1. initail ObTableClient
ObTableClient obTableClient = new ObTableClient();
// if use rs_list ;please set Full user name format user@tenant e.g. root@sys
obTableClient.setFullUserName("full_user_name");
obTableClient.setRsList("your rs_list"); // e.g. 172.16.0.80:2882:2881;172.16.0.82:2882:2881;172.16.0.86:2882:2881
obTableClient.setaAppRegion("your app region"); // e.g. region
obTableClient.setPassword("password");
obTableClient.setSysUserName("sys_user_name");
obTableClient.setSysPassword("sys_user_passwd");
Expand Down
145 changes: 145 additions & 0 deletions src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,23 @@ public class ObTableClient extends AbstractObTableClient implements Lifecycle {
private AtomicInteger tableEntryRefreshContinuousFailureCount = new AtomicInteger(
0);
private String dataSourceName;

private String paramURL;
/*
* rsList >> paramURL
*/
private String rsList;
private Boolean useRsList;
/*
* when use rs list need to asset region
*/
private String appRegion;
/*
* user name
* Standard format: user@tenant#cluster
* NonStandard format: cluster:tenant:user
* when use rs_list
* Standard format: user@tenant
*/
private String fullUserName;
private String userName;
Expand Down Expand Up @@ -293,8 +305,83 @@ private void initProperties() {
rpcLoginTimeout = parseToInt(RPC_LOGIN_TIMEOUT.getKey(), rpcLoginTimeout);
}

private void initMetadataFromRS() throws Exception {

List<ObServerAddr> rsList = new ArrayList<ObServerAddr>();
for (String serverStr : getRsList().split(":")){
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set port?

ObServerAddr serverAddr = new ObServerAddr();
serverAddr.setAddress(serverStr);
rsList.add(serverAddr);
}

List<ObServerAddr> servers = new ArrayList<ObServerAddr>();
ConcurrentHashMap<ObServerAddr, ObTable> tableRoster = new ConcurrentHashMap<ObServerAddr, ObTable>();

TableEntryKey rootServerKey = new TableEntryKey(clusterName, tenantName,
OCEANBASE_DATABASE, ALL_DUMMY_TABLE);


TableEntry tableEntry = loadTableEntryRandomly(rsList,//
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to reuse the logic below, as the code is mainly the same with the original method.

rootServerKey,//
tableEntryAcquireConnectTimeout,//
tableEntryAcquireSocketTimeout, sysUA);

List<ReplicaLocation> replicaLocations = tableEntry.getTableLocation()
.getReplicaLocations();
for (ReplicaLocation replicaLocation : replicaLocations) {
ObServerInfo info = replicaLocation.getInfo();
ObServerAddr addr = replicaLocation.getAddr();
if (!info.isActive()) {
logger.warn("will not init location {} because status is {}", addr.toString(),
info.getStatus());
continue;
}

// 忽略初始化建连失败,否则client会初始化失败,导致应用无法启动的问题
// 初始化建连失败(可能性较小),如果后面这台server恢复,数据路由失败,就会重新刷新metadata
// 在失败100次后(RUNTIME_CONTINUOUS_FAILURE_CEILING),重新刷新建连
// 本地cache 1小时超时后(SERVER_ADDRESS_CACHING_TIMEOUT),重新刷新建连
// 应急可以直接observer切主
try {
ObTable obTable = new ObTable.Builder(addr.getIp(), addr.getSvrPort()) //
.setLoginInfo(tenantName, userName, password, database) //
.setProperties(getProperties()).build();
tableRoster.put(addr, obTable);
servers.add(addr);
} catch (Exception e) {
logger
.warn(
"The addr{}:{} failed to put into table roster, the node status may be wrong, Ignore",
addr.getIp(), addr.getSvrPort());
}
}
this.tableRoster = tableRoster;
this.serverRoster.reset(servers);

// Get Server LDC info for weak read consistency.
if (StringUtil.isEmpty(currentIDC)) {
currentIDC = ZoneUtil.getCurrentIDC();
}
List<ObServerLdcItem> ldcServers = getServerLdc(serverRoster,
tableEntryAcquireConnectTimeout, tableEntryAcquireSocketTimeout,
serverAddressPriorityTimeout, serverAddressCachingTimeout, sysUA);
this.serverRoster.resetServerLdc(ObServerLdcLocation.buildLdcLocation(ldcServers,
currentIDC, getAppRegion()));

if (logger.isInfoEnabled()) {
logger.info("finish refresh serverRoster: {}", serverRoster);
}

this.lastRefreshMetadataTimestamp = System.currentTimeMillis();
}

private void initMetadata() throws Exception {

if (getUseRsList().equals(true)){
initMetadataFromRS();
return;
}

this.ocpModel = loadOcpModel(paramURL, dataSourceName, rsListAcquireConnectTimeout,
rsListAcquireReadTimeout, rsListAcquireTryTimes, rsListAcquireRetryInterval);

Expand Down Expand Up @@ -529,6 +616,11 @@ public void syncRefreshMetadata() throws Exception {
lastRefreshMetadataTimestamp, dataSourceName, paramURL);
}

if (getUseRsList().equals(true)){
initMetadataFromRS();
return;
}

this.ocpModel = loadOcpModel(paramURL, //
dataSourceName,//
rsListAcquireConnectTimeout,//
Expand Down Expand Up @@ -1365,6 +1457,59 @@ public String getParamURL() {
return paramURL;
}

/**
* Get param url
* @return param url
*/
public String getRsList() {
return rsList;
}

/**
* Get useRsList
* @return useRsList
*/
public Boolean getUseRsList() {
return useRsList;
}

/**
* Get param appRegion
* @return param appRegion
*/
public String getAppRegion() {
return appRegion;
}

/**
* Set appRegion .
* @param appRegion app region
* @throws IllegalArgumentException if appRegion invalid
*/
public void setAppRegion(String region) throws IllegalArgumentException{
if (StringUtils.isBlank(region)) {
throw new IllegalArgumentException(String.format("region is empty, region=%s", region));
}
this.appRegion = region;
}

/**
* Set rs list.
* @param rsList rs list
* @throws IllegalArgumentException if rsList invalid
*/
public void setRsList(String rsList) throws IllegalArgumentException {
if (StringUtils.isBlank(rsList)) {
throw new IllegalArgumentException(String.format("rsList is empty, rsList=%s", rsList));
}
int paramIndex = rsList.indexOf(':');
if (-1 == paramIndex || (paramIndex + 1) == paramURL.length()) {
throw new IllegalArgumentException(String.format(
"invalid rs list, parameters are not set. rs_list=%s", rsList));
}
this.rsList = rsList;
this.useRsList = Boolean.valueOf(true);
}
/**
* Set param url.
* @param paramURL param url
Expand Down