Skip to content

Commit

Permalink
Fix monitor bug (DataLinkDC#2626)
Browse files Browse the repository at this point in the history
* fix some monitor bug

* fix some monitor bug

* formate code
  • Loading branch information
gaoyan1998 authored Dec 12, 2023
1 parent 2343bd8 commit 8079fd6
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,12 @@
import org.dinky.data.vo.MetricsVO;
import org.dinky.utils.PaimonUtil;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;

import cn.hutool.core.text.StrFormatter;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;

/**
Expand All @@ -59,30 +55,21 @@ public static MetricsContextHolder getInstances() {

public void sendAsync(String key, MetricsVO o) {
CompletableFuture.runAsync(() -> {
metricsVOS.add(o);
long duration = System.currentTimeMillis() - lastDumpTime;
synchronized (metricsVOS) {
if (metricsVOS.size() > 1000 || duration > 1000 * 5) {
writeMetrics(metricsVOS);
metricsVOS.clear();
}
}
String topic = StrFormatter.format("{}/{}", SseTopic.METRICS.getValue(), key);
SseSessionContextHolder.sendTopic(topic, o);
});
}

private static synchronized void writeMetrics(List<MetricsVO> metricsList) {
ObjectUtil.clone(metricsList).forEach(metrics -> {
LocalDateTime now = metrics.getHeartTime();
metrics.setDate(now.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
metrics.setContent(JSONUtil.toJsonStr(metrics.getContent()));
});
try {
PaimonUtil.write(PaimonTableConstant.DINKY_METRICS, metricsList, MetricsVO.class);
} catch (Exception e) {
// todo 此处最好发个告警
log.error("write metrics error", e);
}
metricsVOS.add(o);
long duration = System.currentTimeMillis() - lastDumpTime;
synchronized (metricsVOS) {
if (metricsVOS.size() > 1000 || duration > 1000 * 5) {
PaimonUtil.write(PaimonTableConstant.DINKY_METRICS, metricsVOS, MetricsVO.class);
metricsVOS.clear();
}
}
String topic = StrFormatter.format("{}/{}", SseTopic.METRICS.getValue(), key);
SseSessionContextHolder.sendTopic(topic, o);
})
.whenComplete((v, t) -> {
if (t != null) {
log.error("send metrics async error", t);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,10 @@ public static boolean refreshJob(JobInfoDetail jobInfoDetail, boolean needSave)
JobDataDto jobDataDto = jobInfoDetail.getJobDataDto();
String oldStatus = jobInstance.getStatus();

// Local mode without rest port parameter unable to monitor
// Cluster information is missing and cannot be monitored
if (Asserts.isNull(jobInfoDetail.getClusterInstance())) {
jobInstance.setStatus(JobStatus.UNKNOWN.getValue());
jobInstanceService.updateById(jobInstance);
return true;
}
// Update the value of JobData from the flink api while ignoring the null value to prevent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.dinky.data.vo.MetricsVO;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -47,6 +48,7 @@ public static void refresh() {
metrics.setContent(metricsTotal);
metrics.setHeartTime(now);
metrics.setModel(MetricsType.LOCAL.getType());
metrics.setDate(now.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
MetricsContextHolder.getInstances().sendAsync(metrics.getModel(), metrics);

log.debug("Collecting jvm information ends.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public class MonitorServiceImpl extends ServiceImpl<MetricsMapper, Metrics> impl
private final Executor scheduleRefreshMonitorDataExecutor;

@Override
public List<MetricsVO> getData(Date startTime, Date endTime, List<String> jobIds) {
public List<MetricsVO> getData(Date startTime, Date endTime, List<String> models) {
endTime = Opt.ofNullable(endTime).orElse(DateUtil.date());
Timestamp startTS = Timestamp.fromLocalDateTime(DateUtil.toLocalDateTime(startTime));
Timestamp endTS = Timestamp.fromLocalDateTime(DateUtil.toLocalDateTime(endTime));
Expand All @@ -78,7 +78,7 @@ public List<MetricsVO> getData(Date startTime, Date endTime, List<String> jobIds
Predicate greaterOrEqual = p.greaterOrEqual(0, startTS);
Predicate lessOrEqual = p.lessOrEqual(0, endTS);
Predicate local =
p.in(1, jobIds.stream().map(BinaryString::fromString).collect(Collectors.toList()));
p.in(1, models.stream().map(BinaryString::fromString).collect(Collectors.toList()));
return CollUtil.newArrayList(local, greaterOrEqual, lessOrEqual);
};
List<MetricsVO> metricsVOList =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@

package org.dinky.data.metrics;

import java.io.Serializable;

import cn.hutool.core.lang.Singleton;
import lombok.Getter;
import lombok.Setter;

@Getter
@Setter
public class MetricsTotal {
public class MetricsTotal implements Serializable {

public static volatile MetricsTotal instance = Singleton.get(MetricsTotal.class);

Expand Down

0 comments on commit 8079fd6

Please sign in to comment.