File:  [Repository] / ExtFile / tests / testProxyTM.py
Revision 1.1: download - view: text, annotated - select for diffs - revision graph
Wed Jan 24 16:53:50 2007 UTC (17 years, 5 months ago) by dwinter
CVS tags: MAIN, HEAD
Initial revision

    1: #
    2: # Tests TMRegistry and ProxyTM
    3: #
    4: 
    5: import os, sys
    6: if __name__ == '__main__':
    7:     execfile(os.path.join(sys.path[0], 'framework.py'))
    8: 
    9: from Testing import ZopeTestCase
   10: 
   11: from OFS.SimpleItem import SimpleItem
   12: from Products.ExtFile import transaction
   13: from Products.ExtFile.TM import TMRegistry, registry
   14: from Products.ExtFile.TM import ProxyTM
   15: 
   16: 
   17: class Target(SimpleItem):
   18:     begin_called = 0
   19:     finish_called = 0
   20:     abort_called = 0
   21:     def _begin(self):
   22:         self.begin_called = 1
   23:     def _finish(self):
   24:         self.finish_called = 1
   25:     def _abort(self):
   26:         self.abort_called = 1
   27: 
   28: def tr():
   29:     t = transaction.get()
   30:     if hasattr(t, '_objects'):
   31:         return t._objects   # ZODB <= 3.2
   32:     elif hasattr(t, '_resources'):
   33:         return t._resources # ZODB >= 3.4
   34: 
   35: 
   36: class TestTMRegistry(ZopeTestCase.TestCase):
   37: 
   38:     def afterSetUp(self):
   39:         self.reg = TMRegistry()
   40: 
   41:     def testRegister(self):
   42:         self.assertEqual(len(self.reg), 0)
   43:         self.reg.register(Target())
   44:         self.assertEqual(len(self.reg), 1)
   45:         self.reg.register(Target())
   46:         self.assertEqual(len(self.reg), 2)
   47: 
   48:     def testCount(self):
   49:         self.assertEqual(self.reg.count(), 0)
   50:         self.reg.register(Target())
   51:         self.assertEqual(self.reg.count(), 1)
   52:         self.reg.register(Target())
   53:         self.assertEqual(self.reg.count(), 2)
   54: 
   55:     def testContains(self):
   56:         target1, target2 = Target(), Target()
   57:         self.reg.register(target1)
   58:         self.reg.register(target2)
   59:         self.failUnless(self.reg.contains(target1))
   60:         self.failUnless(self.reg.contains(target2))
   61: 
   62:     def testRemove(self):
   63:         target1, target2 = Target(), Target()
   64:         self.reg.register(target1)
   65:         self.reg.register(target2)
   66:         self.reg.remove(target1)
   67:         self.reg.remove(target2)
   68:         self.failIf(self.reg.contains(target1))
   69:         self.failIf(self.reg.contains(target2))
   70: 
   71:     def testGet(self):
   72:         target1 = Target()
   73:         self.reg.register(target1)
   74:         tm = self.reg.get(target1)
   75:         self.failUnless(isinstance(tm, ProxyTM))
   76:         self.assertEqual(target1, tm._target)
   77: 
   78: 
   79: class TestProxyTM(ZopeTestCase.Sandboxed, ZopeTestCase.TestCase):
   80: 
   81:     def testRegister(self):
   82:         tm = ProxyTM(Target())
   83:         self.assertEqual(len(tr()), 0)
   84:         tm._register()
   85:         self.assertEqual(len(tr()), 1)
   86:         try:
   87:             self.assertEqual(tr()[0].manager, tm)
   88:         except AttributeError:
   89:             pass # ZODB <= 3.2
   90: 
   91:     def testLastRegisteredComesFirst(self):
   92:         tm1, tm2 = ProxyTM(Target()), ProxyTM(Target())
   93:         tm1._register()
   94:         tm2._register()
   95:         self.assertEqual(len(tr()), 2)
   96:         # Now make sure that tm2 comes first in the
   97:         # transaction's _resources list
   98:         try:
   99:             self.assertEqual(tr()[0].manager, tm2)
  100:             self.assertEqual(tr()[1].manager, tm1)
  101:         except AttributeError:
  102:             pass # ZODB <= 3.2
  103: 
  104:     def testBeginIsForwarded(self):
  105:         target = Target()
  106:         registry.register(target)
  107:         tm = registry.get(target)
  108:         tm._begin()
  109:         self.failUnless(target.begin_called)
  110: 
  111:     def testFinishIsForwarded(self):
  112:         target = Target()
  113:         registry.register(target)
  114:         tm = registry.get(target)
  115:         tm._finish()
  116:         self.failUnless(target.finish_called)
  117:         # _finish removes the TM from the registry
  118:         self.failIf(registry.contains(target))
  119: 
  120:     def testAbortIsForwarded(self):
  121:         target = Target()
  122:         registry.register(target)
  123:         tm = registry.get(target)
  124:         tm._abort()
  125:         self.failUnless(target.abort_called)
  126:         # _abort removes the TM from the registry
  127:         self.failIf(registry.contains(target))
  128: 
  129:     def testCommitCallsFinish(self):
  130:         target = Target()
  131:         registry.register(target)
  132:         transaction.commit()
  133:         self.failUnless(target.finish_called)
  134:         # _finish removes the TM from the registry
  135:         self.failIf(registry.contains(target))
  136: 
  137:     def testAbortCallsAbort(self):
  138:         target = Target()
  139:         registry.register(target)
  140:         transaction.abort()
  141:         self.failUnless(target.abort_called)
  142:         # _abort removes the TM from the registry
  143:         self.failIf(registry.contains(target))
  144: 
  145: 
  146: def test_suite():
  147:     from unittest import TestSuite, makeSuite
  148:     suite = TestSuite()
  149:     suite.addTest(makeSuite(TestTMRegistry))
  150:     suite.addTest(makeSuite(TestProxyTM))
  151:     return suite
  152: 
  153: if __name__ == '__main__':
  154:     framework()
  155: 

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>