Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bugfix] deprecate batch interface setIsAtomic and modify releated test cases #60

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions example/simple-table-demo/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Notice
- We do not support across partition in batch operation.
- Batch operations is atomic. When one of the operation in a batch execute failed, the batch operation will be rollbacked.
- More details are shown in [Demo](https://github.com/oceanbase/obkv-table-client-java/blob/master/example/simple-table-demo/src/main/java/com/oceanbase/example/TableClient.java) `batch2` and `batch3`
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ public static void main(String[] args) {
System.out.println("key_6 -> " + "c2:" + tableClient.get((long)6)[0] + ", c3:" + tableClient.get((long)6)[1]);
}

// batch:
batchOps = tableClient.batch2();
if (!batchOps) {
System.out.println("batch ops error");
}

// scan
boolean queryRet = tableClient.query();
if (!queryRet) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,5 +292,58 @@ public boolean batch() {
}
}

// batch for single tablet: one of the the operation execute failed,
// batch will rollback in observer and throw related Exception
public boolean batch2() {
if (obTableClient == null) {
System.out.println("table client is null");
return false;
}
try {
TableBatchOps batchOps = obTableClient.batch(tableName);
batchOps.get((long)1, new String[]{"c2", "c3"});
batchOps.insert((long)5, new String[]{"c2", "c3"}, new Object[]{(int)5, "batch new c3_5"});
// insert a row with duplicated primary key, return error
batchOps.insert((long)5, new String[]{"c2", "c3"}, new Object[]{(int)5, "batch new c3_5"});
batchOps.update((long)5, new String[]{"c2"}, new Object[]{(int)55});
List<Object> retObj = batchOps.execute();
if (retObj.size() != 4) {
System.out.println("batch Ops error");
return false;
}
System.out.println("batch Ops success.");
return true;
} catch (Exception e) {
System.out.println("fail to execute batch ops: " + e);
return false;
}
}

/*
batch opertion do not support cross partition, it will throw ObTablePartitionConsistentException
*/
public boolean batch3() {
if (obTableClient == null) {
System.out.println("table client is null");
return false;
}
try {
tableName = "test_partition_table";
TableBatchOps batchOps = obTableClient.batch(tableName);
batchOps.insert((long)1, new String[]{"c2", "c3"}, new Object[]{(int)1, "batch new c3_5"});
batchOps.insert((long)2, new String[]{"c2", "c3"}, new Object[]{(int)2, "batch new c3_5"});
List<Object> retObj = batchOps.execute();
if (retObj.size() != 2) {
System.out.println("batch Ops error");
return false;
}
System.out.println("batch Ops success.");
return true;
} catch (ObTablePartitionConsistentException e) {
System.out.println("do not support across partition in batch operation" + e);
return false;
}
}

// todo: others op.
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class BatchOperation {
private Table client;
boolean withResult;
private List<Object> operations;
boolean isAtomic = false;
boolean isAtomic = true;

/*
* default constructor
Expand Down Expand Up @@ -96,6 +96,7 @@ public BatchOperation addOperation(List<Mutation> mutations) {
return this;
}

@Deprecated
public BatchOperation setIsAtomic(boolean isAtomic) {
this.isAtomic = isAtomic;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public abstract class AbstractTableBatchOps implements TableBatchOps {

protected String tableName;

protected boolean atomicOperation;
protected boolean atomicOperation = true;

protected ObTableEntityType entityType = ObTableEntityType.DYNAMIC;

Expand Down Expand Up @@ -111,6 +111,7 @@ public void setTableName(String tableName) {
* Set atomic operation.
*/
@Override
@Deprecated
public void setAtomicOperation(boolean atomicOperation) {
this.atomicOperation = atomicOperation;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,11 +244,10 @@ public Map<Long, ObPair<ObTableParam, List<ObPair<Integer, ObTableOperation>>>>
}
obTableOperations.getRight().add(new ObPair<Integer, ObTableOperation>(i, operation));
}

if (atomicOperation) {
if (partitionOperationsMap.size() > 1) {
throw new ObTablePartitionConsistentException(
"require atomic operation but found across partition may cause consistent problem ");
"require atomic operation but found across partition may cause consistent problem ");
}
}
return partitionOperationsMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public interface TableBatchOps {

String getTableName();

@Deprecated
void setAtomicOperation(boolean atomicOperation);

boolean isAtomicOperation();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,22 +68,6 @@ public void teardown() throws Exception {
@Test
public void testAtomic() {
TableBatchOps batchOps = obTableClient.batch("test_varchar_table");
// default: no atomic batch operation
try {
batchOps.clear();
batchOps.insert("abc-1", new String[] { "c2" }, new String[] { "bar-1" });
batchOps.get("abc-2", new String[] { "c2" });
batchOps.insert("abc-3", new String[] { "c2" }, new String[] { "bar-3" });
batchOps.insert(successKey, new String[] { "c2" }, new String[] { "bar-5" });
List<Object> results = batchOps.execute();
Assert.assertTrue(results.get(0) instanceof ObTableDuplicateKeyException);
Assert.assertEquals(((Map) results.get(1)).get("c2"), "xyz-2");
Assert.assertTrue(results.get(2) instanceof ObTableDuplicateKeyException);
Assert.assertEquals(results.get(3), 1L);
} catch (Exception ex) {
Assert.fail("hit exception:" + ex);
}

// atomic batch operation
try {
batchOps.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,15 +359,15 @@ public int compare(Map<String, Object> o1, Map<String, Object> o2) {
@Test
public void testBatch() throws Exception {
long timeStamp = System.currentTimeMillis();
obTableClient.insert("testHash", new Object[] { timeStamp + 1L, "partition".getBytes(),
obTableClient.insert("testHash", new Object[] { timeStamp, "Q1".getBytes(),
timeStamp }, new String[] { "V" }, new Object[] { "value1L".getBytes() });
obTableClient.insert("testHash", new Object[] { timeStamp + 5L, "partition".getBytes(),
obTableClient.insert("testHash", new Object[] { timeStamp, "Q2".getBytes(),
timeStamp }, new String[] { "V" }, new Object[] { "value1L".getBytes() });
TableBatchOps tableBatchOps = obTableClient.batch("testHash");
tableBatchOps.delete(new Object[] { timeStamp + 1L, "partition".getBytes(), timeStamp });
tableBatchOps.insert(new Object[] { timeStamp + 3L, "partition".getBytes(), timeStamp },
tableBatchOps.delete(new Object[] { timeStamp, "Q1".getBytes(), timeStamp });
tableBatchOps.insert(new Object[] { timeStamp, "Q3".getBytes(), timeStamp },
new String[] { "V" }, new Object[] { "value2".getBytes() });
tableBatchOps.replace(new Object[] { timeStamp + 5L, "partition".getBytes(), timeStamp },
tableBatchOps.replace(new Object[] { timeStamp, "Q2".getBytes(), timeStamp },
new String[] { "V" }, new Object[] { "value2".getBytes() });
List<Object> batchResult = tableBatchOps.execute();
Assert.assertEquals(3, batchResult.size());
Expand All @@ -376,30 +376,30 @@ public void testBatch() throws Exception {
Assert.assertEquals(2L, batchResult.get(2));

Map<String, Object> getResult = obTableClient.get("testHash", new Object[] {
timeStamp + 1L, "partition".getBytes(), timeStamp }, new String[] { "K", "Q", "T",
timeStamp, "Q1".getBytes(), timeStamp }, new String[] { "K", "Q", "T",
"V" });

Assert.assertEquals(0, getResult.size());

getResult = obTableClient.get("testHash",
new Object[] { timeStamp + 3L, "partition".getBytes(), timeStamp }, new String[] { "K",
new Object[] { timeStamp, "Q2".getBytes(), timeStamp }, new String[] { "K",
"Q", "T", "V" });

Assert.assertEquals(4, getResult.size());

Assert.assertEquals(timeStamp + 3L, getResult.get("K"));
Assert.assertEquals("partition", new String((byte[]) getResult.get("Q")));
Assert.assertEquals(timeStamp, getResult.get("K"));
Assert.assertEquals("Q2", new String((byte[]) getResult.get("Q")));
Assert.assertEquals(timeStamp, getResult.get("T"));
Assert.assertEquals("value2", new String((byte[]) getResult.get("V")));

getResult = obTableClient.get("testHash",
new Object[] { timeStamp + 5L, "partition".getBytes(), timeStamp }, new String[] { "K",
new Object[] { timeStamp, "Q3".getBytes(), timeStamp }, new String[] { "K",
"Q", "T", "V" });

Assert.assertEquals(4, getResult.size());

Assert.assertEquals(timeStamp + 5L, getResult.get("K"));
Assert.assertEquals("partition", new String((byte[]) getResult.get("Q")));
Assert.assertEquals(timeStamp, getResult.get("K"));
Assert.assertEquals("Q3", new String((byte[]) getResult.get("Q")));
Assert.assertEquals(timeStamp, getResult.get("T"));
Assert.assertEquals("value2", new String((byte[]) getResult.get("V")));
}
Expand All @@ -408,15 +408,15 @@ public void testBatch() throws Exception {
public void testBatchConcurrent() throws Exception {
long timeStamp = System.currentTimeMillis();
obTableClient.setRuntimeBatchExecutor(Executors.newFixedThreadPool(3));
obTableClient.insert("testHash", new Object[] { timeStamp + 1L, "partition".getBytes(),
obTableClient.insert("testHash", new Object[] { timeStamp, "Q1".getBytes(),
timeStamp }, new String[] { "V" }, new Object[] { "value1L".getBytes() });
obTableClient.insert("testHash", new Object[] { timeStamp + 5L, "partition".getBytes(),
obTableClient.insert("testHash", new Object[] { timeStamp, "Q2".getBytes(),
timeStamp }, new String[] { "V" }, new Object[] { "value1L".getBytes() });
TableBatchOps tableBatchOps = obTableClient.batch("testHash");
tableBatchOps.delete(new Object[] { timeStamp + 1L, "partition".getBytes(), timeStamp });
tableBatchOps.insert(new Object[] { timeStamp + 3L, "partition".getBytes(), timeStamp },
tableBatchOps.delete(new Object[] { timeStamp, "Q1".getBytes(), timeStamp });
tableBatchOps.insert(new Object[] { timeStamp, "Q3".getBytes(), timeStamp },
new String[] { "V" }, new Object[] { "value2".getBytes() });
tableBatchOps.replace(new Object[] { timeStamp + 5L, "partition".getBytes(), timeStamp },
tableBatchOps.replace(new Object[] { timeStamp, "Q2".getBytes(), timeStamp },
new String[] { "V" }, new Object[] { "value2".getBytes() });
List<Object> batchResult = tableBatchOps.execute();
Assert.assertEquals(3, batchResult.size());
Expand All @@ -425,30 +425,30 @@ public void testBatchConcurrent() throws Exception {
Assert.assertEquals(2L, batchResult.get(2));

Map<String, Object> getResult = obTableClient.get("testHash", new Object[] {
timeStamp + 1L, "partition".getBytes(), timeStamp }, new String[] { "K", "Q", "T",
timeStamp, "Q1".getBytes(), timeStamp }, new String[] { "K", "Q", "T",
"V" });

Assert.assertEquals(0, getResult.size());

getResult = obTableClient.get("testHash",
new Object[] { timeStamp + 3L, "partition".getBytes(), timeStamp }, new String[] { "K",
new Object[] { timeStamp, "Q2".getBytes(), timeStamp }, new String[] { "K",
"Q", "T", "V" });

Assert.assertEquals(4, getResult.size());

Assert.assertEquals(timeStamp + 3L, getResult.get("K"));
Assert.assertEquals("partition", new String((byte[]) getResult.get("Q")));
Assert.assertEquals(timeStamp, getResult.get("K"));
Assert.assertEquals("Q2", new String((byte[]) getResult.get("Q")));
Assert.assertEquals(timeStamp, getResult.get("T"));
Assert.assertEquals("value2", new String((byte[]) getResult.get("V")));

getResult = obTableClient.get("testHash",
new Object[] { timeStamp + 5L, "partition".getBytes(), timeStamp }, new String[] { "K",
new Object[] { timeStamp, "Q3".getBytes(), timeStamp }, new String[] { "K",
"Q", "T", "V" });

Assert.assertEquals(4, getResult.size());

Assert.assertEquals(timeStamp + 5L, getResult.get("K"));
Assert.assertEquals("partition", new String((byte[]) getResult.get("Q")));
Assert.assertEquals(timeStamp, getResult.get("K"));
Assert.assertEquals("Q3", new String((byte[]) getResult.get("Q")));
Assert.assertEquals(timeStamp, getResult.get("T"));
Assert.assertEquals("value2", new String((byte[]) getResult.get("V")));
}
Expand All @@ -458,15 +458,15 @@ public void testBatchConcurrentWithPriority() throws Exception {
long timeStamp = System.currentTimeMillis();
ThreadLocalMap.setProcessHighPriority();
obTableClient.setRuntimeBatchExecutor(Executors.newFixedThreadPool(3));
obTableClient.insert("testHash", new Object[] { timeStamp + 1L, "partition".getBytes(),
obTableClient.insert("testHash", new Object[] { timeStamp, "Q1".getBytes(),
timeStamp }, new String[] { "V" }, new Object[] { "value1L".getBytes() });
obTableClient.insert("testHash", new Object[] { timeStamp + 5L, "partition".getBytes(),
obTableClient.insert("testHash", new Object[] { timeStamp, "Q2".getBytes(),
timeStamp }, new String[] { "V" }, new Object[] { "value1L".getBytes() });
TableBatchOps tableBatchOps = obTableClient.batch("testHash");
tableBatchOps.delete(new Object[] { timeStamp + 1L, "partition".getBytes(), timeStamp });
tableBatchOps.insert(new Object[] { timeStamp + 3L, "partition".getBytes(), timeStamp },
tableBatchOps.delete(new Object[] { timeStamp, "Q1".getBytes(), timeStamp });
tableBatchOps.insert(new Object[] { timeStamp, "Q3".getBytes(), timeStamp },
new String[] { "V" }, new Object[] { "value2".getBytes() });
tableBatchOps.replace(new Object[] { timeStamp + 5L, "partition".getBytes(), timeStamp },
tableBatchOps.replace(new Object[] { timeStamp, "Q2".getBytes(), timeStamp },
new String[] { "V" }, new Object[] { "value2".getBytes() });
List<Object> batchResult = tableBatchOps.execute();
Assert.assertEquals(3, batchResult.size());
Expand All @@ -475,30 +475,30 @@ public void testBatchConcurrentWithPriority() throws Exception {
Assert.assertEquals(2L, batchResult.get(2));

Map<String, Object> getResult = obTableClient.get("testHash", new Object[] {
timeStamp + 1L, "partition".getBytes(), timeStamp }, new String[] { "K", "Q", "T",
timeStamp, "Q1".getBytes(), timeStamp }, new String[] { "K", "Q", "T",
"V" });

Assert.assertEquals(0, getResult.size());

getResult = obTableClient.get("testHash",
new Object[] { timeStamp + 3L, "partition".getBytes(), timeStamp }, new String[] { "K",
new Object[] { timeStamp, "Q2".getBytes(), timeStamp }, new String[] { "K",
"Q", "T", "V" });

Assert.assertEquals(4, getResult.size());

Assert.assertEquals(timeStamp + 3L, getResult.get("K"));
Assert.assertEquals("partition", new String((byte[]) getResult.get("Q")));
Assert.assertEquals(timeStamp, getResult.get("K"));
Assert.assertEquals("Q2", new String((byte[]) getResult.get("Q")));
Assert.assertEquals(timeStamp, getResult.get("T"));
Assert.assertEquals("value2", new String((byte[]) getResult.get("V")));

getResult = obTableClient.get("testHash",
new Object[] { timeStamp + 5L, "partition".getBytes(), timeStamp }, new String[] { "K",
new Object[] { timeStamp , "Q3".getBytes(), timeStamp }, new String[] { "K",
"Q", "T", "V" });

Assert.assertEquals(4, getResult.size());

Assert.assertEquals(timeStamp + 5L, getResult.get("K"));
Assert.assertEquals("partition", new String((byte[]) getResult.get("Q")));
Assert.assertEquals(timeStamp, getResult.get("K"));
Assert.assertEquals("Q3", new String((byte[]) getResult.get("Q")));
Assert.assertEquals(timeStamp, getResult.get("T"));
Assert.assertEquals("value2", new String((byte[]) getResult.get("V")));
}
Expand Down
Loading