| #!/usr/bin/python |
| from __future__ import absolute_import, print_function, unicode_literals |
| import dbus |
| import dbus.service |
| import dbus.mainloop.glib |
| import gobject |
| import optparse |
| import sys |
| import os |
| |
| BUS_NAME = 'org.bluez' |
| ALERT_INTERFACE = 'org.bluez.Alert' |
| ALERT_AGENT_INTERFACE = 'org.bluez.AlertAgent' |
| BLUEZ_OBJECT_PATH = '/org/bluez' |
| TEST_OBJECT_PATH = '/org/bluez/test' |
| |
| class AlertAgent(dbus.service.Object): |
| def __init__(self, bus, object_path, alert, mainloop): |
| dbus.service.Object.__init__(self, bus, object_path) |
| self.alert = alert |
| self.mainloop = mainloop |
| |
| @dbus.service.method(ALERT_AGENT_INTERFACE, in_signature='', |
| out_signature='') |
| def MuteOnce(self): |
| print('method MuteOnce() was called') |
| self.alert.NewAlert('ringer', 1, 'not active') |
| |
| @dbus.service.method(ALERT_AGENT_INTERFACE, in_signature='s', |
| out_signature='') |
| def SetRinger(self, mode): |
| print('method SetRinger(%s) was called' % mode) |
| self.alert.NewAlert('ringer', 1, mode) |
| |
| @dbus.service.method(ALERT_AGENT_INTERFACE, in_signature='', |
| out_signature='') |
| def Release(self): |
| print('method Release() was called') |
| self.mainloop.quit() |
| |
| def print_command_line(options): |
| if not options.verbose: |
| return False |
| |
| print('-w: ' + str(options.wait)) |
| |
| if options.times: |
| print('-t: ' + str(options.times)) |
| |
| if options.register: |
| print('-r: ' + options.register) |
| else: |
| print('-r: ' + str(None)) |
| |
| if options.new_alert: |
| print('-n:') |
| for i in options.new_alert: |
| print(' ' + i[0] + ', ' + i[1] + ', ' + i[2]) |
| else: |
| print('-n: ' + str(None)) |
| |
| if options.unread_alert: |
| print('-u:') |
| for i in options.unread_alert: |
| print(' ' + i[0] + ', ' + i[1]) |
| else: |
| print('-u: ' + str(None)) |
| |
| print() |
| |
| return True |
| |
| def read_count(param): |
| try: |
| return int(param) |
| except ValueError: |
| print('<count> must be integer, not \"%s\"' % param) |
| sys.exit(1) |
| |
| def new_alert(alert, params): |
| if not params: |
| return False |
| |
| for param in params: |
| category = param[0] |
| count = read_count(param[1]) |
| description = param[2] |
| |
| alert.NewAlert(category, count, description) |
| |
| def unread_alert(alert, params): |
| if not params: |
| return False |
| |
| for param in params: |
| category = param[0] |
| count = read_count(param[1]) |
| |
| alert.UnreadAlert(category, count) |
| |
| option_list = [ |
| optparse.make_option('-v', None, |
| action = 'store_true', |
| default = False, |
| dest = 'verbose', |
| help = 'verbose'), |
| |
| optparse.make_option('-w', None, |
| action = 'store_true', |
| default = False, |
| dest = 'wait', |
| help = 'wait for dbus events'), |
| |
| optparse.make_option('-t', None, |
| action = 'store', |
| default = 1, |
| type = "int", |
| dest = 'times', |
| help = 'repeat UnreadAlert/NewAlert <times> times', |
| metavar = '<times>'), |
| |
| optparse.make_option('-r', None, |
| action = 'store', |
| dest = 'register', |
| type = 'string', |
| metavar = '<category>', |
| help = 'register alert'), |
| |
| optparse.make_option('-n', None, |
| action = 'append', |
| dest = 'new_alert', |
| type = 'string', |
| nargs = 3, |
| metavar = '<category> <count> <description>', |
| help = 'send new alert'), |
| |
| optparse.make_option('-u', None, |
| action = 'append', |
| dest = 'unread_alert', |
| type = 'string', |
| nargs = 2, |
| metavar = '<category> <count>', |
| help = 'send unread alert'), |
| ] |
| |
| parser = optparse.OptionParser(option_list=option_list) |
| parser.disable_interspersed_args() |
| (options, args) = parser.parse_args() |
| |
| dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) |
| bus = dbus.SystemBus() |
| mainloop = gobject.MainLoop() |
| alert = dbus.Interface(bus.get_object(BUS_NAME, BLUEZ_OBJECT_PATH), |
| ALERT_INTERFACE) |
| alert_agent = AlertAgent(bus, TEST_OBJECT_PATH, alert, mainloop) |
| |
| print_command_line(options) |
| |
| if not (options.register or options.new_alert or options.unread_alert or |
| options.wait): |
| parser.print_usage() |
| sys.exit(1) |
| |
| if options.register: |
| alert.RegisterAlert(options.register, TEST_OBJECT_PATH) |
| |
| times = 0 |
| while times < options.times: |
| times += 1 |
| |
| new_alert(alert, options.new_alert) |
| unread_alert(alert, options.unread_alert) |
| |
| if not options.wait: |
| sys.exit(0) |
| |
| try: |
| mainloop.run() |
| except: |
| pass |