-
Notifications
You must be signed in to change notification settings - Fork 30
/
PollutionBridge.java
100 lines (81 loc) · 4.64 KB
/
PollutionBridge.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
// camel-k: language=java property=file:application.properties
// camel-k: dependency=mvn:org.amqphub.quarkus:quarkus-qpid-jms
// camel-k: dependency=github:openshift-integration:camel-k-example-event-streaming
import org.apache.camel.builder.RouteBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.camel.model.dataformat.JsonLibrary;
import com.redhat.integration.common.Alert;
import com.redhat.integration.pollution.PollutionData;
public class PollutionBridge extends RouteBuilder {
private static final Logger LOG = LoggerFactory.getLogger(PollutionBridge.class);
public void configure() throws Exception {
final String unsafeHeader = "unsafe";
final String unsafeTypeHeader = "unsafe-type";
final String cityHeader = "city";
final String SHORT_TERM = "short term";
final String LONG_TERM = "long term";
from("kafka:pm-data?brokers={{kafka.bootstrap.address}}&groupId=pmbrige&autoOffsetReset=earliest")
.unmarshal().json(JsonLibrary.Jackson, PollutionData.class)
.process(exchange -> {
final String TEXT_FORMAT =
"City %s exceeds the maximum safe levels for %s exposure: %f.";
String text = null;
PollutionData pollutionData = exchange.getMessage().getBody(PollutionData.class);
LOG.info("Processing pollution data for city {} ", pollutionData.getCity());
Alert alert = new Alert();
if (pollutionData.getParameter().equals("pm10")) {
if (pollutionData.getValue() > 25.0) {
LOG.info("City {} exceeds the maximum safe levels for PM 10 exposure",
pollutionData.getCity());
exchange.getMessage().setHeader(unsafeHeader, true);
if (pollutionData.getValue() > 50.0) {
exchange.getMessage().setHeader(unsafeTypeHeader, SHORT_TERM);
alert.setSeverity("red");
} else {
exchange.getMessage().setHeader(unsafeTypeHeader, LONG_TERM);
alert.setSeverity("yellow");
}
}
text = String.format(TEXT_FORMAT, pollutionData.getCity(), "PM 10",
pollutionData.getValue());
}
if (pollutionData.getParameter().equals("pm25")) {
if (pollutionData.getValue() > 8.0) {
LOG.info("City {} exceeds the maximum safe levels for PM 2.5 exposure",
pollutionData.getCity());
exchange.getMessage().setHeader(unsafeHeader, true);
if (pollutionData.getValue() > 25.0) {
exchange.getMessage().setHeader(unsafeTypeHeader, SHORT_TERM);
alert.setSeverity("red");
} else {
exchange.getMessage().setHeader(unsafeTypeHeader, LONG_TERM);
alert.setSeverity("yellow");
}
}
text = String.format(TEXT_FORMAT, pollutionData.getCity(), "PM 10",
pollutionData.getValue());
}
alert.setText(text);
exchange.getMessage().setBody(alert);
exchange.getMessage().setHeader(cityHeader, pollutionData.getCity());
})
.marshal().json()
.convertBodyTo(String.class)
.choice()
.when(header(unsafeHeader).isEqualTo(true))
.wireTap("direct:timeline")
.choice()
.when(header(unsafeTypeHeader).isEqualTo(SHORT_TERM))
.to("jms://queue:alarms?timeToLive={{messaging.ttl.alarms}}")
.when(header(unsafeTypeHeader).isEqualTo(LONG_TERM))
.to("jms://queue:notifications?timeToLive={{messaging.ttl.notifications}}")
.otherwise()
.log("Unexpected data: ${body}")
.endChoice()
.endChoice();
from("direct:timeline")
.log("${body}")
.to("kafka:timeline-data?brokers={{kafka.bootstrap.address}}");
}
}