1: #
2: # Tests TMRegistry and ProxyTM
3: #
4:
5: import os, sys
6: if __name__ == '__main__':
7: execfile(os.path.join(sys.path[0], 'framework.py'))
8:
9: from Testing import ZopeTestCase
10:
11: from OFS.SimpleItem import SimpleItem
12: from Products.ExtFile import transaction
13: from Products.ExtFile.TM import TMRegistry, registry
14: from Products.ExtFile.TM import ProxyTM
15:
16:
17: class Target(SimpleItem):
18: begin_called = 0
19: finish_called = 0
20: abort_called = 0
21: def _begin(self):
22: self.begin_called = 1
23: def _finish(self):
24: self.finish_called = 1
25: def _abort(self):
26: self.abort_called = 1
27:
28: def tr():
29: t = transaction.get()
30: if hasattr(t, '_objects'):
31: return t._objects # ZODB <= 3.2
32: elif hasattr(t, '_resources'):
33: return t._resources # ZODB >= 3.4
34:
35:
36: class TestTMRegistry(ZopeTestCase.TestCase):
37:
38: def afterSetUp(self):
39: self.reg = TMRegistry()
40:
41: def testRegister(self):
42: self.assertEqual(len(self.reg), 0)
43: self.reg.register(Target())
44: self.assertEqual(len(self.reg), 1)
45: self.reg.register(Target())
46: self.assertEqual(len(self.reg), 2)
47:
48: def testCount(self):
49: self.assertEqual(self.reg.count(), 0)
50: self.reg.register(Target())
51: self.assertEqual(self.reg.count(), 1)
52: self.reg.register(Target())
53: self.assertEqual(self.reg.count(), 2)
54:
55: def testContains(self):
56: target1, target2 = Target(), Target()
57: self.reg.register(target1)
58: self.reg.register(target2)
59: self.failUnless(self.reg.contains(target1))
60: self.failUnless(self.reg.contains(target2))
61:
62: def testRemove(self):
63: target1, target2 = Target(), Target()
64: self.reg.register(target1)
65: self.reg.register(target2)
66: self.reg.remove(target1)
67: self.reg.remove(target2)
68: self.failIf(self.reg.contains(target1))
69: self.failIf(self.reg.contains(target2))
70:
71: def testGet(self):
72: target1 = Target()
73: self.reg.register(target1)
74: tm = self.reg.get(target1)
75: self.failUnless(isinstance(tm, ProxyTM))
76: self.assertEqual(target1, tm._target)
77:
78:
79: class TestProxyTM(ZopeTestCase.Sandboxed, ZopeTestCase.TestCase):
80:
81: def testRegister(self):
82: tm = ProxyTM(Target())
83: self.assertEqual(len(tr()), 0)
84: tm._register()
85: self.assertEqual(len(tr()), 1)
86: try:
87: self.assertEqual(tr()[0].manager, tm)
88: except AttributeError:
89: pass # ZODB <= 3.2
90:
91: def testLastRegisteredComesFirst(self):
92: tm1, tm2 = ProxyTM(Target()), ProxyTM(Target())
93: tm1._register()
94: tm2._register()
95: self.assertEqual(len(tr()), 2)
96: # Now make sure that tm2 comes first in the
97: # transaction's _resources list
98: try:
99: self.assertEqual(tr()[0].manager, tm2)
100: self.assertEqual(tr()[1].manager, tm1)
101: except AttributeError:
102: pass # ZODB <= 3.2
103:
104: def testBeginIsForwarded(self):
105: target = Target()
106: registry.register(target)
107: tm = registry.get(target)
108: tm._begin()
109: self.failUnless(target.begin_called)
110:
111: def testFinishIsForwarded(self):
112: target = Target()
113: registry.register(target)
114: tm = registry.get(target)
115: tm._finish()
116: self.failUnless(target.finish_called)
117: # _finish removes the TM from the registry
118: self.failIf(registry.contains(target))
119:
120: def testAbortIsForwarded(self):
121: target = Target()
122: registry.register(target)
123: tm = registry.get(target)
124: tm._abort()
125: self.failUnless(target.abort_called)
126: # _abort removes the TM from the registry
127: self.failIf(registry.contains(target))
128:
129: def testCommitCallsFinish(self):
130: target = Target()
131: registry.register(target)
132: transaction.commit()
133: self.failUnless(target.finish_called)
134: # _finish removes the TM from the registry
135: self.failIf(registry.contains(target))
136:
137: def testAbortCallsAbort(self):
138: target = Target()
139: registry.register(target)
140: transaction.abort()
141: self.failUnless(target.abort_called)
142: # _abort removes the TM from the registry
143: self.failIf(registry.contains(target))
144:
145:
146: def test_suite():
147: from unittest import TestSuite, makeSuite
148: suite = TestSuite()
149: suite.addTest(makeSuite(TestTMRegistry))
150: suite.addTest(makeSuite(TestProxyTM))
151: return suite
152:
153: if __name__ == '__main__':
154: framework()
155:
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>