| #!/usr/bin/python |
| |
| from __future__ import absolute_import, print_function, unicode_literals |
| |
| import os |
| import sys |
| import dbus |
| import dbus.service |
| import dbus.mainloop.glib |
| try: |
| from gi.repository import GObject |
| except ImportError: |
| import gobject as GObject |
| |
| BUS_NAME='org.bluez.obex' |
| PATH = '/org/bluez/obex' |
| CLIENT_INTERFACE = 'org.bluez.obex.Client1' |
| SESSION_INTERFACE = 'org.bluez.obex.Session1' |
| PHONEBOOK_ACCESS_INTERFACE = 'org.bluez.obex.PhonebookAccess1' |
| TRANSFER_INTERFACE = 'org.bluez.obex.Transfer1' |
| |
| class Transfer: |
| def __init__(self, callback_func): |
| self.callback_func = callback_func |
| self.path = None |
| self.filename = None |
| |
| class PbapClient: |
| def __init__(self, session_path): |
| self.transfers = 0 |
| self.props = dict() |
| self.flush_func = None |
| bus = dbus.SessionBus() |
| obj = bus.get_object(BUS_NAME, session_path) |
| self.session = dbus.Interface(obj, SESSION_INTERFACE) |
| self.pbap = dbus.Interface(obj, PHONEBOOK_ACCESS_INTERFACE) |
| bus.add_signal_receiver(self.properties_changed, |
| dbus_interface="org.freedesktop.DBus.Properties", |
| signal_name="PropertiesChanged", |
| path_keyword="path") |
| |
| def register(self, path, properties, transfer): |
| transfer.path = path |
| transfer.filename = properties["Filename"] |
| self.props[path] = transfer |
| print("Transfer created: %s (file %s)" % (path, |
| transfer.filename)) |
| |
| def error(self, err): |
| print(err) |
| mainloop.quit() |
| |
| def transfer_complete(self, path): |
| req = self.props.get(path) |
| if req == None: |
| return |
| self.transfers -= 1 |
| print("Transfer %s complete" % path) |
| try: |
| f = open(req.filename, "r") |
| os.remove(req.filename) |
| lines = f.readlines() |
| del self.props[path] |
| req.callback_func(lines) |
| except: |
| pass |
| |
| if (len(self.props) == 0) and (self.transfers == 0): |
| if self.flush_func != None: |
| f = self.flush_func |
| self.flush_func = None |
| f() |
| |
| def transfer_error(self, path): |
| print("Transfer %s error" % path) |
| mainloop.quit() |
| |
| def properties_changed(self, interface, properties, invalidated, path): |
| req = self.props.get(path) |
| if req == None: |
| return |
| |
| if properties['Status'] == 'complete': |
| self.transfer_complete(path) |
| return |
| |
| if properties['Status'] == 'error': |
| self.transfer_error(path) |
| return |
| |
| def pull(self, vcard, params, func): |
| req = Transfer(func) |
| self.pbap.Pull(vcard, "", params, |
| reply_handler=lambda o, p: self.register(o, p, req), |
| error_handler=self.error) |
| self.transfers += 1 |
| |
| def pull_all(self, params, func): |
| req = Transfer(func) |
| self.pbap.PullAll("", params, |
| reply_handler=lambda o, p: self.register(o, p, req), |
| error_handler=self.error) |
| self.transfers += 1 |
| |
| def flush_transfers(self, func): |
| if (len(self.props) == 0) and (self.transfers == 0): |
| return |
| self.flush_func = func |
| |
| def interface(self): |
| return self.pbap |
| |
| if __name__ == '__main__': |
| |
| dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) |
| |
| bus = dbus.SessionBus() |
| mainloop = GObject.MainLoop() |
| |
| client = dbus.Interface(bus.get_object(BUS_NAME, PATH), |
| CLIENT_INTERFACE) |
| |
| if (len(sys.argv) < 2): |
| print("Usage: %s <device>" % (sys.argv[0])) |
| sys.exit(1) |
| |
| print("Creating Session") |
| session_path = client.CreateSession(sys.argv[1], { "Target": "PBAP" }) |
| |
| pbap_client = PbapClient(session_path) |
| |
| def process_result(lines, header): |
| if header != None: |
| print(header) |
| for line in lines: |
| print(line), |
| print |
| |
| def test_paths(paths): |
| if len(paths) == 0: |
| print |
| print("FINISHED") |
| mainloop.quit() |
| return |
| |
| path = paths[0] |
| |
| print("\n--- Select Phonebook %s ---\n" % (path)) |
| pbap_client.interface().Select("int", path) |
| |
| print("\n--- GetSize ---\n") |
| ret = pbap_client.interface().GetSize() |
| print("Size = %d\n" % (ret)) |
| |
| print("\n--- List vCard ---\n") |
| try: |
| ret = pbap_client.interface().List(dbus.Dictionary()) |
| except: |
| ret = [] |
| |
| params = dbus.Dictionary({ "Format" : "vcard30", |
| "Fields" : ["PHOTO"] }) |
| for item in ret: |
| print("%s : %s" % (item[0], item[1])) |
| pbap_client.pull(item[0], params, |
| lambda x: process_result(x, None)) |
| |
| pbap_client.pull_all(params, lambda x: process_result(x, |
| "\n--- PullAll ---\n")) |
| |
| pbap_client.flush_transfers(lambda: test_paths(paths[1:])) |
| |
| test_paths(["PB", "ICH", "OCH", "MCH", "CCH", "SPD", "FAV"]) |
| |
| mainloop.run() |