blob: 4d1df2f41a1654f6184fceb5d76e8c5618916aaa [file] [log] [blame]
#!/usr/bin/env python3
import dbus
try:
from gi.repository import GObject
except ImportError:
import gobject as GObject
import sys
from dbus.mainloop.glib import DBusGMainLoop
bus = None
mainloop = None
BLUEZ_SERVICE_NAME = 'org.bluez'
DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
GATT_SERVICE_IFACE = 'org.bluez.GattService1'
GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
HR_SVC_UUID = '0000180d-0000-1000-8000-00805f9b34fb'
HR_MSRMT_UUID = '00002a37-0000-1000-8000-00805f9b34fb'
BODY_SNSR_LOC_UUID = '00002a38-0000-1000-8000-00805f9b34fb'
HR_CTRL_PT_UUID = '00002a39-0000-1000-8000-00805f9b34fb'
# The objects that we interact with.
hr_service = None
hr_msrmt_chrc = None
body_snsr_loc_chrc = None
hr_ctrl_pt_chrc = None
def generic_error_cb(error):
print('D-Bus call failed: ' + str(error))
mainloop.quit()
def body_sensor_val_to_str(val):
if val == 0:
return 'Other'
if val == 1:
return 'Chest'
if val == 2:
return 'Wrist'
if val == 3:
return 'Finger'
if val == 4:
return 'Hand'
if val == 5:
return 'Ear Lobe'
if val == 6:
return 'Foot'
return 'Reserved value'
def sensor_contact_val_to_str(val):
if val == 0 or val == 1:
return 'not supported'
if val == 2:
return 'no contact detected'
if val == 3:
return 'contact detected'
return 'invalid value'
def body_sensor_val_cb(value):
if len(value) != 1:
print('Invalid body sensor location value: ' + repr(value))
return
print('Body sensor location value: ' + body_sensor_val_to_str(value[0]))
def hr_msrmt_start_notify_cb():
print('HR Measurement notifications enabled')
def hr_msrmt_changed_cb(iface, changed_props, invalidated_props):
if iface != GATT_CHRC_IFACE:
return
if not len(changed_props):
return
value = changed_props.get('Value', None)
if not value:
return
print('New HR Measurement')
flags = value[0]
value_format = flags & 0x01
sc_status = (flags >> 1) & 0x03
ee_status = flags & 0x08
if value_format == 0x00:
hr_msrmt = value[1]
next_ind = 2
else:
hr_msrmt = value[1] | (value[2] << 8)
next_ind = 3
print('\tHR: ' + str(int(hr_msrmt)))
print('\tSensor Contact status: ' +
sensor_contact_val_to_str(sc_status))
if ee_status:
print('\tEnergy Expended: ' + str(int(value[next_ind])))
def start_client():
# Read the Body Sensor Location value and print it asynchronously.
body_snsr_loc_chrc[0].ReadValue({}, reply_handler=body_sensor_val_cb,
error_handler=generic_error_cb,
dbus_interface=GATT_CHRC_IFACE)
# Listen to PropertiesChanged signals from the Heart Measurement
# Characteristic.
hr_msrmt_prop_iface = dbus.Interface(hr_msrmt_chrc[0], DBUS_PROP_IFACE)
hr_msrmt_prop_iface.connect_to_signal("PropertiesChanged",
hr_msrmt_changed_cb)
# Subscribe to Heart Rate Measurement notifications.
hr_msrmt_chrc[0].StartNotify(reply_handler=hr_msrmt_start_notify_cb,
error_handler=generic_error_cb,
dbus_interface=GATT_CHRC_IFACE)
def process_chrc(chrc_path):
chrc = bus.get_object(BLUEZ_SERVICE_NAME, chrc_path)
chrc_props = chrc.GetAll(GATT_CHRC_IFACE,
dbus_interface=DBUS_PROP_IFACE)
uuid = chrc_props['UUID']
if uuid == HR_MSRMT_UUID:
global hr_msrmt_chrc
hr_msrmt_chrc = (chrc, chrc_props)
elif uuid == BODY_SNSR_LOC_UUID:
global body_snsr_loc_chrc
body_snsr_loc_chrc = (chrc, chrc_props)
elif uuid == HR_CTRL_PT_UUID:
global hr_ctrl_pt_chrc
hr_ctrl_pt_chrc = (chrc, chrc_props)
else:
print('Unrecognized characteristic: ' + uuid)
return True
def process_hr_service(service_path, chrc_paths):
service = bus.get_object(BLUEZ_SERVICE_NAME, service_path)
service_props = service.GetAll(GATT_SERVICE_IFACE,
dbus_interface=DBUS_PROP_IFACE)
uuid = service_props['UUID']
if uuid != HR_SVC_UUID:
return False
print('Heart Rate Service found: ' + service_path)
# Process the characteristics.
for chrc_path in chrc_paths:
process_chrc(chrc_path)
global hr_service
hr_service = (service, service_props, service_path)
return True
def interfaces_removed_cb(object_path, interfaces):
if not hr_service:
return
if object_path == hr_service[2]:
print('Service was removed')
mainloop.quit()
def main():
# Set up the main loop.
DBusGMainLoop(set_as_default=True)
global bus
bus = dbus.SystemBus()
global mainloop
mainloop = GObject.MainLoop()
om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
om.connect_to_signal('InterfacesRemoved', interfaces_removed_cb)
objects = om.GetManagedObjects()
chrcs = []
for path, interfaces in objects.items():
if GATT_CHRC_IFACE not in interfaces.keys():
continue
chrcs.append(path)
for path, interfaces in objects.items():
if GATT_SERVICE_IFACE not in interfaces.keys():
continue
chrc_paths = [d for d in chrcs if d.startswith(path + "/")]
if process_hr_service(path, chrc_paths):
break
if not hr_service:
print('No Heart Rate Service found')
sys.exit(1)
start_client()
mainloop.run()
if __name__ == '__main__':
main()