| #!/usr/bin/python |
| |
| import copy |
| import dbus |
| import dbus.mainloop.glib |
| import errno |
| import gobject |
| import json |
| import os |
| import sys |
| |
| |
| FIRMWARE_VERSION_UUID = u'00002a26-0000-1000-8000-00805f9b34fb' |
| HARDWARE_REV_UUID = u'00002a27-0000-1000-8000-00805f9b34fb' |
| SERIAL_NUMBER_UUID = u'00002a25-0000-1000-8000-00805f9b34fb' |
| |
| |
| # Inventory information for the remote control in question. |
| rcu = {} |
| orig = {} |
| status = {'nqueries': 0} |
| |
| |
| def TimeoutInventory(): |
| mainloop.quit() |
| |
| |
| def CheckIfCallbacksDone(): |
| """Decrement count of outstanding queries.""" |
| status['nqueries'] -= 1 |
| if status['nqueries'] <= 0: |
| mainloop.quit() |
| |
| |
| def FirmwareVersionCallback(value): |
| """Render the firmware version string like 'T0055.12 G001E.04'.""" |
| rcu['firmware'] = ''.join([chr(byte) for byte in value]) |
| CheckIfCallbacksDone() |
| |
| |
| def HardwareVersionCallback(value): |
| """Render the Google Part Number string like 'GPN#07081865-01'. |
| |
| The GPN string contains a trailing NUL, which is annoying so we suppress it. |
| """ |
| rcu['hardware'] = ''.join([chr(byte) for byte in value if byte != 0]) |
| CheckIfCallbacksDone() |
| |
| |
| def SerialNumberCallback(value): |
| """Render the serial number. |
| |
| The Serial number is a series of integers. |
| The first is always '2', not sure what it means. |
| Then (all numbers are decimal): |
| YY == last two digits of the year, like 15 for 2015. |
| WW == week number, where 51 would be the last week of the year. |
| DD == day of the week, 0-6. |
| LL == "line number" in the factory. |
| XX XX XX XX XX == an incrementing digit. |
| |
| The sticker on the remote inserts a space as the third character, so this: |
| 2, 14, 51, 2, 0, 0, 0, 20, 0, 0 |
| is printed as '21451 20002000'. We match this formatting. |
| """ |
| |
| array = [int(byte) for byte in value] |
| array.insert(3, ' ') |
| rcu['serial'] = ''.join([str(byte) for byte in array]) |
| CheckIfCallbacksDone() |
| |
| |
| def DBusErrorHandler(error): |
| sys.stderr.write('D-Bus call failed: ' + str(error)) |
| mainloop.quit() |
| |
| |
| def GetDeviceByAddress(remote): |
| """Look up a device by its BDADDR, like 5C:31:3E:08:25:44.""" |
| bus = dbus.SystemBus() |
| om = dbus.Interface(bus.get_object('org.bluez', '/'), |
| 'org.freedesktop.DBus.ObjectManager') |
| objects = om.GetManagedObjects() |
| for path, interfaces in objects.iteritems(): |
| if 'org.bluez.Device1' in interfaces: |
| device = interfaces['org.bluez.Device1'] |
| if device.get('Address', 'noaddress').lower() == remote.lower(): |
| return device |
| return {} |
| |
| |
| def GetGattService(device): |
| """Return the General GATT service for device.""" |
| bus = dbus.SystemBus() |
| service_path = '' |
| for s in device.get('GattServices', []): |
| if 'service0010' in s: |
| service_path = s |
| if not service_path: |
| return None |
| service = bus.get_object('org.bluez', service_path) |
| p = 'org.freedesktop.DBus.Properties' |
| return service.GetAll('org.bluez.GattService1', dbus_interface=p) |
| |
| |
| def CallbackIfInteresting(uuid): |
| """If we are interested in this UUID, return the callback for it.""" |
| if uuid == FIRMWARE_VERSION_UUID: |
| return FirmwareVersionCallback |
| elif uuid == HARDWARE_REV_UUID: |
| return HardwareVersionCallback |
| elif uuid == SERIAL_NUMBER_UUID: |
| return SerialNumberCallback |
| |
| |
| def GetRcuGattCharacteristics(service): |
| """Send queries for the characteristics we are interested in.""" |
| bus = dbus.SystemBus() |
| characteristics = service.get('Characteristics', []) |
| for path in characteristics: |
| c = bus.get_object('org.bluez', path) |
| props = c.GetAll('org.bluez.GattCharacteristic1', |
| dbus_interface='org.freedesktop.DBus.Properties') |
| uuid = props.get('UUID', None) |
| callback = CallbackIfInteresting(uuid) |
| if callback: |
| status['nqueries'] += 1 |
| c.ReadValue(reply_handler=callback, |
| error_handler=DBusErrorHandler, |
| dbus_interface='org.bluez.GattCharacteristic1') |
| |
| |
| def ReadFromJson(filename): |
| try: |
| with open(filename) as f: |
| return json.load(f) |
| except IOError as e: |
| if e.errno != errno.ENOENT: |
| raise |
| return {} |
| |
| |
| def MakeDirectoryIfNotExist(directory): |
| try: |
| os.makedirs(directory) |
| except OSError as e: |
| if e.errno != errno.EEXIST: |
| raise |
| |
| |
| def WriteToJsonIfChanged(filename): |
| """Write the contents of 'rcu' to filename if they have changed.""" |
| if rcu == orig: |
| return |
| with open(filename + '.tmp', 'w') as f: |
| json.dump(rcu, f, sort_keys=True) |
| os.rename(filename + '.tmp', filename) |
| |
| |
| def usage(): |
| sys.stderr.write('usage: %s addr jsonfile' % sys.argv[0]) |
| sys.stderr.write(' where addr is a Bluetooth device BDADDR') |
| sys.stderr.write(' and jsonfile is the path to a JSON file to update') |
| sys.exit(1) |
| |
| |
| if __name__ == '__main__': |
| if len(sys.argv) != 3: |
| usage() |
| |
| remote = sys.argv[1] |
| jsonfile = sys.argv[2] |
| |
| directory = os.path.dirname(jsonfile) |
| MakeDirectoryIfNotExist(directory) |
| |
| # Bluetooth is kindof flakey. We want to always retain information we've |
| # retrieved from a remote, and supplement it with any additional information |
| # we find this time (or when things change, like a firmware update). |
| # |
| # We don't want to start afresh each time the remote appears and expect to |
| # always reliably retrieve its inventory information. |
| orig.update(ReadFromJson(jsonfile)) |
| rcu = copy.copy(orig) |
| |
| dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) |
| gobject.timeout_add(20000, TimeoutInventory) |
| device = GetDeviceByAddress(remote) |
| if 'Name' in device: |
| rcu['model'] = device['Name'] |
| if 'Address' in device: |
| rcu['bdaddr'] = device['Address'] |
| |
| # Get more information from BLE remotes like GFRM200/GFRM210. |
| service = GetGattService(device) |
| if service: |
| GetRcuGattCharacteristics(service) |
| mainloop = gobject.MainLoop() |
| mainloop.run() |
| |
| WriteToJsonIfChanged(jsonfile) |