Replace DataForm implementation by Form implementation from PyXMPP

darcs-hash:20070213174122-86b55-54dc9d8ae9c4f354af0b46e597779744fbd97900.gz
This commit is contained in:
David Rousselie
2007-02-13 18:41:22 +01:00
parent b4ae16acc6
commit 826d045b41
8 changed files with 128 additions and 507 deletions

View File

@@ -38,12 +38,12 @@ from pyxmpp.iq import Iq
from pyxmpp.stanza import Stanza
from pyxmpp.presence import Presence
from pyxmpp.message import Message
from pyxmpp.jabber.dataforms import Form, Field, Option
from jcl.jabber.component import JCLComponent
from jcl.model import account
from jcl.model.account import Account
from jcl.lang import Lang
from jcl.jabber.x import DataForm
from tests.jcl.model.account import ExampleAccount, Example2Account
@@ -114,6 +114,10 @@ class MockStreamNoConnect(MockStream):
self.connection_started = True
self.eof = True
class MockStreamRaiseException(MockStream):
def connect(self):
raise Error("Test error")
class JCLComponent_TestCase(unittest.TestCase):
###########################################################################
# Utility methods
@@ -171,6 +175,7 @@ class JCLComponent_TestCase(unittest.TestCase):
pass
def test_run(self):
"""Test basic main loop execution"""
self.comp.time_unit = 1
# Do not loop, handle_tick is virtual
# Tests in subclasses might be more precise
@@ -184,7 +189,22 @@ class JCLComponent_TestCase(unittest.TestCase):
if self.comp.queue.qsize():
raise self.comp.queue.get(0)
def test_run_unhandled_error(self):
"""Test main loop unhandled error from a component handler"""
# TODO
self.comp.time_unit = 1
self.comp.stream = MockStreamNoConnect()
self.comp.stream_class = MockStreamNoConnect
self.comp.run()
self.assertTrue(self.comp.stream.connection_started)
threads = threading.enumerate()
self.assertEquals(len(threads), 1)
self.assertTrue(self.comp.stream.connection_stopped)
if self.comp.queue.qsize():
raise self.comp.queue.get(0)
def test_run_ni_handle_tick(self):
"""Test JCLComponent 'NotImplemented' error from handle_tick method"""
self.comp.time_unit = 1
self.comp.stream = MockStream()
self.comp.stream_class = MockStream
@@ -205,6 +225,7 @@ class JCLComponent_TestCase(unittest.TestCase):
NotImplementedError))
def test_run_go_offline(self):
"""Test main loop send offline presence when exiting"""
self.comp.stream = MockStream()
self.comp.stream_class = MockStream
self.comp.time_unit = 1
@@ -677,22 +698,22 @@ class JCLComponent_TestCase(unittest.TestCase):
password = "mypassword", \
store_password = False, \
test_enum = "choice3", \
test_int = 21)
test_int = 1)
account11 = ExampleAccount(user_jid = "user1@test.com", \
name = "account11", \
jid = "account11@jcl.test.com", \
login = "mylogin", \
password = "mypassword", \
login = "mylogin11", \
password = "mypassword11", \
store_password = False, \
test_enum = "choice3", \
test_int = 21)
test_enum = "choice2", \
test_int = 11)
account21 = ExampleAccount(user_jid = "user2@test.com", \
name = "account21", \
jid = "account21@jcl.test.com", \
login = "mylogin", \
password = "mypassword", \
login = "mylogin21", \
password = "mypassword21", \
store_password = False, \
test_enum = "choice3", \
test_enum = "choice1", \
test_int = 21)
del account.hub.threadConnection
self.comp.handle_get_register(Iq(stanza_type = "get", \
@@ -742,7 +763,7 @@ class JCLComponent_TestCase(unittest.TestCase):
self.assertEquals(field.prop("var"), "store_password")
self.assertEquals(field.prop("label"), "store_password")
self.assertEquals(field.children.name, "value")
self.assertEquals(field.children.content, "False")
self.assertEquals(field.children.content, "0")
field = fields[4]
self.assertEquals(field.prop("type"), "list-single")
self.assertEquals(field.prop("var"), "test_enum")
@@ -769,7 +790,7 @@ class JCLComponent_TestCase(unittest.TestCase):
self.assertEquals(field.prop("var"), "test_int")
self.assertEquals(field.prop("label"), "test_int")
self.assertEquals(field.children.name, "value")
self.assertEquals(field.children.content, "21")
self.assertEquals(field.children.content, "1")
def test_handle_get_register_new_type1(self):
self.comp.stream = MockStream()
@@ -780,7 +801,6 @@ class JCLComponent_TestCase(unittest.TestCase):
to_jid = "jcl.test.com/example"))
self.__check_get_register_new_type()
# TODO
def test_handle_get_register_new_type2(self):
self.comp.stream = MockStream()
self.comp.stream_class = MockStream
@@ -822,17 +842,15 @@ class JCLComponent_TestCase(unittest.TestCase):
def test_handle_set_register_new(self):
self.comp.stream = MockStream()
self.comp.stream_class = MockStream
x_data = DataForm()
x_data.xmlns = "jabber:x:data"
x_data.type = "submit"
x_data.add_field(field_type = "text-single", \
var = "name", \
value = "account1")
x_data = Form("submit")
x_data.add_field(name = "name", \
value = "account1", \
field_type = "text-single")
iq_set = Iq(stanza_type = "set", \
from_jid = "user1@test.com", \
to_jid = "jcl.test.com")
query = iq_set.new_query("jabber:iq:register")
x_data.attach_xml(query)
x_data.as_xml(query, None)
self.comp.handle_set_register(iq_set)
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
@@ -881,32 +899,30 @@ class JCLComponent_TestCase(unittest.TestCase):
self.comp.stream = MockStream()
self.comp.stream_class = MockStream
self.comp.account_classes = [ExampleAccount]
x_data = DataForm()
x_data.xmlns = "jabber:x:data"
x_data.type = "submit"
x_data.add_field(field_type = "text-single", \
var = "name", \
value = "account1")
x_data.add_field(field_type = "text-single", \
var = "login", \
value = "mylogin")
x_data.add_field(field_type = "text-private", \
var = "password", \
value = "mypassword")
x_data.add_field(field_type = "boolean", \
var = "store_password", \
value = "false")
x_data.add_field(field_type = "list-single", \
var = "test_enum", \
value = "choice3")
x_data.add_field(field_type = "text-single", \
var = "test_int", \
value = "43")
x_data = Form("submit")
x_data.add_field(name = "name", \
value = "account1", \
field_type = "text-single")
x_data.add_field(name = "login", \
value = "mylogin", \
field_type = "text-single")
x_data.add_field(name = "password", \
value = "mypassword", \
field_type = "text-private")
x_data.add_field(name = "store_password", \
value = False, \
field_type = "boolean")
x_data.add_field(name = "test_enum", \
value = "choice3", \
field_type = "list-single")
x_data.add_field(name = "test_int", \
value = 43, \
field_type = "text-single")
iq_set = Iq(stanza_type = "set", \
from_jid = "user1@test.com", \
to_jid = "jcl.test.com")
query = iq_set.new_query("jabber:iq:register")
x_data.attach_xml(query)
x_data.as_xml(query, None)
self.comp.handle_set_register(iq_set)
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
@@ -960,20 +976,18 @@ class JCLComponent_TestCase(unittest.TestCase):
self.comp.stream = MockStream()
self.comp.stream_class = MockStream
self.comp.account_classes = [ExampleAccount]
x_data = DataForm()
x_data.xmlns = "jabber:x:data"
x_data.type = "submit"
x_data.add_field(field_type = "text-single", \
var = "name", \
value = "account1")
x_data.add_field(field_type = "text-single", \
var = "login", \
value = "mylogin")
x_data = Form("submit")
x_data.add_field(name = "name", \
value = "account1", \
field_type = "text-single")
x_data.add_field(name = "login", \
value = "mylogin", \
field_type = "text-single")
iq_set = Iq(stanza_type = "set", \
from_jid = "user1@test.com", \
to_jid = "jcl.test.com")
query = iq_set.new_query("jabber:iq:register")
x_data.attach_xml(query)
x_data.as_xml(query)
self.comp.handle_set_register(iq_set)
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
@@ -995,14 +1009,12 @@ class JCLComponent_TestCase(unittest.TestCase):
def test_handle_set_register_new_name_mandatory(self):
self.comp.stream = MockStream()
self.comp.stream_class = MockStream
x_data = DataForm()
x_data.xmlns = "jabber:x:data"
x_data.type = "submit"
x_data = Form("submit")
iq_set = Iq(stanza_type = "set", \
from_jid = "user1@test.com", \
to_jid = "jcl.test.com")
query = iq_set.new_query("jabber:iq:register")
x_data.attach_xml(query)
x_data.as_xml(query)
self.comp.handle_set_register(iq_set)
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
@@ -1026,17 +1038,15 @@ class JCLComponent_TestCase(unittest.TestCase):
self.comp.stream = MockStream()
self.comp.stream_class = MockStream
self.comp.account_classes = [ExampleAccount]
x_data = DataForm()
x_data.xmlns = "jabber:x:data"
x_data.type = "submit"
x_data.add_field(field_type = "text-single", \
var = "name", \
value = "account1")
x_data = Form("submit")
x_data.add_field(name = "name", \
value = "account1", \
field_type = "text-single")
iq_set = Iq(stanza_type = "set", \
from_jid = "user1@test.com", \
to_jid = "jcl.test.com")
query = iq_set.new_query("jabber:iq:register")
x_data.attach_xml(query)
x_data.as_xml(query)
self.comp.handle_set_register(iq_set)
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
@@ -1077,32 +1087,30 @@ class JCLComponent_TestCase(unittest.TestCase):
test_enum = "choice1", \
test_int = 21)
del account.hub.threadConnection
x_data = DataForm()
x_data.xmlns = "jabber:x:data"
x_data.type = "submit"
x_data.add_field(field_type = "text-single", \
var = "name", \
value = "account1")
x_data.add_field(field_type = "text-single", \
var = "login", \
value = "mylogin2")
x_data.add_field(field_type = "text-private", \
var = "password", \
value = "mypassword2")
x_data.add_field(field_type = "boolean", \
var = "store_password", \
value = "false")
x_data.add_field(field_type = "list-single", \
var = "test_enum", \
value = "choice3")
x_data.add_field(field_type = "text-single", \
var = "test_int", \
value = "43")
x_data = Form("submit")
x_data.add_field(name = "name", \
value = "account1", \
field_type = "text-single")
x_data.add_field(name = "login", \
value = "mylogin2", \
field_type = "text-single")
x_data.add_field(name = "password", \
value = "mypassword2", \
field_type = "text-private")
x_data.add_field(name = "store_password", \
value = False, \
field_type = "boolean")
x_data.add_field(name = "test_enum", \
value = "choice3", \
field_type = "list-single")
x_data.add_field(name = "test_int", \
value = 43, \
field_type = "text-single")
iq_set = Iq(stanza_type = "set", \
from_jid = "user1@test.com", \
to_jid = "jcl.test.com")
query = iq_set.new_query("jabber:iq:register")
x_data.attach_xml(query)
x_data.as_xml(query)
self.comp.handle_set_register(iq_set)
account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)

View File

@@ -1,175 +0,0 @@
##
## test_x.py
## Login : David Rousselie <dax@happycoders.org>
## Started on Wed Nov 22 19:24:19 2006 David Rousselie
## $Id$
##
## Copyright (C) 2006 David Rousselie
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, write to the Free Software
## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
##
import unittest
from jcl.jabber.x import *
from pyxmpp.stanza import common_doc
import libxml2
class DataForm_TestCase(unittest.TestCase):
def setUp(self):
self.data_form = DataForm()
def tearDown(self):
self.data_form = None
def test_add_field(self):
self.data_form = DataForm()
field = self.data_form.add_field(field_type = "single-text", \
label = "Name", \
var = "name", \
value = "jcl")
self.assertEquals(self.data_form.fields["name"], field)
self.assertTrue(field in self.data_form.fields_tab)
self.assertEquals(field.type, "single-text")
self.assertEquals(field.label, "Name")
self.assertEquals(field.var, "name")
self.assertEquals(field.value, "jcl")
self.assertEquals(field.required, False)
def test_add_required_field(self):
self.data_form = DataForm()
field = self.data_form.add_field(field_type = "single-text", \
label = "Name", \
var = "name", \
value = "jcl", \
required = True)
self.assertEquals(self.data_form.fields["name"], field)
self.assertTrue(field in self.data_form.fields_tab)
self.assertEquals(field.type, "single-text")
self.assertEquals(field.label, "Name")
self.assertEquals(field.var, "name")
self.assertEquals(field.value, "jcl")
self.assertEquals(field.required, True)
def test_get_field_value(self):
self.data_form.add_field(field_type = "single-text", \
var = "name", \
value = "jcl")
self.assertEquals(self.data_form.get_field_value("name"), \
"jcl")
def test_get_field_value_not_exist(self):
self.assertEquals(self.data_form.get_field_value("name"), \
None)
def test_get_field_value_post_func(self):
self.data_form.add_field(field_type = "single-text", \
var = "name", \
value = "jcl")
self.assertEquals(self.data_form.get_field_value("name", \
post_func = (lambda value: "_" + value + "_")), \
"_jcl_")
def test_get_field_value_post_func(self):
self.assertEquals(self.data_form.get_field_value("name", \
default_func = (lambda field_name: "no '" + field_name + "' field")), \
"no 'name' field")
def test_attach_xml(self):
parent_node = common_doc.newChild(None, "iq", None)
self.data_form.title = "JCL Form"
self.data_form.instructions = "Fill the form"
self.data_form.add_field(label = "label1", \
var = "var1")
self.data_form.add_field(label = "label2", \
var = "var2")
self.data_form.xmlns = "jabber:x:data"
data_form_node = self.data_form.attach_xml(parent_node)
context = common_doc.xpathNewContext()
context.setContextNode(parent_node)
context.xpathRegisterNs("jxd", "jabber:x:data")
self.assertEquals(context.xpathEval("jxd:x/jxd:title")[0].content, "JCL Form")
self.assertEquals(context.xpathEval("jxd:x/jxd:instructions")[0].content, "Fill the form")
self.assertEquals(len(context.xpathEval("jxd:x/jxd:field")), 2)
self.assertEquals(context.xpathEval("jxd:x/jxd:field")[0].prop("type"), "fixed")
self.assertEquals(context.xpathEval("jxd:x/jxd:field")[0].prop("label"), "label1")
self.assertEquals(context.xpathEval("jxd:x/jxd:field")[0].prop("var"), "var1")
self.assertEquals(context.xpathEval("jxd:x/jxd:field")[1].prop("type"), "fixed")
self.assertEquals(context.xpathEval("jxd:x/jxd:field")[1].prop("label"), "label2")
self.assertEquals(context.xpathEval("jxd:x/jxd:field")[1].prop("var"), "var2")
context.xpathFreeContext()
def test_from_xml(self):
xml_buffer = "<x xmlns='jabber:x:data'>" \
"<field type='text-single' var='var1'><value>value1</value></field>" \
"<field type='text-single' var='var2'><value>value2</value></field>" \
"</x>"
xml_node = libxml2.parseMemory(xml_buffer, len(xml_buffer))
self.data_form.from_xml(xml_node.children)
self.assertEquals(len(self.data_form.fields_tab), 2)
field1 = self.data_form.fields["var1"]
self.assertEquals(field1.type, "text-single")
self.assertEquals(field1.var, "var1")
self.assertEquals(field1.value, "value1")
field2 = self.data_form.fields["var2"]
self.assertEquals(field2.type, "text-single")
self.assertEquals(field2.var, "var2")
self.assertEquals(field2.value, "value2")
class Field_TestCase(unittest.TestCase):
def test_get_xml_no_option_required(self):
field = Field(field_type = "text-single", \
label = "Name", \
var = "name", \
value = "myaccount", \
required = True)
parent_node = common_doc.newChild(None, "x", None)
xml_field = field.get_xml(parent_node)
self.assertEquals(xml_field.prop("type"), "text-single")
self.assertEquals(xml_field.prop("label"), "Name")
self.assertEquals(xml_field.prop("var"), "name")
self.assertEquals(xml_field.xpathEval("value")[0].content, "myaccount")
self.assertEquals(len(xml_field.xpathEval("required")), 1)
def test_get_xml_with_option(self):
field = Field(field_type = "text-single", \
label = "Name", \
var = "name", \
value = "myaccount")
field.add_option("test_option", "option_value")
parent_node = common_doc.newChild(None, "x", None)
xml_field = field.get_xml(parent_node)
self.assertEquals(xml_field.prop("type"), "text-single")
self.assertEquals(xml_field.prop("label"), "Name")
self.assertEquals(xml_field.prop("var"), "name")
self.assertEquals(xml_field.xpathEval("value")[0].content, "myaccount")
self.assertEquals(\
xml_field.xpathEval("option['label=test_option']/value")[0].content, \
"option_value")
def test_get_xml_no_parent(self):
field = Field(field_type = "text-single", \
label = "Name", \
var = "name", \
value = "myaccount")
self.assertRaises(Exception, field.get_xml, None)
class Option_TestCase(unittest.TestCase):
def test_get_xml(self):
option = Option("test_option", "option_value")
xml_option = option.get_xml(None)
self.assertEquals(xml_option.prop("label"), "test_option")
self.assertEquals(xml_option.xpathEval("value")[0].content, \
"option_value")

View File

@@ -48,7 +48,7 @@ class ExampleAccount(Account):
account.mandatory_field), \
("password", "text-private", None, password_post_func, \
(lambda field_name: None)), \
("store_password", "boolean", None, account.boolean_post_func, \
("store_password", "boolean", None, account.default_post_func, \
lambda field_name: True), \
("test_enum", "list-single", ["choice1", "choice2", "choice3"], \
account.string_not_null_post_func,\

View File

@@ -41,30 +41,6 @@ class AccountModule_TestCase(unittest.TestCase):
result = account.default_post_func("test")
self.assertEquals(result, "test")
def test_boolean_post_func_1(self):
result = account.boolean_post_func("1")
self.assertTrue(result)
def test_boolean_post_func_0(self):
result = account.boolean_post_func("0")
self.assertFalse(result)
def test_boolean_post_func_True(self):
result = account.boolean_post_func("True")
self.assertTrue(result)
def test_boolean_post_func_true(self):
result = account.boolean_post_func("true")
self.assertTrue(result)
def test_boolean_post_func_False(self):
result = account.boolean_post_func("False")
self.assertFalse(result)
def test_boolean_post_func_false(self):
result = account.boolean_post_func("false")
self.assertFalse(result)
def test_int_post_func(self):
result = account.int_post_func("42")
self.assertEquals(result, 42)