| #!/usr/bin/python -u |
| |
| from __future__ import absolute_import, print_function, unicode_literals |
| |
| import dbus |
| import dbus.exceptions |
| import dbus.service |
| import dbus.mainloop.glib |
| try: |
| from gi.repository import GObject |
| except ImportError: |
| import gobject as GObject |
| |
| BLUEZ_BUS = 'org.bluez' |
| BLUEZ_ROOT_OBJ = '/org/bluez' |
| AGENT_INTF = 'org.bluez.Agent1' |
| AGENT_MGR_INTF = 'org.bluez.AgentManager1' |
| DEVICE_INTF = 'org.bluez.Device1' |
| OBJECT_MGR_INTF = 'org.freedesktop.DBus.ObjectManager' |
| PROPERTY_INTF = 'org.freedesktop.DBus.Properties' |
| AGENT_PATH = '/com/google/gfiber/agent' |
| AGENT_CAPABILITY = 'NoInputNoOutput' |
| DEV_NAME_GFRM100 = 'GFRM100' |
| DEV_NAME_GFRM200 = 'GFRM200' |
| DEV_NAME_TIARC = 'HID AdvRemote' |
| DEV_OUI_TIARC = '90:59:AF' |
| |
| bus = None |
| agent = None |
| |
| def dev_trusted(path): |
| obj = bus.get_object(BLUEZ_BUS, path) |
| props = dbus.Interface(obj, PROPERTY_INTF) |
| props.Set(DEVICE_INTF, "Trusted", True) |
| |
| class Rejected(dbus.DBusException): |
| _dbus_error_name = "org.bluez.Error.Rejected" |
| |
| class Agent(dbus.service.Object): |
| exit_on_release = True |
| |
| def set_exit_on_release(self, exit_on_release): |
| self.exit_on_release = exit_on_release |
| |
| @dbus.service.method(AGENT_INTF, in_signature="", out_signature="") |
| def Release(self): |
| print("Release") |
| if self.exit_on_release: |
| mainloop.quit() |
| |
| @dbus.service.method(AGENT_INTF, in_signature="os", out_signature="") |
| def AuthorizeService(self, device, uuid): |
| print("AuthorizeService (%s, %s)" % (device, uuid)) |
| raise Rejected("Not implemented") |
| |
| @dbus.service.method(AGENT_INTF, in_signature="o", out_signature="s") |
| def RequestPinCode(self, device): |
| print("RequestPinCode (%s)" % (device)) |
| dev_trusted(device) |
| return '0000' |
| |
| @dbus.service.method(AGENT_INTF, in_signature="o", out_signature="u") |
| def RequestPasskey(self, device): |
| print("RequestPasskey (%s)" % (device)) |
| dev_trusted(device) |
| return dbus.UInt32('0000') |
| |
| @dbus.service.method(AGENT_INTF, in_signature="ouq", out_signature="") |
| def DisplayPasskey(self, device, passkey, entered): |
| print("DisplayPasskey (%s, %06u entered %u)" % |
| (device, passkey, entered)) |
| |
| @dbus.service.method(AGENT_INTF, in_signature="os", out_signature="") |
| def DisplayPinCode(self, device, pincode): |
| print("DisplayPinCode (%s, %s)" % (device, pincode)) |
| |
| @dbus.service.method(AGENT_INTF, in_signature="ou", out_signature="") |
| def RequestConfirmation(self, device, passkey): |
| print("RequestConfirmation (%s, %06d)" % (device, passkey)) |
| raise Rejected("Not implemented") |
| |
| @dbus.service.method(AGENT_INTF, in_signature="o", out_signature="") |
| def RequestAuthorization(self, device): |
| print("RequestAuthorization (%s)" % (device)) |
| raise Rejected("Not implemented") |
| |
| @dbus.service.method(AGENT_INTF, in_signature="", out_signature="") |
| def Cancel(self): |
| print("Cancel") |
| |
| def register_agent(): |
| obj = bus.get_object(BLUEZ_BUS, BLUEZ_ROOT_OBJ) |
| mgr = dbus.Interface(obj, AGENT_MGR_INTF) |
| mgr.RegisterAgent(AGENT_PATH, AGENT_CAPABILITY) |
| mgr.RequestDefaultAgent(AGENT_PATH) |
| print("Agent registered") |
| |
| def unregister_agent(): |
| obj = bus.get_object(BLUEZ_BUS, BLUEZ_ROOT_OBJ) |
| mgr = dbus.Interface(obj, AGENT_MGR_INTF) |
| mgr.UnregisterAgent(AGENT_PATH) |
| print("Agent unregistered") |
| |
| def dev_pair_and_connect(path): |
| obj = bus.get_object(BLUEZ_BUS, path) |
| dev = dbus.Interface(obj, DEVICE_INTF) |
| props = dbus.Interface(obj, PROPERTY_INTF) |
| paired = props.Get(DEVICE_INTF, "Paired") |
| if paired == True: |
| print("%s is already paired" % (path)) |
| return |
| dev.Pair() |
| props.Set(DEVICE_INTF, "Trusted", True) |
| dev.Connect() |
| |
| def interfaces_added(path, interfaces): |
| if not DEVICE_INTF in interfaces: |
| return |
| obj = bus.get_object(BLUEZ_BUS, path) |
| props = dbus.Interface(obj, PROPERTY_INTF) |
| addr = props.Get(DEVICE_INTF, "Address") |
| try: |
| name = props.Get(DEVICE_INTF, "Name") |
| except dbus.exceptions.DBusException: |
| name = '' |
| print("Discovered %s [%s] [%s]" % (path, addr, name)) |
| |
| if (name == DEV_NAME_GFRM100 or |
| name == DEV_NAME_GFRM200 or |
| name == DEV_NAME_TIARC or |
| addr.startswith(DEV_OUI_TIARC)): |
| print("Pair with %s [%s] [%s]" % (path, addr, name)) |
| dev_pair_and_connect(path) |
| |
| def main(): |
| global bus |
| global agent |
| |
| dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) |
| |
| bus = dbus.SystemBus() |
| agent = Agent(bus, AGENT_PATH) |
| |
| register_agent() |
| bus.add_signal_receiver(interfaces_added, bus_name=BLUEZ_BUS, |
| dbus_interface=OBJECT_MGR_INTF, |
| signal_name="InterfacesAdded") |
| |
| mainloop = GObject.MainLoop() |
| mainloop.run() |
| |
| bus.remove_signal_receiver(interfaces_added, bus_name=BLUEZ_BUS, |
| dbus_interface=OBJECT_MGR_INTF, |
| signal_name="InterfacesAdded") |
| unregister_agent() |
| |
| if __name__ == '__main__': |
| main() |