Mercurial > hg > ZDBInterface
view DBInterface.py @ 7:917e28a08c58
search case insensitive. max and skip options.
author | casties |
---|---|
date | Tue, 15 Feb 2011 21:14:03 +0100 |
parents | 1b25a85a2165 |
children | 17b19345d011 |
line wrap: on
line source
''' Created on 14.2.2011 @author: casties ''' import logging import re import psycopg2 # make psycopg use unicode objects import psycopg2.extensions psycopg2.extensions.register_type(psycopg2.extensions.UNICODE) psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY) from Products.ZSQLMethods.SQL import SQLConnectionIDs from Shared.DC.ZRDB.Results import Results def unicodify(s,alternate='latin-1'): """decode str (utf-8 or latin-1 representation) into unicode object""" if not s: return u"" if isinstance(s, str): try: return s.decode('utf-8') except: return s.decode(alternate) else: return s def utf8ify(s): """encode unicode object or string into byte string in utf-8 representation. assumes string objects to be utf-8""" if not s: return "" if isinstance(s, str): return s else: return s.encode('utf-8') def getTextFromNode(node): """get the cdata content of a XML node""" if node is None: return "" if isinstance(node, list): nodelist = node else: nodelist=node.childNodes rc = "" for node in nodelist: if node.nodeType == node.TEXT_NODE: rc = rc + node.data return rc def sqlName(s, lc=True, more=''): """returns restricted ASCII-only version of string""" if s is None: return "" # all else -> "_" s = re.sub('[^A-Za-z0-9_'+more+']','_',s) if lc: return s.lower() return s class DBInterface: """Object for database queries""" def __init__(self, connection_id=None): """init""" # database connection id self.connection_id = connection_id def getConnectionIDs(self): """return list of available connection ids""" return SQLConnectionIDs(self) def getDB(self): """returns DB object""" # TODO: can we cache and reuse a DB object? if self.connection_id is None: # try to take the first existing ID connids = self.getConnectionIDs() if len(connids) > 0: connection_id = connids[0][1] self.connection_id = connection_id logging.debug("connection_id: %s"%repr(connection_id)) # get Connection instance con = getattr(self, self.connection_id) # call to get db object db = con() return db def executeZSQL(self, query, args=None, max_rows=None): """execute query with args on the database and return all results as Result object.""" logging.debug("executeZSQL query=%s args=%s"%(query,args)) dbc = self.getDB() res = dbc.query(query, max_rows=max_rows, query_data=args) # return result set as Result object with Brains return Results(res) # # Old way using cursor from DA # def getCursor(self,autocommit=True): """returns fresh DB cursor""" conn = getattr(self,"_v_database_connection", None) if conn is None: # create a new connection object try: if self.connection_id is None: # try to take the first existing ID connids = self.getConnectionIDs() if len(connids) > 0: connection_id = connids[0][1] self.connection_id = connection_id logging.debug("connection_id: %s"%repr(connection_id)) da = getattr(self, self.connection_id) logging.debug('da=%s'%da) da.connect('') # we copy the DAs database connection conn = da._v_database_connection #conn._register() # register with the Zope transaction system(?) self._v_database_connection = conn except Exception, e: raise IOError("No database connection! (%s)"%str(e)) cursor = conn.getcursor() if autocommit: # TODO: is there a better version to get to the connection? cursor.connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) return cursor def getFieldNameMap(self,fields): """returns a dict mapping field names to row indexes""" map = {} i = 0 for f in fields: map[f[0]] = i i += 1 return map def executeSQL(self, query, args=None, hasResult=True, autocommit=True): """execute query with args on database and return all results. result format: {"fields":fields, "rows":data}""" logging.debug("executeSQL query=%s args=%s"%(query,args)) cur = self.getCursor(autocommit=autocommit) if args is not None: # make sure args is a list if isinstance(args,basestring): args = (args,) cur.execute(query, args) # description of returned fields fields = cur.description if hasResult: # get all data in an array data = cur.fetchall() cur.close() #logging.debug("fields: %s"%repr(fields)) #logging.debug("rows: %s"%repr(data)) return {"fields":fields, "rows":data} else: cur.close() return None