Skip to content

Commit

Permalink
[Refactor] Refactor alert module. (#3023)
Browse files Browse the repository at this point in the history
* [Refactor] Refactor alert module.

* address comment
  • Loading branch information
GOODBOY008 authored Sep 10, 2023
1 parent 15df598 commit 4390d06
Show file tree
Hide file tree
Showing 18 changed files with 146 additions and 197 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.Parameters;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.PostMapping;
Expand All @@ -51,13 +51,14 @@
@Tag(name = "ALERT_TAG")
@Slf4j
@Validated
@RequiredArgsConstructor
@RestController
@RequestMapping("/flink/alert")
public class AlertController {

@Autowired private AlertConfigService alertConfigService;
private final AlertConfigService alertConfigService;

@Autowired private AlertService alertService;
private final AlertService alertService;

@Operation(summary = "Create alert config")
@PostMapping(value = "/add")
Expand Down Expand Up @@ -126,8 +127,6 @@ public RestResponse sendAlert(Long id) throws AlertException {
DateUtils.format(date, DateUtils.fullFormat(), TimeZone.getDefault()));
alertTemplate.setEndTime(DateUtils.format(date, DateUtils.fullFormat(), TimeZone.getDefault()));
alertTemplate.setDuration("");
boolean alert =
alertService.alert(AlertConfigParams.of(alertConfigService.getById(id)), alertTemplate);
return RestResponse.success(alert);
return RestResponse.success(alertService.alert(id, alertTemplate));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public class Application implements Serializable {

/** alert id */
@TableField(updateStrategy = FieldStrategy.IGNORED)
private Integer alertId;
private Long alertId;

private String args;
/** application module */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public class FlinkCluster implements Serializable {
private Date endTime;

@TableField(updateStrategy = FieldStrategy.IGNORED)
private Integer alertId;
private Long alertId;

private transient Integer allJobs = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,15 @@

package org.apache.streampark.console.core.enums;

import org.apache.streampark.console.core.service.alert.AlertNotifyService;
import org.apache.streampark.console.core.service.alert.impl.DingTalkAlertNotifyServiceImpl;
import org.apache.streampark.console.core.service.alert.impl.EmailAlertNotifyServiceImpl;
import org.apache.streampark.console.core.service.alert.impl.HttpCallbackAlertNotifyServiceImpl;
import org.apache.streampark.console.core.service.alert.impl.LarkAlertNotifyServiceImpl;
import org.apache.streampark.console.core.service.alert.impl.WeComAlertNotifyServiceImpl;

import com.fasterxml.jackson.annotation.JsonValue;
import lombok.Getter;
import org.springframework.util.CollectionUtils;

import java.util.ArrayList;
Expand All @@ -27,27 +35,32 @@
import java.util.Map;

/** The AlertType enum represents different types of alerts that can be used for notifications. */
@Getter
public enum AlertType {

/** Email */
EMAIL(1),
EMAIL(1, EmailAlertNotifyServiceImpl.class),

/** Ding talk */
DING_TALK(2),
DING_TALK(2, DingTalkAlertNotifyServiceImpl.class),

/** WeChat work */
WE_COM(4),
WE_COM(4, WeComAlertNotifyServiceImpl.class),

/** Http callback */
HTTP_CALLBACK(8),
HTTP_CALLBACK(8, HttpCallbackAlertNotifyServiceImpl.class),

/** Lark */
LARK(16);
LARK(16, LarkAlertNotifyServiceImpl.class);

/** The empty level */
private static final Integer EMPTY_LEVEL = 0;

/** Get the alert type by the code */
private final Integer code;
@JsonValue private final Integer code;

/** Holds the reference to a Class object. */
private final Class<? extends AlertNotifyService> clazz;

/** A cache map used to quickly get the alert type from an integer code */
private static final Map<Integer, AlertType> CACHE_MAP = createCacheMap();
Expand All @@ -60,13 +73,9 @@ private static Map<Integer, AlertType> createCacheMap() {
return Collections.unmodifiableMap(map);
}

AlertType(Integer code) {
AlertType(Integer code, Class<? extends AlertNotifyService> clazz) {
this.code = code;
}

@JsonValue
public int getCode() {
return this.code;
this.clazz = clazz;
}

public static List<AlertType> decode(Integer level) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,19 @@
import org.apache.streampark.console.core.bean.AlertConfigParams;
import org.apache.streampark.console.core.bean.AlertTemplate;

/**
* This interface defines a service for sending alert notifications, it has multiple
* implementations.
*/
public interface AlertNotifyService {

/**
* Performs an alert with the given alert configuration parameters and alert template.
*
* @param alertConfig alert configuration parameters.
* @param template alert template to use.
* @return true if the alert was successfully triggered, false otherwise.
* @throws AlertException if an error occurs while performing the alert.
*/
boolean doAlert(AlertConfigParams alertConfig, AlertTemplate template) throws AlertException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,17 @@

package org.apache.streampark.console.core.service.alert;

import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.console.base.exception.AlertException;
import org.apache.streampark.console.core.bean.AlertConfigParams;
import org.apache.streampark.console.core.bean.AlertTemplate;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.enums.CheckPointStatus;
import org.apache.streampark.console.core.enums.FlinkAppState;

/** The AlertService interface represents a service for sending alert. */
public interface AlertService {

void alert(Application application, CheckPointStatus checkPointStatus);

void alert(Application application, FlinkAppState appState);

void alert(FlinkCluster flinkCluster, ClusterState clusterState);

boolean alert(AlertConfigParams params, AlertTemplate alertTemplate) throws AlertException;
/**
* Sends an alert based on the given alert configuration ID and alert template.
*
* @param alertConfigId the ID of the alert configuration
* @param alertTemplate the alert template to use for generating the alert content
* @return true if the alert is sent successfully, false otherwise
*/
boolean alert(Long alertConfigId, AlertTemplate alertTemplate);
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class AlertConfigServiceImpl extends ServiceImpl<AlertConfigMapper, Alert
@Override
public IPage<AlertConfigParams> page(AlertConfigParams params, RestRequest request) {
// build query conditions
LambdaQueryWrapper<AlertConfig> wrapper = new LambdaQueryWrapper();
LambdaQueryWrapper<AlertConfig> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(params.getUserId() != null, AlertConfig::getUserId, params.getUserId());

Page<AlertConfig> page = new MybatisPager<AlertConfig>().getDefaultPage(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,127 +17,83 @@

package org.apache.streampark.console.core.service.alert.impl;

import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.common.util.Utils;
import org.apache.streampark.console.base.exception.AlertException;
import org.apache.streampark.console.base.util.SpringContextUtils;
import org.apache.streampark.console.core.bean.AlertConfigParams;
import org.apache.streampark.console.core.bean.AlertTemplate;
import org.apache.streampark.console.core.entity.AlertConfig;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.enums.AlertType;
import org.apache.streampark.console.core.enums.CheckPointStatus;
import org.apache.streampark.console.core.enums.FlinkAppState;
import org.apache.streampark.console.core.service.alert.AlertConfigService;
import org.apache.streampark.console.core.service.alert.AlertNotifyService;
import org.apache.streampark.console.core.service.alert.AlertService;

import org.apache.flink.api.java.tuple.Tuple2;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;

import java.util.List;

@Slf4j
@Service
@RequiredArgsConstructor
public class AlertServiceImpl implements AlertService {
@Autowired private AlertConfigService alertConfigService;

@Override
public void alert(Application application, CheckPointStatus checkPointStatus) {
AlertTemplate alertTemplate = AlertTemplate.of(application, checkPointStatus);
alert(application.getAlertId(), alertTemplate);
}

@Override
public void alert(Application application, FlinkAppState appState) {
AlertTemplate alertTemplate = AlertTemplate.of(application, appState);
alert(application.getAlertId(), alertTemplate);
}
private final AlertConfigService alertConfigService;

@Override
public void alert(FlinkCluster flinkCluster, ClusterState clusterState) {
AlertTemplate alertTemplate = AlertTemplate.of(flinkCluster, clusterState);
alert(flinkCluster.getAlertId(), alertTemplate);
}
public boolean alert(Long alertConfigId, AlertTemplate alertTemplate) {

private void alert(Integer alertId, AlertTemplate alertTemplate) {
if (alertId == null) {
return;
if (alertConfigId == null) {
log.warn("alertConfigId is null");
return false;
}
AlertConfig alertConfig = alertConfigService.getById(alertId);
AlertConfig alertConfig = alertConfigService.getById(alertConfigId);
try {
alert(AlertConfigParams.of(alertConfig), alertTemplate);
AlertConfigParams params = AlertConfigParams.of(alertConfig);
List<AlertType> alertTypes = AlertType.decode(params.getAlertType());
if (CollectionUtils.isEmpty(alertTypes)) {
return true;
}
// No use thread pool, ensure that the alarm can be sent successfully
Tuple2<Boolean, AlertException> reduce =
alertTypes.stream()
.map(
alertType -> {
try {
boolean alertRes =
SpringContextUtils.getBean(alertType.getClazz())
.doAlert(params, alertTemplate);
return new Tuple2<Boolean, AlertException>(alertRes, null);
} catch (AlertException e) {
return new Tuple2<>(false, e);
}
})
.reduce(
new Tuple2<>(true, null),
(tp1, tp2) -> {
boolean alertResult = tp1.f0 & tp2.f0;
if (tp1.f1 == null && tp2.f1 == null) {
return new Tuple2<>(tp1.f0 & tp2.f0, null);
}
if (tp1.f1 != null && tp2.f1 != null) {
// merge multiple exception, and keep the details of the first exception
AlertException alertException =
new AlertException(
tp1.f1.getMessage() + "\n" + tp2.f1.getMessage(), tp1.f1);
return new Tuple2<>(alertResult, alertException);
}
return new Tuple2<>(alertResult, tp1.f1 == null ? tp2.f1 : tp1.f1);
});
if (reduce.f1 != null) {
throw reduce.f1;
}

return reduce.f0;
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}

@Override
public boolean alert(AlertConfigParams params, AlertTemplate alertTemplate)
throws AlertException {
List<AlertType> alertTypes = AlertType.decode(params.getAlertType());
if (CollectionUtils.isEmpty(alertTypes)) {
return true;
}
// No use thread pool, ensure that the alarm can be sent successfully
Tuple2<Boolean, AlertException> reduce =
alertTypes.stream()
.map(
alertType -> {
try {
Class<? extends AlertNotifyService> notifyServiceClass =
getAlertServiceImpl(alertType);
Utils.notNull(notifyServiceClass);
boolean alertRes =
SpringContextUtils.getBean(notifyServiceClass)
.doAlert(params, alertTemplate);
return new Tuple2<Boolean, AlertException>(alertRes, null);
} catch (AlertException e) {
return new Tuple2<>(false, e);
}
})
.reduce(
new Tuple2<>(true, null),
(tp1, tp2) -> {
boolean alertResult = tp1.f0 & tp2.f0;
if (tp1.f1 == null && tp2.f1 == null) {
return new Tuple2<>(tp1.f0 & tp2.f0, null);
}
if (tp1.f1 != null && tp2.f1 != null) {
// merge multiple exception, and keep the details of the first exception
AlertException alertException =
new AlertException(
tp1.f1.getMessage() + "\n" + tp2.f1.getMessage(), tp1.f1);
return new Tuple2<>(alertResult, alertException);
}
return new Tuple2<>(alertResult, tp1.f1 == null ? tp2.f1 : tp1.f1);
});
if (reduce.f1 != null) {
throw reduce.f1;
}

return reduce.f0;
}

private Class<? extends AlertNotifyService> getAlertServiceImpl(AlertType alertType) {
switch (alertType) {
case EMAIL:
return EmailAlertNotifyServiceImpl.class;
case DING_TALK:
return DingTalkAlertNotifyServiceImpl.class;
case WE_COM:
return WeComAlertNotifyServiceImpl.class;
case LARK:
return LarkAlertNotifyServiceImpl.class;
case HTTP_CALLBACK:
return HttpCallbackAlertNotifyServiceImpl.class;
default:
return null;
}
return false;
}
}
Loading

0 comments on commit 4390d06

Please sign in to comment.