| #!/usr/bin/python -u |
| |
| from __future__ import absolute_import, print_function, unicode_literals |
| |
| import dbus |
| import dbus.exceptions |
| import dbus.service |
| import dbus.mainloop.glib |
| import os |
| import subprocess |
| import sys |
| try: |
| from gi.repository import GObject |
| except ImportError: |
| import gobject as GObject |
| |
| BLUEZ_BUS = 'org.bluez' |
| BLUEZ_ROOT_OBJ = '/org/bluez' |
| AGENT_INTF = 'org.bluez.Agent1' |
| AGENT_MGR_INTF = 'org.bluez.AgentManager1' |
| DEVICE_INTF = 'org.bluez.Device1' |
| OBJECT_MGR_INTF = 'org.freedesktop.DBus.ObjectManager' |
| PROPERTY_INTF = 'org.freedesktop.DBus.Properties' |
| AGENT_PATH = '/com/google/gfiber/agent' |
| AGENT_CAPABILITY = 'NoInputNoOutput' |
| DEV_NAME_GFRM100 = 'GFRM100' |
| DEV_NAME_GFRM200 = 'GFRM200' |
| DEV_NAME_GFRM210 = 'GFRM210' |
| # Early GFRM210s identified themselves as GFRM201 |
| DEV_NAME_GFRM201 = 'GFRM201' |
| DEV_NAME_TIARC = 'HID AdvRemote' |
| DEV_OUI_TIARC = '90:59:AF' |
| |
| |
| g = {'bus': None, 'agent': None, 'adaptdir': None} |
| |
| |
| def dev_trusted(path): |
| bus = g['bus'] |
| obj = bus.get_object(BLUEZ_BUS, path) |
| props = dbus.Interface(obj, PROPERTY_INTF) |
| props.Set(DEVICE_INTF, "Trusted", True) |
| |
| class Rejected(dbus.DBusException): |
| _dbus_error_name = "org.bluez.Error.Rejected" |
| |
| class Agent(dbus.service.Object): |
| exit_on_release = True |
| |
| def set_exit_on_release(self, exit_on_release): |
| self.exit_on_release = exit_on_release |
| |
| @dbus.service.method(AGENT_INTF, in_signature="", out_signature="") |
| def Release(self): |
| print("Release") |
| if self.exit_on_release: |
| mainloop.quit() |
| |
| @dbus.service.method(AGENT_INTF, in_signature="os", out_signature="") |
| def AuthorizeService(self, device, uuid): |
| print("AuthorizeService (%s, %s)" % (device, uuid)) |
| raise Rejected("Not implemented") |
| |
| @dbus.service.method(AGENT_INTF, in_signature="o", out_signature="s") |
| def RequestPinCode(self, device): |
| print("RequestPinCode (%s)" % (device)) |
| dev_trusted(device) |
| return '0000' |
| |
| @dbus.service.method(AGENT_INTF, in_signature="o", out_signature="u") |
| def RequestPasskey(self, device): |
| print("RequestPasskey (%s)" % (device)) |
| dev_trusted(device) |
| return dbus.UInt32('0000') |
| |
| @dbus.service.method(AGENT_INTF, in_signature="ouq", out_signature="") |
| def DisplayPasskey(self, device, passkey, entered): |
| print("DisplayPasskey (%s, %06u entered %u)" % |
| (device, passkey, entered)) |
| |
| @dbus.service.method(AGENT_INTF, in_signature="os", out_signature="") |
| def DisplayPinCode(self, device, pincode): |
| print("DisplayPinCode (%s, %s)" % (device, pincode)) |
| |
| @dbus.service.method(AGENT_INTF, in_signature="ou", out_signature="") |
| def RequestConfirmation(self, device, passkey): |
| print("RequestConfirmation (%s, %06d)" % (device, passkey)) |
| raise Rejected("Not implemented") |
| |
| @dbus.service.method(AGENT_INTF, in_signature="o", out_signature="") |
| def RequestAuthorization(self, device): |
| print("RequestAuthorization (%s)" % (device)) |
| raise Rejected("Not implemented") |
| |
| @dbus.service.method(AGENT_INTF, in_signature="", out_signature="") |
| def Cancel(self): |
| print("Cancel") |
| |
| def register_agent(): |
| bus = g['bus'] |
| obj = bus.get_object(BLUEZ_BUS, BLUEZ_ROOT_OBJ) |
| mgr = dbus.Interface(obj, AGENT_MGR_INTF) |
| mgr.RegisterAgent(AGENT_PATH, AGENT_CAPABILITY) |
| mgr.RequestDefaultAgent(AGENT_PATH) |
| print("Agent registered") |
| |
| def unregister_agent(): |
| bus = g['bus'] |
| obj = bus.get_object(BLUEZ_BUS, BLUEZ_ROOT_OBJ) |
| mgr = dbus.Interface(obj, AGENT_MGR_INTF) |
| mgr.UnregisterAgent(AGENT_PATH) |
| print("Agent unregistered") |
| |
| def set_le_adv_ind_filter(adap_path, dev_addr): |
| name = adap_path.split('/')[-1] |
| path = "/sys/kernel/debug/bluetooth/%s/le_adv_ind_filter" % name |
| try: |
| f = open(path, 'w') |
| f.write(dev_addr) |
| f.close() |
| except IOError as e: |
| print("%s" % e) |
| |
| def dev_pair_and_connect(path): |
| bus = g['bus'] |
| obj = bus.get_object(BLUEZ_BUS, path) |
| dev = dbus.Interface(obj, DEVICE_INTF) |
| props = dbus.Interface(obj, PROPERTY_INTF) |
| adap_path = props.Get(DEVICE_INTF, "Adapter") |
| paired = props.Get(DEVICE_INTF, "Paired") |
| |
| if paired == True: |
| print("%s is already paired" % (path)) |
| return |
| |
| def dev_pair_reply(): |
| print("Device pairing completed for %s" % path) |
| set_le_adv_ind_filter(adap_path, '00:00:00:00:00:00') |
| props.Set(DEVICE_INTF, "Trusted", True) |
| dev.Connect() |
| |
| def dev_pair_error(error): |
| print("Device pairing failed for %s" % path) |
| set_le_adv_ind_filter(adap_path, '00:00:00:00:00:00') |
| err_name = error.get_dbus_name() |
| print("Device pairing error: %s" % err_name) |
| dev.CancelPairing() |
| |
| dev_addr = props.Get(DEVICE_INTF, "Address") |
| set_le_adv_ind_filter(adap_path, dev_addr) |
| dev.Pair(reply_handler=dev_pair_reply, error_handler=dev_pair_error, timeout=15.0) |
| |
| def get_name_and_addr(path): |
| bus = g['bus'] |
| obj = bus.get_object(BLUEZ_BUS, path) |
| props = dbus.Interface(obj, PROPERTY_INTF) |
| addr = props.Get(DEVICE_INTF, "Address") |
| try: |
| name = props.Get(DEVICE_INTF, "Name") |
| except dbus.exceptions.DBusException: |
| name = "" |
| return (name, addr) |
| |
| def is_gfiber_remote(name): |
| return (name == DEV_NAME_GFRM100 or |
| name == DEV_NAME_GFRM200 or |
| name == DEV_NAME_GFRM210 or |
| name == DEV_NAME_GFRM201) |
| |
| def interfaces_added(path, interfaces): |
| if not DEVICE_INTF in interfaces: |
| return |
| (name, addr) = get_name_and_addr(path) |
| print("Discovered %s [%s] [%s]" % (path, addr, name)) |
| if (is_gfiber_remote(name) or |
| name == DEV_NAME_TIARC or |
| addr.startswith(DEV_OUI_TIARC)): |
| print("Pair with %s [%s] [%s]" % (path, addr, name)) |
| dev_pair_and_connect(path) |
| |
| def properties_changed(interface, changed, invalidated, path): |
| if interface != DEVICE_INTF: |
| return |
| connected = changed.get('Connected', False) |
| if not connected: |
| return |
| (name, addr) = get_name_and_addr(path) |
| if is_gfiber_remote(name): |
| jsonfile = os.path.join(g['adaptdir'], addr, 'gfiber-inventory') |
| subprocess.call(["gfrm-inventory", addr, jsonfile]) |
| |
| |
| def main(): |
| dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) |
| |
| bus = dbus.SystemBus() |
| g['bus'] = bus |
| g['agent'] = Agent(bus, AGENT_PATH) |
| |
| if len(sys.argv) == 1: |
| print("usage: %s adaptpath" % sys.argv[0]) |
| print(" where adaptpath is the adaptor directory like " |
| "/user/bluez/lib/bluetooth/F4:F5:E8:80:XX:XX") |
| sys.exit(1) |
| g['adaptdir'] = sys.argv[1] |
| |
| register_agent() |
| bus.add_signal_receiver(interfaces_added, bus_name=BLUEZ_BUS, |
| dbus_interface=OBJECT_MGR_INTF, |
| signal_name="InterfacesAdded") |
| bus.add_signal_receiver(properties_changed, |
| dbus_interface=PROPERTY_INTF, |
| signal_name="PropertiesChanged", |
| arg0=DEVICE_INTF, path_keyword="path") |
| |
| mainloop = GObject.MainLoop() |
| mainloop.run() |
| |
| bus.remove_signal_receiver(interfaces_added, bus_name=BLUEZ_BUS, |
| dbus_interface=OBJECT_MGR_INTF, |
| signal_name="InterfacesAdded") |
| bus.remove_signal_receiver(properties_changed, |
| dbus_interface=PROPERTY_INTF, |
| signal_name="PropertiesChanged", |
| arg0=DEVICE_INTF, path_keyword="path") |
| unregister_agent() |
| |
| if __name__ == '__main__': |
| main() |