forked from baby-pilot/aws-iot-core-traffic-simulation
-
Notifications
You must be signed in to change notification settings - Fork 0
/
lab4_emulator_client_updated.py
127 lines (103 loc) · 4.59 KB
/
lab4_emulator_client_updated.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# Import SDK packages
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
import time
import json
import pandas as pd
import numpy as np
import settings
# number of devices
device_count = settings.DEVICE_COUNT
PUBLISH_TOPIC = "data/send/co2"
SUBSCRIBE_TOPIC = "data/receive/co2"
#Path to the dataset, modify this
data_path = "data/vehicle{}.csv"
#Path to your certificates, modify this
certificate_formatter = "./certs/device_{}/device_{}.certificate.pem"
key_formatter = "./certs/device_{}/device_{}.private.key"
class MQTTClient:
def __init__(self, device_id, cert, key):
# For certificate based connection
self.device_id = str(device_id)
self.state = 0
self.client = AWSIoTMQTTClient(self.device_id)
#TODO 2: modify your broker address
self.client.configureEndpoint(settings.AWS_ENDPOINT, 8883)
self.client.configureCredentials("certs/aws_root_ca.pem", key, cert)
self.client.configureOfflinePublishQueueing(-1) # Infinite offline Publish queueing
self.client.configureDrainingFrequency(2) # Draining: 2 Hz
self.client.configureConnectDisconnectTimeout(10) # 10 sec
self.client.configureMQTTOperationTimeout(5) # 5 sec
self.client.onMessage = self.customOnMessage
def customOnMessage(self, *args):
# inconsistency in how customOnMessage is called, varying number of args
# Extract the message object from the arguments. Assume it is always last arg
message = args[-1]
# decode byte string, since MQTT is a binary protocol
message_payload = json.loads(message.payload.decode('utf-8'))
# ignore messages not destined self
if self.device_id != str(message_payload.get("device_id")):
return
# print(message)
# print(message.topic)
print("client {} received the computed current max reading {} from topic {}".format(self.device_id, message_payload.get("co2_reading"), message.topic))
# Suback callback
def customSubackCallback(self,mid, data):
#You don't need to write anything here
pass
# Puback callback
def customPubackCallback(self,mid):
#You don't need to write anything here
pass
def subscribe(self, topic):
# self.client.subscribe(topic, settings.QUALITY_OF_SERVICE, self.customOnMessage)
self.client.subscribeAsync(topic, settings.QUALITY_OF_SERVICE, ackCallback=self.customSubackCallback)
def publish(self, co2_reading):
#TODO4: fill in this function for your publish
# self.client.subscribeAsync(PUBLISH_TOPIC, settings.QUALITY_OF_SERVICE, ackCallback=self.customSubackCallback)
self.client.publishAsync(PUBLISH_TOPIC, self.craftPayload(co2_reading), settings.QUALITY_OF_SERVICE, ackCallback=self.customPubackCallback)
def craftPayload(self, co2_reading):
# This function generates a json payload
payload = {
"co2_reading": co2_reading,
"device_id": int(self.device_id)
}
# print(json.dumps(payload).encode("utf-8"))
return json.dumps(payload)
def read_csv_row(counter, device_id):
data = pd.read_csv(data_path.format(device_id), header=0)
total_rows = len(data)
row_index = counter % total_rows
selected_row = data.iloc[row_index]
return selected_row['vehicle_CO2']
print("Initializing MQTTClients...")
clients = []
for device_id in range(device_count):
client = MQTTClient(device_id,certificate_formatter.format(device_id,device_id) ,key_formatter.format(device_id,device_id))
client.client.connect()
client.subscribe(SUBSCRIBE_TOPIC)
clients.append(client)
"""
The below program allows us to send in one row at a time to the cloud, activated by key press, corresponding to device ID
The cloud (lambda at the edge) returns the maximum CO2 levels encountered thus far for the respective car (i.e. device ID)
Once the max number of rows is reached for a car, it starts to loop from zero again. This simulates a continuous data stream from the car.
"""
counter = [0] * 5
while True:
print("Enter device id to simulate. Or press d to exit simulation.")
x = input()
if x.isdigit():
if (x:=int(x)) in range(device_count):
co2_reading = read_csv_row(counter[x], x)
clients[x].publish(co2_reading)
counter[x] += 1
else:
print("Not a valid device")
continue
elif x == "d":
for c in clients:
c.client.disconnect()
print("All devices disconnected")
exit()
else:
print("wrong key pressed")
time.sleep(3)