blob: 93053f4073197ff7e9a40c398d5c2765e9eb6580 [file] [log] [blame]
import copy
import dbus
import dbus.mainloop.glib
import errno
import gobject
import json
import os
import sys
FIRMWARE_VERSION_UUID = u'00002a26-0000-1000-8000-00805f9b34fb'
HARDWARE_REV_UUID = u'00002a27-0000-1000-8000-00805f9b34fb'
SERIAL_NUMBER_UUID = u'00002a25-0000-1000-8000-00805f9b34fb'
# Inventory information for the remote control in question.
rcu = {}
orig = {}
status = {'nqueries': 0}
def TimeoutInventory():
def CheckIfCallbacksDone():
"""Decrement count of outstanding queries."""
status['nqueries'] -= 1
if status['nqueries'] <= 0:
def FirmwareVersionCallback(value):
"""Render the firmware version string like 'T0055.12 G001E.04'."""
rcu['firmware'] = ''.join([chr(byte) for byte in value])
def HardwareVersionCallback(value):
"""Render the Google Part Number string like 'GPN#07081865-01'.
The GPN string contains a trailing NUL, which is annoying so we suppress it.
rcu['hardware'] = ''.join([chr(byte) for byte in value if byte != 0])
def SerialNumberCallback(value):
"""Render the serial number.
The Serial number is a series of integers.
The first is always '2', not sure what it means.
Then (all numbers are decimal):
YY == last two digits of the year, like 15 for 2015.
WW == week number, where 51 would be the last week of the year.
DD == day of the week, 0-6.
LL == "line number" in the factory.
XX XX XX XX XX == an incrementing digit.
The sticker on the remote inserts a space as the third character, so this:
2, 14, 51, 2, 0, 0, 0, 20, 0, 0
is printed as '21451 20002000'. We match this formatting.
array = [int(byte) for byte in value]
array.insert(3, ' ')
rcu['serial'] = ''.join([str(byte) for byte in array])
def DBusErrorHandler(error):
sys.stderr.write('D-Bus call failed: ' + str(error))
def GetDeviceByAddress(remote):
"""Look up a device by its BDADDR, like 5C:31:3E:08:25:44."""
bus = dbus.SystemBus()
om = dbus.Interface(bus.get_object('org.bluez', '/'),
objects = om.GetManagedObjects()
for path, interfaces in objects.iteritems():
if 'org.bluez.Device1' in interfaces:
device = interfaces['org.bluez.Device1']
if device.get('Address', 'noaddress').lower() == remote.lower():
return device
return {}
def GetGattService(device):
"""Return the General GATT service for device."""
bus = dbus.SystemBus()
service_path = ''
for s in device.get('GattServices', []):
if 'service0010' in s:
service_path = s
if not service_path:
return None
service = bus.get_object('org.bluez', service_path)
p = 'org.freedesktop.DBus.Properties'
return service.GetAll('org.bluez.GattService1', dbus_interface=p)
def CallbackIfInteresting(uuid):
"""If we are interested in this UUID, return the callback for it."""
return FirmwareVersionCallback
elif uuid == HARDWARE_REV_UUID:
return HardwareVersionCallback
elif uuid == SERIAL_NUMBER_UUID:
return SerialNumberCallback
def GetRcuGattCharacteristics(service):
"""Send queries for the characteristics we are interested in."""
bus = dbus.SystemBus()
characteristics = service.get('Characteristics', [])
for path in characteristics:
c = bus.get_object('org.bluez', path)
props = c.GetAll('org.bluez.GattCharacteristic1',
uuid = props.get('UUID', None)
callback = CallbackIfInteresting(uuid)
if callback:
status['nqueries'] += 1
def ReadFromJson(filename):
with open(filename) as f:
return json.load(f)
except IOError as e:
if e.errno != errno.ENOENT:
return {}
def MakeDirectoryIfNotExist(directory):
except OSError as e:
if e.errno != errno.EEXIST:
def WriteToJsonIfChanged(filename):
"""Write the contents of 'rcu' to filename if they have changed."""
if rcu == orig:
with open(filename + '.tmp', 'w') as f:
json.dump(rcu, f, sort_keys=True)
os.rename(filename + '.tmp', filename)
def usage():
sys.stderr.write('usage: %s addr jsonfile' % sys.argv[0])
sys.stderr.write(' where addr is a Bluetooth device BDADDR')
sys.stderr.write(' and jsonfile is the path to a JSON file to update')
if __name__ == '__main__':
if len(sys.argv) != 3:
remote = sys.argv[1]
jsonfile = sys.argv[2]
directory = os.path.dirname(jsonfile)
# Bluetooth is kindof flakey. We want to always retain information we've
# retrieved from a remote, and supplement it with any additional information
# we find this time (or when things change, like a firmware update).
# We don't want to start afresh each time the remote appears and expect to
# always reliably retrieve its inventory information.
rcu = copy.copy(orig)
gobject.timeout_add(20000, TimeoutInventory)
device = GetDeviceByAddress(remote)
if 'Name' in device:
rcu['model'] = device['Name']
if 'Address' in device:
rcu['bdaddr'] = device['Address']
# Get more information from BLE remotes like GFRM200/GFRM210.
service = GetGattService(device)
if service:
mainloop = gobject.MainLoop()