diff --git a/src/jcl/model/tests/__init__.py b/src/jcl/model/tests/__init__.py index f5bf7a5..460df5a 100644 --- a/src/jcl/model/tests/__init__.py +++ b/src/jcl/model/tests/__init__.py @@ -4,6 +4,7 @@ __revision__ = "" import unittest import threading import os +import tempfile import sys from sqlobject import SQLObject @@ -13,51 +14,63 @@ from sqlobject.dbconnection import TheURIOpener import jcl.model as model if sys.platform == "win32": - DB_PATH = "/c|/temp/jcl_test.db" + DB_DIR = "/c|/temp/" else: - DB_PATH = "/tmp/jcl_test.db" -DB_URL = DB_PATH# + "?debug=1&debugThreading=1" + DB_DIR = "/tmp/" -class MockSQLObject(SQLObject): +class MyMockSQLObject(SQLObject): _connection = model.hub string_attr = StringCol() class ModelModule_TestCase(unittest.TestCase): def setUp(self): - if os.path.exists(DB_PATH): - os.unlink(DB_PATH) - model.db_connection_str = 'sqlite://' + DB_URL + self.db_path = tempfile.mktemp("db", "jcltest", DB_DIR) + if os.path.exists(self.db_path): + os.unlink(self.db_path) + self.db_url = "sqlite://" + self.db_path + model.db_connection_str = self.db_url model.db_connect() - MockSQLObject.createTable(ifNotExists=True) + MyMockSQLObject.createTable(ifNotExists=True) model.db_disconnect() def tearDown(self): model.db_connect() - MockSQLObject.dropTable(ifExists=True) - del TheURIOpener.cachedURIs['sqlite://' + DB_URL] + MyMockSQLObject.dropTable(ifExists=True) + del TheURIOpener.cachedURIs[self.db_url] model.hub.threadConnection.close() model.db_disconnect() - if os.path.exists(DB_PATH): - os.unlink(DB_PATH) + if os.path.exists(self.db_path): + os.unlink(self.db_path) def test_multiple_db_connection(self): def create_account_thread(): - model.db_connect() - obj21 = MockSQLObject(string_attr="obj21") - obj22 = MockSQLObject(string_attr="obj22") - model.db_disconnect() - - model.db_connect() + for i in xrange(100): + string_attr = "obj2" + str(i) + model.db_connect() + obj = MyMockSQLObject(string_attr=string_attr) + model.db_disconnect() + model.db_connect() + obj2 = MyMockSQLObject.select(MyMockSQLObject.q.string_attr == string_attr) + model.db_disconnect() + self.assertEquals(obj, obj2[0]) timer_thread = threading.Thread(target=create_account_thread, name="CreateAccountThread") timer_thread.start() - obj11 = MockSQLObject(string_attr="obj11") - obj12 = MockSQLObject(string_attr="obj12") + for i in xrange(100): + string_attr = "obj1" + str(i) + model.db_connect() + obj = MyMockSQLObject(string_attr=string_attr) + model.db_disconnect() + model.db_connect() + obj2 = MyMockSQLObject.select(MyMockSQLObject.q.string_attr == string_attr) + model.db_disconnect() + self.assertEquals(obj, obj2[0]) timer_thread.join(2) threads = threading.enumerate() self.assertEquals(len(threads), 1) - objs = MockSQLObject.select() - self.assertEquals(objs.count(), 4) + model.db_connect() + objs = MyMockSQLObject.select() + self.assertEquals(objs.count(), 200) model.db_disconnect() def suite():