File:  [Repository] / ZSQLExtend / importFMPXML.py
Revision 1.35: download - view: text, annotated - select for diffs - revision graph
Wed Feb 15 08:41:01 2012 UTC (12 years, 2 months ago) by dwinter
Branches: MAIN
CVS tags: HEAD
bug in character handling

#!/usr/local/bin/python
#

import string
import logging
import sys
import types
import time
import re

from xml import sax
from xml.sax.handler import ContentHandler
#from amara import saxtools

try:
    import psycopg2 as psycopg
    import psycopg2.extensions
    # switch to unicode
    psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
    psyco = 2
except:
    import psycopg
    psyco = 1

fm_ns = 'http://www.filemaker.com/fmpxmlresult'

version_string = "V0.6.7 ROC 21.6.2011"

def unicodify(text, withNone=False):
    """decode str (utf-8 or latin-1 representation) into unicode object"""
    if withNone and text is None:
        return None
    if not text:
        return u""
    if isinstance(text, str):
        try:
            return text.decode('utf-8')
        except:
            return text.decode('latin-1')
    else:
        return text

def utf8ify(text, withNone=False):
    """encode unicode object or string into byte string in utf-8 representation"""
    if withNone and text is None:
        return None
    if not text:
        return ""
    if isinstance(text, unicode):
        return text.encode('utf-8')
    else:
        return text

def getTextFromNode(nodename):
    """get the cdata content of a node"""
    if nodename is None:
        return ""
    nodelist=nodename.childNodes
    rc = ""
    for node in nodelist:
        if node.nodeType == node.TEXT_NODE:
           rc = rc + node.data
    return rc

def sql_quote(v):
    # quote dictionary
    quote_dict = {"\'": "''", "\\": "\\\\"}
    for dkey in quote_dict.keys():
        if string.find(v, dkey) >= 0:
            v=string.join(string.split(v,dkey),quote_dict[dkey])
    return "'%s'"%v

def sqlName(s, lc=True, more=''):
    """returns restricted ASCII-only version of string"""
    if s is None:
        return ""
    
    # remove '
    s = s.replace("'","")
    # all else -> "_"
    s = re.sub('[^A-Za-z0-9_'+more+']','_',s)
    if lc:
        return s.lower()
    
    return s

def SimpleSearch(curs,query, args=None, ascii=False):
    """execute sql query and return data"""
    #logger.debug("executing: "+query)
    if ascii:
        # encode all in UTF-8
        query = utf8ify(query)
        if args is not None:
            encargs = []
            for a in args:
                encargs.append(utf8ify(a, withNone=True))
            
            args = encargs

    curs.execute(query, args)
    #logger.debug("sql done")
    try:
        return curs.fetchall()
    except:
        return None


class TableColumn:
    """simple type for storing sql column name and type"""
    
    def __init__(self, name, type=None):
        #print "new tablecolumn(%s,%s)"%(name, type)
        self.name = name
        self.type = type
        
    def getName(self):
        return self.name
    
    def getType(self):
        if self.type is not None:
            return self.type
        else:
            return "text"

    def __str__(self):
        return self.name
    
    
class xml_handler(ContentHandler):
    def __init__(self,options):
        """SAX handler to import FileMaker XML file (FMPXMLRESULT format) into the table.
        @param options: dict of options
        @param options.dsn: database connection string
        @param options.table: name of the table the xml shall be imported into
        @param options.filename: xmlfile filename
        @param options.update_fields: (optional) list of fields to update; default is to create all fields
        @param options.id_field: (optional) field which uniquely identifies an entry for updating purposes.
        @param options.sync_mode: (optional) really synchronise, i.e. delete entries not in XML file
        @param options.lc_names: (optional) lower case and clean up field names from XML
        @param options.keep_fields: (optional) don't add fields to SQL database
        @param options.ascii_db: (optional) assume ascii encoding in db
        @param options.replace_table: (optional) delete and re-insert data
        @param options.backup_table: (optional) create backup of old table (breaks indices)
        @param options.use_logger_instance: (optional) use this instance of a logger
        """
        
        # set up logger
        if hasattr(options, 'use_logger_instance'):
            self.logger = options.use_logger_instance
        else:
            self.logger = logging.getLogger('db.import.fmpxml')

        
        # set up parser
        self.result={}
        self.event = None
        
#        self.top_dispatcher = { 
#            (saxtools.START_ELEMENT, fm_ns, u'METADATA'): 
#            self.handle_meta_fields,
#            (saxtools.START_ELEMENT, fm_ns, u'RESULTSET'): 
#            self.handle_data_fields,
#            }
        
        # connect database
        self.dbCon = psycopg.connect(options.dsn)
        logging.debug("DB encoding: %s"%getattr(self.dbCon, 'encoding', 'UNKNOWN'))
        self.db = self.dbCon.cursor()
        assert self.db, "AIIEE no db cursor for %s!!"%options.dsn
    
        self.table = getattr(options,"table",None)
        self.update_fields = getattr(options,"update_fields",None)
        self.id_field = getattr(options,"id_field",None)
        self.sync_mode = getattr(options,"sync_mode",None)
        self.lc_names = getattr(options,"lc_names",None)
        self.keep_fields = getattr(options,"keep_fields",None)
        self.ascii_db = getattr(options,"ascii_db",None)
        self.replace_table = getattr(options,"replace_table",None)
        self.backup_table = getattr(options,"backup_table",None)
        self.read_before_update = getattr(options,"read_before_update",None)
        self.debug_data = getattr(options,"debug_data",None)

        self.logger.debug("dsn: "+repr(getattr(options,"dsn",None)))
        self.logger.debug("table: "+repr(self.table))
        self.logger.debug("update_fields: "+repr(self.update_fields))
        self.logger.debug("id_field: "+repr(self.id_field))
        self.logger.debug("sync_mode: "+repr(self.sync_mode))
        self.logger.debug("lc_names: "+repr(self.lc_names))
        self.logger.debug("keep_fields: "+repr(self.keep_fields))
        self.logger.debug("ascii_db: "+repr(self.ascii_db))
        self.logger.debug("replace_table: "+repr(self.replace_table))
        self.logger.debug("backup_table: "+repr(self.backup_table))
        self.logger.debug("read_before_update: "+repr(self.read_before_update))
        self.logger.debug("debug_data: "+repr(self.debug_data))
        
        self.dbIDs = {}
        self.rowcnt = 0
        
        self.currentName = None
        
        if self.id_field is not None:
            # prepare a list of ids for sync mode
            qstr="select %s from %s"%(self.id_field,self.table)
            for id in SimpleSearch(self.db, qstr):
                # value 0: not updated
                self.dbIDs[id[0]] = 0;
                self.rowcnt += 1
                
            self.logger.info("%d entries in DB to sync"%self.rowcnt)
        
        # names of fields in XML file
        self.xml_field_names = []
        # map XML field names to SQL field names
        self.xml_field_map = {}
        # and vice versa
        self.sql_field_map = {}
        
        return

    def startElement(self, name, attrs):
        logging.debug(name)
        if (name.lower() == "field") :
            self.handle_meta_field(attrs)
        if (name.lower() == "row") :
            logging.debug("handleROW")
            self.handle_row(attrs)
        if (name.lower()=="resultset"):
           self.handle_data_fields(attrs)
          
        if (name.lower()=="data"):
           self.handle_data_tag(attrs);    
           
    def endElement(self,name):
        if (name.lower() == "resultset") :
            self.currentTag=""
            self.handle_end_data_fields()
        if (name.lower() == "field") :
            self.handle_end_meta_field()
        if (name.lower() == "metadata"):
            self.handle_end_meta_fields()
        if (name.lower() == "row") :
            logging.debug("handleROW")
            self.handle_end_row()
        
        if (name.lower() == "col") :
            self.handle_end_col()
    def characters(self,content):
        
        try:
            fn = self.xml_field_names[self.colIdx]
            
            contentTmp = self.xml_data.get(fn,'') #gibt es schon einen Inhalt, dann dieses hinzufuegen (in einem Tag kann u.U. der characters handler mehrfach aufgerufen werden.)
            self.xml_data[fn] = contentTmp+content
        except:
            logging.debug(content)
            pass
        
#        if self.currentName is not None:
#            logging.debug(self.currentName+"    "+content)
#            self.currentRow[self.currentName]=content;
#    
    def handle_end_meta_fields(self):
#        dispatcher = {
#            (saxtools.START_ELEMENT, fm_ns, u'FIELD'):
#            self.handle_meta_field,
#            }
        #First round through the generator corresponds to the
        #start element event
#        self.logger.info("reading metadata...")
#        if self.debug_data:
#            self.logger.debug("START METADATA")
#        #yield None
    
        #delegate is a generator that handles all the events "within"
        #this element
#        delegate = None
#        while not self.event == end_condition:
#            delegate = saxtools.tenorsax.event_loop_body(
#                dispatcher, delegate, self.event)
#            yield None
#        
#        #Element closed. Wrap up
        if self.debug_data:
            self.logger.debug("END METADATA")
        
        # rename table for backup
        if self.backup_table:
            self.orig_table = self.table
            self.tmp_table = self.table + "_tmp"
            backup_name = "%s_%s"%(self.table,time.strftime('%Y_%m_%d_%H_%M_%S'))
            
            # remove old temp table
            qstr = "DROP TABLE %s"%(self.tmp_table)
            try:
                self.db.execute(qstr)
            except:
                pass
            
            self.dbCon.commit()
           
            if self.id_field:
                # sync mode -- copy backup table, update current table 
                self.logger.info("copy table %s to %s"%(self.table,backup_name))
                qstr = "CREATE TABLE %s AS (SELECT * FROM %s)"%(backup_name,self.table)

            else:
                # replace mode -- create empty tmp table, insert into tmp table
                self.table = self.tmp_table
                self.logger.info("create empty table %s"%(self.table))
                qstr = "CREATE TABLE %s AS (SELECT * FROM %s WHERE 1=0)"%(self.table,self.orig_table)
            
            self.db.execute(qstr)
            self.dbCon.commit()
        
        # delete data from table for replace
        if self.replace_table:
            self.logger.info("delete data from table %s"%(self.table))
            qstr = "TRUNCATE TABLE %s"%(self.table)
            self.db.execute(qstr)
            self.dbCon.commit()
           
        # try to match date style with XML
        self.db.execute("set datestyle to 'german'")
        
        #self.logger.debug("xml-fieldnames:"+repr(self.xml_field_names))
        # get list of fields and types of db table
        qstr="select attname, format_type(pg_attribute.atttypid, pg_attribute.atttypmod) from pg_attribute, pg_class where attrelid = pg_class.oid and pg_attribute.attnum > 0 and relname = '%s'"
        self.sql_fields={}
        for f in SimpleSearch(self.db, qstr%self.table):
            fn = f[0]
            ft = f[1]
            #print "SQL fields: %s (%s)"%(n,t)
            self.sql_fields[fn] = TableColumn(fn,ft)
        
        # translate id_field (SQL-name) to XML-name
        self.xml_id = self.sql_field_map.get(self.id_field, None)
        # get type of id_field
        if self.id_field:
            self.id_type = self.sql_fields[self.id_field].getType()
        else:
            self.id_type = None
        
        # check fields to update
        if self.update_fields is None:
            if self.keep_fields:
                # update all existing fields from sql (when they are in the xml file)
                self.update_fields = {}
                for f in self.sql_fields.keys():
                    if self.sql_field_map.has_key(f):
                        xf = self.sql_field_map[f]
                        self.update_fields[f] = self.xml_field_map[xf]

            else:
                # update all fields
                if self.lc_names:
                    # create dict with sql names
                    self.update_fields = {}
                    for f in self.xml_field_map.values():
                        self.update_fields[f.getName()] = f
                        
                else:
                    self.update_fields = self.xml_field_map
                                
        # and translate to list of xml fields
        if self.lc_names:
            self.xml_update_list = [self.sql_field_map[x] for x in self.update_fields]
        else:
            self.xml_update_list = self.update_fields.keys()

        if not self.keep_fields:
            # adjust db table to fields in XML and update_fields
            for f in self.xml_field_map.values():
                self.logger.debug("sync-fieldname: %s"%f.getName())
                sf = self.sql_fields.get(f.getName(), None)
                uf = self.update_fields.get(f.getName(), None)
                if sf is not None:
                    # name in db -- check type
                    if f.getType() != sf.getType():
                        self.logger.debug("field %s has different type (%s vs %s)"%(f,f.getType(),sf.getType()))
                elif uf is not None:
                    # add field to table
                    fn = uf.getName()
                    ft = uf.getType()
                    qstr="alter table %s add \"%s\" %s"%(self.table,fn,ft)
                    self.logger.info("db add field:"+qstr)
                    
                    if self.ascii_db and type(qstr)==types.UnicodeType:
                        qstr=qstr.encode('utf-8')
                        
                    self.db.execute(qstr)
                    self.dbCon.commit()
                    # add field to field list
                    self.sql_fields[fn] = TableColumn(fn, ft)
                
        # prepare sql statements for update (do not update id_field)
        setStr=string.join(["\"%s\" = %%s"%self.xml_field_map[f] for f in self.xml_update_list if f != self.xml_id], ', ')
        self.updQuery="UPDATE %s SET %s WHERE \"%s\" = %%s"%(self.table,setStr,self.id_field)
        # and select (for update check)
        selStr=string.join([self.xml_field_map[f].getName() for f in self.xml_update_list if f != self.xml_id], ', ')
        self.selQuery="SELECT %s FROM %s WHERE \"%s\" = %%s"%(selStr,self.table,self.id_field)
        # and insert
        fields=string.join(["\"%s\""%self.xml_field_map[x].getName() for x in self.xml_update_list], ',')
        values=string.join(['%s' for f in self.xml_update_list], ',')
        self.addQuery="INSERT INTO %s (%s) VALUES (%s)"%(self.table,fields,values)
        self.logger.debug("update-query: "+self.updQuery)
        self.logger.debug("sel-query: "+self.selQuery)
        self.logger.debug("add-query: "+self.addQuery)
        return

    def handle_meta_field(self, attrs):
        self.currentName =  attrs.get('NAME')
        #yield None
        return
    def handle_end_meta_field(self):
        #Element closed.  Wrap up
        name = self.currentName
        if self.lc_names:
            # clean name
            sqlname = sqlName(name)
        else:
            sqlname = name
        self.xml_field_names.append(name)
        # map to sql name and default text type
        self.xml_field_map[name] = TableColumn(sqlname, 'text')
        self.sql_field_map[sqlname] = name
        self.logger.debug("FIELD name: "+name)
        return

    def handle_data_fields(self, attrs):
       
        #First round through the generator corresponds to the
        #start element event
        self.logger.info("reading data...")
        if self.debug_data:
            self.logger.debug("START RESULTSET")
        self.rowcnt = 0
        return
    
    def handle_end_data_fields(self):
        #delegate is a generator that handles all the events "within"
        #this element
     
        #Element closed.  Wrap up
        if self.debug_data:
            self.logger.debug("END RESULTSET")
        self.dbCon.commit()
        
        if self.sync_mode:
            # delete unmatched entries in db
            if self.rowcnt > 0:
                self.logger.info("deleting unmatched rows from db")
                delQuery = "DELETE FROM %s WHERE \"%s\" = %%s"%(self.table,self.id_field)
                for id in self.dbIDs.keys():
                    # find all not-updated fields
                    if self.dbIDs[id] == 0:
                        self.logger.info(" delete: %s"%id)
                        SimpleSearch(self.db, delQuery, [id], ascii=self.ascii_db)
                        
                    elif self.dbIDs[id] > 1:
                        self.logger.info(" sync: ID %s used more than once?"%id)
                
                self.dbCon.commit()
                
            else:
                # safety in case we had an empty file
                self.logger.warning("no rows read! not deleting unmatched rows!")
            
        # reinstate backup tables
        if self.backup_table and not self.id_field:
            backup_name = "%s_%s"%(self.orig_table,time.strftime('%Y_%m_%d_%H_%M_%S'))
            self.logger.info("rename backup table %s to %s"%(self.orig_table,backup_name))
            qstr = "ALTER TABLE %s RENAME TO %s"%(self.orig_table,backup_name)
            self.db.execute(qstr)
            self.logger.info("rename working table %s to %s"%(self.table,self.orig_table))
            qstr = "ALTER TABLE %s RENAME TO %s"%(self.table,self.orig_table)
            self.db.execute(qstr)
            self.dbCon.commit()
        
        self.logger.info("Done (%s rows)"%self.rowcnt)
        return

    def handle_row(self, end_condition):
       
        if self.debug_data:
            self.logger.debug("START ROW")
        self.xml_data = {}
        self.colIdx = 0
    
        return
    
    def handle_end_row(self):
        #delegate is a generator that handles all the events "within"
        #this element
     
        #Element closed.  Wrap up
        if self.debug_data:
            self.logger.debug("END ROW")
        self.rowcnt += 1
        # process collected row data
        update=False
        id_val=''
        # synchronize by id_field
        if self.id_field:
            if self.id_type == 'integer':
                try:
                    id_val = int(self.xml_data[self.xml_id])
                except:
                    pass
            else:
                id_val = self.xml_data[self.xml_id]

            if not id_val:
                # abort update
                self.logger.error("ERROR: unable to sync! emtpy id in row %s"%self.rowcnt)
                return
                
            if id_val in self.dbIDs:
                self.dbIDs[id_val] += 1
                update=True

        # collect all values
        args = []
        for fn in self.xml_update_list:
            # do not update id_field
            if update and fn == self.xml_id:
                continue
            
            f = self.xml_field_map[fn]
            val = self.xml_data.get(fn,None)
            type = self.sql_fields[f.getName()].getType()
            if type == "date" and len(val.strip()) == 0: 
                # empty date field
                val = None
                
            elif type == "integer" and len(val) == 0: 
                # empty int field
                val = None
                
            args.append(val)
                    
        if update:
            # update existing row (by id_field)
            if self.read_before_update:
                # read data
                if self.debug_data:
                    self.logger.debug("update check: %s = %s"%(id_val, args))
                oldrow = SimpleSearch(self.db, self.selQuery, [id_val], ascii=self.ascii_db)
                #i = 0
                #for v in oldrow[0]:
                #    logging.debug("v: %s = %s (%s)"%(v,args[i],v==args[i]))
                #    i += 1
                if tuple(oldrow[0]) != tuple(args):
                    # data has changed -- update
                    if self.debug_data:
                        self.logger.debug("really update: %s = %s"%(id_val, args))
                    args.append(id_val) # last arg is id
                    SimpleSearch(self.db, self.updQuery, args, ascii=self.ascii_db)
                    
            else:
                # always update
                if self.debug_data:
                    self.logger.debug("update: %s = %s"%(id_val, args))
                args.append(id_val) # last arg is id
                SimpleSearch(self.db, self.updQuery, args, ascii=self.ascii_db)

        else:
            # create new row
            if self.debug_data:
                self.logger.debug("insert: %s"%args)
            SimpleSearch(self.db, self.addQuery, args, ascii=self.ascii_db)

        #self.logger.info(" row:"+"%d (%s)"%(self.rowcnt,id_val))
        if (self.rowcnt % 100) == 0:
            self.logger.info(" row:"+"%d (id:%s)"%(self.rowcnt,id_val))
            self.dbCon.commit()
            
        return

    def handle_end_col(self):
      
        
        self.colIdx += 1
        return

    
    def handle_data_tag(self, attrs):
        #print "START DATA"
        self.content = u''
#        yield None
#        # gather child elements
#        while not self.event == end_condition:
#            if self.event[0] == saxtools.CHARACTER_DATA:
#                content += self.params
#            yield None
#        #Element closed.  Wrap up
#        fn = self.xml_field_names[self.colIdx]
#        self.xml_data[fn] = content
        return


def importFMPXML(options):
    """import FileMaker XML file (FMPXMLRESULT format) into the table.     
        @param options: dict of options
        @param options.dsn: database connection string
        @param options.table: name of the table the xml shall be imported into
        @param options.filename: xmlfile filename
        @param options.update_fields: (optional) list of fields to update; default is to create all fields
        @param options.id_field: (optional) field which uniquely identifies an entry for updating purposes.
        @param options.sync_mode: (optional) really synchronise, i.e. delete entries not in XML file
        @param options.lc_names: (optional) lower case and clean up field names from XML
        @param options.keep_fields: (optional) don't add fields to SQL database
        @param options.ascii_db: (optional) assume ascii encoding in db
        @param options.replace_table: (optional) delete and re-insert data
        @param options.backup_table: (optional) create backup of old table
        """
        
    if getattr(options,'update_fields',None):
        uf = {}
        for f in options.update_fields.split(','):
            if f.find(':') > 0:
                (n,t) = f.split(':')
            else:
                n = f
                t = None
            uf[n] = TableColumn(n,t)
            
        options.update_fields = uf
    
    if getattr(options,'id_field',None) and getattr(options,'replace_table',None):
        logging.error("ABORT: sorry, you can't do both sync (id_field) and replace")
        return
        
    parser = sax.make_parser()
    #The "consumer" is our own handler
    consumer = xml_handler(options)
    #Initialize Tenorsax with handler
    #handler = saxtools.tenorsax(consumer)
    #Resulting tenorsax instance is the SAX handler 
    parser.setContentHandler(consumer)
    #parser.setFeature(sax.handler.feature_namespaces, 1)
    parser.parse(options.filename)  
    

if __name__ == "__main__":
    from optparse import OptionParser

    opars = OptionParser()
    opars.add_option("-f", "--file", 
                     dest="filename",
                     help="FMPXML file name", metavar="FILE")
    opars.add_option("-c", "--dsn", 
                     dest="dsn", 
                     help="database connection string")
    opars.add_option("-t", "--table", 
                     dest="table", 
                     help="database table name")
    opars.add_option("--fields", default=None, 
                     dest="update_fields", 
                     help="list of fields to update (comma separated, sql-names)", metavar="LIST")
    opars.add_option("--id-field", default=None, 
                     dest="id_field", 
                     help="name of id field for synchronisation (only appends data otherwise, sql-name)", metavar="NAME")
    opars.add_option("--sync", "--sync-mode", default=False, action="store_true", 
                     dest="sync_mode", 
                     help="do full sync based on id field (remove unmatched fields from db)")
    opars.add_option("--lc-names", default=False, action="store_true", 
                     dest="lc_names", 
                     help="clean and lower case field names from XML")
    opars.add_option("--keep-fields", default=False, action="store_true", 
                     dest="keep_fields", 
                     help="don't add fields from XML to SQL table")
    opars.add_option("--ascii-db", default=False, action="store_true", 
                     dest="ascii_db", 
                     help="the SQL database stores ASCII instead of unicode")
    opars.add_option("--replace", default=False, action="store_true", 
                     dest="replace_table", 
                     help="replace table i.e. delete and re-insert data")
    opars.add_option("--backup", default=False, action="store_true", 
                     dest="backup_table", 
                     help="create backup of old table")
    opars.add_option("--read-before-update", default=False, action="store_true", 
                     dest="read_before_update", 
                     help="read all data to check if it really changed")
    opars.add_option("-d", "--debug", default=False, action="store_true", 
                     dest="debug", 
                     help="debug mode (more output)")
    opars.add_option("--debug-data", default=False, action="store_true", 
                     dest="debug_data", 
                     help="debug mode for data (even more output)")
    
    (options, args) = opars.parse_args()
    
    if len(sys.argv) < 2 or options.filename is None or options.dsn is None:
        print "importFMPXML "+version_string
        opars.print_help()
        sys.exit(1)
    
    if options.debug:
        loglevel = logging.DEBUG
    else:
        loglevel = logging.INFO
    
    logging.basicConfig(level=loglevel, 
                        format='%(asctime)s %(levelname)s %(message)s',
                        datefmt='%H:%M:%S')

    importFMPXML(options)


    


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>