Skip to content

Commit

Permalink
Edge - alarm comment support (#94)
Browse files Browse the repository at this point in the history
  • Loading branch information
AndriiLandiak authored Jan 23, 2024
1 parent cfc5635 commit 4145c22
Show file tree
Hide file tree
Showing 35 changed files with 376 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import org.springframework.transaction.event.TransactionalEventListener;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.alarm.AlarmComment;
import org.thingsboard.server.common.data.cloud.CloudEventType;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.relation.EntityRelation;
Expand All @@ -37,8 +39,6 @@
import java.util.Arrays;
import java.util.List;

import static org.thingsboard.server.service.entitiy.DefaultTbNotificationEntityService.edgeTypeByActionType;

/**
* This event listener does not support async event processing because relay on ThreadLocal
* Another possible approach is to implement a special annotation and a bunch of classes similar to TransactionalApplicationListener
Expand Down Expand Up @@ -87,13 +87,17 @@ public void handleEvent(SaveEntityEvent<?> event) {
return;
}
try {
if (event.getEntityId() != null && !saveEventSupportableEntityTypes.contains(event.getEntityId().getEntityType())) {
if (event.getEntityId() != null && !saveEventSupportableEntityTypes.contains(event.getEntityId().getEntityType())
&& !(event.getEntity() instanceof AlarmComment)) {
return;
}
log.trace("SaveEntityEvent called: {}", event);
EdgeEventActionType action = Boolean.TRUE.equals(event.getAdded()) ? EdgeEventActionType.ADDED : EdgeEventActionType.UPDATED;
boolean isCreated = Boolean.TRUE.equals(event.getCreated());
String body = getBodyMsgForEntityEvent(event.getEntity());
CloudEventType cloudEventType = getCloudEventTypeForEntityEvent(event.getEntity());
EdgeEventActionType action = getActionForEntityEvent(event.getEntity(), isCreated);
tbClusterService.sendNotificationMsgToCloud(event.getTenantId(), event.getEntityId(),
null, null, action);
body, cloudEventType, action);
} catch (Exception e) {
log.error("failed to process SaveEntityEvent: {}", event);
}
Expand All @@ -105,12 +109,15 @@ public void handleEvent(DeleteEntityEvent<?> event) {
return;
}
try {
if (event.getEntityId() != null && !supportableEntityTypes.contains(event.getEntityId().getEntityType())) {
if (event.getEntityId() != null && !supportableEntityTypes.contains(event.getEntityId().getEntityType())
&& !(event.getEntity() instanceof AlarmComment)) {
return;
}
log.trace("DeleteEntityEvent called: {}", event);
CloudEventType type = getCloudEventTypeForEntityEvent(event.getEntity());
EdgeEventActionType actionType = getEdgeEventActionTypeForEntityEvent(event.getEntity());
tbClusterService.sendNotificationMsgToCloud(event.getTenantId(), event.getEntityId(),
JacksonUtil.toString(event.getEntity()), null, EdgeEventActionType.DELETED);
JacksonUtil.toString(event.getEntity()), type, actionType);
} catch (Exception e) {
log.error("failed to process DeleteEntityEvent: {}", event);
}
Expand All @@ -127,7 +134,7 @@ public void handleEvent(ActionEntityEvent event) {
}
log.trace("ActionEntityEvent called: {}", event);
tbClusterService.sendNotificationMsgToCloud(event.getTenantId(), event.getEntityId(),
event.getBody(), null, edgeTypeByActionType(event.getActionType()));
event.getBody(), null, EdgeUtils.getEdgeEventActionTypeByActionType(event.getActionType()));
} catch (Exception e) {
log.error("failed to process ActionEntityEvent: {}", event);
}
Expand All @@ -150,9 +157,37 @@ public void handleEvent(RelationActionEvent event) {
}
log.trace("RelationActionEvent called: {}", event);
tbClusterService.sendNotificationMsgToCloud(event.getTenantId(), null,
JacksonUtil.toString(event.getRelation()), CloudEventType.RELATION, edgeTypeByActionType(event.getActionType()));
JacksonUtil.toString(event.getRelation()), CloudEventType.RELATION, EdgeUtils.getEdgeEventActionTypeByActionType(event.getActionType()));
} catch (Exception e) {
log.error("failed to process RelationActionEvent: {}", event);
}
}

private CloudEventType getCloudEventTypeForEntityEvent(Object entity) {
if (entity instanceof AlarmComment) {
return CloudEventType.ALARM_COMMENT;
}
return null;
}

private EdgeEventActionType getEdgeEventActionTypeForEntityEvent(Object entity) {
if (entity instanceof AlarmComment) {
return EdgeEventActionType.DELETED_COMMENT;
}
return EdgeEventActionType.DELETED;
}

private String getBodyMsgForEntityEvent(Object entity) {
if (entity instanceof AlarmComment) {
return JacksonUtil.toString(entity);
}
return null;
}

private EdgeEventActionType getActionForEntityEvent(Object entity, boolean isCreated) {
if (entity instanceof AlarmComment) {
return isCreated ? EdgeEventActionType.ADDED_COMMENT : EdgeEventActionType.UPDATED_COMMENT;
}
return isCreated ? EdgeEventActionType.ADDED : EdgeEventActionType.UPDATED;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,9 @@ private List<UplinkMsg> convertToUplinkMsgsPack(List<CloudEvent> cloudEvents) {
case RELATION_DELETED:
case ASSIGNED_TO_CUSTOMER:
case UNASSIGNED_FROM_CUSTOMER:
case ADDED_COMMENT:
case UPDATED_COMMENT:
case DELETED_COMMENT:
uplinkMsg = convertEntityEventToUplink(this.tenantId, cloudEvent);
break;
case ATTRIBUTES_UPDATED:
Expand Down Expand Up @@ -458,6 +461,8 @@ private UplinkMsg convertEntityEventToUplink(TenantId tenantId, CloudEvent cloud
return deviceProfileProcessor.convertDeviceProfileEventToUplink(cloudEvent, edgeVersion);
case ALARM:
return alarmProcessor.convertAlarmEventToUplink(cloudEvent, edgeVersion);
case ALARM_COMMENT:
return alarmProcessor.convertAlarmCommentEventToUplink(cloudEvent, edgeVersion);
case ASSET:
return assetProcessor.convertAssetEventToUplink(cloudEvent, edgeVersion);
case ASSET_PROFILE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.CloudUtils;
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.alarm.AlarmComment;
import org.thingsboard.server.common.data.cloud.CloudEventType;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.id.AlarmId;
Expand Down Expand Up @@ -95,6 +96,9 @@ public void pushNotificationToCloud(TransportProtos.CloudNotificationMsgProto cl
case RELATION:
future = processRelation(tenantId, cloudNotificationMsg);
break;
case ALARM_COMMENT:
future = processAlarmComment(tenantId, cloudNotificationMsg);
break;
default:
log.warn("Cloud event type [{}] is not designed to be pushed to cloud", cloudEventType);
future = Futures.immediateFuture(null);
Expand All @@ -121,7 +125,7 @@ private void callBackFailure(TransportProtos.CloudNotificationMsgProto cloudNoti
}


private ListenableFuture<Void> processEntity(TenantId tenantId, TransportProtos.CloudNotificationMsgProto cloudNotificationMsg) throws Exception {
private ListenableFuture<Void> processEntity(TenantId tenantId, TransportProtos.CloudNotificationMsgProto cloudNotificationMsg) {
EdgeEventActionType cloudEventActionType = EdgeEventActionType.valueOf(cloudNotificationMsg.getCloudEventAction());
CloudEventType cloudEventType = CloudEventType.valueOf(cloudNotificationMsg.getCloudEventType());
EntityId entityId = EntityIdFactory.getByCloudEventTypeAndUuid(cloudEventType, new UUID(cloudNotificationMsg.getEntityIdMSB(), cloudNotificationMsg.getEntityIdLSB()));
Expand All @@ -138,45 +142,37 @@ private ListenableFuture<Void> processEntity(TenantId tenantId, TransportProtos.
}
}

private ListenableFuture<Void> processAlarm(TenantId tenantId, TransportProtos.CloudNotificationMsgProto cloudNotificationMsg) throws Exception {
private ListenableFuture<Void> processAlarm(TenantId tenantId, TransportProtos.CloudNotificationMsgProto cloudNotificationMsg) {
EdgeEventActionType actionType = EdgeEventActionType.valueOf(cloudNotificationMsg.getCloudEventAction());
AlarmId alarmId = new AlarmId(new UUID(cloudNotificationMsg.getEntityIdMSB(), cloudNotificationMsg.getEntityIdLSB()));
switch (actionType) {
case DELETED:
Alarm deletedAlarm = JacksonUtil.OBJECT_MAPPER.readValue(cloudNotificationMsg.getEntityBody(), Alarm.class);
return cloudEventService.saveCloudEventAsync(tenantId,
CloudEventType.ALARM,
actionType,
alarmId,
JacksonUtil.OBJECT_MAPPER.valueToTree(deletedAlarm),
0L);
default:
ListenableFuture<Alarm> future = alarmService.findAlarmByIdAsync(tenantId, alarmId);
return Futures.transformAsync(future, alarm -> {
if (alarm != null) {
CloudEventType cloudEventType = CloudUtils.getCloudEventTypeByEntityType(alarm.getOriginator().getEntityType());
if (cloudEventType != null) {
return cloudEventService.saveCloudEventAsync(tenantId,
CloudEventType.ALARM,
EdgeEventActionType.valueOf(cloudNotificationMsg.getCloudEventAction()),
alarmId,
null,
0L);
}
}
return Futures.immediateFuture(null);
}, dbCallBackExecutor);
if (EdgeEventActionType.DELETED.equals(actionType)) {
Alarm deletedAlarm = JacksonUtil.fromString(cloudNotificationMsg.getEntityBody(), Alarm.class);
return cloudEventService.saveCloudEventAsync(tenantId, CloudEventType.ALARM, actionType, alarmId, JacksonUtil.valueToTree(deletedAlarm), 0L);
}
ListenableFuture<Alarm> future = alarmService.findAlarmByIdAsync(tenantId, alarmId);
return Futures.transformAsync(future, alarm -> {
if (alarm != null) {
CloudEventType cloudEventType = CloudUtils.getCloudEventTypeByEntityType(alarm.getOriginator().getEntityType());
if (cloudEventType != null) {
return cloudEventService.saveCloudEventAsync(tenantId, CloudEventType.ALARM, EdgeEventActionType.valueOf(cloudNotificationMsg.getCloudEventAction()), alarmId, null, 0L);
}
}
return Futures.immediateFuture(null);
}, dbCallBackExecutor);
}

public ListenableFuture<Void> processAlarmComment(TenantId tenantId, TransportProtos.CloudNotificationMsgProto cloudNotificationMsg) {
EdgeEventActionType actionType = EdgeEventActionType.valueOf(cloudNotificationMsg.getCloudEventAction());
AlarmId alarmId = new AlarmId(new UUID(cloudNotificationMsg.getEntityIdMSB(), cloudNotificationMsg.getEntityIdLSB()));
AlarmComment alarmComment = JacksonUtil.fromString(cloudNotificationMsg.getEntityBody(), AlarmComment.class);
if (alarmComment == null) {
return Futures.immediateFuture(null);
}
return cloudEventService.saveCloudEventAsync(tenantId, CloudEventType.ALARM_COMMENT, actionType, alarmId, JacksonUtil.valueToTree(alarmComment), 0L);
}

private ListenableFuture<Void> processRelation(TenantId tenantId, TransportProtos.CloudNotificationMsgProto cloudNotificationMsg) throws Exception {
EntityRelation relation = JacksonUtil.OBJECT_MAPPER.readValue(cloudNotificationMsg.getEntityBody(), EntityRelation.class);
return cloudEventService.saveCloudEventAsync(tenantId,
CloudEventType.RELATION,
EdgeEventActionType.valueOf(cloudNotificationMsg.getCloudEventAction()),
null,
JacksonUtil.OBJECT_MAPPER.valueToTree(relation),
0L);
private ListenableFuture<Void> processRelation(TenantId tenantId, TransportProtos.CloudNotificationMsgProto cloudNotificationMsg) {
EntityRelation relation = JacksonUtil.fromString(cloudNotificationMsg.getEntityBody(), EntityRelation.class);
return cloudEventService.saveCloudEventAsync(tenantId, CloudEventType.RELATION, EdgeEventActionType.valueOf(cloudNotificationMsg.getCloudEventAction()), null, JacksonUtil.valueToTree(relation), 0L);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.dao.cloud.CloudEventService;
import org.thingsboard.server.gen.edge.v1.AdminSettingsUpdateMsg;
import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg;
import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg;
import org.thingsboard.server.gen.edge.v1.AssetProfileUpdateMsg;
import org.thingsboard.server.gen.edge.v1.AssetUpdateMsg;
Expand Down Expand Up @@ -239,6 +240,11 @@ public ListenableFuture<List<Void>> processDownlinkMsg(TenantId tenantId,
result.add(alarmProcessor.processAlarmMsgFromCloud(tenantId, alarmUpdateMsg));
}
}
if (downlinkMsg.getAlarmCommentUpdateMsgCount() > 0) {
for (AlarmCommentUpdateMsg alarmCommentUpdateMsg : downlinkMsg.getAlarmCommentUpdateMsgList()) {
result.add(alarmProcessor.processAlarmCommentMsgFromCloud(tenantId, alarmCommentUpdateMsg));
}
}
if (downlinkMsg.getCustomerUpdateMsgCount() > 0) {
for (CustomerUpdateMsg customerUpdateMsg : downlinkMsg.getCustomerUpdateMsgList()) {
sequenceDependencyLock.lock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.alarm.AlarmComment;
import org.thingsboard.server.common.data.cloud.CloudEvent;
import org.thingsboard.server.common.data.id.AlarmId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg;
import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg;
import org.thingsboard.server.gen.edge.v1.EdgeVersion;
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
import org.thingsboard.server.gen.edge.v1.UplinkMsg;
import org.thingsboard.server.service.edge.rpc.constructor.alarm.AlarmMsgConstructor;
import org.thingsboard.server.service.edge.rpc.processor.alarm.BaseAlarmProcessor;

@Component
Expand All @@ -43,6 +47,15 @@ public ListenableFuture<Void> processAlarmMsgFromCloud(TenantId tenantId, AlarmU
}
}

public ListenableFuture<Void> processAlarmCommentMsgFromCloud(TenantId tenantId, AlarmCommentUpdateMsg alarmCommentUpdateMsg) {
try {
cloudSynchronizationManager.getSync().set(true);
return processAlarmCommentMsg(tenantId, alarmCommentUpdateMsg);
} finally {
cloudSynchronizationManager.getSync().remove();
}
}

public UplinkMsg convertAlarmEventToUplink(CloudEvent cloudEvent, EdgeVersion edgeVersion) {
AlarmUpdateMsg alarmUpdateMsg =
convertAlarmEventToAlarmMsg(cloudEvent.getTenantId(), cloudEvent.getEntityId(), cloudEvent.getAction(), cloudEvent.getEntityBody(), edgeVersion);
Expand All @@ -55,6 +68,24 @@ public UplinkMsg convertAlarmEventToUplink(CloudEvent cloudEvent, EdgeVersion ed
return null;
}

public UplinkMsg convertAlarmCommentEventToUplink(CloudEvent cloudEvent, EdgeVersion edgeVersion) {
UpdateMsgType msgType = getUpdateMsgType(cloudEvent.getAction());
AlarmComment alarmComment;
switch (cloudEvent.getAction()) {
case ADDED_COMMENT:
case UPDATED_COMMENT:
case DELETED_COMMENT:
alarmComment = JacksonUtil.convertValue(cloudEvent.getEntityBody(), AlarmComment.class);
return UplinkMsg.newBuilder()
.setUplinkMsgId(EdgeUtils.nextPositiveInt())
.addAlarmCommentUpdateMsg(((AlarmMsgConstructor) alarmMsgConstructorFactory
.getMsgConstructorByEdgeVersion(edgeVersion)).constructAlarmCommentUpdatedMsg(msgType, alarmComment))
.build();
default:
return null;
}
}

@Override
protected EntityId getAlarmOriginatorFromMsg(TenantId tenantId, AlarmUpdateMsg alarmUpdateMsg) {
Alarm alarm = JacksonUtil.fromString(alarmUpdateMsg.getEntity(), Alarm.class, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ private ListenableFuture<Void> processDeviceRpcRequestFromCloud(TenantId tenantI
private void reply(ToDeviceRpcRequest rpcRequest, int requestId, FromDeviceRpcResponse response) {
try {
Optional<RpcError> rpcError = response.getError();
ObjectNode body = JacksonUtil.OBJECT_MAPPER.createObjectNode();
ObjectNode body = JacksonUtil.newObjectNode();
body.put("requestUUID", rpcRequest.getId().toString());
body.put("expirationTime", rpcRequest.getExpirationTime());
body.put("oneway", rpcRequest.isOneway());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public UplinkMsg convertRelationRequestEventToUplink(CloudEvent cloudEvent) {
public UplinkMsg convertRelationEventToUplink(CloudEvent cloudEvent, EdgeVersion edgeVersion) {
UplinkMsg msg = null;
UpdateMsgType msgType = getUpdateMsgType(cloudEvent.getAction());
EntityRelation entityRelation = JacksonUtil.OBJECT_MAPPER.convertValue(cloudEvent.getEntityBody(), EntityRelation.class);
EntityRelation entityRelation = JacksonUtil.convertValue(cloudEvent.getEntityBody(), EntityRelation.class);
if (entityRelation != null) {
RelationUpdateMsg relationUpdateMsg = ((RelationMsgConstructor) relationMsgConstructorFactory.getMsgConstructorByEdgeVersion(edgeVersion))
.constructRelationUpdatedMsg(msgType, entityRelation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,13 @@ public Tenant save(Tenant tenant) throws Exception {
Tenant oldTenant = !created ? tenantService.findTenantById(tenant.getId()) : null;

Tenant savedTenant = checkNotNull(tenantService.saveTenant(tenant));
/* edge: DefaultRuleChains and DefaultEdgeRuleChains are configured on Cloud
if (created) {
installScripts.createDefaultRuleChains(savedTenant.getId());
installScripts.createDefaultEdgeRuleChains(savedTenant.getId());
installScripts.createDefaultTenantDashboards(savedTenant.getId(), null);
}
*/
tenantProfileCache.evict(savedTenant.getId());
notificationEntityService.notifyCreateOrUpdateTenant(savedTenant, created ?
ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED);
Expand Down
Loading

0 comments on commit 4145c22

Please sign in to comment.