Skip to content

Commit

Permalink
[Improvement] Optimize paimon to store monitoring data, causing threa…
Browse files Browse the repository at this point in the history
…ds not to be released (DataLinkDC#3547)

Signed-off-by: Zzm0809 <[email protected]>
Co-authored-by: Pandas886 <[email protected]>
Co-authored-by: Zzm0809 <[email protected]>
Co-authored-by: Zzm0809 <[email protected]>
  • Loading branch information
4 people authored Jun 4, 2024
1 parent 295eb6c commit 383babb
Show file tree
Hide file tree
Showing 25 changed files with 214 additions and 190 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,20 @@
import org.dinky.utils.PaimonUtil;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import com.google.common.util.concurrent.ThreadFactoryBuilder;

import cn.hutool.core.text.StrFormatter;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

/**
Expand All @@ -39,38 +48,48 @@
@Slf4j
public class MetricsContextHolder {

@Getter
protected static final MetricsContextHolder instance = new MetricsContextHolder();

public static MetricsContextHolder getInstances() {
return instance;
}
private final List<MetricsVO> metricsVOS = new CopyOnWriteArrayList<>();
private final AtomicLong lastDumpTime = new AtomicLong(System.currentTimeMillis());

/**
* Temporary cache monitoring information, mainly to prevent excessive buffering of write IO,
* when metricsVOS data reaches 1000 or the time exceeds 5 seconds
*/
private final List<MetricsVO> metricsVOS = Collections.synchronizedList(new ArrayList<>());
// Create a ThreadFactory with custom naming
ThreadFactory namedThreadFactory =
new ThreadFactoryBuilder().setNameFormat("metrics-send-thread-%d").build();

private final Long lastDumpTime = System.currentTimeMillis();
// Create a custom ThreadPoolExecutor
ExecutorService pool = new ThreadPoolExecutor(
5, // Core pool size
10, // Maximum pool size, allows the pool to expand as needed
60L, // Keep alive time for idle threads
TimeUnit.SECONDS, // Unit of keep alive time
new LinkedBlockingQueue<Runnable>(10), // Use a larger queue to hold excess tasks
namedThreadFactory);

public void sendAsync(String key, MetricsVO o) {
CompletableFuture.runAsync(() -> {
Thread.currentThread().setContextClassLoader(MetricsContextHolder.class.getClassLoader());
metricsVOS.add(o);
long duration = System.currentTimeMillis() - lastDumpTime;
synchronized (metricsVOS) {
if (metricsVOS.size() > 1000 || duration > 1000 * 5) {
PaimonUtil.write(PaimonTableConstant.DINKY_METRICS, metricsVOS, MetricsVO.class);
metricsVOS.clear();
}
}
String topic = StrFormatter.format("{}/{}", SseTopic.METRICS.getValue(), key);
SseSessionContextHolder.sendTopic(topic, o);
})
.whenComplete((v, t) -> {
if (t != null) {
log.error("send metrics async error", t);
}
});
Object content = o.getContent();
if (content == null
|| (content instanceof ConcurrentHashMap && ((ConcurrentHashMap<?, ?>) content).isEmpty())) {
return; // Return early to avoid unnecessary operations
}
pool.execute(() -> {
metricsVOS.add(o);
long current = System.currentTimeMillis();
long duration = current - lastDumpTime.get();
// Temporary cache monitoring information, mainly to prevent excessive buffering of write IO,
// when metricsVOS data reaches 1000 or the time exceeds 15 seconds
if (metricsVOS.size() >= 1000 || duration >= 15000) {
List<MetricsVO> snapshot;
synchronized (this) { // Enter synchronized block only when necessary
snapshot = new ArrayList<>(metricsVOS);
metricsVOS.clear();
lastDumpTime.set(current);
}
PaimonUtil.write(PaimonTableConstant.DINKY_METRICS, snapshot, MetricsVO.class);
}
String topic = StrFormatter.format("{}/{}", SseTopic.METRICS.getValue(), key);
SseSessionContextHolder.sendTopic(topic, o); // Ensure only successfully added metrics are sent
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public static void refeshAndWriteFlinkMetrics(
metricsVO.setHeartTime(LocalDateTime.now());
metricsVO.setModel(jobId);
metricsVO.setDate(TimeUtil.nowStr("yyyy-MM-dd"));
MetricsContextHolder.getInstances().sendAsync(metricsVO.getModel(), metricsVO);
MetricsContextHolder.getInstance().sendAsync(metricsVO.getModel(), metricsVO);
} catch (Exception e) {
log.error("Get and save Flink metrics error", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public static void refresh() {
metrics.setHeartTime(now);
metrics.setModel(MetricsType.LOCAL.getType());
metrics.setDate(now.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
MetricsContextHolder.getInstances().sendAsync(metrics.getModel(), metrics);
MetricsContextHolder.getInstance().sendAsync(metrics.getModel(), metrics);

log.debug("Collecting jvm information ends.");
}
Expand Down
2 changes: 1 addition & 1 deletion dinky-web/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ This project uses [Ant Design Pro](https://pro.ant.design) for initialization. H
## Environment preparation

| Environment | Version | Remarks |
|-------------|---------|---------|
| ----------- | ------- | ------- |
| node | 18+ | |
| npm | 10+ | |

Expand Down
8 changes: 4 additions & 4 deletions dinky-web/README_zh_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@

## 环境准备

| 环境 | 版本 | 备注 |
|------|-----|----|
| node | 18+ | |
| npm | 10+ | |
| 环境 | 版本 | 备注 |
| ---- | ---- | ---- |
| node | 18+ | |
| npm | 10+ | |

> 自行配置相关环境变量
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ const DagDataNode = (props: any) => {
ratio > 0.75
? '#cf1322'
: ratio > 0.5
? '#d46b08'
: ratio > 0.25
? '#d4b106'
: '#3f8600',
? '#d46b08'
: ratio > 0.25
? '#d4b106'
: '#3f8600',
fontSize: 10
}}
/>
Expand All @@ -65,10 +65,10 @@ const DagDataNode = (props: any) => {
ratio > 0.75
? '#3f8600'
: ratio > 0.5
? '#d4b106'
: ratio > 0.25
? '#d46b08'
: '#cf1322',
? '#d4b106'
: ratio > 0.25
? '#d46b08'
: '#cf1322',
fontSize: 10
}}
/>
Expand Down
2 changes: 1 addition & 1 deletion dinky-web/src/components/Flink/OptionsSelect/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import { l } from '@/utils/intl';
import { ProFormSelect } from '@ant-design/pro-components';
import { ProFormSelectProps } from '@ant-design/pro-form/es/components/Select';
import { Divider, Typography } from 'antd';
import React from "react";
import React from 'react';

const { Link } = Typography;

Expand Down
25 changes: 12 additions & 13 deletions dinky-web/src/components/Flink/UdfSelect/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,28 @@
*
*/

import {ProFormSelect} from '@ant-design/pro-components';
import {ProFormSelectProps} from '@ant-design/pro-form/es/components/Select';
import {Divider} from 'antd';
import React from "react";
import { ProFormSelect } from '@ant-design/pro-components';
import { ProFormSelectProps } from '@ant-design/pro-form/es/components/Select';
import { Divider } from 'antd';
import React from 'react';

export type FlinkUdfOptionsProps = ProFormSelectProps & {};

const FlinkUdfOptionsSelect = (props: FlinkUdfOptionsProps) => {

const renderTemplateDropDown = (item: any) => {
return (
<>
<Divider style={{ margin: '8px 0' }} />
{item}
</>
<>
<Divider style={{ margin: '8px 0' }} />
{item}
</>
);
};

return (
<ProFormSelect
{...props}
fieldProps={{ dropdownRender: (item) => renderTemplateDropDown(item), ...props.fieldProps}}
/>
<ProFormSelect
{...props}
fieldProps={{ dropdownRender: (item) => renderTemplateDropDown(item), ...props.fieldProps }}
/>
);
};

Expand Down
95 changes: 49 additions & 46 deletions dinky-web/src/components/LineageGraph/lineage-dag-ext.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -92,54 +92,57 @@ export default class LineageDagExt extends LineageDag {
edges: result.edges
};

setTimeout(() => {
let tmpEdges = result.edges;
result.edges = [];
// this.canvas.wrapper.style.visibility = 'hidden';
this.canvas.draw(result, () => {
this.canvas.relayout(
{
edges: tmpEdges.map((item) => {
return {
source: item.sourceNode,
target: item.targetNode
};
})
},
true
);
// this.canvas.wrapper.style.visibility = 'visible';
this.canvas.addEdges(tmpEdges, true);

let minimap = _.get(this, 'props.config.minimap', {});

const minimapCfg = _.assign({}, minimap.config, {
events: ['system.node.click', 'system.canvas.click']
});

if (minimap && minimap.enable) {
this.canvas.setMinimap(true, minimapCfg);
}
setTimeout(
() => {
let tmpEdges = result.edges;
result.edges = [];
// this.canvas.wrapper.style.visibility = 'hidden';
this.canvas.draw(result, () => {
this.canvas.relayout(
{
edges: tmpEdges.map((item) => {
return {
source: item.sourceNode,
target: item.targetNode
};
})
},
true
);
// this.canvas.wrapper.style.visibility = 'visible';
this.canvas.addEdges(tmpEdges, true);

let minimap = _.get(this, 'props.config.minimap', {});

const minimapCfg = _.assign({}, minimap.config, {
events: ['system.node.click', 'system.canvas.click']
});

if (minimap && minimap.enable) {
this.canvas.setMinimap(true, minimapCfg);
}

if (_.get(this, 'props.config.gridMode')) {
this.canvas.setGridMode(true, _.assign({}, _.get(this, 'props.config.gridMode', {})));
}
if (_.get(this, 'props.config.gridMode')) {
this.canvas.setGridMode(true, _.assign({}, _.get(this, 'props.config.gridMode', {})));
}

if (result.nodes.length !== 0) {
this.canvas.focusCenterWithAnimate();
this._isFirstFocus = true;
}
if (result.nodes.length !== 0) {
this.canvas.focusCenterWithAnimate();
this._isFirstFocus = true;
}

this.forceUpdate();
this.props.onLoaded && this.props.onLoaded(this.canvas);
});
this.canvas.on('system.node.click', (data) => {
let node = data.node;
this.canvas.focus(node.id);
});
this.canvas.on('system.canvas.click', () => {
this.canvas.unfocus();
});
}, _.get(this.props, 'config.delayDraw', 0));
this.forceUpdate();
this.props.onLoaded && this.props.onLoaded(this.canvas);
});
this.canvas.on('system.node.click', (data) => {
let node = data.node;
this.canvas.focus(node.id);
});
this.canvas.on('system.canvas.click', () => {
this.canvas.unfocus();
});
},
_.get(this.props, 'config.delayDraw', 0)
);
}
}
9 changes: 6 additions & 3 deletions dinky-web/src/locales/en-US/pages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -450,8 +450,10 @@ export default {
'pages.datastudio.label.jobConfig.addConfig.params': 'parameters',
'pages.datastudio.label.jobConfig.addConfig.value': 'value',
'pages.datastudio.label.udf': 'Udf Item',
'pages.datastudio.label.udf.tip': 'Inject UDF item, Automatically add statement `create temporary function [functionName] as [className]` at the beginning of the SQL statement',
'pages.datastudio.label.udf.duplicate.tip': 'The class [className] selected this time already exists and duplicate injection is not allowed. Please reselect or cancel injection (delete and change line).',
'pages.datastudio.label.udf.tip':
'Inject UDF item, Automatically add statement `create temporary function [functionName] as [className]` at the beginning of the SQL statement',
'pages.datastudio.label.udf.duplicate.tip':
'The class [className] selected this time already exists and duplicate injection is not allowed. Please reselect or cancel injection (delete and change line).',
'pages.datastudio.label.udf.injectUdf': 'Inject UDF item',
'pages.datastudio.label.udf.name': 'function name',
'pages.datastudio.label.udf.className': 'class name',
Expand Down Expand Up @@ -1006,7 +1008,8 @@ export default {
'Support for a single or bulk upload. Strictly prohibited from uploading company data or\n other banned files.',
'rc.resource.filelist': 'File list',
'rc.resource.sync': 'Sync remote files',
'rc.resource.sync.confirm': 'Please note that this operation will delete all records in the database and will affect running jobs as well as corresponding resource files referenced in UDF management, resulting in job failure. And UDF cannot be used in UDF management Please operate with caution!! Please confirm if you want to continue?',
'rc.resource.sync.confirm':
'Please note that this operation will delete all records in the database and will affect running jobs as well as corresponding resource files referenced in UDF management, resulting in job failure. And UDF cannot be used in UDF management Please operate with caution!! Please confirm if you want to continue?',
'rc.resource.copy_to_add_custom_jar': 'Copy as ADD CUSTOMJAR syntax',
'rc.resource.copy_to_add_jar': 'Copy as ADD JAR syntax',
'rc.resource.copy_to_add_file': 'Copy as ADD FILE syntax',
Expand Down
9 changes: 6 additions & 3 deletions dinky-web/src/locales/zh-CN/pages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -429,8 +429,10 @@ export default {
'pages.datastudio.label.jobConfig.addConfig.params': '参数',
'pages.datastudio.label.jobConfig.addConfig.value': '值',
'pages.datastudio.label.udf': '注入UDF算子',
'pages.datastudio.label.udf.tip': '注入UDF算子, 自动在所有语句前注入`create temporary function [functionName] as [className]` 语句',
'pages.datastudio.label.udf.duplicate.tip': '此次选择的类[className]已经存在,不允许重复注入,请重新选择,或者取消注入(删除改行即可)。',
'pages.datastudio.label.udf.tip':
'注入UDF算子, 自动在所有语句前注入`create temporary function [functionName] as [className]` 语句',
'pages.datastudio.label.udf.duplicate.tip':
'此次选择的类[className]已经存在,不允许重复注入,请重新选择,或者取消注入(删除改行即可)。',
'pages.datastudio.label.udf.injectUdf': '注入UDF',
'pages.datastudio.label.udf.name': '函数名称',
'pages.datastudio.label.udf.className': '类名',
Expand Down Expand Up @@ -962,7 +964,8 @@ export default {
'rc.resource.upload.tip2': '支持单个或批量上传。严禁上传公司数据或其他禁止上传的文件。',
'rc.resource.filelist': '文件列表',
'rc.resource.sync': '同步目录结构',
'rc.resource.sync.confirm': '请注意: 该操作会删除数据库内的所有记录,且会关系到运行中的作业,以及UDF管理中引用的对应资源文件.从而导致作业运行失败。以及在 UDF管理中的 UDF 无法被使用. 请谨慎操作!! 请确认是否继续? ',
'rc.resource.sync.confirm':
'请注意: 该操作会删除数据库内的所有记录,且会关系到运行中的作业,以及UDF管理中引用的对应资源文件.从而导致作业运行失败。以及在 UDF管理中的 UDF 无法被使用. 请谨慎操作!! 请确认是否继续? ',
'rc.resource.copy_to_add_custom_jar': '复制为 ADD CUSTOMJAR 语法',
'rc.resource.copy_to_add_jar': '复制为 ADD JAR 语法',
'rc.resource.copy_to_add_file': '复制为 ADD FILE 语法',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,10 +325,10 @@ const MenuList: React.FC = () => {
{sysMenuValue?.id && editOpen
? l('menu.edit')
: !sysMenuValue?.id && addedOpen && !isRootMenu
? l('right.menu.addSub')
: !sysMenuValue?.id && addedOpen && isRootMenu
? l('right.menu.createRoot')
: ''}
? l('right.menu.addSub')
: !sysMenuValue?.id && addedOpen && isRootMenu
? l('right.menu.createRoot')
: ''}
</>
);
};
Expand Down
Loading

0 comments on commit 383babb

Please sign in to comment.