-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Restructure the modules splitting them out for reuse
- Loading branch information
Showing
18 changed files
with
371 additions
and
167 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
import time | ||
|
||
from standalone_modules.rpi.device import RPIDevice | ||
from standalone_modules.shed_pi_module_utils.base_protocol import BaseProtocol | ||
from standalone_modules.shed_pi_module_utils.data_submission import ( | ||
ReadingSubmissionService, | ||
) | ||
from standalone_modules.shed_pi_module_utils.utils import check_arch_is_arm, logger | ||
|
||
TIME_TO_SLEEP = 60 # time in seconds | ||
|
||
|
||
class DeviceProtocol(BaseProtocol): | ||
def __init__(self, submission_service: ReadingSubmissionService): | ||
# Installed modules | ||
self.rpi_device = RPIDevice( | ||
submission_service=submission_service, | ||
device_module_id=None, | ||
cpu_module_id=None, | ||
) | ||
self.submission_delay = TIME_TO_SLEEP | ||
|
||
def stop(self): | ||
return False | ||
|
||
def startup(self): | ||
self.rpi_device.submit_device_startup() | ||
|
||
def run(self): | ||
while not self.stop(): | ||
self.rpi_device.submit_reading() | ||
|
||
time.sleep(self.submission_delay) | ||
|
||
def shutdown(self): | ||
self.rpi_device.submit_device_shutdown() | ||
|
||
|
||
def main(): | ||
if not check_arch_is_arm(): | ||
logger.error("only rasbian os supported") | ||
return | ||
|
||
submission_service = ReadingSubmissionService() | ||
device = DeviceProtocol(submission_service=submission_service) | ||
|
||
try: | ||
device.startup() | ||
device.run() | ||
finally: | ||
device.shutdown() | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |
Empty file.
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
from unittest.mock import Mock | ||
|
||
import pytest | ||
|
||
from shedpi_hub_dashboard.models import DeviceModuleReading | ||
from shedpi_hub_dashboard.tests.utils.factories import ( | ||
DeviceModuleFactory, | ||
) | ||
from standalone_modules.rpi.device_protocol import ( | ||
DeviceProtocol, | ||
) | ||
from standalone_modules.shed_pi_module_utils.data_submission import ( | ||
ReadingSubmissionService, | ||
) | ||
|
||
|
||
@pytest.mark.django_db | ||
def test_device_protocol(live_server): | ||
# Submission service | ||
submission_service = ReadingSubmissionService() | ||
submission_service.base_url = live_server.url | ||
# Device Protocol | ||
device_protocol = DeviceProtocol(submission_service=submission_service) | ||
# Override the loop timer for the test to end instantly | ||
device_protocol.submission_delay = 0 | ||
device_protocol.stop = Mock(side_effect=[False, True]) | ||
# RPI CPU temp probe | ||
rpi_schema = { | ||
"$id": "https://example.com/person.schema.json", | ||
"$schema": "https://json-schema.org/draft/2020-12/schema", | ||
"title": "Reading", | ||
"type": "object", | ||
"properties": { | ||
"temperature": {"type": "string", "description": "The Temperature"}, | ||
}, | ||
} | ||
rpi_cpu_temp = DeviceModuleFactory(schema=rpi_schema) | ||
device_protocol.rpi_device.device_module_id = rpi_cpu_temp.id | ||
device_protocol.rpi_device.get_cpu_temp = Mock(return_value=10.0) | ||
|
||
device_protocol.run() | ||
|
||
# Check that the data was submitted | ||
assert DeviceModuleReading.objects.filter(device_module=rpi_cpu_temp).count() == 1 |
Empty file.
59 changes: 59 additions & 0 deletions
59
standalone_modules/shed_pi_example_device_installation/device_protocol.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
import time | ||
|
||
from standalone_modules.rpi.device import RPIDevice | ||
from standalone_modules.shed_pi_module_utils.base_protocol import BaseProtocol | ||
from standalone_modules.shed_pi_module_utils.data_submission import ( | ||
ReadingSubmissionService, | ||
) | ||
from standalone_modules.shed_pi_module_utils.utils import check_arch_is_arm, logger | ||
from standalone_modules.temperature_module.temperature_probe import TempProbe | ||
|
||
TIME_TO_SLEEP = 60 # time in seconds | ||
|
||
|
||
class DeviceProtocol(BaseProtocol): | ||
def __init__(self, submission_service: ReadingSubmissionService): | ||
# Installed modules | ||
self.temp_probe = TempProbe(submission_service=submission_service) | ||
self.rpi_device = RPIDevice( | ||
submission_service=submission_service, | ||
device_module_id=None, | ||
cpu_module_id=None, | ||
) | ||
self.submission_delay = TIME_TO_SLEEP | ||
|
||
def stop(self): | ||
return False | ||
|
||
def startup(self): | ||
self.rpi_device.submit_device_startup() | ||
|
||
def run(self): | ||
while not self.stop(): | ||
# TODO: Would be nice to be able to bundle multiple calls into 1, less of an issue initially | ||
self.temp_probe.submit_reading() | ||
self.rpi_device.submit_reading() | ||
|
||
time.sleep(self.submission_delay) | ||
|
||
def shutdown(self): | ||
self.rpi_device.submit_device_shutdown() | ||
|
||
|
||
def main(): | ||
if not check_arch_is_arm(): | ||
logger.error("only rasbian os supported") | ||
return | ||
|
||
submission_service = ReadingSubmissionService() | ||
device = DeviceProtocol(submission_service=submission_service) | ||
|
||
try: | ||
device.startup() | ||
device.run() | ||
finally: | ||
device.shutdown() | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |
Empty file.
Empty file.
64 changes: 64 additions & 0 deletions
64
standalone_modules/shed_pi_example_device_installation/tests/unit/test_device_protocol.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
from unittest.mock import Mock, patch | ||
|
||
import pytest | ||
|
||
from shedpi_hub_dashboard.models import DeviceModuleReading | ||
from shedpi_hub_dashboard.tests.utils.factories import ( | ||
DeviceModuleFactory, | ||
) | ||
from standalone_modules.shed_pi_example_device_installation.device_protocol import ( | ||
DeviceProtocol, | ||
) | ||
from standalone_modules.shed_pi_module_utils.data_submission import ( | ||
ReadingSubmissionService, | ||
) | ||
|
||
|
||
@patch("standalone_modules.temperature_module.temperature_probe.Path") | ||
@pytest.mark.django_db | ||
def test_device_protocol(mocked_path, live_server): | ||
# Submission service | ||
submission_service = ReadingSubmissionService() | ||
submission_service.base_url = live_server.url | ||
# Device Protocol | ||
device_protocol = DeviceProtocol(submission_service=submission_service) | ||
# Override the loop timer for the test to end instantly | ||
device_protocol.submission_delay = 0 | ||
device_protocol.stop = Mock(side_effect=[False, True]) | ||
# Temp probe | ||
schema = { | ||
"$id": "https://example.com/person.schema.json", | ||
"$schema": "https://json-schema.org/draft/2020-12/schema", | ||
"title": "Reading", | ||
"type": "object", | ||
"properties": { | ||
"temperature": {"type": "string", "description": "The Temperature"}, | ||
}, | ||
} | ||
temp_probe = DeviceModuleFactory(schema=schema) | ||
device_protocol.temp_probe.device_id = temp_probe.id | ||
device_protocol.temp_probe.read_temp_raw = Mock( | ||
return_value=[ | ||
"YES", | ||
"t=12345", | ||
] | ||
) | ||
# RPI CPU temp probe | ||
rpi_schema = { | ||
"$id": "https://example.com/person.schema.json", | ||
"$schema": "https://json-schema.org/draft/2020-12/schema", | ||
"title": "Reading", | ||
"type": "object", | ||
"properties": { | ||
"temperature": {"type": "string", "description": "The Temperature"}, | ||
}, | ||
} | ||
rpi_cpu_temp = DeviceModuleFactory(schema=rpi_schema) | ||
device_protocol.rpi_device.device_module_id = rpi_cpu_temp.id | ||
device_protocol.rpi_device.get_cpu_temp = Mock(return_value=10.0) | ||
|
||
device_protocol.run() | ||
|
||
# Check that the data was submitted | ||
assert DeviceModuleReading.objects.filter(device_module=rpi_cpu_temp).count() == 1 | ||
assert DeviceModuleReading.objects.filter(device_module=temp_probe).count() == 1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
from standalone_modules.shed_pi_module_utils.data_submission import ( | ||
ReadingSubmissionService, | ||
) | ||
|
||
|
||
class BaseProtocol: | ||
def __init__(self, submission_service: ReadingSubmissionService): | ||
self.submission_service = submission_service | ||
|
||
def stop(self): | ||
... | ||
|
||
def startup(self): | ||
... | ||
|
||
def run(self): | ||
raise NotImplementedError | ||
|
||
def shutdown(self): | ||
... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
import time | ||
|
||
from standalone_modules.shed_pi_module_utils.base_protocol import BaseProtocol | ||
from standalone_modules.shed_pi_module_utils.data_submission import ( | ||
ReadingSubmissionService, | ||
) | ||
from standalone_modules.shed_pi_module_utils.utils import check_arch_is_arm, logger | ||
from standalone_modules.temperature_module.temperature_probe import TempProbe | ||
|
||
TIME_TO_SLEEP = 60 # time in seconds | ||
|
||
|
||
class DeviceProtocol(BaseProtocol): | ||
def __init__(self, submission_service: ReadingSubmissionService): | ||
# Installed modules | ||
self.temp_probe = TempProbe(submission_service=submission_service) | ||
self.submission_delay = TIME_TO_SLEEP | ||
|
||
def stop(self): | ||
return False | ||
|
||
def run(self): | ||
while not self.stop(): | ||
self.temp_probe.submit_reading() | ||
|
||
time.sleep(self.submission_delay) | ||
|
||
|
||
def main(): | ||
if not check_arch_is_arm(): | ||
logger.error("only rasbian os supported") | ||
return | ||
|
||
submission_service = ReadingSubmissionService() | ||
device = DeviceProtocol(submission_service=submission_service) | ||
|
||
try: | ||
device.startup() | ||
device.run() | ||
finally: | ||
device.shutdown() | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |
Oops, something went wrong.