Skip to content

Commit

Permalink
Add sse heart 3 (DataLinkDC#3391)
Browse files Browse the repository at this point in the history
  • Loading branch information
gaoyan1998 authored Apr 12, 2024
1 parent 72cfc1f commit 3ebd880
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,9 @@ public Object round(ProceedingJoinPoint proceedingJoinPoint) {
proceed = proceedingJoinPoint.proceed();
} catch (Throwable e) {
if (!(e instanceof DinkyException)) {
throw new DinkyException(e);
throw new RuntimeException(e);
}
e.printStackTrace();
throw (DinkyException) e;
throw (RuntimeException) e;
} finally {
if (contextClassLoader != Thread.currentThread().getContextClassLoader()) {
Thread.currentThread().setContextClassLoader(contextClassLoader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.dinky.context;

import org.dinky.daemon.pool.ScheduleThreadPool;
import org.dinky.data.constant.SseConstant;
import org.dinky.data.exception.BusException;
import org.dinky.data.vo.SseDataVo;
Expand All @@ -30,15 +31,39 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.springframework.scheduling.support.PeriodicTrigger;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Data
public class SseSessionContextHolder {

@Getter
private static final Map<String, TopicSubscriber> sessionMap = new ConcurrentHashMap<>();

public static void init(ScheduleThreadPool schedulePool) {
log.info("start init sse heart schedule task");
PeriodicTrigger trigger = new PeriodicTrigger(9 * 1000L);
schedulePool.addSchedule(
SseSessionContextHolder.class.toString(), SseSessionContextHolder::checkHeart, trigger);
}

public static void checkHeart() {
sessionMap.forEach((sessionKey, topicSubscriber) -> {
try {
SseDataVo data = new SseDataVo(sessionKey, SseConstant.HEART_TOPIC, "heart");
sendSse(sessionKey, data);
} catch (Exception e) {
log.error("Error sending sse data:{}", e.getMessage());
onError(topicSubscriber.getEmitter(), sessionKey, e);
}
});
}

/**
* Subscribes a session to the topics.
* A session can subscribe to multiple topics
Expand All @@ -65,21 +90,20 @@ public static Set<String> subscribeTopic(String sessionId, List<String> topics)
* @param sessionKey The session key of the new session.
* @return The SseEmitter for the session.
*/
public static SseEmitter connectSession(String sessionKey) {
public static synchronized SseEmitter connectSession(String sessionKey) {
log.debug("New session wants to connect: {}", sessionKey);
log.warn("Session key already exists: {},replace it", sessionKey);

SseEmitter sseEmitter = new SseEmitter(60 * 1000L * 10);
sseEmitter.onError(err -> onError(sessionKey, err));
sseEmitter.onTimeout(() -> onTimeout(sessionKey));
sseEmitter.onCompletion(() -> onCompletion(sessionKey));
SseEmitter sseEmitter = new SseEmitter(30 * 60 * 1000L);
sseEmitter.onError(err -> onError(sseEmitter, sessionKey, err));
sseEmitter.onTimeout(() -> onTimeout(sseEmitter));
sseEmitter.onCompletion(() -> onCompletion(sseEmitter, sessionKey));
sessionMap.put(sessionKey, TopicSubscriber.of(sseEmitter));
try {
// Set the client reconnection interval, 0 to reconnect immediately
sseEmitter.send(SseEmitter.event().reconnectTime(1000));
} catch (IOException e) {
throw new RuntimeException(e);
}
sessionMap.put(sessionKey, TopicSubscriber.of(sseEmitter));
return sseEmitter;
}

Expand All @@ -95,28 +119,12 @@ public static boolean exists(String sessionKey) {

/**
* Handles the timeout event for a session.
*
* @param sessionKey The session key of the timed-out session.
*/
public static void onTimeout(String sessionKey) {
log.debug("Type: SseSession Timeout, Session ID: {}", sessionKey);
closeSse(sessionKey);
}

/**
* Closes the SseEmitter for a session.
*
* @param sessionKey The session key of the session to close.
*/
public static void closeSse(String sessionKey) {
if (exists(sessionKey)) {
try {
sessionMap.get(sessionKey).getEmitter().complete();
} catch (Exception e) {
log.warn("Failed to complete sseEmitter, Session Key: {}, Error: {}", sessionKey, e.getMessage());
} finally {
sessionMap.remove(sessionKey);
}
public static void onTimeout(SseEmitter sseEmitter) {
try {
sseEmitter.complete();
} catch (Exception e) {
log.warn("Failed to complete sseEmitter, Error: {}", e.getMessage());
}
}

Expand All @@ -126,26 +134,29 @@ public static void closeSse(String sessionKey) {
* @param sessionKey The session key of the session with the error.
* @param throwable The throwable representing the error.
*/
public static void onError(String sessionKey, Throwable throwable) {
log.error("Type: SseSession [{}] Error, Message: {}", sessionKey, throwable.getMessage());
if (exists(sessionKey)) {
try {
sessionMap.get(sessionKey).getEmitter().completeWithError(throwable);
} catch (Exception e) {
log.error("Failed to complete Sse With Error: {}", e.getMessage());
}
private static void onError(SseEmitter sseEmitter, String sessionKey, Throwable throwable) {
log.debug("Type: SseSession [{}] Error, Message: {}", sessionKey, throwable.getMessage());
try {
sseEmitter.completeWithError(throwable);
} catch (Exception e) {
log.debug("Failed to complete Sse With Error: {}", e.getMessage());
}
sessionMap.remove(sessionKey);
onCompletion(sseEmitter, sessionKey);
}

/**
* Handles the completion event for a session.
*
* @param sessionKey The session key of the completed session.
*/
public static void onCompletion(String sessionKey) {
private static synchronized void onCompletion(SseEmitter sseEmitter, String sessionKey) {
log.debug("Type: SseSession Completion, Session ID: {}", sessionKey);
closeSse(sessionKey);
if (exists(sessionKey)) {
SseEmitter emitter = sessionMap.get(sessionKey).getEmitter();
if (emitter == sseEmitter) {
sessionMap.remove(sessionKey);
}
}
}

/**
Expand All @@ -162,7 +173,7 @@ public static void sendTopic(String topic, Object content) {
sendSse(sessionKey, data);
} catch (Exception e) {
log.error("Error sending sse data:{}", e.getMessage());
onError(sessionKey, e);
onError(topicSubscriber.getEmitter(), sessionKey, e);
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import cn.dev33.satoken.annotation.SaCheckLogin;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiOperation;
Expand All @@ -43,6 +44,7 @@
@Api(tags = "SSE Controller")
@RequestMapping("/api/sse")
@RequiredArgsConstructor
@SaCheckLogin
public class SseController {

@PostMapping(value = "/subscribeTopic")
Expand Down
4 changes: 3 additions & 1 deletion dinky-admin/src/main/java/org/dinky/init/SystemInit.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.dinky.init;

import org.dinky.assertion.Asserts;
import org.dinky.context.SseSessionContextHolder;
import org.dinky.context.TenantContextHolder;
import org.dinky.daemon.pool.FlinkJobThreadPool;
import org.dinky.daemon.pool.ScheduleThreadPool;
Expand Down Expand Up @@ -154,7 +155,7 @@ private void initDaemon() {
Configuration<Integer> sysGatherTiming = sysConfig.getMetricsSysGatherTiming();
Consumer<Configuration<?>> metricsListener = c -> {
c.addChangeEvent(x -> {
schedule.removeSchedule(sysMetricsTask);
schedule.removeSchedule(sysMetricsTask.getType());
PeriodicTrigger trigger = new PeriodicTrigger(sysGatherTiming.getValue());
if (metricsSysEnable.getValue()) schedule.addSchedule(sysMetricsTask, trigger);
});
Expand All @@ -175,6 +176,7 @@ private void initDaemon() {
DaemonTask daemonTask = DaemonTask.build(config);
flinkJobThreadPool.execute(daemonTask);
}
SseSessionContextHolder.init(schedule);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@ public class SseConstant {
* Sse label for front to reconnect session
*/
public static final String SSE_SESSION_INVALID = "SESSION_INVALID";

public static final String HEART_TOPIC = "HEART_BEAT";
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,13 @@ public void addSchedule(DaemonTask task, Trigger trigger) {
getScheduleMap().put(task.getType(), schedule);
}

public void removeSchedule(DaemonTask task) {
ScheduledFuture<?> scheduledFuture = getScheduleMap().get(task.getType());
public void addSchedule(String type, Runnable task, Trigger trigger) {
ScheduledFuture<?> schedule = threadPoolTaskScheduler.schedule(task, trigger);
getScheduleMap().put(type, schedule);
}

public void removeSchedule(String type) {
ScheduledFuture<?> scheduledFuture = getScheduleMap().get(type);
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
Expand Down
31 changes: 21 additions & 10 deletions dinky-web/src/models/Sse.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,18 @@ export type SubscriberData = {

export default () => {
const uuidRef = useRef<string>(uuidv4());
const lastHeartTime = useRef<number>(0);
const subscriberRef = useRef<SubscriberData[]>([]);
const [eventSource, setEventSource] = useState<EventSource>();

const reconnectSse = () => {
lastHeartTime.current = Date.now();
uuidRef.current = uuidv4();
const sseUrl = 'api/sse/connect?sessionKey=' + uuidRef.current;
eventSource?.close();
setEventSource(new EventSource(sseUrl));
};

const subscribe = async () => {
const topics: string[] = [];
subscriberRef.current.forEach((sub) => topics.push(...sub.topic));
Expand All @@ -50,15 +59,13 @@ export default () => {
})
.catch((e) => ErrorMessage(e));
};
const reconnectSse = () => {
uuidRef.current = uuidv4();
const sseUrl = 'api/sse/connect?sessionKey=' + uuidRef.current;
eventSource?.close();
setEventSource(new EventSource(sseUrl));
};

useEffect(() => {
reconnectSse();
setInterval(() => {
if (Date.now() - lastHeartTime.current > 20 * 1000) {
reconnectSse();
}
}, 5000);
}, []);

useEffect(() => {
Expand All @@ -67,9 +74,13 @@ export default () => {
eventSource.onmessage = (e) => {
try {
const data: SseData = JSON.parse(e.data);
subscriberRef.current
.filter((sub) => sub.topic.includes(data.topic))
.forEach((sub) => sub.call(data));
if (data.topic === 'HEART_BEAT') {
lastHeartTime.current = Date.now();
} else {
subscriberRef.current
.filter((sub) => sub.topic.includes(data.topic))
.forEach((sub) => sub.call(data));
}
} catch (e: any) {
ErrorMessage(e);
}
Expand Down
3 changes: 0 additions & 3 deletions dinky-web/src/pages/Other/Login/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ const Login: React.FC = () => {

const [localStorageOfToken, setLocalStorageOfToken] = useLocalStorage('token', '');

const { reconnectSse } = useModel('Sse', (model: any) => ({ reconnectSse: model.reconnectSse }));

const containerClassName = useEmotionCss(() => {
return {
display: 'flex',
Expand Down Expand Up @@ -99,7 +97,6 @@ const Login: React.FC = () => {
/**
* Redirect to home page && reconnect Global Sse
*/
reconnectSse();
gotoRedirectUrl();
} else {
ErrorMessage(l('login.chooseTenantFailed'));
Expand Down

0 comments on commit 3ebd880

Please sign in to comment.