Skip to content

Commit

Permalink
Edge - minor improvement
Browse files Browse the repository at this point in the history
  • Loading branch information
AndriiLandiak authored Feb 13, 2024
1 parent d909c4b commit d05a21d
Show file tree
Hide file tree
Showing 11 changed files with 291 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.cloud.CloudEventType;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.edge.EdgeEventType;
Expand Down Expand Up @@ -244,6 +245,8 @@ void processRpcRequest(TbActorCtx context, ToDeviceRpcRequestActorMsg msg) {
if (!persisted && request.isOneway() && sent) {
log.debug("[{}] RPC command response sent [{}][{}]!", deviceId, rpcId, requestId);
systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(rpcId, null, null));
} else if (!persisted && request.isOneway() && !sent) {
saveRpcResponseToCloudQueue(msg, requestId, rpcId); // edge only
} else {
registerPendingRpcRequest(context, msg, sent, rpcRequest, timeout);
}
Expand Down Expand Up @@ -1093,4 +1096,21 @@ void checkSessionsTimeout() {

}

// edge-only:
private void saveRpcResponseToCloudQueue(ToDeviceRpcRequestActorMsg msg, int requestId, UUID rpcId) {
ObjectNode body = JacksonUtil.newObjectNode();
body.put("requestId", requestId);
body.put("requestUUID", msg.getMsg().getId().toString());
body.put("rpcId", rpcId.toString());
body.put("serviceId", msg.getServiceId());
body.put("error", RpcError.NO_ACTIVE_CONNECTION.name());

try {
systemContext.getCloudEventService().saveCloudEventAsync(tenantId, CloudEventType.DEVICE, EdgeEventActionType.RPC_CALL,
deviceId, body, 0L).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public EdgeSettings findEdgeSettings(TenantId tenantId) {
attributesService.find(tenantId, tenantId, DataConstants.SERVER_SCOPE, DataConstants.EDGE_SETTINGS_ATTR_KEY).get();
if (attr.isPresent()) {
log.trace("Found current edge settings {}", attr.get().getValueAsString());
return JacksonUtil.OBJECT_MAPPER.readValue(attr.get().getValueAsString(), EdgeSettings.class);
return JacksonUtil.fromString(attr.get().getValueAsString(), EdgeSettings.class);
} else {
log.trace("Edge settings not found");
return null;
Expand All @@ -159,7 +159,7 @@ public EdgeSettings findEdgeSettings(TenantId tenantId) {
public ListenableFuture<List<String>> saveEdgeSettings(TenantId tenantId, EdgeSettings edgeSettings) {
try {
BaseAttributeKvEntry edgeSettingAttr =
new BaseAttributeKvEntry(new StringDataEntry(DataConstants.EDGE_SETTINGS_ATTR_KEY, JacksonUtil.OBJECT_MAPPER.writeValueAsString(edgeSettings)), System.currentTimeMillis());
new BaseAttributeKvEntry(new StringDataEntry(DataConstants.EDGE_SETTINGS_ATTR_KEY, JacksonUtil.toString(edgeSettings)), System.currentTimeMillis());
List<AttributeKvEntry> attributes =
Collections.singletonList(edgeSettingAttr);
return attributesService.save(tenantId, tenantId, DataConstants.SERVER_SCOPE, attributes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,30 +187,30 @@ private static void verifyWidgetBundles() {
.atMost(30, TimeUnit.SECONDS).
until(() -> {
try {
return edgeRestClient.getWidgetsBundles(new PageLink(100)).getTotalElements() == 25;
return edgeRestClient.getWidgetsBundles(new PageLink(100)).getTotalElements() == 26;
} catch (Throwable e) {
return false;
}
});

PageData<WidgetsBundle> pageData = edgeRestClient.getWidgetsBundles(new PageLink(100));

for (String widgetsBundlesAlias : pageData.getData().stream().map(WidgetsBundle::getAlias).collect(Collectors.toList())) {
for (WidgetsBundleId widgetsBundleId : pageData.getData().stream().map(WidgetsBundle::getId).collect(Collectors.toList())) {
Awaitility.await()
.pollInterval(1000, TimeUnit.MILLISECONDS)
.atMost(60, TimeUnit.SECONDS).
until(() -> {
try {
List<WidgetType> edgeBundleWidgetTypes = edgeRestClient.getBundleWidgetTypes(true, widgetsBundlesAlias);
List<WidgetType> cloudBundleWidgetTypes = cloudRestClient.getBundleWidgetTypes(true, widgetsBundlesAlias);
List<WidgetType> edgeBundleWidgetTypes = edgeRestClient.getBundleWidgetTypes(widgetsBundleId);
List<WidgetType> cloudBundleWidgetTypes = cloudRestClient.getBundleWidgetTypes(widgetsBundleId);
return cloudBundleWidgetTypes != null && edgeBundleWidgetTypes != null
&& edgeBundleWidgetTypes.size() == cloudBundleWidgetTypes.size();
} catch (Throwable e) {
return false;
}
});
List<WidgetType> edgeBundleWidgetTypes = edgeRestClient.getBundleWidgetTypes(true, widgetsBundlesAlias);
List<WidgetType> cloudBundleWidgetTypes = cloudRestClient.getBundleWidgetTypes(true, widgetsBundlesAlias);
List<WidgetType> edgeBundleWidgetTypes = edgeRestClient.getBundleWidgetTypes(widgetsBundleId);
List<WidgetType> cloudBundleWidgetTypes = cloudRestClient.getBundleWidgetTypes(widgetsBundleId);
Assert.assertNotNull("edgeBundleWidgetTypes can't be null", edgeBundleWidgetTypes);
Assert.assertNotNull("cloudBundleWidgetTypes can't be null", cloudBundleWidgetTypes);
}
Expand Down Expand Up @@ -245,8 +245,8 @@ private static RuleChainId updateRootRuleChain(RuleChainType ruleChainType, Stri
RuleChainMetaData ruleChainMetaData = new RuleChainMetaData();
ruleChainMetaData.setRuleChainId(rootRuleChainId);
ruleChainMetaData.setFirstNodeIndex(configuration.get("firstNodeIndex").asInt());
ruleChainMetaData.setNodes(Arrays.asList(JacksonUtil.OBJECT_MAPPER.treeToValue(configuration.get("nodes"), RuleNode[].class)));
ruleChainMetaData.setConnections(Arrays.asList(JacksonUtil.OBJECT_MAPPER.treeToValue(configuration.get("connections"), NodeConnectionInfo[].class)));
ruleChainMetaData.setNodes(Arrays.asList(JacksonUtil.treeToValue(configuration.get("nodes"), RuleNode[].class)));
ruleChainMetaData.setConnections(Arrays.asList(JacksonUtil.treeToValue(configuration.get("connections"), NodeConnectionInfo[].class)));
cloudRestClient.saveRuleChainMetaData(ruleChainMetaData);
return rootRuleChainId;
}
Expand Down Expand Up @@ -327,6 +327,7 @@ protected static void extendDeviceProfileData(DeviceProfile deviceProfile) {
List<DeviceProfileAlarm> alarms = new ArrayList<>();
DeviceProfileAlarm deviceProfileAlarm = new DeviceProfileAlarm();
deviceProfileAlarm.setAlarmType("High Temperature");
deviceProfileAlarm.setId("High Temperature");
AlarmRule alarmRule = new AlarmRule();
alarmRule.setAlarmDetails("Alarm Details");
AlarmCondition alarmCondition = new AlarmCondition();
Expand Down Expand Up @@ -668,11 +669,11 @@ protected Device saveAndAssignDeviceToEdge(String deviceType) {
}

protected List<AttributeKvEntry> sendAttributesUpdated(RestClient sourceRestClient, RestClient targetRestClient,
JsonObject attributesPayload, List<String> keys, String scope) throws Exception {
JsonObject attributesPayload, List<String> keys, String scope) {

Device device = saveAndAssignDeviceToEdge();

sourceRestClient.saveDeviceAttributes(device.getId(), scope, JacksonUtil.OBJECT_MAPPER.readTree(attributesPayload.toString()));
sourceRestClient.saveDeviceAttributes(device.getId(), scope, JacksonUtil.toJsonNode(attributesPayload.toString()));

Awaitility.await()
.pollInterval(500, TimeUnit.MILLISECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
*/
package org.thingsboard.server.msa.edge;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.gson.JsonObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Test;
Expand All @@ -25,10 +28,14 @@
import org.thingsboard.rest.client.RestClient;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.alarm.AlarmComment;
import org.thingsboard.server.common.data.alarm.AlarmCommentType;
import org.thingsboard.server.common.data.alarm.AlarmInfo;
import org.thingsboard.server.common.data.alarm.AlarmSearchStatus;
import org.thingsboard.server.common.data.id.AlarmId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.msa.AbstractContainerTest;
Expand All @@ -40,7 +47,7 @@
public class AlarmClientTest extends AbstractContainerTest {

@Test
public void testAlarms() throws Exception {
public void testAlarms() {
// create alarm
Device device = saveAndAssignDeviceToEdge(CUSTOM_DEVICE_PROFILE_NAME);

Expand All @@ -56,8 +63,8 @@ public void testAlarms() throws Exception {
telemetry.addProperty("temperature", 100);

ResponseEntity deviceTelemetryResponse = cloudRestClient.getRestTemplate()
.postForEntity(tbUrl + "/api/v1/{credentialsId}/telemetry",
JacksonUtil.OBJECT_MAPPER.readTree(telemetry.toString()),
.postForEntity(tbUrl + "/api/v1/" + accessToken + "/telemetry/",
JacksonUtil.toJsonNode(telemetry.toString()),
ResponseEntity.class,
accessToken);

Expand Down Expand Up @@ -123,16 +130,16 @@ public void sendAlarmToCloud() {
Awaitility.await()
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(30, TimeUnit.SECONDS)
.until(() -> cloudRestClient.getDeviceCredentialsByDeviceId(device.getId()).isPresent());
.until(() -> edgeRestClient.getDeviceCredentialsByDeviceId(device.getId()).isPresent());

DeviceCredentials deviceCredentials = cloudRestClient.getDeviceCredentialsByDeviceId(device.getId()).get();
DeviceCredentials deviceCredentials = edgeRestClient.getDeviceCredentialsByDeviceId(device.getId()).get();
String accessToken = deviceCredentials.getCredentialsId();

JsonObject telemetry = new JsonObject();
telemetry.addProperty("temperature", 100);

ResponseEntity deviceTelemetryResponse = cloudRestClient.getRestTemplate()
.postForEntity(tbUrl + "/api/v1/" + accessToken + "/telemetry",
ResponseEntity deviceTelemetryResponse = edgeRestClient.getRestTemplate()
.postForEntity(edgeUrl + "/api/v1/" + accessToken + "/telemetry",
JacksonUtil.toJsonNode(telemetry.toString()),
ResponseEntity.class);
Assert.assertTrue(deviceTelemetryResponse.getStatusCode().is2xxSuccessful());
Expand Down Expand Up @@ -179,4 +186,145 @@ private Optional<AlarmInfo> getLatestAlarmByEntityIdFromCloud(EntityId entityId)
return getLatestAnyAlarmByEntityId(entityId, cloudRestClient);
}

@Test
public void testAlarmComments() {
// create alarm
Device device = saveAndAssignDeviceToEdge(CUSTOM_DEVICE_PROFILE_NAME);

Awaitility.await()
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(30, TimeUnit.SECONDS)
.until(() -> cloudRestClient.getDeviceCredentialsByDeviceId(device.getId()).isPresent());

DeviceCredentials deviceCredentials = cloudRestClient.getDeviceCredentialsByDeviceId(device.getId()).get();
String accessToken = deviceCredentials.getCredentialsId();

JsonObject telemetry = new JsonObject();
telemetry.addProperty("temperature", 100);

ResponseEntity deviceTelemetryResponse = cloudRestClient.getRestTemplate()
.postForEntity(tbUrl + "/api/v1/" + accessToken + "/telemetry/",
JacksonUtil.toJsonNode(telemetry.toString()),
ResponseEntity.class,
accessToken);

Assert.assertTrue(deviceTelemetryResponse.getStatusCode().is2xxSuccessful());

Awaitility.await()
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(30, TimeUnit.SECONDS)
.until(() -> getLatestAlarmByEntityIdFromCloud(device.getId()).isPresent() && getLatestAlarmByEntityIdFromEdge(device.getId()).isPresent());

Alarm savedAlarm = getLatestAlarmByEntityIdFromCloud(device.getId()).get();

ObjectNode comment = JacksonUtil.newObjectNode().put("text", RandomStringUtils.randomAlphanumeric(10));
AlarmComment alarmComment = saveAlarmComment(savedAlarm.getId(), comment, cloudRestClient);
Awaitility.await()
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(30, TimeUnit.SECONDS)
.until(() -> edgeRestClient.getAlarmComments(savedAlarm.getId(), new PageLink(100)).getTotalElements() > 0 &&
edgeRestClient.getAlarmComments(savedAlarm.getId(), new PageLink(100)).getData().stream()
.anyMatch(ac -> ac.getComment().equals(alarmComment.getComment())
&& ac.getId().equals(alarmComment.getId())
&& ac.getAlarmId().equals(alarmComment.getAlarmId())));

comment = JacksonUtil.newObjectNode().put("text", RandomStringUtils.randomAlphanumeric(10));
alarmComment.setComment(comment);
AlarmComment updated = cloudRestClient.saveAlarmComment(savedAlarm.getId(), alarmComment);
Awaitility.await()
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(30, TimeUnit.SECONDS)
.until(() -> edgeRestClient.getAlarmComments(savedAlarm.getId(), new PageLink(100)).getTotalElements() > 0 &&
edgeRestClient.getAlarmComments(savedAlarm.getId(), new PageLink(100)).getData().stream()
.anyMatch(ac -> ac.getComment().get("text").equals(updated.getComment().get("text"))
&& ac.getComment().get("edited").asBoolean()
&& updated.getComment().get("edited").asBoolean()
&& ac.getId().equals(updated.getId())
&& ac.getAlarmId().equals(updated.getAlarmId())));

// delete alarm
cloudRestClient.deleteAlarm(savedAlarm.getId());
Awaitility.await()
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(30, TimeUnit.SECONDS)
.until(() -> getLatestAlarmByEntityIdFromEdge(device.getId()).isEmpty());

// cleanup
cloudRestClient.deleteDevice(device.getId());
}

@Test
public void sendAlarmCommentToCloud() {
// create alarm
Device device = saveAndAssignDeviceToEdge(CUSTOM_DEVICE_PROFILE_NAME);

Awaitility.await()
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(30, TimeUnit.SECONDS)
.until(() -> edgeRestClient.getDeviceCredentialsByDeviceId(device.getId()).isPresent());

DeviceCredentials deviceCredentials = edgeRestClient.getDeviceCredentialsByDeviceId(device.getId()).get();
String accessToken = deviceCredentials.getCredentialsId();

JsonObject telemetry = new JsonObject();
telemetry.addProperty("temperature", 100);

ResponseEntity deviceTelemetryResponse = edgeRestClient.getRestTemplate()
.postForEntity(edgeUrl + "/api/v1/" + accessToken + "/telemetry/",
JacksonUtil.toJsonNode(telemetry.toString()),
ResponseEntity.class,
accessToken);

Assert.assertTrue(deviceTelemetryResponse.getStatusCode().is2xxSuccessful());

Awaitility.await()
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(30, TimeUnit.SECONDS)
.until(() -> getLatestAlarmByEntityIdFromCloud(device.getId()).isPresent() && getLatestAlarmByEntityIdFromEdge(device.getId()).isPresent());

Alarm savedAlarm = getLatestAlarmByEntityIdFromEdge(device.getId()).get();

ObjectNode comment = JacksonUtil.newObjectNode().put("text", RandomStringUtils.randomAlphanumeric(10));
AlarmComment alarmComment = saveAlarmComment(savedAlarm.getId(), comment, edgeRestClient);
Awaitility.await()
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(30, TimeUnit.SECONDS)
.until(() -> cloudRestClient.getAlarmComments(savedAlarm.getId(), new PageLink(100)).getData().stream()
.anyMatch(ac -> ac.getComment().equals(alarmComment.getComment())
&& ac.getId().equals(alarmComment.getId())
&& ac.getAlarmId().equals(alarmComment.getAlarmId())));

comment = JacksonUtil.newObjectNode().put("text", RandomStringUtils.randomAlphanumeric(10));
alarmComment.setComment(comment);
AlarmComment updated = edgeRestClient.saveAlarmComment(savedAlarm.getId(), alarmComment);
Awaitility.await()
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(30, TimeUnit.SECONDS)
.until(() -> edgeRestClient.getAlarmComments(savedAlarm.getId(), new PageLink(100)).getTotalElements() > 0 &&
edgeRestClient.getAlarmComments(savedAlarm.getId(), new PageLink(100)).getData().stream()
.anyMatch(ac -> ac.getComment().get("text").equals(updated.getComment().get("text"))
&& ac.getComment().get("edited").asBoolean()
&& updated.getComment().get("edited").asBoolean()
&& ac.getId().equals(updated.getId())
&& ac.getAlarmId().equals(updated.getAlarmId())));

// delete alarm
cloudRestClient.deleteAlarm(savedAlarm.getId());
Awaitility.await()
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(30, TimeUnit.SECONDS)
.until(() -> getLatestAlarmByEntityIdFromEdge(device.getId()).isEmpty());

// cleanup
cloudRestClient.deleteDevice(device.getId());
}

private AlarmComment saveAlarmComment(AlarmId alarmId, JsonNode comment, RestClient restClient) {
AlarmComment alarmComment = new AlarmComment();
alarmComment.setAlarmId(alarmId);
alarmComment.setType(AlarmCommentType.OTHER);
alarmComment.setComment(comment);
return restClient.saveAlarmComment(alarmId, alarmComment);
}

}
Loading

0 comments on commit d05a21d

Please sign in to comment.