| #!/usr/bin/python |
| |
| from __future__ import absolute_import, print_function, unicode_literals |
| |
| from optparse import OptionParser, make_option |
| import os |
| from socket import SOCK_SEQPACKET, socket |
| import sys |
| import dbus |
| import dbus.service |
| import dbus.mainloop.glib |
| import glib |
| try: |
| from gi.repository import GObject |
| except ImportError: |
| import gobject as GObject |
| |
| mainloop = None |
| audio_supported = True |
| |
| try: |
| from socket import AF_BLUETOOTH, BTPROTO_SCO |
| except: |
| print("WARNING: python compiled without Bluetooth support" |
| " - audio will not be available") |
| audio_supported = False |
| |
| BUF_SIZE = 1024 |
| |
| BDADDR_ANY = '00:00:00:00:00:00' |
| |
| HF_NREC = 0x0001 |
| HF_3WAY = 0x0002 |
| HF_CLI = 0x0004 |
| HF_VOICE_RECOGNITION = 0x0008 |
| HF_REMOTE_VOL = 0x0010 |
| HF_ENHANCED_STATUS = 0x0020 |
| HF_ENHANCED_CONTROL = 0x0040 |
| HF_CODEC_NEGOTIATION = 0x0080 |
| |
| AG_3WAY = 0x0001 |
| AG_NREC = 0x0002 |
| AG_VOICE_RECOGNITION = 0x0004 |
| AG_INBAND_RING = 0x0008 |
| AG_VOICE_TAG = 0x0010 |
| AG_REJECT_CALL = 0x0020 |
| AG_ENHANCED_STATUS = 0x0040 |
| AG_ENHANCED_CONTROL = 0x0080 |
| AG_EXTENDED_RESULT = 0x0100 |
| AG_CODEC_NEGOTIATION = 0x0200 |
| |
| HF_FEATURES = (HF_3WAY | HF_CLI | HF_VOICE_RECOGNITION | |
| HF_REMOTE_VOL | HF_ENHANCED_STATUS | |
| HF_ENHANCED_CONTROL | HF_CODEC_NEGOTIATION) |
| |
| AVAIL_CODECS = "1,2" |
| |
| class HfpConnection: |
| slc_complete = False |
| fd = None |
| io_id = 0 |
| version = 0 |
| features = 0 |
| pending = None |
| |
| def disconnect(self): |
| if (self.fd >= 0): |
| os.close(self.fd) |
| self.fd = -1 |
| glib.source_remove(self.io_id) |
| self.io_id = 0 |
| |
| def slc_completed(self): |
| print("SLC establisment complete") |
| self.slc_complete = True |
| |
| def slc_next_cmd(self, cmd): |
| if not cmd: |
| self.send_cmd("AT+BRSF=%u" % (HF_FEATURES)) |
| elif (cmd.startswith("AT+BRSF")): |
| if (self.features & AG_CODEC_NEGOTIATION and |
| HF_FEATURES & HF_CODEC_NEGOTIATION): |
| self.send_cmd("AT+BAC=%s" % (AVAIL_CODECS)) |
| else: |
| self.send_cmd("AT+CIND=?") |
| elif (cmd.startswith("AT+BAC")): |
| self.send_cmd("AT+CIND=?") |
| elif (cmd.startswith("AT+CIND=?")): |
| self.send_cmd("AT+CIND?") |
| elif (cmd.startswith("AT+CIND?")): |
| self.send_cmd("AT+CMER=3,0,0,1") |
| elif (cmd.startswith("AT+CMER=")): |
| if (HF_FEATURES & HF_3WAY and self.features & AG_3WAY): |
| self.send_cmd("AT+CHLD=?") |
| else: |
| self.slc_completed() |
| elif (cmd.startswith("AT+CHLD=?")): |
| self.slc_completed() |
| else: |
| print("Unknown SLC command completed: %s" % (cmd)) |
| |
| def io_cb(self, fd, cond): |
| buf = os.read(fd, BUF_SIZE) |
| buf = buf.strip() |
| |
| print("Received: %s" % (buf)) |
| |
| if (buf == "OK" or buf == "ERROR"): |
| cmd = self.pending |
| self.pending = None |
| |
| if (not self.slc_complete): |
| self.slc_next_cmd(cmd) |
| |
| return True |
| |
| parts = buf.split(':') |
| |
| if (parts[0] == "+BRSF"): |
| self.features = int(parts[1]) |
| |
| return True |
| |
| def send_cmd(self, cmd): |
| if (self.pending): |
| print("ERROR: Another command is pending") |
| return |
| |
| print("Sending: %s" % (cmd)) |
| |
| os.write(self.fd, cmd + "\r\n") |
| self.pending = cmd |
| |
| def __init__(self, fd, version, features): |
| self.fd = fd |
| self.version = version |
| self.features = features |
| |
| print("Version 0x%04x Features 0x%04x" % (version, features)) |
| |
| self.io_id = glib.io_add_watch(fd, glib.IO_IN, self.io_cb) |
| |
| self.slc_next_cmd(None) |
| |
| class HfpProfile(dbus.service.Object): |
| sco_socket = None |
| io_id = 0 |
| conns = {} |
| |
| def sco_cb(self, sock, cond): |
| (sco, peer) = sock.accept() |
| print("New SCO connection from %s" % (peer)) |
| |
| def init_sco(self, sock): |
| self.sco_socket = sock |
| self.io_id = glib.io_add_watch(sock, glib.IO_IN, self.sco_cb) |
| |
| def __init__(self, bus, path, sco): |
| dbus.service.Object.__init__(self, bus, path) |
| |
| if sco: |
| self.init_sco(sco) |
| |
| @dbus.service.method("org.bluez.Profile1", |
| in_signature="", out_signature="") |
| def Release(self): |
| print("Release") |
| mainloop.quit() |
| |
| @dbus.service.method("org.bluez.Profile1", |
| in_signature="", out_signature="") |
| def Cancel(self): |
| print("Cancel") |
| |
| @dbus.service.method("org.bluez.Profile1", |
| in_signature="o", out_signature="") |
| def RequestDisconnection(self, path): |
| conn = self.conns.pop(path) |
| conn.disconnect() |
| |
| @dbus.service.method("org.bluez.Profile1", |
| in_signature="oha{sv}", out_signature="") |
| def NewConnection(self, path, fd, properties): |
| fd = fd.take() |
| version = 0x0105 |
| features = 0 |
| print("NewConnection(%s, %d)" % (path, fd)) |
| for key in properties.keys(): |
| if key == "Version": |
| version = properties[key] |
| elif key == "Features": |
| features = properties[key] |
| |
| conn = HfpConnection(fd, version, features) |
| |
| self.conns[path] = conn |
| |
| if __name__ == '__main__': |
| dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) |
| |
| bus = dbus.SystemBus() |
| |
| manager = dbus.Interface(bus.get_object("org.bluez", |
| "/org/bluez"), "org.bluez.ProfileManager1") |
| |
| option_list = [ |
| make_option("-p", "--path", action="store", |
| type="string", dest="path", |
| default="/bluez/test/hfp"), |
| make_option("-n", "--name", action="store", |
| type="string", dest="name", |
| default=None), |
| make_option("-C", "--channel", action="store", |
| type="int", dest="channel", |
| default=None), |
| ] |
| |
| parser = OptionParser(option_list=option_list) |
| |
| (options, args) = parser.parse_args() |
| |
| mainloop = GObject.MainLoop() |
| |
| opts = { |
| "Version" : dbus.UInt16(0x0106), |
| "Features" : dbus.UInt16(HF_FEATURES), |
| } |
| |
| if (options.name): |
| opts["Name"] = options.name |
| |
| if (options.channel is not None): |
| opts["Channel"] = dbus.UInt16(options.channel) |
| |
| if audio_supported: |
| sco = socket(AF_BLUETOOTH, SOCK_SEQPACKET, BTPROTO_SCO) |
| sco.bind(BDADDR_ANY) |
| sco.listen(1) |
| else: |
| sco = None |
| |
| profile = HfpProfile(bus, options.path, sco) |
| |
| manager.RegisterProfile(options.path, "hfp-hf", opts) |
| |
| print("Profile registered - waiting for connections") |
| |
| mainloop.run() |