diff --git a/hsweb-system/hsweb-system-database-manager/hsweb-system-database-manager-local/src/main/java/org/hswebframework/web/datasource/manager/simple/DefaultLocalTransactionExecutor.java b/hsweb-system/hsweb-system-database-manager/hsweb-system-database-manager-local/src/main/java/org/hswebframework/web/datasource/manager/simple/DefaultLocalTransactionExecutor.java index e2637f786..694751274 100644 --- a/hsweb-system/hsweb-system-database-manager/hsweb-system-database-manager-local/src/main/java/org/hswebframework/web/datasource/manager/simple/DefaultLocalTransactionExecutor.java +++ b/hsweb-system/hsweb-system-database-manager/hsweb-system-database-manager-local/src/main/java/org/hswebframework/web/datasource/manager/simple/DefaultLocalTransactionExecutor.java @@ -1,5 +1,6 @@ package org.hswebframework.web.datasource.manager.simple; +import lombok.SneakyThrows; import org.hswebframework.ezorm.rdb.executor.SqlExecutor; import org.hswebframework.web.database.manager.SqlExecuteRequest; import org.hswebframework.web.database.manager.SqlExecuteResult; @@ -16,9 +17,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Queue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -31,7 +30,7 @@ public class DefaultLocalTransactionExecutor implements TransactionExecutor { private final Logger logger = LoggerFactory.getLogger(this.getClass()); - private Queue executionQueue = new LinkedList<>(); + private BlockingQueue executionQueue = new LinkedBlockingQueue<>(); private SqlExecutor sqlExecutor; @@ -51,9 +50,6 @@ public class DefaultLocalTransactionExecutor implements TransactionExecutor { private volatile boolean running = false; - /* 线程循环开始等待sql进入的时候执行一次,sql进入的时候执行一次,然后唤醒线程开始执行sql */ - private CyclicBarrier waitToReady = new CyclicBarrier(2); - private CountDownLatch waitClose = new CountDownLatch(1); public DefaultLocalTransactionExecutor(SqlExecutor sqlExecutor, String transactionId, String datasourceId, TransactionTemplate transactionTemplate) { @@ -82,11 +78,8 @@ public void commit() { protected void waitToClose() { try { + executionQueue.add(new NullExecution()); logger.debug("wait transaction {} close", transactionId); - if (!running) { - //先唤醒执行,继续执行任务 - waitToReady.await(); - } //等待执行结束 waitClose.await(); } catch (Exception e) { @@ -135,14 +128,20 @@ public void run() { } while (!shutdown) { logger.debug("wait sql execute request {}", transactionId); + Execution execution; if (transactionTemplate.getTimeout() > 0) { - waitToReady.await(transactionTemplate.getTimeout(), TimeUnit.MILLISECONDS);//等待有新的sql进来 + execution = executionQueue.poll(transactionTemplate.getTimeout(), TimeUnit.MILLISECONDS);//等待有新的sql进来 + if (execution == null) { + throw new TimeoutException("事务[" + transactionId + "]超时"); + } } else { - waitToReady.await(); + execution = executionQueue.take(); + } + if (execution instanceof NullExecution) { + continue; } - waitToReady.reset();//重置,下一次循环继续等待 //执行sql - doExecute(); + doExecute(execution); } } catch (Exception e) { tryRollback();//回滚 @@ -164,41 +163,38 @@ public void run() { } } - protected void doExecute() { - Execution execution; - while ((execution = executionQueue.poll()) != null) { - Execution finalE = execution; - running = true; - logger.debug("start execute sql {}", transactionId); - try { - List requests = execution.request.getSql() - .stream() - .map(sqlInfo -> { - try { - if (finalE.datasourceId != null) { - DataSourceHolder.switcher().use(finalE.datasourceId); - } else { - DataSourceHolder.switcher().useDefault(); - } - //执行sql - return sqlRequestExecutor.apply(sqlExecutor, sqlInfo); - } catch (Exception e) { - return SqlExecuteResult.builder() - .result(e.getMessage()) - .sqlInfo(sqlInfo) - .success(false) - .build(); + @SneakyThrows + protected void doExecute(Execution execution) { + running = true; + logger.debug("start execute sql {}", transactionId); + try { + List requests = execution.request.getSql() + .stream() + .map(sqlInfo -> { + try { + if (execution.datasourceId != null) { + DataSourceHolder.switcher().use(execution.datasourceId); + } else { + DataSourceHolder.switcher().useDefault(); } - }) - .collect(Collectors.toList()); - //通过回调返回执行结果 - execution.callback.accept(requests); - } catch (Exception e) { - execution.onError.accept(e); - return; - } + //执行sql + return sqlRequestExecutor.apply(sqlExecutor, sqlInfo); + } catch (Exception e) { + return SqlExecuteResult.builder() + .result(e.getMessage()) + .sqlInfo(sqlInfo) + .success(false) + .build(); + } + }) + .collect(Collectors.toList()); + //通过回调返回执行结果 + execution.callback.accept(requests); + } catch (Exception e) { + execution.onError.accept(e); + } finally { + running = false; } - running = false; } @Override @@ -220,20 +216,18 @@ public List execute(SqlExecuteRequest request) throws Exceptio sqlExecuteResults.clear(); countDownLatch.countDown(); }; - execution.onError = (e) -> { - countDownLatch.countDown(); - }; + execution.onError = (e) -> countDownLatch.countDown(); logger.debug("submit sql execute job {}", transactionId); executionQueue.add(execution); - //当前没有在执行sql,说明现在正在等待新的sql进入,唤醒之 - if (!running) { - waitToReady.await(); - } //等待sql执行完毕 countDownLatch.await(); return results; } + private class NullExecution extends Execution { + + } + protected class Execution { protected String datasourceId; diff --git a/hsweb-system/hsweb-system-database-manager/hsweb-system-database-manager-starter/src/test/java/org/hswebframework/web/datasource/manager/simple/SimpleDatabaseManagerServiceTest.java b/hsweb-system/hsweb-system-database-manager/hsweb-system-database-manager-starter/src/test/java/org/hswebframework/web/datasource/manager/simple/SimpleDatabaseManagerServiceTest.java index 83a2e00ff..fe8a2b01d 100644 --- a/hsweb-system/hsweb-system-database-manager/hsweb-system-database-manager-starter/src/test/java/org/hswebframework/web/datasource/manager/simple/SimpleDatabaseManagerServiceTest.java +++ b/hsweb-system/hsweb-system-database-manager/hsweb-system-database-manager-starter/src/test/java/org/hswebframework/web/datasource/manager/simple/SimpleDatabaseManagerServiceTest.java @@ -16,6 +16,7 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.*; @@ -53,13 +54,14 @@ public void testExecuteSql() throws Exception { new Thread(() -> { try { databaseManagerService.execute(id, request); + Thread.sleep(100); } catch (Exception e) { - throw new RuntimeException(); + throw new RuntimeException(e); } countDownLatch.countDown(); }).start(); } - countDownLatch.await(); + countDownLatch.await(30, TimeUnit.SECONDS); sqlInfo = new SqlInfo(); sqlInfo.setSql("select *,name as \"NAME\",1 as \"\" from t_test "); @@ -71,10 +73,9 @@ public void testExecuteSql() throws Exception { // System.out.println(JSON.toJSONString(results)); - Assert.assertTrue(sqlExecutor.list("select * from t_test").isEmpty()); + Assert.assertTrue(sqlExecutor.list("select * from t_test").isEmpty()); databaseManagerService.rollback(id); - Thread.sleep(2000); Assert.assertTrue(sqlExecutor.list("select * from t_test").isEmpty());