blob: e0b831ed637ad05fbf3d65739cd8aefa9a893b9c [file] [log] [blame]
#!/usr/bin/python
"""Tests for ratchet.py."""
import os
import shutil
import tempfile
import time
import ratchet
import status
import test_common
from wvtest import wvtest
@test_common.wvtest
def condition_test():
"""Test basic Condition functionality."""
x = y = 0
callback_sink = []
cx = ratchet.Condition('cx', lambda: x != 0, 0)
cy = ratchet.Condition('cx', lambda: y != 0, 0.1,
callback=lambda: callback_sink.append([0]))
wvtest.WVEXCEPT(ratchet.TimeoutException, cx.check)
wvtest.WVFAIL(cy.check())
time.sleep(0.1)
wvtest.WVEXCEPT(ratchet.TimeoutException, cy.check)
x = 1
wvtest.WVEXCEPT(ratchet.TimeoutException, cx.check)
cx.reset()
wvtest.WVPASS(cx.check())
y = 1
cy.reset()
wvtest.WVPASS(cy.check())
wvtest.WVPASSEQ(len(callback_sink), 1)
# Callback shouldn't fire again.
wvtest.WVPASS(cy.check())
wvtest.WVPASSEQ(len(callback_sink), 1)
cy.reset()
wvtest.WVPASS(cy.check())
wvtest.WVPASSEQ(len(callback_sink), 2)
@test_common.wvtest
def file_condition_test():
"""Test File*Condition functionality."""
_, filename = tempfile.mkstemp()
c_exists = ratchet.FileExistsCondition('c exists', filename, 0.1)
c_touched = ratchet.FileTouchedCondition('c touched', filename, 0.1)
wvtest.WVPASS(c_exists.check())
wvtest.WVFAIL(c_touched.check())
# File mtime resolution isn't fine enough to see the difference between this
# write and the previous one, so sleep for a short time before writing to
# ensure a different mtime.
time.sleep(0.01)
open(filename, 'w')
wvtest.WVPASS(c_touched.check())
# Test that pre-existing files don't count.
c_touched.reset()
wvtest.WVFAIL(c_touched.check())
time.sleep(0.1)
wvtest.WVEXCEPT(ratchet.TimeoutException, c_touched.check)
# Test that deleting files doesn't count.
c_touched.reset()
wvtest.WVFAIL(c_touched.check())
os.unlink(filename)
wvtest.WVFAIL(c_touched.check())
@test_common.wvtest
def ratchet_test():
"""Test Ratchet functionality."""
class P(object):
X = 'X'
Y = 'Y'
Z = 'Z'
status.P = P
status.IMPLICATIONS = {}
status_export_path = tempfile.mkdtemp()
try:
x = y = z = 0
r = ratchet.Ratchet('test ratchet', [
ratchet.Condition('x', lambda: x, 0.1),
ratchet.Condition('y', lambda: y, 0.1),
ratchet.Condition('z', lambda: z, 0.1),
], status.Status('test ratchet', status_export_path))
x = y = 1
# Test that timeouts are not just summed, but start whenever the previous
# step completed.
wvtest.WVPASSEQ(r._current_step, 0) # pylint: disable=protected-access
wvtest.WVFAIL(os.path.isfile(os.path.join(status_export_path, 'X')))
wvtest.WVFAIL(os.path.isfile(os.path.join(status_export_path, 'Y')))
wvtest.WVFAIL(os.path.isfile(os.path.join(status_export_path, 'Z')))
r.start()
wvtest.WVPASS(os.path.isfile(os.path.join(status_export_path, 'X')))
wvtest.WVFAIL(os.path.isfile(os.path.join(status_export_path, 'Y')))
wvtest.WVFAIL(os.path.isfile(os.path.join(status_export_path, 'Z')))
time.sleep(0.05)
wvtest.WVFAIL(r.check())
wvtest.WVPASSEQ(r._current_step, 2) # pylint: disable=protected-access
wvtest.WVPASS(os.path.isfile(os.path.join(status_export_path, 'X')))
wvtest.WVPASS(os.path.isfile(os.path.join(status_export_path, 'Y')))
wvtest.WVPASS(os.path.isfile(os.path.join(status_export_path, 'Z')))
wvtest.WVFAIL(r.check())
wvtest.WVPASSEQ(r._current_step, 2) # pylint: disable=protected-access
time.sleep(0.1)
wvtest.WVEXCEPT(ratchet.TimeoutException, r.check)
x = y = z = 1
r.start()
wvtest.WVPASS(r.check())
finally:
shutil.rmtree(status_export_path)
if __name__ == '__main__':
wvtest.wvtest_main()