Improve threading handling in test

Use threading.Event() to interupt sleeping threads

darcs-hash:20080825172648-86b55-821f616c9b12dabaee9d74d7c93966b620832789.gz
This commit is contained in:
David Rousselie
2008-08-25 19:26:48 +02:00
parent d95702a940
commit 13e831ec35
5 changed files with 78 additions and 51 deletions

View File

@@ -355,7 +355,8 @@ class JCLCommandManager(CommandManager):
self.commands["http://jabber.org/protocol/admin#shutdown"] = \
(True, root_node_re)
self.commands["jcl#get-last-error"] = (False, account_node_re)
self.restart_thread = None
self.shutdown_thread = None
#self.commands["http://jabber.org/protocol/admin#get-user-password"] = True
#self.commands["http://jabber.org/protocol/admin#change-user-password"] = True
@@ -1140,6 +1141,10 @@ class JCLCommandManager(CommandManager):
###########################################################################
# restart command
###########################################################################
def sleep(self, delay):
"""Sleep for the number of second specified in delay"""
threading.Event().wait(delay)
def execute_restart_1(self, info_query, session_context,
command_node, lang_class):
self.add_actions(command_node, [ACTION_COMPLETE])
@@ -1191,11 +1196,12 @@ class JCLCommandManager(CommandManager):
body=announcement))
command_node.setProp("status", STATUS_COMPLETED)
def delayed_restart(self, delay):
threading.Event().wait(delay)
self.sleep(delay)
self.component.restart = True
restart_thread = threading.Thread(target=lambda : delayed_restart(self, delay),
name="TimerThread")
restart_thread.start()
self.restart_thread = threading.Thread(\
target=lambda : delayed_restart(self, delay),
name="TimerThread")
self.restart_thread.start()
return (None, result)
###########################################################################
@@ -1252,12 +1258,12 @@ class JCLCommandManager(CommandManager):
body=announcement))
command_node.setProp("status", STATUS_COMPLETED)
def delayed_shutdown(self, delay):
threading.Event().wait(delay)
self.sleep(delay)
self.component.running = False
shutdown_thread = threading.Thread(\
self.shutdown_thread = threading.Thread(\
target=lambda : delayed_shutdown(self, delay),
name="TimerThread")
shutdown_thread.start()
self.shutdown_thread.start()
return (None, result)
def execute_get_last_error_1(self, info_query, session_context,

View File

@@ -666,17 +666,18 @@ class JCLComponent(Component, object):
Call Component main loop
Clean up when shutting down JCLcomponent
"""
self.connect()
self.spool_dir += "/" + unicode(self.jid)
self.running = True
self.last_activity = int(time.time())
timer_thread = threading.Thread(target=self.time_handler,
name="TimerThread")
timer_thread.start()
wait_before_restart = 5
self._restart = True
timer_thread = None
self.running = True
try:
try:
self.connect()
self.spool_dir += "/" + unicode(self.jid)
self.last_activity = int(time.time())
timer_thread = threading.Thread(target=self.time_handler,
name="TimerThread")
timer_thread.start()
while (self.running and self.stream
and not self.stream.eof
and self.stream.socket is not None):
@@ -692,8 +693,9 @@ class JCLComponent(Component, object):
wait_before_restart = 0
else:
self.running = False
timer_thread.join(JCLComponent.timeout)
self.wait_event.set()
if timer_thread is not None:
self.wait_event.set()
timer_thread.join(JCLComponent.timeout)
if self.stream and not self.stream.eof \
and self.stream.socket is not None:
presences = self.account_manager.get_presence_all("unavailable")

View File

@@ -537,6 +537,7 @@ class JCLCommandManagerTestCase(JCLTestCase):
"5347",
self.config,
self.config_file)
self.comp.time_unit = 0
self.comp.set_admins(["admin@test.com"])
self.command_manager = JCLCommandManager(self.comp,
self.comp.account_manager)
@@ -2954,6 +2955,8 @@ class JCLCommandManagerRestartCommand_TestCase(JCLCommandManagerTestCase):
self.account22.status = "xa"
self.command_node.setProp("node",
"http://jabber.org/protocol/admin#restart")
self.wait_event = threading.Event()
self.command_manager.sleep = lambda delay: self.wait_event.wait(2)
def _common_execute_restart(self):
result = self.command_manager.apply_command_action(\
@@ -3017,7 +3020,8 @@ class JCLCommandManagerRestartCommand_TestCase(JCLCommandManagerTestCase):
self.assertTrue(self.comp.running)
threads = threading.enumerate()
self.assertEquals(len(threads), 2)
threading.Event().wait(2)
self.wait_event.set()
self.command_manager.restart_thread.join(1)
threads = threading.enumerate()
self.assertEquals(len(threads), 1)
self.assertTrue(self.comp.restart)
@@ -3068,7 +3072,8 @@ class JCLCommandManagerRestartCommand_TestCase(JCLCommandManagerTestCase):
self.assertTrue(self.comp.running)
threads = threading.enumerate()
self.assertEquals(len(threads), 2)
threading.Event().wait(2)
self.wait_event.set()
self.command_manager.restart_thread.join(1)
threads = threading.enumerate()
self.assertEquals(len(threads), 1)
self.assertTrue(self.comp.restart)
@@ -3103,6 +3108,8 @@ class JCLCommandManagerShutdownCommand_TestCase(JCLCommandManagerTestCase):
self.account22.status = "xa"
self.command_node.setProp("node",
"http://jabber.org/protocol/admin#shutdown")
self.wait_event = threading.Event()
self.command_manager.sleep = lambda delay: self.wait_event.wait(2)
def _common_execute_shutdown(self):
result = self.command_manager.apply_command_action(\
@@ -3166,7 +3173,8 @@ class JCLCommandManagerShutdownCommand_TestCase(JCLCommandManagerTestCase):
self.assertTrue(self.comp.running)
threads = threading.enumerate()
self.assertEquals(len(threads), 2)
threading.Event().wait(2)
self.wait_event.set()
self.command_manager.shutdown_thread.join(1)
threads = threading.enumerate()
self.assertEquals(len(threads), 1)
self.assertFalse(self.comp.restart)
@@ -3217,7 +3225,8 @@ class JCLCommandManagerShutdownCommand_TestCase(JCLCommandManagerTestCase):
self.assertTrue(self.comp.running)
threads = threading.enumerate()
self.assertEquals(len(threads), 2)
threading.Event().wait(2)
self.wait_event.set()
self.command_manager.shutdown_thread.join(1)
threads = threading.enumerate()
self.assertEquals(len(threads), 1)
self.assertFalse(self.comp.restart)

View File

@@ -110,7 +110,7 @@ class MockStream(object):
self.connection_stopped = True
def loop_iter(self, timeout):
time.sleep(timeout)
return
def close(self):
pass
@@ -165,6 +165,11 @@ class JCLComponent_TestCase(JCLTestCase):
###########################################################################
# Utility methods
###########################################################################
def _handle_tick_test_time_handler(self):
self.max_tick_count -= 1
if self.max_tick_count == 0:
self.comp.running = False
def setUp(self):
JCLTestCase.setUp(self, tables=[Account, LegacyJID, ExampleAccount,
Example2Account, User])
@@ -174,8 +179,10 @@ class JCLComponent_TestCase(JCLTestCase):
"5347",
None)
self.max_tick_count = 1
self.comp.time_unit = 0
self.saved_time_handler = None
class JCLComponent_All_TestCase(JCLComponent_TestCase):
###########################################################################
# Constructor tests
###########################################################################
@@ -228,11 +235,6 @@ class JCLComponent_TestCase(JCLTestCase):
###########################################################################
# 'time_handler' tests
###########################################################################
def _handle_tick_test_time_handler(self):
self.max_tick_count -= 1
if self.max_tick_count == 0:
self.comp.running = False
def test_time_handler(self):
self.comp.time_unit = 1
self.max_tick_count = 1
@@ -2931,12 +2933,10 @@ class JCLComponent_run_TestCase(JCLComponent_TestCase):
def test_run(self):
"""Test basic main loop execution"""
def do_nothing():
def end_run():
self.comp.running = False
return
self.comp.handle_tick = do_nothing
self.comp.time_unit = 1
# Do not loop, handle_tick is virtual
self.comp.handle_tick = end_run
self.comp.stream = MockStreamNoConnect()
self.comp.stream_class = MockStreamNoConnect
(result, time_to_wait) = self.comp.run()
@@ -2951,12 +2951,10 @@ class JCLComponent_run_TestCase(JCLComponent_TestCase):
def test_run_restart(self):
"""Test main loop execution with restart"""
def do_nothing():
def end_stream():
self.comp.stream.eof = True
return
self.comp.handle_tick = do_nothing
self.comp.time_unit = 1
# Do not loop, handle_tick is virtual
self.comp.handle_tick = end_stream
self.comp.stream = MockStreamNoConnect()
self.comp.stream_class = MockStreamNoConnect
self.comp.restart = True
@@ -2972,12 +2970,9 @@ class JCLComponent_run_TestCase(JCLComponent_TestCase):
def test_run_connection_failed(self):
"""Test when connection to Jabber server fails"""
class MockStreamLoopFailed(MockStream):
def connect(self):
self.connection_started = True
def loop_iter(self, timeout):
self.socket = None
raise socket.error
self.comp.time_unit = 1
# Do not loop, handle_tick is virtual
self.comp.stream = MockStreamLoopFailed()
self.comp.stream_class = MockStreamLoopFailed
@@ -2991,14 +2986,29 @@ class JCLComponent_run_TestCase(JCLComponent_TestCase):
self.assertEquals(len(threads), 1)
self.assertFalse(self.comp.stream.connection_stopped)
def test_run_startconnection_socketerror(self):
"""Test when connection to Jabber server fails when starting"""
class MockStreamConnectFail(MockStream):
def connect(self):
self.socket = None
raise socket.error
# Do not loop, handle_tick is virtual
self.comp.stream = MockStreamConnectFail()
self.comp.stream_class = MockStreamConnectFail
self.comp.restart = False
(result, time_to_wait) = self.comp.run()
self.assertEquals(time_to_wait, 5)
self.assertTrue(result)
self.assertFalse(self.comp.running)
threads = threading.enumerate()
self.assertEquals(len(threads), 1)
def test_run_connection_closed(self):
"""Test when connection to Jabber server is closed"""
def do_nothing():
def end_stream():
self.comp.stream.eof = True
return
self.comp.handle_tick = do_nothing
self.comp.time_unit = 1
# Do not loop, handle_tick is virtual
self.comp.handle_tick = end_stream
self.comp.stream = MockStreamNoConnect()
self.comp.stream_class = MockStreamNoConnect
self.comp.restart = False
@@ -3011,11 +3021,10 @@ class JCLComponent_run_TestCase(JCLComponent_TestCase):
self.assertEquals(len(threads), 1)
self.assertFalse(self.comp.stream.connection_stopped)
def test_run_unhandled_error(self): # TODO : why it works ?
def test_run_unhandled_error(self):
"""Test main loop unhandled error from a component handler"""
def do_nothing():
return
self.comp.time_unit = 1
self.comp.stream = MockStreamRaiseException()
self.comp.stream_class = MockStreamRaiseException
self.comp.handle_tick = do_nothing
@@ -3030,7 +3039,6 @@ class JCLComponent_run_TestCase(JCLComponent_TestCase):
def test_run_ni_handle_tick(self):
"""Test JCLComponent 'NotImplemented' error from handle_tick method"""
self.comp.time_unit = 1
self.comp.stream = MockStream()
self.comp.stream_class = MockStream
try:
@@ -3046,7 +3054,6 @@ class JCLComponent_run_TestCase(JCLComponent_TestCase):
"""Test main loop send offline presence when exiting"""
self.comp.stream = MockStream()
self.comp.stream_class = MockStream
self.comp.time_unit = 1
self.max_tick_count = 1
self.comp.handle_tick = self._handle_tick_test_time_handler
model.db_connect()
@@ -3286,7 +3293,7 @@ class AccountManager_TestCase(JCLTestCase):
def suite():
test_suite = unittest.TestSuite()
test_suite.addTest(unittest.makeSuite(JCLComponent_TestCase, 'test'))
test_suite.addTest(unittest.makeSuite(JCLComponent_All_TestCase, 'test'))
test_suite.addTest(unittest.makeSuite(JCLComponent_run_TestCase, 'test'))
test_suite.addTest(unittest.makeSuite(Handler_TestCase, 'test'))
test_suite.addTest(unittest.makeSuite(AccountManager_TestCase, 'test'))

View File

@@ -59,18 +59,21 @@ class FeederComponent_TestCase(JCLComponent_TestCase):
"5347",
None,
None)
self.comp.time_unit = 0
def test_run(self):
self.comp.time_unit = 1
def end_run():
self.comp.running = False
return
self.comp.handle_tick = end_run
self.comp.stream = MockStream()
self.comp.stream_class = MockStream
run_thread = threading.Thread(target=self.comp.run,
name="run_thread")
run_thread.start()
time.sleep(1)
self.comp.running = False
self.comp.wait_event.wait(JCLComponent.timeout)
run_thread.join(JCLComponent.timeout)
self.assertTrue(self.comp.stream.connection_started)
time.sleep(JCLComponent.timeout + 1)
threads = threading.enumerate()
self.assertEquals(len(threads), 1)
self.assertTrue(self.comp.stream.connection_stopped)