diff --git a/dinky-admin/src/main/java/org/dinky/aop/UdfClassLoaderAspect.java b/dinky-admin/src/main/java/org/dinky/aop/UdfClassLoaderAspect.java index ab7a98f157..696ea88ef2 100644 --- a/dinky-admin/src/main/java/org/dinky/aop/UdfClassLoaderAspect.java +++ b/dinky-admin/src/main/java/org/dinky/aop/UdfClassLoaderAspect.java @@ -55,10 +55,9 @@ public Object round(ProceedingJoinPoint proceedingJoinPoint) { proceed = proceedingJoinPoint.proceed(); } catch (Throwable e) { if (!(e instanceof DinkyException)) { - throw new DinkyException(e); + throw new RuntimeException(e); } - e.printStackTrace(); - throw (DinkyException) e; + throw (RuntimeException) e; } finally { if (contextClassLoader != Thread.currentThread().getContextClassLoader()) { Thread.currentThread().setContextClassLoader(contextClassLoader); diff --git a/dinky-admin/src/main/java/org/dinky/context/SseSessionContextHolder.java b/dinky-admin/src/main/java/org/dinky/context/SseSessionContextHolder.java index 3d4835c7ac..748b8b0637 100644 --- a/dinky-admin/src/main/java/org/dinky/context/SseSessionContextHolder.java +++ b/dinky-admin/src/main/java/org/dinky/context/SseSessionContextHolder.java @@ -19,6 +19,7 @@ package org.dinky.context; +import org.dinky.daemon.pool.ScheduleThreadPool; import org.dinky.data.constant.SseConstant; import org.dinky.data.exception.BusException; import org.dinky.data.vo.SseDataVo; @@ -30,15 +31,39 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import org.springframework.scheduling.support.PeriodicTrigger; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import lombok.Data; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; @Slf4j +@Data public class SseSessionContextHolder { + + @Getter private static final Map sessionMap = new ConcurrentHashMap<>(); + public static void init(ScheduleThreadPool schedulePool) { + log.info("start init sse heart schedule task"); + PeriodicTrigger trigger = new PeriodicTrigger(9 * 1000L); + schedulePool.addSchedule( + SseSessionContextHolder.class.toString(), SseSessionContextHolder::checkHeart, trigger); + } + + public static void checkHeart() { + sessionMap.forEach((sessionKey, topicSubscriber) -> { + try { + SseDataVo data = new SseDataVo(sessionKey, SseConstant.HEART_TOPIC, "heart"); + sendSse(sessionKey, data); + } catch (Exception e) { + log.error("Error sending sse data:{}", e.getMessage()); + onError(topicSubscriber.getEmitter(), sessionKey, e); + } + }); + } + /** * Subscribes a session to the topics. * A session can subscribe to multiple topics @@ -65,21 +90,20 @@ public static Set subscribeTopic(String sessionId, List topics) * @param sessionKey The session key of the new session. * @return The SseEmitter for the session. */ - public static SseEmitter connectSession(String sessionKey) { + public static synchronized SseEmitter connectSession(String sessionKey) { log.debug("New session wants to connect: {}", sessionKey); - log.warn("Session key already exists: {},replace it", sessionKey); - SseEmitter sseEmitter = new SseEmitter(60 * 1000L * 10); - sseEmitter.onError(err -> onError(sessionKey, err)); - sseEmitter.onTimeout(() -> onTimeout(sessionKey)); - sseEmitter.onCompletion(() -> onCompletion(sessionKey)); + SseEmitter sseEmitter = new SseEmitter(30 * 60 * 1000L); + sseEmitter.onError(err -> onError(sseEmitter, sessionKey, err)); + sseEmitter.onTimeout(() -> onTimeout(sseEmitter)); + sseEmitter.onCompletion(() -> onCompletion(sseEmitter, sessionKey)); + sessionMap.put(sessionKey, TopicSubscriber.of(sseEmitter)); try { // Set the client reconnection interval, 0 to reconnect immediately sseEmitter.send(SseEmitter.event().reconnectTime(1000)); } catch (IOException e) { throw new RuntimeException(e); } - sessionMap.put(sessionKey, TopicSubscriber.of(sseEmitter)); return sseEmitter; } @@ -95,28 +119,12 @@ public static boolean exists(String sessionKey) { /** * Handles the timeout event for a session. - * - * @param sessionKey The session key of the timed-out session. */ - public static void onTimeout(String sessionKey) { - log.debug("Type: SseSession Timeout, Session ID: {}", sessionKey); - closeSse(sessionKey); - } - - /** - * Closes the SseEmitter for a session. - * - * @param sessionKey The session key of the session to close. - */ - public static void closeSse(String sessionKey) { - if (exists(sessionKey)) { - try { - sessionMap.get(sessionKey).getEmitter().complete(); - } catch (Exception e) { - log.warn("Failed to complete sseEmitter, Session Key: {}, Error: {}", sessionKey, e.getMessage()); - } finally { - sessionMap.remove(sessionKey); - } + public static void onTimeout(SseEmitter sseEmitter) { + try { + sseEmitter.complete(); + } catch (Exception e) { + log.warn("Failed to complete sseEmitter, Error: {}", e.getMessage()); } } @@ -126,16 +134,14 @@ public static void closeSse(String sessionKey) { * @param sessionKey The session key of the session with the error. * @param throwable The throwable representing the error. */ - public static void onError(String sessionKey, Throwable throwable) { - log.error("Type: SseSession [{}] Error, Message: {}", sessionKey, throwable.getMessage()); - if (exists(sessionKey)) { - try { - sessionMap.get(sessionKey).getEmitter().completeWithError(throwable); - } catch (Exception e) { - log.error("Failed to complete Sse With Error: {}", e.getMessage()); - } + private static void onError(SseEmitter sseEmitter, String sessionKey, Throwable throwable) { + log.debug("Type: SseSession [{}] Error, Message: {}", sessionKey, throwable.getMessage()); + try { + sseEmitter.completeWithError(throwable); + } catch (Exception e) { + log.debug("Failed to complete Sse With Error: {}", e.getMessage()); } - sessionMap.remove(sessionKey); + onCompletion(sseEmitter, sessionKey); } /** @@ -143,9 +149,14 @@ public static void onError(String sessionKey, Throwable throwable) { * * @param sessionKey The session key of the completed session. */ - public static void onCompletion(String sessionKey) { + private static synchronized void onCompletion(SseEmitter sseEmitter, String sessionKey) { log.debug("Type: SseSession Completion, Session ID: {}", sessionKey); - closeSse(sessionKey); + if (exists(sessionKey)) { + SseEmitter emitter = sessionMap.get(sessionKey).getEmitter(); + if (emitter == sseEmitter) { + sessionMap.remove(sessionKey); + } + } } /** @@ -162,7 +173,7 @@ public static void sendTopic(String topic, Object content) { sendSse(sessionKey, data); } catch (Exception e) { log.error("Error sending sse data:{}", e.getMessage()); - onError(sessionKey, e); + onError(topicSubscriber.getEmitter(), sessionKey, e); } } }); diff --git a/dinky-admin/src/main/java/org/dinky/controller/SseController.java b/dinky-admin/src/main/java/org/dinky/controller/SseController.java index 382b41d90a..eaec9bccc6 100644 --- a/dinky-admin/src/main/java/org/dinky/controller/SseController.java +++ b/dinky-admin/src/main/java/org/dinky/controller/SseController.java @@ -34,6 +34,7 @@ import org.springframework.web.bind.annotation.RestController; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; +import cn.dev33.satoken.annotation.SaCheckLogin; import io.swagger.annotations.Api; import io.swagger.annotations.ApiImplicitParam; import io.swagger.annotations.ApiOperation; @@ -43,6 +44,7 @@ @Api(tags = "SSE Controller") @RequestMapping("/api/sse") @RequiredArgsConstructor +@SaCheckLogin public class SseController { @PostMapping(value = "/subscribeTopic") diff --git a/dinky-admin/src/main/java/org/dinky/init/SystemInit.java b/dinky-admin/src/main/java/org/dinky/init/SystemInit.java index a5605b1f59..e525cebf21 100644 --- a/dinky-admin/src/main/java/org/dinky/init/SystemInit.java +++ b/dinky-admin/src/main/java/org/dinky/init/SystemInit.java @@ -20,6 +20,7 @@ package org.dinky.init; import org.dinky.assertion.Asserts; +import org.dinky.context.SseSessionContextHolder; import org.dinky.context.TenantContextHolder; import org.dinky.daemon.pool.FlinkJobThreadPool; import org.dinky.daemon.pool.ScheduleThreadPool; @@ -154,7 +155,7 @@ private void initDaemon() { Configuration sysGatherTiming = sysConfig.getMetricsSysGatherTiming(); Consumer> metricsListener = c -> { c.addChangeEvent(x -> { - schedule.removeSchedule(sysMetricsTask); + schedule.removeSchedule(sysMetricsTask.getType()); PeriodicTrigger trigger = new PeriodicTrigger(sysGatherTiming.getValue()); if (metricsSysEnable.getValue()) schedule.addSchedule(sysMetricsTask, trigger); }); @@ -175,6 +176,7 @@ private void initDaemon() { DaemonTask daemonTask = DaemonTask.build(config); flinkJobThreadPool.execute(daemonTask); } + SseSessionContextHolder.init(schedule); } /** diff --git a/dinky-common/src/main/java/org/dinky/data/constant/SseConstant.java b/dinky-common/src/main/java/org/dinky/data/constant/SseConstant.java index 47237860aa..e6610ee17d 100644 --- a/dinky-common/src/main/java/org/dinky/data/constant/SseConstant.java +++ b/dinky-common/src/main/java/org/dinky/data/constant/SseConstant.java @@ -24,4 +24,6 @@ public class SseConstant { * Sse label for front to reconnect session */ public static final String SSE_SESSION_INVALID = "SESSION_INVALID"; + + public static final String HEART_TOPIC = "HEART_BEAT"; } diff --git a/dinky-daemon/src/main/java/org/dinky/daemon/pool/ScheduleThreadPool.java b/dinky-daemon/src/main/java/org/dinky/daemon/pool/ScheduleThreadPool.java index dfc37143cb..1f29715fbe 100644 --- a/dinky-daemon/src/main/java/org/dinky/daemon/pool/ScheduleThreadPool.java +++ b/dinky-daemon/src/main/java/org/dinky/daemon/pool/ScheduleThreadPool.java @@ -41,8 +41,13 @@ public void addSchedule(DaemonTask task, Trigger trigger) { getScheduleMap().put(task.getType(), schedule); } - public void removeSchedule(DaemonTask task) { - ScheduledFuture scheduledFuture = getScheduleMap().get(task.getType()); + public void addSchedule(String type, Runnable task, Trigger trigger) { + ScheduledFuture schedule = threadPoolTaskScheduler.schedule(task, trigger); + getScheduleMap().put(type, schedule); + } + + public void removeSchedule(String type) { + ScheduledFuture scheduledFuture = getScheduleMap().get(type); if (scheduledFuture != null) { scheduledFuture.cancel(true); } diff --git a/dinky-web/src/models/Sse.tsx b/dinky-web/src/models/Sse.tsx index 7bfd0ae811..1ba39151dd 100644 --- a/dinky-web/src/models/Sse.tsx +++ b/dinky-web/src/models/Sse.tsx @@ -35,9 +35,18 @@ export type SubscriberData = { export default () => { const uuidRef = useRef(uuidv4()); + const lastHeartTime = useRef(0); const subscriberRef = useRef([]); const [eventSource, setEventSource] = useState(); + const reconnectSse = () => { + lastHeartTime.current = Date.now(); + uuidRef.current = uuidv4(); + const sseUrl = 'api/sse/connect?sessionKey=' + uuidRef.current; + eventSource?.close(); + setEventSource(new EventSource(sseUrl)); + }; + const subscribe = async () => { const topics: string[] = []; subscriberRef.current.forEach((sub) => topics.push(...sub.topic)); @@ -50,15 +59,13 @@ export default () => { }) .catch((e) => ErrorMessage(e)); }; - const reconnectSse = () => { - uuidRef.current = uuidv4(); - const sseUrl = 'api/sse/connect?sessionKey=' + uuidRef.current; - eventSource?.close(); - setEventSource(new EventSource(sseUrl)); - }; useEffect(() => { - reconnectSse(); + setInterval(() => { + if (Date.now() - lastHeartTime.current > 20 * 1000) { + reconnectSse(); + } + }, 5000); }, []); useEffect(() => { @@ -67,9 +74,13 @@ export default () => { eventSource.onmessage = (e) => { try { const data: SseData = JSON.parse(e.data); - subscriberRef.current - .filter((sub) => sub.topic.includes(data.topic)) - .forEach((sub) => sub.call(data)); + if (data.topic === 'HEART_BEAT') { + lastHeartTime.current = Date.now(); + } else { + subscriberRef.current + .filter((sub) => sub.topic.includes(data.topic)) + .forEach((sub) => sub.call(data)); + } } catch (e: any) { ErrorMessage(e); } diff --git a/dinky-web/src/pages/Other/Login/index.tsx b/dinky-web/src/pages/Other/Login/index.tsx index 1c32c35762..32c3cf0b98 100644 --- a/dinky-web/src/pages/Other/Login/index.tsx +++ b/dinky-web/src/pages/Other/Login/index.tsx @@ -43,8 +43,6 @@ const Login: React.FC = () => { const [localStorageOfToken, setLocalStorageOfToken] = useLocalStorage('token', ''); - const { reconnectSse } = useModel('Sse', (model: any) => ({ reconnectSse: model.reconnectSse })); - const containerClassName = useEmotionCss(() => { return { display: 'flex', @@ -99,7 +97,6 @@ const Login: React.FC = () => { /** * Redirect to home page && reconnect Global Sse */ - reconnectSse(); gotoRedirectUrl(); } else { ErrorMessage(l('login.chooseTenantFailed'));