Annotation of ExtFile/tests/testProxyTM.py, revision 1.1
1.1 ! dwinter 1: #
! 2: # Tests TMRegistry and ProxyTM
! 3: #
! 4:
! 5: import os, sys
! 6: if __name__ == '__main__':
! 7: execfile(os.path.join(sys.path[0], 'framework.py'))
! 8:
! 9: from Testing import ZopeTestCase
! 10:
! 11: from OFS.SimpleItem import SimpleItem
! 12: from Products.ExtFile import transaction
! 13: from Products.ExtFile.TM import TMRegistry, registry
! 14: from Products.ExtFile.TM import ProxyTM
! 15:
! 16:
! 17: class Target(SimpleItem):
! 18: begin_called = 0
! 19: finish_called = 0
! 20: abort_called = 0
! 21: def _begin(self):
! 22: self.begin_called = 1
! 23: def _finish(self):
! 24: self.finish_called = 1
! 25: def _abort(self):
! 26: self.abort_called = 1
! 27:
! 28: def tr():
! 29: t = transaction.get()
! 30: if hasattr(t, '_objects'):
! 31: return t._objects # ZODB <= 3.2
! 32: elif hasattr(t, '_resources'):
! 33: return t._resources # ZODB >= 3.4
! 34:
! 35:
! 36: class TestTMRegistry(ZopeTestCase.TestCase):
! 37:
! 38: def afterSetUp(self):
! 39: self.reg = TMRegistry()
! 40:
! 41: def testRegister(self):
! 42: self.assertEqual(len(self.reg), 0)
! 43: self.reg.register(Target())
! 44: self.assertEqual(len(self.reg), 1)
! 45: self.reg.register(Target())
! 46: self.assertEqual(len(self.reg), 2)
! 47:
! 48: def testCount(self):
! 49: self.assertEqual(self.reg.count(), 0)
! 50: self.reg.register(Target())
! 51: self.assertEqual(self.reg.count(), 1)
! 52: self.reg.register(Target())
! 53: self.assertEqual(self.reg.count(), 2)
! 54:
! 55: def testContains(self):
! 56: target1, target2 = Target(), Target()
! 57: self.reg.register(target1)
! 58: self.reg.register(target2)
! 59: self.failUnless(self.reg.contains(target1))
! 60: self.failUnless(self.reg.contains(target2))
! 61:
! 62: def testRemove(self):
! 63: target1, target2 = Target(), Target()
! 64: self.reg.register(target1)
! 65: self.reg.register(target2)
! 66: self.reg.remove(target1)
! 67: self.reg.remove(target2)
! 68: self.failIf(self.reg.contains(target1))
! 69: self.failIf(self.reg.contains(target2))
! 70:
! 71: def testGet(self):
! 72: target1 = Target()
! 73: self.reg.register(target1)
! 74: tm = self.reg.get(target1)
! 75: self.failUnless(isinstance(tm, ProxyTM))
! 76: self.assertEqual(target1, tm._target)
! 77:
! 78:
! 79: class TestProxyTM(ZopeTestCase.Sandboxed, ZopeTestCase.TestCase):
! 80:
! 81: def testRegister(self):
! 82: tm = ProxyTM(Target())
! 83: self.assertEqual(len(tr()), 0)
! 84: tm._register()
! 85: self.assertEqual(len(tr()), 1)
! 86: try:
! 87: self.assertEqual(tr()[0].manager, tm)
! 88: except AttributeError:
! 89: pass # ZODB <= 3.2
! 90:
! 91: def testLastRegisteredComesFirst(self):
! 92: tm1, tm2 = ProxyTM(Target()), ProxyTM(Target())
! 93: tm1._register()
! 94: tm2._register()
! 95: self.assertEqual(len(tr()), 2)
! 96: # Now make sure that tm2 comes first in the
! 97: # transaction's _resources list
! 98: try:
! 99: self.assertEqual(tr()[0].manager, tm2)
! 100: self.assertEqual(tr()[1].manager, tm1)
! 101: except AttributeError:
! 102: pass # ZODB <= 3.2
! 103:
! 104: def testBeginIsForwarded(self):
! 105: target = Target()
! 106: registry.register(target)
! 107: tm = registry.get(target)
! 108: tm._begin()
! 109: self.failUnless(target.begin_called)
! 110:
! 111: def testFinishIsForwarded(self):
! 112: target = Target()
! 113: registry.register(target)
! 114: tm = registry.get(target)
! 115: tm._finish()
! 116: self.failUnless(target.finish_called)
! 117: # _finish removes the TM from the registry
! 118: self.failIf(registry.contains(target))
! 119:
! 120: def testAbortIsForwarded(self):
! 121: target = Target()
! 122: registry.register(target)
! 123: tm = registry.get(target)
! 124: tm._abort()
! 125: self.failUnless(target.abort_called)
! 126: # _abort removes the TM from the registry
! 127: self.failIf(registry.contains(target))
! 128:
! 129: def testCommitCallsFinish(self):
! 130: target = Target()
! 131: registry.register(target)
! 132: transaction.commit()
! 133: self.failUnless(target.finish_called)
! 134: # _finish removes the TM from the registry
! 135: self.failIf(registry.contains(target))
! 136:
! 137: def testAbortCallsAbort(self):
! 138: target = Target()
! 139: registry.register(target)
! 140: transaction.abort()
! 141: self.failUnless(target.abort_called)
! 142: # _abort removes the TM from the registry
! 143: self.failIf(registry.contains(target))
! 144:
! 145:
! 146: def test_suite():
! 147: from unittest import TestSuite, makeSuite
! 148: suite = TestSuite()
! 149: suite.addTest(makeSuite(TestTMRegistry))
! 150: suite.addTest(makeSuite(TestProxyTM))
! 151: return suite
! 152:
! 153: if __name__ == '__main__':
! 154: framework()
! 155:
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>