more db concurrency test

darcs-hash:20070807055128-86b55-615b33f531773d8eea4a6cd289ffc252aeb228fa.gz
This commit is contained in:
David Rousselie
2007-08-07 07:51:28 +02:00
parent 903d2a98e7
commit 947688056b

View File

@@ -4,6 +4,7 @@ __revision__ = ""
import unittest
import threading
import os
import tempfile
import sys
from sqlobject import SQLObject
@@ -13,51 +14,63 @@ from sqlobject.dbconnection import TheURIOpener
import jcl.model as model
if sys.platform == "win32":
DB_PATH = "/c|/temp/jcl_test.db"
DB_DIR = "/c|/temp/"
else:
DB_PATH = "/tmp/jcl_test.db"
DB_URL = DB_PATH# + "?debug=1&debugThreading=1"
DB_DIR = "/tmp/"
class MockSQLObject(SQLObject):
class MyMockSQLObject(SQLObject):
_connection = model.hub
string_attr = StringCol()
class ModelModule_TestCase(unittest.TestCase):
def setUp(self):
if os.path.exists(DB_PATH):
os.unlink(DB_PATH)
model.db_connection_str = 'sqlite://' + DB_URL
self.db_path = tempfile.mktemp("db", "jcltest", DB_DIR)
if os.path.exists(self.db_path):
os.unlink(self.db_path)
self.db_url = "sqlite://" + self.db_path
model.db_connection_str = self.db_url
model.db_connect()
MockSQLObject.createTable(ifNotExists=True)
MyMockSQLObject.createTable(ifNotExists=True)
model.db_disconnect()
def tearDown(self):
model.db_connect()
MockSQLObject.dropTable(ifExists=True)
del TheURIOpener.cachedURIs['sqlite://' + DB_URL]
MyMockSQLObject.dropTable(ifExists=True)
del TheURIOpener.cachedURIs[self.db_url]
model.hub.threadConnection.close()
model.db_disconnect()
if os.path.exists(DB_PATH):
os.unlink(DB_PATH)
if os.path.exists(self.db_path):
os.unlink(self.db_path)
def test_multiple_db_connection(self):
def create_account_thread():
model.db_connect()
obj21 = MockSQLObject(string_attr="obj21")
obj22 = MockSQLObject(string_attr="obj22")
model.db_disconnect()
model.db_connect()
for i in xrange(100):
string_attr = "obj2" + str(i)
model.db_connect()
obj = MyMockSQLObject(string_attr=string_attr)
model.db_disconnect()
model.db_connect()
obj2 = MyMockSQLObject.select(MyMockSQLObject.q.string_attr == string_attr)
model.db_disconnect()
self.assertEquals(obj, obj2[0])
timer_thread = threading.Thread(target=create_account_thread,
name="CreateAccountThread")
timer_thread.start()
obj11 = MockSQLObject(string_attr="obj11")
obj12 = MockSQLObject(string_attr="obj12")
for i in xrange(100):
string_attr = "obj1" + str(i)
model.db_connect()
obj = MyMockSQLObject(string_attr=string_attr)
model.db_disconnect()
model.db_connect()
obj2 = MyMockSQLObject.select(MyMockSQLObject.q.string_attr == string_attr)
model.db_disconnect()
self.assertEquals(obj, obj2[0])
timer_thread.join(2)
threads = threading.enumerate()
self.assertEquals(len(threads), 1)
objs = MockSQLObject.select()
self.assertEquals(objs.count(), 4)
model.db_connect()
objs = MyMockSQLObject.select()
self.assertEquals(objs.count(), 200)
model.db_disconnect()
def suite():