diff --git a/dinky-admin/src/main/java/org/dinky/context/MetricsContextHolder.java b/dinky-admin/src/main/java/org/dinky/context/MetricsContextHolder.java index ecb0578a1c..2bb49dedbf 100644 --- a/dinky-admin/src/main/java/org/dinky/context/MetricsContextHolder.java +++ b/dinky-admin/src/main/java/org/dinky/context/MetricsContextHolder.java @@ -24,16 +24,12 @@ import org.dinky.data.vo.MetricsVO; import org.dinky.utils.PaimonUtil; -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; import cn.hutool.core.text.StrFormatter; -import cn.hutool.core.util.ObjectUtil; -import cn.hutool.json.JSONUtil; import lombok.extern.slf4j.Slf4j; /** @@ -59,30 +55,21 @@ public static MetricsContextHolder getInstances() { public void sendAsync(String key, MetricsVO o) { CompletableFuture.runAsync(() -> { - metricsVOS.add(o); - long duration = System.currentTimeMillis() - lastDumpTime; - synchronized (metricsVOS) { - if (metricsVOS.size() > 1000 || duration > 1000 * 5) { - writeMetrics(metricsVOS); - metricsVOS.clear(); - } - } - String topic = StrFormatter.format("{}/{}", SseTopic.METRICS.getValue(), key); - SseSessionContextHolder.sendTopic(topic, o); - }); - } - - private static synchronized void writeMetrics(List metricsList) { - ObjectUtil.clone(metricsList).forEach(metrics -> { - LocalDateTime now = metrics.getHeartTime(); - metrics.setDate(now.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); - metrics.setContent(JSONUtil.toJsonStr(metrics.getContent())); - }); - try { - PaimonUtil.write(PaimonTableConstant.DINKY_METRICS, metricsList, MetricsVO.class); - } catch (Exception e) { - // todo 此处最好发个告警 - log.error("write metrics error", e); - } + metricsVOS.add(o); + long duration = System.currentTimeMillis() - lastDumpTime; + synchronized (metricsVOS) { + if (metricsVOS.size() > 1000 || duration > 1000 * 5) { + PaimonUtil.write(PaimonTableConstant.DINKY_METRICS, metricsVOS, MetricsVO.class); + metricsVOS.clear(); + } + } + String topic = StrFormatter.format("{}/{}", SseTopic.METRICS.getValue(), key); + SseSessionContextHolder.sendTopic(topic, o); + }) + .whenComplete((v, t) -> { + if (t != null) { + log.error("send metrics async error", t); + } + }); } } diff --git a/dinky-admin/src/main/java/org/dinky/job/handler/JobRefreshHandler.java b/dinky-admin/src/main/java/org/dinky/job/handler/JobRefreshHandler.java index e236d27db3..c1d28374f7 100644 --- a/dinky-admin/src/main/java/org/dinky/job/handler/JobRefreshHandler.java +++ b/dinky-admin/src/main/java/org/dinky/job/handler/JobRefreshHandler.java @@ -95,8 +95,10 @@ public static boolean refreshJob(JobInfoDetail jobInfoDetail, boolean needSave) JobDataDto jobDataDto = jobInfoDetail.getJobDataDto(); String oldStatus = jobInstance.getStatus(); - // Local mode without rest port parameter unable to monitor + // Cluster information is missing and cannot be monitored if (Asserts.isNull(jobInfoDetail.getClusterInstance())) { + jobInstance.setStatus(JobStatus.UNKNOWN.getValue()); + jobInstanceService.updateById(jobInstance); return true; } // Update the value of JobData from the flink api while ignoring the null value to prevent diff --git a/dinky-admin/src/main/java/org/dinky/job/handler/SystemMetricsHandler.java b/dinky-admin/src/main/java/org/dinky/job/handler/SystemMetricsHandler.java index 0ad409e962..50df79d261 100644 --- a/dinky-admin/src/main/java/org/dinky/job/handler/SystemMetricsHandler.java +++ b/dinky-admin/src/main/java/org/dinky/job/handler/SystemMetricsHandler.java @@ -28,6 +28,7 @@ import org.dinky.data.vo.MetricsVO; import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; import lombok.extern.slf4j.Slf4j; @@ -47,6 +48,7 @@ public static void refresh() { metrics.setContent(metricsTotal); metrics.setHeartTime(now); metrics.setModel(MetricsType.LOCAL.getType()); + metrics.setDate(now.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); MetricsContextHolder.getInstances().sendAsync(metrics.getModel(), metrics); log.debug("Collecting jvm information ends."); diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/MonitorServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/MonitorServiceImpl.java index 6aa73a3fc1..27dc0cba3f 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/MonitorServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/MonitorServiceImpl.java @@ -65,7 +65,7 @@ public class MonitorServiceImpl extends ServiceImpl impl private final Executor scheduleRefreshMonitorDataExecutor; @Override - public List getData(Date startTime, Date endTime, List jobIds) { + public List getData(Date startTime, Date endTime, List models) { endTime = Opt.ofNullable(endTime).orElse(DateUtil.date()); Timestamp startTS = Timestamp.fromLocalDateTime(DateUtil.toLocalDateTime(startTime)); Timestamp endTS = Timestamp.fromLocalDateTime(DateUtil.toLocalDateTime(endTime)); @@ -78,7 +78,7 @@ public List getData(Date startTime, Date endTime, List jobIds Predicate greaterOrEqual = p.greaterOrEqual(0, startTS); Predicate lessOrEqual = p.lessOrEqual(0, endTS); Predicate local = - p.in(1, jobIds.stream().map(BinaryString::fromString).collect(Collectors.toList())); + p.in(1, models.stream().map(BinaryString::fromString).collect(Collectors.toList())); return CollUtil.newArrayList(local, greaterOrEqual, lessOrEqual); }; List metricsVOList = diff --git a/dinky-common/src/main/java/org/dinky/data/metrics/MetricsTotal.java b/dinky-common/src/main/java/org/dinky/data/metrics/MetricsTotal.java index 7d5d2962b7..e8903c41c2 100644 --- a/dinky-common/src/main/java/org/dinky/data/metrics/MetricsTotal.java +++ b/dinky-common/src/main/java/org/dinky/data/metrics/MetricsTotal.java @@ -19,13 +19,15 @@ package org.dinky.data.metrics; +import java.io.Serializable; + import cn.hutool.core.lang.Singleton; import lombok.Getter; import lombok.Setter; @Getter @Setter -public class MetricsTotal { +public class MetricsTotal implements Serializable { public static volatile MetricsTotal instance = Singleton.get(MetricsTotal.class);