Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dazhi obkv params #189

Open
wants to merge 16 commits into
base: obkv_params
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Import the dependency for your maven project:
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>obkv-table-client</artifactId>
<version>1.2.12</version>
<version>1.2.13</version>
</dependency>
```

Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>com.oceanbase</groupId>
<artifactId>obkv-table-client</artifactId>
<version>1.2.13-SNAPSHOT</version>
<version>1.2.14-SNAPSHOT</version>

<name>${project.groupId}:${project.artifactId}</name>
<description>OceanBase JavaClient for TableApi</description>
Expand Down Expand Up @@ -365,4 +365,4 @@
</build>
</profile>
</profiles>
</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,12 @@ public TableQuery setMaxResultSize(long maxResultSize) {
return tableClientQuery.setMaxResultSize(maxResultSize);
}

@Override
public TableQuery setOperationTimeout(long operationTimeout) {
tableClientQuery.setOperationTimeout(operationTimeout);
return this;
}

/**
* Clear.
*/
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/com/alipay/oceanbase/rpc/ObGlobal.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ public static String getObVsnString(long version) {
}

public static boolean isLsOpSupport() {
return OB_VERSION >= OB_VERSION_4_2_3_0 && OB_VERSION < OB_VERSION_4_3_0_0;
return (OB_VERSION >= OB_VERSION_4_2_3_0 && OB_VERSION < OB_VERSION_4_3_0_0) ||
(OB_VERSION >= OB_VERSION_4_3_4_0);
}

public static boolean isReturnOneResultSupport() {
Expand Down
409 changes: 283 additions & 126 deletions src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.alipay.oceanbase.rpc.protocol.payload.ObPayload;
import com.alipay.oceanbase.rpc.protocol.payload.Pcodes;
import com.alipay.oceanbase.rpc.protocol.payload.impl.direct_load.ObTableDirectLoadResult;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableApiMove;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableBatchOperationResult;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableLSOpResult;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationResult;
Expand Down Expand Up @@ -111,9 +112,7 @@ public ObPayload newPayload(ObRpcPacketHeader header) {
*/
@Override
public ObPayload newPayload(ObRpcPacketHeader header) {
throw new ObTableRoutingWrongException(
"Receive rerouting response packet. "
+ "Java client is not supported and need to Refresh table router entry");
return new ObTableApiMove();
}
}, //
OB_ERROR_PACKET(Pcodes.OB_ERROR_PACKET) {
Expand Down Expand Up @@ -161,9 +160,7 @@ public static ObTablePacketCode valueOf(short value) {
case Pcodes.OB_TABLE_API_LS_EXECUTE:
return OB_TABLE_API_LS_EXECUTE;
case Pcodes.OB_TABLE_API_MOVE:
throw new ObTableRoutingWrongException(
"Receive rerouting response packet. "
+ "Java client is not supported and need to Refresh table router entry");
return OB_TABLE_API_MOVE;
case Pcodes.OB_ERROR_PACKET:
return OB_ERROR_PACKET;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ private byte[] encodePayload(ObPayload payload) {
// compute checksum
rpcHeaderPacket.setChecksum(ObPureCrc32C.calculate(payloadContent));

// group id
rpcHeaderPacket.setGroupId(payload.getGroupId());

// 3. assemble and encode rpc packet
ObRpcPacket rpcPacket = new ObRpcPacket();
rpcPacket.setRpcPacketHeader(rpcHeaderPacket);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ private void login() throws Exception {
request.setTenantName(obTable.getTenantName());
request.setUserName(obTable.getUserName());
request.setDatabaseName(obTable.getDatabase());
if (loginWithConfigs) {
// When the caller doesn't provide any parameters, configsMap is empty.
// In this case, we don't generate any JSON to avoid creating an empty object.
if (loginWithConfigs && !obTable.getConfigs().isEmpty()) {
JSONObject json = new JSONObject(obTable.getConfigs());
request.setConfigsStr(json.toJSONString());
loginWithConfigs = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
ObRpcResultCode resultCode = new ObRpcResultCode();
resultCode.decode(buf);
// If response indicates the request is routed to wrong server, we should refresh the routing meta.
if (response.getHeader().isRoutingWrong()) {
if (!conn.getObTable().getReRouting() &&response.getHeader().isRoutingWrong()) {
String errMessage = TraceUtil.formatTraceMessage(conn, request,
"routed to the wrong server: " + response.getMessage());
logger.warn(errMessage);
Expand All @@ -139,7 +139,7 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
throw new ObTableNeedFetchAllException(errMessage);
}
}
if (resultCode.getRcode() != 0) {
if (resultCode.getRcode() != 0 && response.getHeader().getPcode() != Pcodes.OB_TABLE_API_MOVE) {
String errMessage = TraceUtil.formatTraceMessage(conn, request,
"routed to the wrong server: " + response.getMessage());
logger.warn(errMessage);
Expand Down Expand Up @@ -186,7 +186,6 @@ protected InvokeFuture createInvokeFuture(Connection conn, RemotingCommand reque
InvokeCallback invokeCallback) {
return new ObClientFuture(request.getId());
}


// schema changed
private boolean needFetchAll(int errorCode, int pcode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.alipay.oceanbase.rpc.exception.ObTableException;
import com.alipay.oceanbase.rpc.filter.ObTableFilter;
import com.alipay.oceanbase.rpc.mutation.InsertOrUpdate;
import com.alipay.oceanbase.rpc.mutation.Row;
import com.alipay.oceanbase.rpc.mutation.result.MutationResult;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObRowKey;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperation;
Expand Down Expand Up @@ -58,7 +59,7 @@ public CheckAndInsUp(Table client, String tableName, ObTableFilter filter,
this.checkExists = check_exists;
}

public Object[] getRowKey() {
public Row getRowKey() {
return insUp.getRowKey();
}

Expand All @@ -85,15 +86,15 @@ public MutationResult execute() throws Exception {

TableQuery query = client.query(tableName);
query.setFilter(filter);
Object[] rowKey = getRowKey();
Row rowKey = getRowKey();
List<ObNewRange> ranges = new ArrayList<>();
ObNewRange range = new ObNewRange();
range.setStartKey(ObRowKey.getInstance(insUp.getRowKey()));
range.setEndKey(ObRowKey.getInstance(insUp.getRowKey()));
range.setStartKey(ObRowKey.getInstance(insUp.getRowKey().getValues()));
range.setEndKey(ObRowKey.getInstance(insUp.getRowKey().getValues()));
ranges.add(range);
query.getObTableQuery().setKeyRanges(ranges);
ObTableOperation operation = ObTableOperation.getInstance(ObTableOperationType.INSERT_OR_UPDATE,
insUp.getRowKey(), insUp.getColumns(), insUp.getValues());
insUp.getRowKey().getValues(), insUp.getColumns(), insUp.getValues());

return new MutationResult(((ObTableClient)client).mutationWithFilter(query, rowKey, ranges, operation, false, true, checkExists));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*-
* #%L
* com.oceanbase:obkv-table-client
* %%
* Copyright (C) 2021 - 2023 OceanBase
* %%
* OBKV Table Client Framework is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* #L%
*/

package com.alipay.oceanbase.rpc.direct_load;

import java.util.ArrayList;
import java.util.List;

import com.alipay.oceanbase.rpc.direct_load.exception.*;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
import com.alipay.oceanbase.rpc.util.ObByteBuf;
import com.alipay.oceanbase.rpc.util.Serialization;

public class ObDirectLoadBucket {

/**
* buffer的格式如下
* +----------------+-----------+------+------+-----+
* | payload length | row count | row1 | row2 | ... |
* +----------------+-----------+------+------+-----+
*/

private static final ObDirectLoadLogger logger = ObDirectLoadLogger.getLogger();
private final static int integerReservedSize = 5; // 预留5个字节用来编码Integer
private final static int reservedSize = integerReservedSize * 2; // 预留2个Integer
private final static int defaultBufferSize = 1 * 1024 * 1024; // 1M

private final int bufferSize;
private ArrayList<ObByteBuf> payloadBufferList = new ArrayList<ObByteBuf>(64);
private int totalRowCount = 0;

private ObByteBuf buffer = null;
private int currentRowCount = 0;
private Row row = new Row();

public ObDirectLoadBucket() {
bufferSize = defaultBufferSize;
}

public ObDirectLoadBucket(int bufferSize) {
this.bufferSize = bufferSize;
}

public boolean isEmpty() {
return (getRowNum() == 0);
}

public int getRowNum() {
return totalRowCount + currentRowCount;
}

@Override
public String toString() {
return String.format("{rowNum:%d}", getRowNum());
}

public void addRow(ObObj[] cells) throws ObDirectLoadException {
if (cells == null || cells.length == 0) {
logger.warn("cells cannot be null or empty, cells:" + cells);
throw new ObDirectLoadIllegalArgumentException("cells cannot be null or empty, cells:"
+ cells);
}
row.setCells(cells);
appendRow(row);
}

public void addRow(List<ObObj> cells) throws ObDirectLoadException {
if (cells == null || cells.isEmpty()) {
logger.warn("cells cannot be null or empty, cells:" + cells);
throw new ObDirectLoadIllegalArgumentException("cells cannot be null or empty, cells:"
+ cells);
}
row.setCells(cells);
appendRow(row);
}

private void appendRow(Row row) {
final int rowEncodedSize = row.getEncodedSize();
while (true) {
if (buffer == null) {
allocBuffer(rowEncodedSize);
} else if (buffer.writableBytes() < rowEncodedSize) {
sealBuffer();
} else {
row.encode(buffer);
++currentRowCount;
break;
}
}
}

private void allocBuffer(int encodedSize) {
final int needSize = encodedSize + reservedSize;
final int allocBufferSize = (needSize + bufferSize - 1) / bufferSize * bufferSize;
buffer = new ObByteBuf(allocBufferSize);
buffer.reserve(reservedSize);
}

private void sealBuffer() {
// 编码row count
encodeVi32(buffer.bytes, integerReservedSize, currentRowCount);
// 编码payload length
encodeVi32(buffer.bytes, 0, buffer.readableBytes() - integerReservedSize);
payloadBufferList.add(buffer);
totalRowCount += currentRowCount;
currentRowCount = 0;
buffer = null;
}

private void encodeVi32(byte[] buf, int pos, int value) {
// 前面的byte的高位都设置为1
for (int i = 0; i < integerReservedSize - 1; ++i, ++pos) {
buf[pos] = (byte) (value | 0x80);
value >>>= 7;
}
// 最后一个byte的高位设置为0
buf[pos] = (byte) (value & 0x7f);
}

public List<ObByteBuf> getPayloadBufferList() {
if (buffer != null) {
sealBuffer();
}
return payloadBufferList;
}

private static class Row {

private final long SeqNo = 0;
private ObObj[] cells = null;

public Row() {
}

public void setCells(ObObj[] cells) {
this.cells = cells;
}

public void setCells(List<ObObj> cells) {
this.cells = cells.toArray(new ObObj[0]);
}

/**
* Encode.
*/
public void encode(ObByteBuf buf) {
Serialization.encodeVi64(buf, SeqNo);
Serialization.encodeVi32(buf, cells.length);
for (int i = 0; i < cells.length; ++i) {
cells[i].encode(buf);
}
}

/**
* Get encoded size.
*/
public int getEncodedSize() {
int size = 0;
size += Serialization.getNeedBytes(SeqNo);
size += Serialization.getNeedBytes(cells.length);
for (int i = 0; i < cells.length; ++i) {
size += cells[i].getEncodedSize();
}
return size;
}

}

}
Loading