blob: ca9929f1d3c3118b228102a7feb608860a67731d [file] [log] [blame]
#!/usr/bin/python -u
from __future__ import absolute_import, print_function, unicode_literals
import dbus
import dbus.exceptions
import dbus.service
import dbus.mainloop.glib
import os
import subprocess
import sys
try:
from gi.repository import GObject
except ImportError:
import gobject as GObject
BLUEZ_BUS = 'org.bluez'
BLUEZ_ROOT_OBJ = '/org/bluez'
AGENT_INTF = 'org.bluez.Agent1'
AGENT_MGR_INTF = 'org.bluez.AgentManager1'
DEVICE_INTF = 'org.bluez.Device1'
OBJECT_MGR_INTF = 'org.freedesktop.DBus.ObjectManager'
PROPERTY_INTF = 'org.freedesktop.DBus.Properties'
AGENT_PATH = '/com/google/gfiber/agent'
AGENT_CAPABILITY = 'NoInputNoOutput'
DEV_NAME_GFRM100 = 'GFRM100'
DEV_NAME_GFRM200 = 'GFRM200'
DEV_NAME_GFRM210 = 'GFRM210'
# Early GFRM210s identified themselves as GFRM201
DEV_NAME_GFRM201 = 'GFRM201'
DEV_NAME_TIARC = 'HID AdvRemote'
DEV_OUI_TIARC = '90:59:AF'
g = {'bus': None, 'agent': None, 'adaptdir': None}
def dev_trusted(path):
bus = g['bus']
obj = bus.get_object(BLUEZ_BUS, path)
props = dbus.Interface(obj, PROPERTY_INTF)
props.Set(DEVICE_INTF, "Trusted", True)
class Rejected(dbus.DBusException):
_dbus_error_name = "org.bluez.Error.Rejected"
class Agent(dbus.service.Object):
exit_on_release = True
def set_exit_on_release(self, exit_on_release):
self.exit_on_release = exit_on_release
@dbus.service.method(AGENT_INTF, in_signature="", out_signature="")
def Release(self):
print("Release")
if self.exit_on_release:
mainloop.quit()
@dbus.service.method(AGENT_INTF, in_signature="os", out_signature="")
def AuthorizeService(self, device, uuid):
print("AuthorizeService (%s, %s)" % (device, uuid))
raise Rejected("Not implemented")
@dbus.service.method(AGENT_INTF, in_signature="o", out_signature="s")
def RequestPinCode(self, device):
print("RequestPinCode (%s)" % (device))
dev_trusted(device)
return '0000'
@dbus.service.method(AGENT_INTF, in_signature="o", out_signature="u")
def RequestPasskey(self, device):
print("RequestPasskey (%s)" % (device))
dev_trusted(device)
return dbus.UInt32('0000')
@dbus.service.method(AGENT_INTF, in_signature="ouq", out_signature="")
def DisplayPasskey(self, device, passkey, entered):
print("DisplayPasskey (%s, %06u entered %u)" %
(device, passkey, entered))
@dbus.service.method(AGENT_INTF, in_signature="os", out_signature="")
def DisplayPinCode(self, device, pincode):
print("DisplayPinCode (%s, %s)" % (device, pincode))
@dbus.service.method(AGENT_INTF, in_signature="ou", out_signature="")
def RequestConfirmation(self, device, passkey):
print("RequestConfirmation (%s, %06d)" % (device, passkey))
raise Rejected("Not implemented")
@dbus.service.method(AGENT_INTF, in_signature="o", out_signature="")
def RequestAuthorization(self, device):
print("RequestAuthorization (%s)" % (device))
raise Rejected("Not implemented")
@dbus.service.method(AGENT_INTF, in_signature="", out_signature="")
def Cancel(self):
print("Cancel")
def register_agent():
bus = g['bus']
obj = bus.get_object(BLUEZ_BUS, BLUEZ_ROOT_OBJ)
mgr = dbus.Interface(obj, AGENT_MGR_INTF)
mgr.RegisterAgent(AGENT_PATH, AGENT_CAPABILITY)
mgr.RequestDefaultAgent(AGENT_PATH)
print("Agent registered")
def unregister_agent():
bus = g['bus']
obj = bus.get_object(BLUEZ_BUS, BLUEZ_ROOT_OBJ)
mgr = dbus.Interface(obj, AGENT_MGR_INTF)
mgr.UnregisterAgent(AGENT_PATH)
print("Agent unregistered")
def set_le_adv_ind_filter(adap_path, dev_addr):
name = adap_path.split('/')[-1]
path = "/sys/kernel/debug/bluetooth/%s/le_adv_ind_filter" % name
try:
f = open(path, 'w')
f.write(dev_addr)
f.close()
except IOError as e:
print("%s" % e)
def dev_pair_and_connect(path):
bus = g['bus']
obj = bus.get_object(BLUEZ_BUS, path)
dev = dbus.Interface(obj, DEVICE_INTF)
props = dbus.Interface(obj, PROPERTY_INTF)
adap_path = props.Get(DEVICE_INTF, "Adapter")
paired = props.Get(DEVICE_INTF, "Paired")
if paired == True:
print("%s is already paired" % (path))
return
def dev_pair_reply():
print("Device pairing completed for %s" % path)
set_le_adv_ind_filter(adap_path, '00:00:00:00:00:00')
props.Set(DEVICE_INTF, "Trusted", True)
dev.Connect()
def dev_pair_error(error):
print("Device pairing failed for %s" % path)
set_le_adv_ind_filter(adap_path, '00:00:00:00:00:00')
err_name = error.get_dbus_name()
print("Device pairing error: %s" % err_name)
dev.CancelPairing()
dev_addr = props.Get(DEVICE_INTF, "Address")
set_le_adv_ind_filter(adap_path, dev_addr)
dev.Pair(reply_handler=dev_pair_reply, error_handler=dev_pair_error, timeout=15.0)
def get_name_and_addr(path):
bus = g['bus']
obj = bus.get_object(BLUEZ_BUS, path)
props = dbus.Interface(obj, PROPERTY_INTF)
addr = props.Get(DEVICE_INTF, "Address")
try:
name = props.Get(DEVICE_INTF, "Name")
except dbus.exceptions.DBusException:
name = ""
return (name, addr)
def is_gfiber_remote(name):
return (name == DEV_NAME_GFRM100 or
name == DEV_NAME_GFRM200 or
name == DEV_NAME_GFRM210 or
name == DEV_NAME_GFRM201)
def interfaces_added(path, interfaces):
if not DEVICE_INTF in interfaces:
return
(name, addr) = get_name_and_addr(path)
print("Discovered %s [%s] [%s]" % (path, addr, name))
if (is_gfiber_remote(name) or
name == DEV_NAME_TIARC or
addr.startswith(DEV_OUI_TIARC)):
print("Pair with %s [%s] [%s]" % (path, addr, name))
dev_pair_and_connect(path)
def properties_changed(interface, changed, invalidated, path):
if interface != DEVICE_INTF:
return
connected = changed.get('Connected', False)
if not connected:
return
(name, addr) = get_name_and_addr(path)
if is_gfiber_remote(name):
jsonfile = os.path.join(g['adaptdir'], addr, 'gfiber-inventory')
subprocess.call(["gfrm-inventory", addr, jsonfile])
def main():
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
g['bus'] = bus
g['agent'] = Agent(bus, AGENT_PATH)
if len(sys.argv) == 1:
print("usage: %s adaptpath" % sys.argv[0])
print(" where adaptpath is the adaptor directory like "
"/user/bluez/lib/bluetooth/F4:F5:E8:80:XX:XX")
sys.exit(1)
g['adaptdir'] = sys.argv[1]
register_agent()
bus.add_signal_receiver(interfaces_added, bus_name=BLUEZ_BUS,
dbus_interface=OBJECT_MGR_INTF,
signal_name="InterfacesAdded")
bus.add_signal_receiver(properties_changed,
dbus_interface=PROPERTY_INTF,
signal_name="PropertiesChanged",
arg0=DEVICE_INTF, path_keyword="path")
mainloop = GObject.MainLoop()
mainloop.run()
bus.remove_signal_receiver(interfaces_added, bus_name=BLUEZ_BUS,
dbus_interface=OBJECT_MGR_INTF,
signal_name="InterfacesAdded")
bus.remove_signal_receiver(properties_changed,
dbus_interface=PROPERTY_INTF,
signal_name="PropertiesChanged",
arg0=DEVICE_INTF, path_keyword="path")
unregister_agent()
if __name__ == '__main__':
main()