-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
30a5b57
commit 58eb3b7
Showing
4 changed files
with
397 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
from .holman import TapTimerManager, TapTimerManagerListener, TapTimer, TapTimerListener |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,252 @@ | ||
""" | ||
Module for managing Holman Bluetooth tap timers. | ||
""" | ||
import gatt | ||
|
||
|
||
class TapTimerManager(gatt.DeviceManager): | ||
""" | ||
Entry point for managing and discovering Holman ``TapTimer``s. | ||
""" | ||
|
||
def __init__(self, adapter_name='hci0'): | ||
""" | ||
Instantiates a ``TapTimerManager`` | ||
:param adapter_name: name of Bluetooth adapter used by this | ||
tap timer manager | ||
""" | ||
super().__init__(adapter_name) | ||
self.listener = None | ||
self.discovered_tap_timers = {} | ||
|
||
def tap_timers(self): | ||
""" | ||
Returns all known Holman tap timers. | ||
""" | ||
return self.devices() | ||
|
||
def start_discovery(self, service_uuids=None): | ||
""" | ||
Starts a Bluetooth discovery for Holman tap timers. | ||
Assign a `TapTimerManagerListener` to the `listener` attribute | ||
to collect discovered Holmans. | ||
""" | ||
super().start_discovery(service_uuids=TapTimer.SERVICE_UUIDS) | ||
|
||
def make_device(self, mac_address): | ||
device = gatt.Device( | ||
mac_address=mac_address, manager=self, managed=False) | ||
if device.alias() != 'Tap Timer': | ||
return None | ||
return TapTimer(mac_address=mac_address, manager=self) | ||
|
||
def device_discovered(self, device): | ||
super().device_discovered(device) | ||
if device.mac_address in self.discovered_tap_timers: | ||
return | ||
self.discovered_tap_timers[device.mac_address] = device | ||
if self.listener is not None: | ||
self.listener.tap_timer_discovered(device) | ||
|
||
|
||
class TapTimerManagerListener(object): | ||
""" | ||
Base class for receiving discovery events from ``TapTimerManager``. | ||
Assign an instance of your subclass to the ``listener`` attribute | ||
of your ``TapTimerManager`` to receive discovery events. | ||
""" | ||
|
||
def tap_timer_discovered(self, tap_timer): | ||
""" | ||
This method gets called once for each Holman tap timer | ||
discovered nearby. | ||
:param tap_timer: the Holman tap timer that was discovered | ||
""" | ||
pass | ||
|
||
|
||
class TapTimer(gatt.Device): | ||
""" | ||
This class represents a Holman tap timer. | ||
Obtain instances of this class by using the discovery mechanism of | ||
``TapTimerManager`` or by manually creating an instance. | ||
Assign an instance of ``TapTimerListener`` to the ``listener`` | ||
attribute to receive all Holman related events such as connection, | ||
disconnection and user input. | ||
:param adapter_name: name of Bluetooth adapter that can connect to | ||
this tap timer | ||
:param mac_address: MAC address of this Holman tap timer | ||
:param listener: instance of ``TapTimerListener`` that will be | ||
notified with all events | ||
""" | ||
|
||
HOLMAN_SERVICE_UUID = '0a75f000-f9ad-467a-e564-3c19163ad543' | ||
STATE_CHARACTERISTIC_UUID = '0000f004-0000-1000-8000-00805f9b34fb' | ||
MANUAL_CHARACTERISTIC_UUID = '0000f006-0000-1000-8000-00805f9b34fb' | ||
|
||
SERVICE_UUIDS = [ | ||
HOLMAN_SERVICE_UUID] | ||
|
||
def __init__(self, mac_address, manager): | ||
""" | ||
Create an instance with given Bluetooth adapter name and MAC | ||
address. | ||
:param mac_address: MAC address of Holman tap timer with | ||
format: ``AA:BB:CC:DD:EE:FF`` | ||
:param manager: reference to the `TapTimerManager` that manages | ||
this tap timer | ||
""" | ||
super().__init__(mac_address=mac_address, manager=manager) | ||
|
||
self.listener = None | ||
self._battery_level = None | ||
self._manual_characteristic = None | ||
self._state_characteristic = None | ||
self._state = bytes([0]) | ||
|
||
def connect(self): | ||
""" | ||
Tries to connect to this Holman tap timer and blocks until it | ||
has connected or failed to connect. | ||
Notifies ``listener`` as soon has the connection has succeeded | ||
or failed. | ||
""" | ||
if self.listener: | ||
self.listener.started_connecting() | ||
super().connect() | ||
|
||
def connect_failed(self, error): | ||
super().connect_failed(error) | ||
if self.listener: | ||
self.listener.connect_failed(error) | ||
|
||
def disconnect(self): | ||
""" | ||
Disconnects this Holman tap timer if connected. | ||
Notifies ``listener`` as soon as Holman was disconnected. | ||
""" | ||
if self.listener: | ||
self.listener.started_disconnecting() | ||
self._refresh_state() | ||
super().disconnect() | ||
|
||
def disconnect_succeeded(self): | ||
super().disconnect_succeeded() | ||
if self.listener: | ||
self.listener.disconnect_succeeded() | ||
|
||
def services_resolved(self): | ||
super().services_resolved() | ||
|
||
holman_service = next(( | ||
service for service in self.services | ||
if service.uuid == self.HOLMAN_SERVICE_UUID), None) | ||
if holman_service is None: | ||
if self.listener: | ||
# TODO: Use proper exception subclass | ||
self.listener.connect_failed( | ||
Exception("Holman GATT service missing")) | ||
return | ||
|
||
self._manual_characteristic = next(( | ||
char for char in holman_service.characteristics | ||
if char.uuid == self.MANUAL_CHARACTERISTIC_UUID), None) | ||
if self._manual_characteristic is None: | ||
# TODO: Use proper exception subclass | ||
self.listener.connect_failed( | ||
Exception( | ||
"Holman GATT characteristic %s missing", | ||
self.MANUAL_CHARACTERISTIC_UUID)) | ||
return | ||
|
||
self._state_characteristic = next(( | ||
char for char in holman_service.characteristics | ||
if char.uuid == self.STATE_CHARACTERISTIC_UUID), None) | ||
if self._state_characteristic is None: | ||
# TODO: Use proper exception subclass | ||
self.listener.connect_failed( | ||
Exception( | ||
"Holman GATT characteristic %s missing", | ||
self.STATE_CHARACTERISTIC_UUID)) | ||
return | ||
self._refresh_state() | ||
|
||
# TODO: Only fire connected event when we read the firmware | ||
# version or battery value as in other SDKs | ||
if self.listener: | ||
self.listener.connect_succeeded() | ||
|
||
def battery_level(self): | ||
# TODO: determine which byte in state is used for battery level | ||
return self._battery_level | ||
|
||
def _refresh_state(self): | ||
self._state = self._state_characteristic.read_value() | ||
|
||
@property | ||
def is_on(self): | ||
"""Return true if tap is running.""" | ||
self._refresh_state() | ||
return self._state[-1] == 1 | ||
|
||
@property | ||
def name(self): | ||
"""The name of the tap timer.""" | ||
return str(self.alias()) | ||
|
||
def start(self, runtime=1): | ||
""" | ||
Turn on the tap for ```runtime``` minutes. | ||
:param runtime: the number of minutes to run the tap | ||
""" | ||
runtime = 255 if runtime > 255 else runtime | ||
if self._manual_characteristic: | ||
value = bytes([0x01, 0x00, 0x00, runtime]) | ||
self._manual_characteristic.write_value(value) | ||
|
||
def stop(self): | ||
"""Turn off the tap.""" | ||
if self._manual_characteristic: | ||
value = bytes([0x00, 0x00, 0x00, 0x00]) | ||
self._manual_characteristic.write_value(value) | ||
|
||
def characteristic_write_value_succeeded(self, characteristic): | ||
self._refresh_state() | ||
|
||
def characteristic_write_value_failed(self, characteristic, error): | ||
self._refresh_state() | ||
|
||
|
||
class TapTimerListener: | ||
""" | ||
Base class of listeners for a ``HolmanTapTimer`` with empty handler | ||
implementations. | ||
""" | ||
def received_gesture_event(self, event): | ||
pass | ||
|
||
def started_connecting(self): | ||
pass | ||
|
||
def connect_succeeded(self): | ||
pass | ||
|
||
def connect_failed(self, error): | ||
pass | ||
|
||
def started_disconnecting(self): | ||
pass | ||
|
||
def disconnect_succeeded(self): | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
#!/usr/bin/env python3 | ||
|
||
import sys | ||
from argparse import ArgumentParser | ||
import holman | ||
|
||
tap_timer_manager = None | ||
|
||
|
||
class TapTimerPrintListener(holman.TapTimerListener): | ||
""" | ||
An implementation of ``TapTimerListener`` that prints each event. | ||
""" | ||
def __init__(self, tap_timer): | ||
self.tap_timer = tap_timer | ||
|
||
def started_connecting(self): | ||
self.print("connecting...") | ||
|
||
def connect_succeeded(self): | ||
self.print("connected") | ||
|
||
def connect_failed(self, error): | ||
self.print("connect failed: " + str(error)) | ||
|
||
def started_disconnecting(self): | ||
self.print("disconnecting...") | ||
|
||
def disconnect_succeeded(self): | ||
self.print("disconnected") | ||
|
||
def print(self, string): | ||
print("Holman tap timer " + self.tap_timer.mac_address + " " + string) | ||
|
||
|
||
class TapTimerTestListener(TapTimerPrintListener): | ||
def __init__(self, tap_timer, auto_reconnect=False): | ||
super().__init__(tap_timer) | ||
self.auto_reconnect = auto_reconnect | ||
|
||
def connect_failed(self, error): | ||
super().connect_failed(error) | ||
tap_timer_manager.stop() | ||
sys.exit(0) | ||
|
||
def disconnect_succeeded(self): | ||
super().disconnect_succeeded() | ||
|
||
if self.auto_reconnect: | ||
# Reconnect as soon as Holman was disconnected | ||
print("Disconnected, reconnecting...") | ||
self.tap_timer.connect() | ||
else: | ||
tap_timer_manager.stop() | ||
sys.exit(0) | ||
|
||
|
||
class TapTimerManagerPrintListener(holman.TapTimerManagerListener): | ||
def tap_timer_discovered(self, tap_timer): | ||
print("Discovered Holman tap timer", tap_timer.mac_address) | ||
|
||
|
||
def main(): | ||
arg_parser = ArgumentParser(description="Holman Tap Timer Demo") | ||
arg_parser.add_argument( | ||
'--adapter', | ||
default='hci0', | ||
help="Name of Bluetooth adapter, defaults to 'hci0'") | ||
arg_commands_group = arg_parser.add_mutually_exclusive_group(required=True) | ||
arg_commands_group.add_argument( | ||
'--discover', | ||
action='store_true', | ||
help="Lists all nearby Holman tap timers") | ||
arg_commands_group.add_argument( | ||
'--known', | ||
action='store_true', | ||
help="Lists all known Holman tap timers") | ||
arg_commands_group.add_argument( | ||
'--connect', | ||
metavar='address', | ||
type=str, | ||
help="Connect to a Holman tap timer with a given MAC address") | ||
arg_commands_group.add_argument( | ||
'--auto', | ||
metavar='address', | ||
type=str, | ||
help="Connect and automatically reconnect to a Holman tap timer with a given MAC address") | ||
arg_commands_group.add_argument( | ||
'--disconnect', | ||
metavar='address', | ||
type=str, | ||
help="Disconnect a Holman tap timer with a given MAC address") | ||
args = arg_parser.parse_args() | ||
|
||
global tap_timer_manager | ||
tap_timer_manager = holman.TapTimerManager(adapter_name=args.adapter) | ||
|
||
if args.discover: | ||
tap_timer_manager.listener = TapTimerManagerPrintListener() | ||
tap_timer_manager.start_discovery() | ||
if args.known: | ||
for tap_timer in tap_timer_manager.tap_timers(): | ||
print("[%s] %s" % (tap_timer.mac_address, tap_timer.alias())) | ||
return | ||
elif args.connect: | ||
tap_timer = holman.TapTimer(mac_address=args.connect, manager=tap_timer_manager) | ||
tap_timer.listener = TapTimerTestListener(tap_timer=tap_timer) | ||
tap_timer.connect() | ||
elif args.auto: | ||
tap_timer = holman.TapTimer(mac_address=args.auto, manager=tap_timer_manager) | ||
tap_timer.listener = TapTimerTestListener(tap_timer=tap_timer, auto_reconnect=True) | ||
tap_timer.connect() | ||
elif args.disconnect: | ||
tap_timer = holman.TapTimer(mac_address=args.disconnect, manager=tap_timer_manager) | ||
tap_timer.disconnect() | ||
return | ||
|
||
print("Terminate with Ctrl+C") | ||
try: | ||
tap_timer_manager.run() | ||
except KeyboardInterrupt: | ||
pass | ||
|
||
if __name__ == '__main__': | ||
main() |
Oops, something went wrong.