| #!/usr/bin/python |
| |
| from __future__ import absolute_import, print_function, unicode_literals |
| |
| from gi.repository import GObject |
| |
| import sys |
| import dbus |
| import dbus.service |
| import dbus.mainloop.glib |
| from optparse import OptionParser |
| |
| bus = None |
| device_obj = None |
| dev_path = None |
| |
| def ask(prompt): |
| try: |
| return raw_input(prompt) |
| except: |
| return input(prompt) |
| |
| def set_trusted(path): |
| props = dbus.Interface(bus.get_object("org.bluez", path), |
| "org.freedesktop.DBus.Properties") |
| props.Set("org.bluez.Device", "Trusted", True) |
| |
| def dev_connect(path): |
| dev = dbus.Interface(bus.get_object("org.bluez", path), |
| "org.bluez.Device") |
| dev.Connect() |
| |
| class Rejected(dbus.DBusException): |
| _dbus_error_name = "org.bluez.Error.Rejected" |
| |
| class Agent(dbus.service.Object): |
| exit_on_release = True |
| |
| def set_exit_on_release(self, exit_on_release): |
| self.exit_on_release = exit_on_release |
| |
| @dbus.service.method("org.bluez.Agent", |
| in_signature="", out_signature="") |
| def Release(self): |
| print("Release") |
| if self.exit_on_release: |
| mainloop.quit() |
| |
| @dbus.service.method("org.bluez.Agent", |
| in_signature="os", out_signature="") |
| def Authorize(self, device, uuid): |
| print("Authorize (%s, %s)" % (device, uuid)) |
| authorize = ask("Authorize connection (yes/no): ") |
| if (authorize == "yes"): |
| return |
| raise Rejected("Connection rejected by user") |
| |
| @dbus.service.method("org.bluez.Agent", |
| in_signature="o", out_signature="s") |
| def RequestPinCode(self, device): |
| print("RequestPinCode (%s)" % (device)) |
| set_trusted(device) |
| return ask("Enter PIN Code: ") |
| |
| @dbus.service.method("org.bluez.Agent", |
| in_signature="o", out_signature="u") |
| def RequestPasskey(self, device): |
| print("RequestPasskey (%s)" % (device)) |
| set_trusted(device) |
| passkey = ask("Enter passkey: ") |
| return dbus.UInt32(passkey) |
| |
| @dbus.service.method("org.bluez.Agent", |
| in_signature="ouq", out_signature="") |
| def DisplayPasskey(self, device, passkey, entered): |
| print("DisplayPasskey (%s, %06u entered %u)" % |
| (device, passkey, entered)) |
| |
| @dbus.service.method("org.bluez.Agent", |
| in_signature="os", out_signature="") |
| def DisplayPinCode(self, device, pincode): |
| print("DisplayPinCode (%s, %s)" % (device, pincode)) |
| |
| @dbus.service.method("org.bluez.Agent", |
| in_signature="ou", out_signature="") |
| def RequestConfirmation(self, device, passkey): |
| print("RequestConfirmation (%s, %06d)" % (device, passkey)) |
| confirm = ask("Confirm passkey (yes/no): ") |
| if (confirm == "yes"): |
| set_trusted(device) |
| return |
| raise Rejected("Passkey doesn't match") |
| |
| @dbus.service.method("org.bluez.Agent", |
| in_signature="s", out_signature="") |
| def ConfirmModeChange(self, mode): |
| print("ConfirmModeChange (%s)" % (mode)) |
| authorize = ask("Authorize mode change (yes/no): ") |
| if (authorize == "yes"): |
| return |
| raise Rejected("Mode change by user") |
| |
| @dbus.service.method("org.bluez.Agent", |
| in_signature="", out_signature="") |
| def Cancel(self): |
| print("Cancel") |
| |
| def pair_reply(): |
| print("Device paired") |
| set_trusted(dev_path) |
| dev_connect(dev_path) |
| mainloop.quit() |
| |
| def pair_error(error): |
| err_name = error.get_dbus_name() |
| if err_name == "org.freedesktop.DBus.Error.NoReply" and device_obj: |
| print("Timed out. Cancelling pairing") |
| device_obj.CancelPairing() |
| else: |
| print("Creating device failed: %s" % (error)) |
| |
| |
| mainloop.quit() |
| |
| if __name__ == '__main__': |
| dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) |
| |
| bus = dbus.SystemBus() |
| manager = dbus.Interface(bus.get_object("org.bluez", "/"), |
| "org.bluez.Manager") |
| |
| capability = "KeyboardDisplay" |
| |
| parser = OptionParser() |
| parser.add_option("-c", "--capability", action="store", |
| type="string", dest="capability") |
| parser.add_option("-t", "--timeout", action="store", |
| type="int", dest="timeout", |
| default=60000) |
| (options, args) = parser.parse_args() |
| if options.capability: |
| capability = options.capability |
| |
| if len(args) > 0: |
| path = manager.FindAdapter(args[0]) |
| else: |
| path = manager.DefaultAdapter() |
| |
| adapter = dbus.Interface(bus.get_object("org.bluez", path), |
| "org.bluez.Adapter") |
| |
| path = "/test/agent" |
| agent = Agent(bus, path) |
| |
| mainloop = GObject.MainLoop() |
| |
| if len(args) > 1: |
| dev_path = adapter.FindDevice(args[1]) |
| device = dbus.Interface(bus.get_object("org.bluez", dev_path), |
| "org.bluez.Device") |
| |
| agent.set_exit_on_release(False) |
| device.Pair(path, capability, timeout=options.timeout, |
| reply_handler=pair_reply, |
| error_handler=pair_error) |
| device_obj = device |
| else: |
| adapter.RegisterAgent(path, capability) |
| print("Agent registered") |
| |
| mainloop.run() |
| |
| #adapter.UnregisterAgent(path) |
| #print("Agent unregistered") |