Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge Obkv params2 to master #200

Closed
wants to merge 11 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,12 @@ void preCheck() {
ObTableOperationType lastType = operations.get(0).getOperationType();
if (returnOneResult && !ObGlobal.isReturnOneResultSupport()) {
throw new FeatureNotSupportedException(
"returnOneResult is not supported in this Observer version [" + ObGlobal.obVsnString() +"]");
"returnOneResult is not supported in this Observer version ["
+ ObGlobal.obVsnString() + "]");
} else if (returnOneResult
&& !(this.tableBatchOps.getObTableBatchOperation().isSameType() && (lastType == ObTableOperationType.INSERT
|| lastType == ObTableOperationType.PUT
|| lastType == ObTableOperationType.REPLACE || lastType == ObTableOperationType.DEL))) {
&& !(this.tableBatchOps.getObTableBatchOperation().isSameType() && (lastType == ObTableOperationType.INSERT
|| lastType == ObTableOperationType.PUT
|| lastType == ObTableOperationType.REPLACE || lastType == ObTableOperationType.DEL))) {
throw new IllegalArgumentException(
"returnOneResult only support multi-insert/put/replace/del");
}
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/com/alipay/oceanbase/rpc/ObGlobal.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,13 @@ public static String getObVsnString(long version) {
getObVsnMajorPatch(version), getObVsnMinorPatch(version));
}

// todo: use OB_VERSION_4_3_4_0 after observer upgrade version
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove todo

public static boolean isLsOpSupport() {
return OB_VERSION >= OB_VERSION_4_2_3_0 && OB_VERSION < OB_VERSION_4_3_0_0
|| OB_VERSION >= OB_VERSION_4_3_4_0;
}

// todo: use OB_VERSION_4_3_4_0 after observer upgrade version
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove todo

public static boolean isReturnOneResultSupport() {
return OB_VERSION >= OB_VERSION_4_2_3_0 && OB_VERSION < OB_VERSION_4_3_0_0
|| OB_VERSION >= OB_VERSION_4_3_4_0;
Expand All @@ -97,6 +99,7 @@ public static boolean isReturnOneResultSupport() {

public static final long OB_VERSION_4_3_0_0 = calcVersion(4, (short) 3, (byte) 0, (byte) 0);

public static final long OB_VERSION_4_3_3_0 = calcVersion(4, (short) 3, (byte) 3, (byte) 0);
public static final long OB_VERSION_4_3_4_0 = calcVersion(4, (short) 3, (byte) 4, (byte) 0);

public static long OB_VERSION = calcVersion(0, (short) 0, (byte) 0, (byte) 0);
Expand Down
13 changes: 12 additions & 1 deletion src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1063,6 +1063,17 @@ public String getIndexTableName(final String dataTableName, final String indexNa
return indexTableName;
}

@Override
public void setRpcExecuteTimeout(int rpcExecuteTimeout) {
this.properties.put(RPC_EXECUTE_TIMEOUT.getKey(), String.valueOf(rpcExecuteTimeout));
this.rpcExecuteTimeout = rpcExecuteTimeout;
for (ObTable obTable : tableRoster.values()) {
if (obTable != null) {
obTable.setObTableExecuteTimeout(rpcExecuteTimeout);
}
}
}

public String constructIndexTableName(final String dataTableName, final String indexName)
throws Exception {
// construct index table name
Expand Down Expand Up @@ -3012,7 +3023,7 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
request.getTableName(),
((ObTableBatchOperationRequest) request).getBatchOperation(), this);
batchOps.setEntityType(request.getEntityType());
return new ObClusterTableBatchOps(batchOps).executeInternal();
return new ObClusterTableBatchOps(runtimeBatchExecutor, batchOps).executeInternal();
} else if (request instanceof ObTableQueryAndMutateRequest) {
ObTableQueryAndMutate tableQueryAndMutate = ((ObTableQueryAndMutateRequest) request)
.getTableQueryAndMutate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,19 @@

public class ObTableConnection {

private static final Logger LOGGER = TableClientLoggerFactory
.getLogger(ObTableConnection.class);
private static final Logger LOGGER = TableClientLoggerFactory
.getLogger(ObTableConnection.class);
private ObBytesString credential;
private long tenantId = 1; //默认值切勿不要随意改动
private long tenantId = 1; //默认值切勿不要随意改动
private Connection connection;
private final ObTable obTable;
private long uniqueId; // as trace0 in rpc header
private AtomicLong sequence; // as trace1 in rpc header
private AtomicBoolean isReConnecting = new AtomicBoolean(false); // indicate is re-connecting or not
private AtomicBoolean isExpired = new AtomicBoolean(false);
private long uniqueId; // as trace0 in rpc header
private AtomicLong sequence; // as trace1 in rpc header
private AtomicBoolean isReConnecting = new AtomicBoolean(false); // indicate is re-connecting or not
private AtomicBoolean isExpired = new AtomicBoolean(false);
private LocalDateTime lastConnectionTime;
private boolean loginWithConfigs = false;

public static long ipToLong(String strIp) {
String[] ip = strIp.split("\\.");
return (Long.parseLong(ip[0]) << 24) + (Long.parseLong(ip[1]) << 16)
Expand All @@ -69,10 +70,10 @@ public void setExpired(boolean expired) {
isExpired.set(expired);
}


public void enableLoginWithConfigs() {
loginWithConfigs = true;
}

/*
* Ob table connection.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
// If response indicates the request is routed to wrong server, we should refresh the routing meta.
if (!conn.getObTable().getReRouting() &&response.getHeader().isRoutingWrong()) {
String errMessage = TraceUtil.formatTraceMessage(conn, request,
"routed to the wrong server: " + response.getMessage());
"routed to the wrong server: " + response.getMessage());
logger.warn(errMessage);
if (needFetchAll(resultCode.getRcode(), resultCode.getPcode())) {
throw new ObTableNeedFetchAllException(errMessage);
Expand All @@ -141,16 +141,16 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
}
if (resultCode.getRcode() != 0 && response.getHeader().getPcode() != Pcodes.OB_TABLE_API_MOVE) {
String errMessage = TraceUtil.formatTraceMessage(conn, request,
"routed to the wrong server: " + response.getMessage());
"routed to the wrong server: " + response.getMessage());
logger.warn(errMessage);
if (needFetchAll(resultCode.getRcode(), resultCode.getPcode())) {
throw new ObTableNeedFetchAllException(errMessage);
} else if (needFetchPartial(resultCode.getRcode())) {
throw new ObTableRoutingWrongException(errMessage);
} else {
ExceptionUtil.throwObTableException(conn.getObTable().getIp(), conn.getObTable()
.getPort(), response.getHeader().getTraceId1(), response.getHeader()
.getTraceId0(), resultCode.getRcode(), resultCode.getErrMsg());
ExceptionUtil.throwObTableException(conn.getObTable().getIp(), conn
.getObTable().getPort(), response.getHeader().getTraceId1(), response
.getHeader().getTraceId0(), resultCode.getRcode(), resultCode.getErrMsg());
}
}

Expand Down Expand Up @@ -190,25 +190,26 @@ protected InvokeFuture createInvokeFuture(Connection conn, RemotingCommand reque
// schema changed
private boolean needFetchAll(int errorCode, int pcode) {
return errorCode == ResultCodes.OB_SCHEMA_ERROR.errorCode
|| errorCode == ResultCodes.OB_TABLE_NOT_EXIST.errorCode
|| errorCode == ResultCodes.OB_TABLET_NOT_EXIST.errorCode
|| errorCode == ResultCodes.OB_LS_NOT_EXIST.errorCode
|| (pcode == Pcodes.OB_TABLE_API_LS_EXECUTE && errorCode == ResultCodes.OB_NOT_MASTER.errorCode);
|| errorCode == ResultCodes.OB_TABLE_NOT_EXIST.errorCode
|| errorCode == ResultCodes.OB_TABLET_NOT_EXIST.errorCode
|| errorCode == ResultCodes.OB_LS_NOT_EXIST.errorCode
|| (pcode == Pcodes.OB_TABLE_API_LS_EXECUTE && errorCode == ResultCodes.OB_NOT_MASTER.errorCode);
}

private boolean needFetchPartial(int errorCode) {
return errorCode == ResultCodes.OB_LOCATION_LEADER_NOT_EXIST.errorCode
|| errorCode == ResultCodes.OB_NOT_MASTER.errorCode
|| errorCode == ResultCodes.OB_RS_NOT_MASTER.errorCode
|| errorCode == ResultCodes.OB_RS_SHUTDOWN.errorCode
|| errorCode == ResultCodes.OB_RPC_SEND_ERROR.errorCode
|| errorCode == ResultCodes.OB_RPC_POST_ERROR.errorCode
|| errorCode == ResultCodes.OB_PARTITION_NOT_EXIST.errorCode
|| errorCode == ResultCodes.OB_LOCATION_NOT_EXIST.errorCode
|| errorCode == ResultCodes.OB_PARTITION_IS_STOPPED.errorCode
|| errorCode == ResultCodes.OB_PARTITION_IS_BLOCKED.errorCode
|| errorCode == ResultCodes.OB_SERVER_IS_INIT.errorCode
|| errorCode == ResultCodes.OB_SERVER_IS_STOPPING.errorCode
|| errorCode == ResultCodes.OB_TRANS_RPC_TIMEOUT.errorCode
|| errorCode == ResultCodes.OB_NO_READABLE_REPLICA.errorCode;
|| errorCode == ResultCodes.OB_NOT_MASTER.errorCode
|| errorCode == ResultCodes.OB_RS_NOT_MASTER.errorCode
|| errorCode == ResultCodes.OB_RS_SHUTDOWN.errorCode
|| errorCode == ResultCodes.OB_RPC_SEND_ERROR.errorCode
|| errorCode == ResultCodes.OB_RPC_POST_ERROR.errorCode
|| errorCode == ResultCodes.OB_PARTITION_NOT_EXIST.errorCode
|| errorCode == ResultCodes.OB_LOCATION_NOT_EXIST.errorCode
|| errorCode == ResultCodes.OB_PARTITION_IS_STOPPED.errorCode
|| errorCode == ResultCodes.OB_PARTITION_IS_BLOCKED.errorCode
|| errorCode == ResultCodes.OB_SERVER_IS_INIT.errorCode
|| errorCode == ResultCodes.OB_SERVER_IS_STOPPING.errorCode
|| errorCode == ResultCodes.OB_TRANS_RPC_TIMEOUT.errorCode
|| errorCode == ResultCodes.OB_NO_READABLE_REPLICA.errorCode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,12 @@ public BatchOperation setReturnOneResult(boolean returnOneResult) {
public BatchOperationResult execute() throws Exception {
if (returnOneResult && !ObGlobal.isReturnOneResultSupport()) {
throw new FeatureNotSupportedException(
"returnOneResult is not supported in this Observer version [" + ObGlobal.obVsnString() +"]");
"returnOneResult is not supported in this Observer version ["
+ ObGlobal.obVsnString() + "]");
} else if (returnOneResult
&& !(isSameType && (lastType == ObTableOperationType.INSERT
|| lastType == ObTableOperationType.PUT
|| lastType == ObTableOperationType.REPLACE || lastType == ObTableOperationType.DEL))) {
&& !(isSameType && (lastType == ObTableOperationType.INSERT
|| lastType == ObTableOperationType.PUT
|| lastType == ObTableOperationType.REPLACE || lastType == ObTableOperationType.DEL))) {
throw new IllegalArgumentException(
"returnOneResult only support multi-insert/put/replace/del");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,7 @@ public enum ResultCodes {
OB_CLUSTER_NO_MATCH(-4666), //
OB_CHECK_ZONE_MERGE_ORDER(-4667), //
OB_ERR_ZONE_NOT_EMPTY(-4668), //
OB_USE_DUP_FOLLOW_AFTER_DML(-4686),
OB_LS_NOT_EXIST(-4719), //
OB_USE_DUP_FOLLOW_AFTER_DML(-4686), OB_LS_NOT_EXIST(-4719), //
OB_TABLET_NOT_EXIST(-4725), //
OB_ERR_PARSER_INIT(-5000), //
OB_ERR_PARSE_SQL(-5001), //
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger,
}
subObTable = client
.getTableWithPartId(indexTableName, partIdWithIndex.getLeft(),
needRefreshTableEntry, client.isTableEntryRefreshIntervalWait(), false,
route).getRight().getObTable();
needRefreshTableEntry, client.isTableEntryRefreshIntervalWait(),
false, route).getRight().getObTable();
}
}
if (client.isOdpMode()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query;

import com.alipay.oceanbase.rpc.exception.FeatureNotSupportedException;
import com.alipay.oceanbase.rpc.table.ObHBaseParams;
import com.alipay.oceanbase.rpc.table.ObKVParams;
import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.aggregation.ObTableAggregationSingle;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.aggregation.ObTableAggregationType;
Expand Down Expand Up @@ -58,12 +61,14 @@ public class ObTableQuery extends AbstractPayload {
private long maxResultSize = -1;
private ObHTableFilter hTableFilter;

private static final byte[] HTABLE_FILTER_DUMMY_BYTES = new byte[] { 0x01, 0x00 };
private static final byte[] HTABLE_DUMMY_BYTES = new byte[] { 0x01, 0x00 };
private boolean isHbaseQuery = false;
private List<String> scanRangeColumns = new LinkedList<String>();

private List<ObTableAggregationSingle> aggregations = new LinkedList<>();

private ObKVParams obKVParams;

/*
* Check filter.
*/
Expand Down Expand Up @@ -149,8 +154,8 @@ public byte[] encode() {
len = (int) hTableFilter.getPayloadSize();
System.arraycopy(hTableFilter.encode(), 0, bytes, idx, len);
} else {
len = HTABLE_FILTER_DUMMY_BYTES.length;
System.arraycopy(HTABLE_FILTER_DUMMY_BYTES, 0, bytes, idx, len);
len = HTABLE_DUMMY_BYTES.length;
System.arraycopy(HTABLE_DUMMY_BYTES, 0, bytes, idx, len);
}
idx += len;

Expand All @@ -173,6 +178,16 @@ public byte[] encode() {
idx += len;
}

if (isHbaseQuery && obKVParams != null) {
len = (int) obKVParams.getPayloadSize();
System.arraycopy(obKVParams.encode(), 0, bytes, idx, len);
idx += len;
} else {
len = HTABLE_DUMMY_BYTES.length;
System.arraycopy(HTABLE_DUMMY_BYTES, 0, bytes, idx, len);
idx += len;
}

return bytes;
}

Expand Down Expand Up @@ -230,6 +245,10 @@ public Object decode(ByteBuf buf) {
String agg_column = Serialization.decodeVString(buf);
this.aggregations.add(new ObTableAggregationSingle(ObTableAggregationType.fromByte(agg_type), agg_column));
}
if (isHbaseQuery) {
obKVParams = new ObKVParams();
this.obKVParams.decode(buf);
}
return this;
}

Expand Down Expand Up @@ -259,7 +278,12 @@ public long getPayloadContentSize() {
if (isHbaseQuery) {
contentSize += hTableFilter.getPayloadSize();
} else {
contentSize += HTABLE_FILTER_DUMMY_BYTES.length;
contentSize += HTABLE_DUMMY_BYTES.length;
}
if (isHbaseQuery && obKVParams != null) {
contentSize += obKVParams.getPayloadSize();
} else {
contentSize += HTABLE_DUMMY_BYTES.length;
}
contentSize += Serialization.getNeedBytes(scanRangeColumns.size());
for (String scanRangeColumn : scanRangeColumns) {
Expand Down Expand Up @@ -466,4 +490,17 @@ public void setScanRangeColumns(String... scanRangeColumns) {
public void setScanRangeColumns(List<String> scanRangeColumns) {
this.scanRangeColumns = scanRangeColumns;
}

// This interface is just for OBKV-Hbase
public void setObKVParams(ObKVParams obKVParams) {
if (!(obKVParams.getObParamsBase() instanceof ObHBaseParams)) {
throw new FeatureNotSupportedException("only ObHBaseParams support currently");
}
this.isHbaseQuery = true;
this.obKVParams = obKVParams;
}

public ObKVParams getObKVParams() {
return obKVParams;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ public long getTtlUs() {
public void setTtlUs(long ttlUs) {
this.ttlUs = ttlUs;
}

public String getConfigsStr() {
return configsStr;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public abstract class AbstractObTable extends AbstractTable {

protected int nettyBlockingWaitInterval = NETTY_BLOCKING_WAIT_INTERVAL.getDefaultInt();

protected long maxConnExpiredTime = MAX_CONN_EXPIRED_TIME.getDefaultLong();
protected long maxConnExpiredTime = MAX_CONN_EXPIRED_TIME.getDefaultLong();

/*
* Get ob table connect try times.
Expand Down Expand Up @@ -165,5 +165,7 @@ public int getNettyBlockingWaitInterval() {
/*
* Get connection max expired time
*/
public long getConnMaxExpiredTime() { return maxConnExpiredTime; }
public long getConnMaxExpiredTime() {
return maxConnExpiredTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ public abstract class AbstractObTableClient extends AbstractTable {
.getDefaultInt();
protected long slowQueryMonitorThreshold = SLOW_QUERY_MONITOR_THRESHOLD
.getDefaultLong();
protected Long maxConnExpiredTime = MAX_CONN_EXPIRED_TIME.getDefaultLong();
protected Long maxConnExpiredTime = MAX_CONN_EXPIRED_TIME
.getDefaultLong();

@Deprecated
/*
Expand Down
Loading
Loading