| #!/usr/bin/python |
| # Copyright 2015 Google Inc. All Rights Reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| """Tests for WifiblasterController.""" |
| |
| import glob |
| import os |
| import random |
| import shutil |
| import sys |
| import tempfile |
| import time |
| import waveguide |
| from wvtest import wvtest |
| |
| |
| @wvtest.wvtest |
| def ReadParameterTest(): |
| d = tempfile.mkdtemp() |
| try: |
| wc = waveguide.WifiblasterController([], d) |
| testfile = os.path.join(d, 'wifiblaster.name') |
| |
| # Result should be equal in value and type (unless None). |
| testcases = [('1\n', int, 1), |
| ('1.5\n', float, 1.5), |
| ('True\n', str, 'True\n'), |
| ('x\n', int, None)] |
| for (read_data, typecast, expected) in testcases: |
| open(testfile, 'w').write(read_data) |
| result = wc._ReadParameter('name', typecast) |
| wvtest.WVPASSEQ(result, expected) |
| if result: |
| wvtest.WVPASS(isinstance(result, typecast)) |
| |
| # Make sure it works if the file isn't readable |
| os.chmod(testfile, 0) |
| result = wc._ReadParameter('name', int) |
| wvtest.WVPASSEQ(result, None) |
| |
| # and if it generally doesn't exist |
| os.unlink(testfile) |
| result = wc._ReadParameter('name', str) |
| wvtest.WVPASSEQ(result, None) |
| finally: |
| os.rmdir(d) |
| |
| |
| @wvtest.wvtest |
| def LogWifiblasterResultsTest(): |
| old_wifiblaster_dir = waveguide.WIFIBLASTER_DIR |
| waveguide.WIFIBLASTER_DIR = tempfile.mkdtemp() |
| try: |
| wc = waveguide.WifiblasterController([], '') |
| |
| # Set the consensus key to a known value. |
| waveguide.consensus_key = 16 * 'x' |
| |
| stdout = ('version=1 mac=11:11:11:11:11:11 throughput=10000000 ' |
| 'samples=5000000,15000000\n' |
| 'malformed 11:11:11:11:11:11 but has macs 11:11:11:11:11:11\n') |
| expected = [('version=1 mac=11:11:11:11:11:11 throughput=10000000 ' |
| 'samples=5000000,15000000'), |
| 'malformed 11:11:11:11:11:11 but has macs 11:11:11:11:11:11'] |
| filename = os.path.join(waveguide.WIFIBLASTER_DIR, '11:11:11:11:11:11') |
| # Should save only the second line, with leading timestamp |
| wc._HandleResults(2, stdout, 'error') |
| wvtest.WVPASSEQ(open(filename).read().split(' ', 1)[1], expected[1]) |
| # Should save the first line, with leading timestamp |
| wc._HandleResults(0, stdout.split('\n')[0], 'error 2') |
| wvtest.WVPASSEQ(open(filename).read().split(' ', 1)[1], expected[0]) |
| |
| finally: |
| shutil.rmtree(waveguide.WIFIBLASTER_DIR) |
| waveguide.WIFIBLASTER_DIR = old_wifiblaster_dir |
| |
| |
| class Empty(object): |
| pass |
| |
| |
| @wvtest.wvtest |
| def PollTest(): |
| d = tempfile.mkdtemp() |
| old_wifiblaster_dir = waveguide.WIFIBLASTER_DIR |
| waveguide.WIFIBLASTER_DIR = tempfile.mkdtemp() |
| oldexpovariate = random.expovariate |
| oldpath = os.environ['PATH'] |
| oldtime = time.time |
| |
| def FakeTime(): |
| faketime[0] += 1 |
| return faketime[0] |
| |
| try: |
| random.expovariate = lambda lambd: random.uniform(0, 2 * 1.0 / lambd) |
| time.time = FakeTime |
| os.environ['PATH'] = 'fake:' + os.environ['PATH'] |
| sys.path.insert(0, 'fake') |
| waveguide.opt = Empty() |
| waveguide.opt.status_dir = d |
| |
| for flags in [{'phyname': 'phy-22:22', 'vdevname': 'wlan-22:22', |
| 'high_power': True, 'tv_box': False}, |
| {'phyname': 'phy-22:22', 'vdevname': 'wlan-22:22', |
| 'high_power': False, 'tv_box': True}]: |
| # Reset time and config directory before every test run. |
| faketime = [-1] |
| for f in glob.glob(os.path.join(d, '*')): |
| os.remove(f) |
| |
| managers = [] |
| wc = waveguide.WifiblasterController(managers, d) |
| manager = waveguide.WlanManager(wifiblaster_controller=wc, **flags) |
| managers.append(manager) |
| manager.UpdateStationInfo() |
| |
| def WriteConfig(k, v): |
| open(os.path.join(d, 'wifiblaster.%s' % k), 'w').write(v) |
| |
| WriteConfig('duration', '.1') |
| WriteConfig('enable', 'False') |
| WriteConfig('fraction', '10') |
| WriteConfig('interval', '10') |
| WriteConfig('measureall', '0') |
| WriteConfig('size', '1470') |
| |
| def CountRuns(): |
| try: |
| v = open('fake/wifiblaster.out').readlines() |
| except IOError: |
| return 0 |
| else: |
| os.unlink('fake/wifiblaster.out') |
| return len(v) |
| |
| # Get rid of any leftovers. |
| CountRuns() |
| |
| # Disabled. |
| # No measurements should be run. |
| print manager.GetState() |
| for t in xrange(0, 100): |
| wc.Poll(t) |
| wvtest.WVPASSEQ(CountRuns(), 0) |
| |
| # Enabled. |
| # The first measurement should be one cycle later than the start time. |
| # This is not an implementation detail: it prevents multiple APs from |
| # running simultaneous measurements if measurements are enabled at the |
| # same time. |
| WriteConfig('enable', 'True') |
| wc.Poll(100) |
| wvtest.WVPASSGT(wc.NextMeasurement(), 100) |
| for t in xrange(101, 200): |
| wc.Poll(t) |
| wvtest.WVPASSGE(CountRuns(), 1) |
| |
| # Invalid parameter. |
| # Disabled. No measurements should be run. |
| WriteConfig('duration', '-1') |
| for t in xrange(200, 300): |
| wc.Poll(t) |
| wvtest.WVPASSEQ(CountRuns(), 0) |
| |
| # Fix invalid parameter. |
| # Enabled again with 10 second average interval. |
| WriteConfig('duration', '.1') |
| for t in xrange(300, 400): |
| wc.Poll(t) |
| wvtest.WVPASSGE(CountRuns(), 1) |
| |
| # Next poll should be in at most one second regardless of interval. |
| wvtest.WVPASSLE(wc.NextTimeout(), 400) |
| |
| # Enabled with shorter average interval. The change in interval should |
| # trigger a change in next poll timeout. |
| WriteConfig('interval', '0.5') |
| old_to = wc.NextMeasurement() |
| wc.Poll(400) |
| wvtest.WVPASSNE(old_to, wc.NextMeasurement()) |
| for t in xrange(401, 500): |
| wc.Poll(t) |
| wvtest.WVPASSGE(CountRuns(), 1) |
| |
| # Request all clients to be measured and make sure it only happens once. |
| # Disable automated measurement so they are not counted. |
| WriteConfig('interval', '0') |
| WriteConfig('measureall', str(faketime[0])) |
| wc.Poll(500) |
| wvtest.WVPASSEQ(CountRuns(), 1) |
| wc.Poll(501) |
| wvtest.WVPASSEQ(CountRuns(), 0) |
| |
| # Measure on association only if enabled. |
| wc.MeasureOnAssociation(manager.vdevname, |
| manager.GetState().assoc[0].mac) |
| wvtest.WVPASSEQ(CountRuns(), 0) |
| WriteConfig('onassociation', 'True') |
| wc.MeasureOnAssociation(manager.vdevname, |
| manager.GetState().assoc[0].mac) |
| wvtest.WVPASSEQ(CountRuns(), 1) |
| finally: |
| random.expovariate = oldexpovariate |
| time.time = oldtime |
| shutil.rmtree(d) |
| os.environ['PATH'] = oldpath |
| shutil.rmtree(waveguide.WIFIBLASTER_DIR) |
| waveguide.WIFIBLASTER_DIR = old_wifiblaster_dir |
| |
| |
| if __name__ == '__main__': |
| wvtest.wvtest_main() |