Skip to content

Commit

Permalink
优化sql执行逻辑
Browse files Browse the repository at this point in the history
  • Loading branch information
zhou-hao committed Feb 28, 2019
1 parent f899f1b commit db5efc3
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 59 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.hswebframework.web.datasource.manager.simple;

import lombok.SneakyThrows;
import org.hswebframework.ezorm.rdb.executor.SqlExecutor;
import org.hswebframework.web.database.manager.SqlExecuteRequest;
import org.hswebframework.web.database.manager.SqlExecuteResult;
Expand All @@ -16,9 +17,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.stream.Collectors;

Expand All @@ -31,7 +30,7 @@ public class DefaultLocalTransactionExecutor implements TransactionExecutor {

private final Logger logger = LoggerFactory.getLogger(this.getClass());

private Queue<Execution> executionQueue = new LinkedList<>();
private BlockingQueue<Execution> executionQueue = new LinkedBlockingQueue<>();

private SqlExecutor sqlExecutor;

Expand All @@ -51,9 +50,6 @@ public class DefaultLocalTransactionExecutor implements TransactionExecutor {

private volatile boolean running = false;

/* 线程循环开始等待sql进入的时候执行一次,sql进入的时候执行一次,然后唤醒线程开始执行sql */
private CyclicBarrier waitToReady = new CyclicBarrier(2);

private CountDownLatch waitClose = new CountDownLatch(1);

public DefaultLocalTransactionExecutor(SqlExecutor sqlExecutor, String transactionId, String datasourceId, TransactionTemplate transactionTemplate) {
Expand Down Expand Up @@ -82,11 +78,8 @@ public void commit() {

protected void waitToClose() {
try {
executionQueue.add(new NullExecution());
logger.debug("wait transaction {} close", transactionId);
if (!running) {
//先唤醒执行,继续执行任务
waitToReady.await();
}
//等待执行结束
waitClose.await();
} catch (Exception e) {
Expand Down Expand Up @@ -135,14 +128,20 @@ public void run() {
}
while (!shutdown) {
logger.debug("wait sql execute request {}", transactionId);
Execution execution;
if (transactionTemplate.getTimeout() > 0) {
waitToReady.await(transactionTemplate.getTimeout(), TimeUnit.MILLISECONDS);//等待有新的sql进来
execution = executionQueue.poll(transactionTemplate.getTimeout(), TimeUnit.MILLISECONDS);//等待有新的sql进来
if (execution == null) {
throw new TimeoutException("事务[" + transactionId + "]超时");
}
} else {
waitToReady.await();
execution = executionQueue.take();
}
if (execution instanceof NullExecution) {
continue;
}
waitToReady.reset();//重置,下一次循环继续等待
//执行sql
doExecute();
doExecute(execution);
}
} catch (Exception e) {
tryRollback();//回滚
Expand All @@ -164,41 +163,38 @@ public void run() {
}
}

protected void doExecute() {
Execution execution;
while ((execution = executionQueue.poll()) != null) {
Execution finalE = execution;
running = true;
logger.debug("start execute sql {}", transactionId);
try {
List<SqlExecuteResult> requests = execution.request.getSql()
.stream()
.map(sqlInfo -> {
try {
if (finalE.datasourceId != null) {
DataSourceHolder.switcher().use(finalE.datasourceId);
} else {
DataSourceHolder.switcher().useDefault();
}
//执行sql
return sqlRequestExecutor.apply(sqlExecutor, sqlInfo);
} catch (Exception e) {
return SqlExecuteResult.builder()
.result(e.getMessage())
.sqlInfo(sqlInfo)
.success(false)
.build();
@SneakyThrows
protected void doExecute(Execution execution) {
running = true;
logger.debug("start execute sql {}", transactionId);
try {
List<SqlExecuteResult> requests = execution.request.getSql()
.stream()
.map(sqlInfo -> {
try {
if (execution.datasourceId != null) {
DataSourceHolder.switcher().use(execution.datasourceId);
} else {
DataSourceHolder.switcher().useDefault();
}
})
.collect(Collectors.toList());
//通过回调返回执行结果
execution.callback.accept(requests);
} catch (Exception e) {
execution.onError.accept(e);
return;
}
//执行sql
return sqlRequestExecutor.apply(sqlExecutor, sqlInfo);
} catch (Exception e) {
return SqlExecuteResult.builder()
.result(e.getMessage())
.sqlInfo(sqlInfo)
.success(false)
.build();
}
})
.collect(Collectors.toList());
//通过回调返回执行结果
execution.callback.accept(requests);
} catch (Exception e) {
execution.onError.accept(e);
} finally {
running = false;
}
running = false;
}

@Override
Expand All @@ -220,20 +216,18 @@ public List<SqlExecuteResult> execute(SqlExecuteRequest request) throws Exceptio
sqlExecuteResults.clear();
countDownLatch.countDown();
};
execution.onError = (e) -> {
countDownLatch.countDown();
};
execution.onError = (e) -> countDownLatch.countDown();
logger.debug("submit sql execute job {}", transactionId);
executionQueue.add(execution);
//当前没有在执行sql,说明现在正在等待新的sql进入,唤醒之
if (!running) {
waitToReady.await();
}
//等待sql执行完毕
countDownLatch.await();
return results;
}

private class NullExecution extends Execution {

}

protected class Execution {

protected String datasourceId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.*;

Expand Down Expand Up @@ -53,13 +54,14 @@ public void testExecuteSql() throws Exception {
new Thread(() -> {
try {
databaseManagerService.execute(id, request);
Thread.sleep(100);
} catch (Exception e) {
throw new RuntimeException();
throw new RuntimeException(e);
}
countDownLatch.countDown();
}).start();
}
countDownLatch.await();
countDownLatch.await(30, TimeUnit.SECONDS);

sqlInfo = new SqlInfo();
sqlInfo.setSql("select *,name as \"NAME\",1 as \"\" from t_test ");
Expand All @@ -71,10 +73,9 @@ public void testExecuteSql() throws Exception {

// System.out.println(JSON.toJSONString(results));

Assert.assertTrue(sqlExecutor.list("select * from t_test").isEmpty());
Assert.assertTrue(sqlExecutor.list("select * from t_test").isEmpty());

databaseManagerService.rollback(id);
Thread.sleep(2000);
Assert.assertTrue(sqlExecutor.list("select * from t_test").isEmpty());


Expand Down

0 comments on commit db5efc3

Please sign in to comment.