blob: 393f79c72cf6b26e96f4c4ddc865ae4a1846bb13 [file] [log] [blame] [edit]
#!/usr/bin/python
from __future__ import absolute_import, print_function, unicode_literals
'''
Cycling Speed and Cadence test script
'''
from optparse import OptionParser, make_option
import sys
import dbus
import dbus.service
import dbus.mainloop.glib
try:
from gi.repository import GObject
except ImportError:
import gobject as GObject
import bluezutils
BUS_NAME = 'org.bluez'
CYCLINGSPEED_MANAGER_INTERFACE = 'org.bluez.CyclingSpeedManager1'
CYCLINGSPEED_WATCHER_INTERFACE = 'org.bluez.CyclingSpeedWatcher1'
CYCLINGSPEED_INTERFACE = 'org.bluez.CyclingSpeed1'
class MeasurementQ:
def __init__(self, wrap_v):
self._now = [None, None]
self._prev = [None, None]
self._wrap_v = wrap_v
def can_calc(self):
return ((self._now[0] is not None)
and (self._now[1] is not None)
and (self._prev[0] is not None)
and (self._prev[1] is not None))
def delta_v(self):
delta = self._now[0] - self._prev[0]
if (delta < 0) and (self._wrap_v):
delta = delta + 65536
return delta
def delta_t(self):
delta = self._now[1] - self._prev[1]
if delta < 0:
delta = delta + 65536
return delta
def put(self, data):
self._prev = self._now
self._now = data
class Watcher(dbus.service.Object):
_wheel = MeasurementQ(False)
_crank = MeasurementQ(True)
_circumference = None
def enable_calc(self, v):
self._circumference = v
@dbus.service.method(CYCLINGSPEED_WATCHER_INTERFACE,
in_signature="oa{sv}", out_signature="")
def MeasurementReceived(self, device, measure):
print("Measurement received from %s" % device)
rev = None
evt = None
if "WheelRevolutions" in measure:
rev = measure["WheelRevolutions"]
print("WheelRevolutions: ", measure["WheelRevolutions"])
if "LastWheelEventTime" in measure:
evt = measure["LastWheelEventTime"]
print("LastWheelEventTime: ", measure["LastWheelEventTime"])
self._wheel.put( [rev, evt] )
rev = None
evt = None
if "CrankRevolutions" in measure:
rev = measure["CrankRevolutions"]
print("CrankRevolutions: ", measure["CrankRevolutions"])
if "LastCrankEventTime" in measure:
evt = measure["LastCrankEventTime"]
print("LastCrankEventTime: ", measure["LastCrankEventTime"])
self._crank.put( [rev, evt] )
if self._circumference is None:
return
if self._wheel.can_calc():
delta_v = self._wheel.delta_v()
delta_t = self._wheel.delta_t()
if (delta_v >= 0) and (delta_t > 0):
speed = delta_v * self._circumference * 1024 / delta_t # mm/s
speed = speed * 0.0036 # mm/s -> km/h
print("(calculated) Speed: %.2f km/h" % speed)
if self._crank.can_calc():
delta_v = self._crank.delta_v()
delta_t = self._crank.delta_t()
if delta_t > 0:
cadence = delta_v * 1024 / delta_t
print("(calculated) Cadence: %d rpm" % cadence)
def properties_changed(interface, changed, invalidated):
if "Location" in changed:
print("Sensor location: %s" % changed["Location"])
if __name__ == "__main__":
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
option_list = [
make_option("-i", "--adapter", action="store",
type="string", dest="adapter"),
make_option("-b", "--device", action="store",
type="string", dest="address"),
make_option("-c", "--circumference", action="store",
type="int", dest="circumference"),
]
parser = OptionParser(option_list=option_list)
(options, args) = parser.parse_args()
if not options.address:
print("Usage: %s [-i <adapter>] -b <bdaddr> [-c <value>] [cmd]" % (sys.argv[0]))
print("Possible commands:")
print("\tShowSupportedLocations")
print("\tSetLocation <location>")
print("\tSetCumulativeWheelRevolutions <value>")
sys.exit(1)
managed_objects = bluezutils.get_managed_objects()
adapter = bluezutils.find_adapter_in_objects(managed_objects,
options.adapter)
adapter_path = adapter.object_path
device = bluezutils.find_device_in_objects(managed_objects,
options.address,
options.adapter)
device_path = device.object_path
cscmanager = dbus.Interface(bus.get_object(BUS_NAME, adapter_path),
CYCLINGSPEED_MANAGER_INTERFACE)
watcher_path = "/test/watcher"
watcher = Watcher(bus, watcher_path)
if options.circumference:
watcher.enable_calc(options.circumference)
cscmanager.RegisterWatcher(watcher_path)
csc = dbus.Interface(bus.get_object(BUS_NAME, device_path),
CYCLINGSPEED_INTERFACE)
bus.add_signal_receiver(properties_changed, bus_name=BUS_NAME,
path=device_path,
dbus_interface="org.freedesktop.DBus.Properties",
signal_name="PropertiesChanged")
device_prop = dbus.Interface(bus.get_object(BUS_NAME, device_path),
"org.freedesktop.DBus.Properties")
properties = device_prop.GetAll(CYCLINGSPEED_INTERFACE)
if "Location" in properties:
print("Sensor location: %s" % properties["Location"])
else:
print("Sensor location is not supported")
if len(args) > 0:
if args[0] == "ShowSupportedLocations":
if properties["MultipleLocationsSupported"]:
print("Supported locations: ", properties["SupportedLocations"])
else:
print("Multiple sensor locations not supported")
elif args[0] == "SetLocation":
if properties["MultipleLocationsSupported"]:
device_prop.Set(CYCLINGSPEED_INTERFACE, "Location", args[1])
else:
print("Multiple sensor locations not supported")
elif args[0] == "SetCumulativeWheelRevolutions":
if properties["WheelRevolutionDataSupported"]:
csc.SetCumulativeWheelRevolutions(dbus.UInt32(args[1]))
else:
print("Wheel revolution data not supported")
else:
print("Unknown command")
sys.exit(1)
mainloop = GObject.MainLoop()
mainloop.run()