| #!/usr/bin/python |
| |
| from __future__ import absolute_import, print_function, unicode_literals |
| |
| import sys |
| import dbus |
| import dbus.service |
| import dbus.mainloop.glib |
| try: |
| from gi.repository import GObject |
| except ImportError: |
| import gobject as GObject |
| |
| BUS_NAME = 'org.bluez.obex' |
| PATH = '/org/bluez/obex' |
| AGENT_MANAGER_INTERFACE = 'org.bluez.obex.AgentManager1' |
| AGENT_INTERFACE = 'org.bluez.obex.Agent1' |
| TRANSFER_INTERFACE = 'org.bluez.obex.Transfer1' |
| |
| def ask(prompt): |
| try: |
| return raw_input(prompt) |
| except: |
| return input(prompt) |
| |
| class Agent(dbus.service.Object): |
| def __init__(self, conn=None, obj_path=None): |
| dbus.service.Object.__init__(self, conn, obj_path) |
| self.pending_auth = False |
| |
| @dbus.service.method(AGENT_INTERFACE, in_signature="o", |
| out_signature="s") |
| def AuthorizePush(self, path): |
| transfer = dbus.Interface(bus.get_object(BUS_NAME, path), |
| 'org.freedesktop.DBus.Properties') |
| properties = transfer.GetAll(TRANSFER_INTERFACE); |
| |
| self.pending_auth = True |
| auth = ask("Authorize (%s, %s) (Y/n):" % (path, |
| properties['Name'])) |
| |
| if auth == "n" or auth == "N": |
| self.pending_auth = False |
| raise dbus.DBusException( |
| "org.bluez.obex.Error.Rejected: " |
| "Not Authorized") |
| |
| self.pending_auth = False |
| |
| return properties['Name'] |
| |
| @dbus.service.method(AGENT_INTERFACE, in_signature="", |
| out_signature="") |
| def Cancel(self): |
| print("Authorization Canceled") |
| self.pending_auth = False |
| |
| if __name__ == '__main__': |
| dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) |
| |
| bus = dbus.SessionBus() |
| manager = dbus.Interface(bus.get_object(BUS_NAME, PATH), |
| AGENT_MANAGER_INTERFACE) |
| |
| path = "/test/agent" |
| agent = Agent(bus, path) |
| |
| mainloop = GObject.MainLoop() |
| |
| manager.RegisterAgent(path) |
| print("Agent registered") |
| |
| cont = True |
| while cont: |
| try: |
| mainloop.run() |
| except KeyboardInterrupt: |
| if agent.pending_auth: |
| agent.Cancel() |
| elif len(transfers) > 0: |
| for a in transfers: |
| a.cancel() |
| else: |
| cont = False |
| |
| # manager.UnregisterAgent(path) |
| # print "Agent unregistered" |