Better handlers exceptions handling

darcs-hash:20070321072742-86b55-56b0ae25d0a3e8835100a6b570aeb271ad912fe7.gz
This commit is contained in:
David Rousselie
2007-03-21 08:27:42 +01:00
parent 244a82f38a
commit 2791e3cd61
3 changed files with 33 additions and 43 deletions

View File

@@ -115,8 +115,8 @@ class MockStreamNoConnect(MockStream):
self.eof = True
class MockStreamRaiseException(MockStream):
def connect(self):
raise Error("Test error")
def loop_iter(self, timeout):
raise Exception("in loop error")
class JCLComponent_TestCase(unittest.TestCase):
###########################################################################
@@ -191,38 +191,35 @@ class JCLComponent_TestCase(unittest.TestCase):
def test_run_unhandled_error(self):
"""Test main loop unhandled error from a component handler"""
# TODO
def do_nothing():
return
self.comp.time_unit = 1
self.comp.stream = MockStreamNoConnect()
self.comp.stream_class = MockStreamNoConnect
self.comp.run()
self.assertTrue(self.comp.stream.connection_started)
threads = threading.enumerate()
self.assertEquals(len(threads), 1)
self.assertTrue(self.comp.stream.connection_stopped)
if self.comp.queue.qsize():
raise self.comp.queue.get(0)
self.comp.stream = MockStreamRaiseException()
self.comp.stream_class = MockStreamRaiseException
self.comp.handle_tick = do_nothing
try:
self.comp.run()
except Exception, e:
threads = threading.enumerate()
self.assertEquals(len(threads), 1)
self.assertTrue(self.comp.stream.connection_stopped)
return
self.fail("No exception caught")
def test_run_ni_handle_tick(self):
"""Test JCLComponent 'NotImplemented' error from handle_tick method"""
self.comp.time_unit = 1
self.comp.stream = MockStream()
self.comp.stream_class = MockStream
self.saved_time_handler = self.comp.time_handler
self.comp.time_handler = self.__comp_time_handler
run_thread = threading.Thread(target = self.__comp_run, \
name = "run_thread")
run_thread.start()
time.sleep(1)
self.comp.running = False
self.assertTrue(self.comp.stream.connection_started)
time.sleep(1)
threads = threading.enumerate()
self.assertEquals(len(threads), 1)
self.assertTrue(self.comp.stream.connection_stopped)
self.assertEquals(self.comp.queue.qsize(), 1)
self.assertTrue(isinstance(self.comp.queue.get(0), \
NotImplementedError))
try:
self.comp.run()
except NotImplementedError, e:
threads = threading.enumerate()
self.assertEquals(len(threads), 1)
self.assertTrue(self.comp.stream.connection_stopped)
return
self.fail("No exception caught")
def test_run_go_offline(self):
"""Test main loop send offline presence when exiting"""