Diff for /ZSQLExtend/importFMPXML.py between versions 1.1 and 1.8

version 1.1, 2006/12/20 18:45:58 version 1.8, 2007/03/29 18:31:32
Line 2 Line 2
 #  #
   
 import string  import string
 from xml.dom.pulldom import parse  
 import psycopg2 as psycopg  
 import logging  import logging
 import sys  import sys
   import types
   import time
   
   from xml import sax
   from amara import saxtools
   
   try:
       import psycopg2 as psycopg
       psyco = 2
   except:
       import psycopg
       psyco = 1
   
   fm_ns = 'http://www.filemaker.com/fmpxmlresult'
   
   version_string = "V0.4 ROC 29.3.2007"
   
 def getTextFromNode(nodename):  def getTextFromNode(nodename):
     """get the cdata content of a node"""      """get the cdata content of a node"""
Line 27  def sql_quote(v): Line 40  def sql_quote(v):
             v=string.join(string.split(v,dkey),quote_dict[dkey])              v=string.join(string.split(v,dkey),quote_dict[dkey])
     return "'%s'" % v      return "'%s'" % v
   
 def SimpleSearch(curs,query, args=None):  def SimpleSearch(curs,query, args=None, ascii=False):
     """execute sql query and return data"""      """execute sql query and return data"""
     logging.debug("executing: "+query)      #logging.debug("executing: "+query)
       if ascii:
           # encode all in UTF-8
           query = query.encode("UTF-8")
           if args is not None:
               encargs = []
               for a in args:
                   if a is not None:
                       a = a.encode("UTF-8")
                   encargs.append(a)
               
               args = encargs
   
     curs.execute(query, args)      curs.execute(query, args)
     logging.debug("sql done")      #logging.debug("sql done")
       try:
     return curs.fetchall()      return curs.fetchall()
       except:
           return None
   
   
   class TableColumn:
       """simple type for storing sql column name and type"""
   
       def __init__(self, name, type=None):
           #print "new tablecolumn(%s,%s)"%(name, type)
           self.name = name
           self.type = type
   
 def importXMLFileFMP(dsn,table,filename,update_fields=None,id_field=None,sync_mode=False):      def getName(self):
           return self.name
       
       def getType(self):
           if self.type is not None:
               return self.type
           else:
               return "text"
   
       def __str__(self):
           return self.name
       
       
   class xml_handler:
       
       def __init__(self,options):
     '''      '''
     Import FileMaker XML file (FMPXMLRESULT format) into the table.          SAX handler to import FileMaker XML file (FMPXMLRESULT format) into the table.
     @param table: name of the table the xml shall be imported into          @param options: dict of options
     @param filename: xmlfile filename          @param options.dsn: database connection string
     @param update_fields: (optional) list of fields to update; default is to create all fields          @param options.table: name of the table the xml shall be imported into
     @param id_field: (optional) field which uniquely identifies an entry for updating purposes.          @param options.filename: xmlfile filename
     @param sync_mode: (optional) really synchronise, i.e. delete entries not in XML file          @param options.update_fields: (optional) list of fields to update; default is to create all fields
           @param options.id_field: (optional) field which uniquely identifies an entry for updating purposes.
           @param options.sync_mode: (optional) really synchronise, i.e. delete entries not in XML file
           @param options.lc_names: (optional) lower case and clean up field names from XML
           @param options.keep_fields: (optional) don't add fields to SQL database
           @param options.ascii_db: (optional) assume ascii encoding in db
           @param options.replace_table: (optional) delete and re-insert data
     '''      '''
           # set up parser
           self.event = None
           self.top_dispatcher = { 
               (saxtools.START_ELEMENT, fm_ns, u'METADATA'): 
               self.handle_meta_fields,
               (saxtools.START_ELEMENT, fm_ns, u'RESULTSET'): 
               self.handle_data_fields,
               }
   
     # connect database      # connect database
     dbCon = psycopg.connect(dsn)          self.dbCon = psycopg.connect(options.dsn)
     db = dbCon.cursor()          self.db = self.dbCon.cursor()
     assert db, "AIIEE no db cursor for %s!!"%dsn          assert self.db, "AIIEE no db cursor for %s!!"%options.dsn
       
     # read XML file          self.table = options.table
     fh=file(filename)          self.update_fields = options.update_fields
     logging.info("reading file "+filename)          self.id_field = options.id_field
     doc=parse(fh)          self.sync_mode = options.sync_mode
     logging.info("file read")          self.lc_names = options.lc_names
           self.keep_fields = options.keep_fields
     dbIDs = {}          self.ascii_db = options.ascii_db
     rowcnt = 0          self.replace_table = options.replace_table
     ret = ""          self.backup_table = options.backup_table
       
     logging.debug("dsn: "+dsn)          logging.debug("dsn: "+repr(options.dsn))
     logging.debug("table: "+table)          logging.debug("table: "+repr(self.table))
     logging.debug("update_fields: "+repr(update_fields))          logging.debug("update_fields: "+repr(self.update_fields))
     logging.debug("id_field: "+id_field)          logging.debug("id_field: "+repr(self.id_field))
     logging.debug("sync_mode: "+repr(sync_mode))          logging.debug("sync_mode: "+repr(self.sync_mode))
           logging.debug("lc_names: "+repr(self.lc_names))
           logging.debug("keep_fields: "+repr(self.keep_fields))
           logging.debug("ascii_db: "+repr(self.ascii_db))
           logging.debug("replace_table: "+repr(self.replace_table))
           
           self.dbIDs = {}
           self.rowcnt = 0
           
     if id_field is not None:          if self.id_field is not None:
         # prepare a list of ids for sync mode          # prepare a list of ids for sync mode
         qstr="select %s from %s"%(id_field,table)              qstr="select %s from %s"%(self.id_field,self.table)
         for id in SimpleSearch(db, qstr):              for id in SimpleSearch(self.db, qstr):
             # value 0: not updated              # value 0: not updated
             dbIDs[id[0]] = 0;                  self.dbIDs[id[0]] = 0;
             rowcnt += 1                  self.rowcnt += 1
                           
         logging.info("%d entries in DB to sync"%rowcnt)              logging.info("%d entries in DB to sync"%self.rowcnt)
           
     fieldNames = []          # names of fields in XML file
     rowcnt = 0          self.xml_field_names = []
     id_val = ''          # map XML field names to SQL field names
           self.xml_field_map = {}
           # and vice versa
           self.sql_field_map = {}
           
           return
   
       def handle_meta_fields(self, end_condition):
           dispatcher = {
               (saxtools.START_ELEMENT, fm_ns, u'FIELD'):
               self.handle_meta_field,
               }
           #First round through the generator corresponds to the
           #start element event
           logging.debug("START METADATA")
           yield None
       
           #delegate is a generator that handles all the events "within"
           #this element
           delegate = None
           while not self.event == end_condition:
               delegate = saxtools.tenorsax.event_loop_body(
                   dispatcher, delegate, self.event)
               yield None
           
           #Element closed. Wrap up
           logging.debug("END METADATA")
           
           # rename table for backup
           if self.backup_table:
               self.orig_table = self.table
               self.table = self.table + "_tmp"
               # remove old temp table
               qstr = "DROP TABLE %s"%(self.table)
               try:
                   self.db.execute(qstr)
               except:
                   pass
               
               self.dbCon.commit()
              
               if self.id_field:
                   # sync mode -- copy table
                   logging.info("copy table %s to %s"%(self.orig_table,self.table))
                   qstr = "CREATE TABLE %s AS (SELECT * FROM %s)"%(self.table,self.orig_table)
   
               else:
                   # rename table and create empty new one
                   logging.info("create empty table %s"%(self.table))
                   qstr = "CREATE TABLE %s AS (SELECT * FROM %s WHERE 1=0)"%(self.table,self.orig_table)
               
               self.db.execute(qstr)
               self.dbCon.commit()
           
           # delete data from table for replace
           if self.replace_table:
               logging.info("delete data from table %s"%(self.table))
               qstr = "TRUNCATE TABLE %s"%(self.table)
               self.db.execute(qstr)
               self.dbCon.commit()
              
           # try to match date style with XML
           self.db.execute("set datestyle to 'german'")
           
           # translate id_field (SQL-name) to XML-name
           self.xml_id = self.sql_field_map.get(self.id_field, None)
           
           #logging.debug("xml-fieldnames:"+repr(self.xml_field_names))
           # get list of fields and types of db table
           qstr="select attname, format_type(pg_attribute.atttypid, pg_attribute.atttypmod) from pg_attribute, pg_class where attrelid = pg_class.oid and pg_attribute.attnum > 0 and relname = '%s'"
           self.sql_fields={}
           for f in SimpleSearch(self.db, qstr%self.table):
               n = f[0]
               t = f[1]
               #print "SQL fields: %s (%s)"%(n,t)
               self.sql_fields[n] = TableColumn(n,t)
           
           # check fields to update
           if self.update_fields is None:
               if self.keep_fields:
                   # update existing fields
                   self.update_fields = self.sql_fields
           
     while 1:              else:
         node=doc.getEvent()                  # update all fields
                   if self.lc_names:
                       # create dict with sql names
                       self.update_fields = {}
                       for f in self.xml_field_map.values():
                           self.update_fields[f.getName()] = f
           
         if node is None:                  else:
             break;                      self.update_fields = self.xml_field_map
                   
         # METADATA tag defines number and names of fields in FMPXMLRESULT          # and translate to list of xml fields
         if node[1].nodeName == 'METADATA':          if self.lc_names:
             doc.expandNode(node[1])              self.xml_update_list = [self.sql_field_map[x] for x in self.update_fields]
           else:
               self.xml_update_list = self.update_fields.keys()
                   
             names=node[1].getElementsByTagName('FIELD')          if not self.keep_fields:
               # adjust db table to fields in XML and update_fields
               for f in self.xml_field_map.values():
                   logging.debug("sync-fieldname: %s"%f.getName())
                   sf = self.sql_fields.get(f.getName(), None)
                   uf = self.update_fields.get(f.getName(), None)
                   if sf is not None:
                       # name in db -- check type
                       if f.getType() != sf.getType():
                           logging.debug("field %s has different type (%s vs %s)"%(f,f.getType(),sf.getType()))
                   elif uf is not None:
                       # add field to table
                       qstr="alter table %s add %s %s"%(self.table,uf.getName(),uf.getType())
                       logging.info("db add field:"+qstr)
                       
                       if self.ascii_db and type(qstr)==types.UnicodeType:
                           qstr=qstr.encode('utf-8')
                           
                       self.db.execute(qstr)
                       self.dbCon.commit()
                   
           # prepare sql statements for update
           setStr=string.join(["%s = %%s"%self.xml_field_map[f] for f in self.xml_update_list], ', ')
           self.updQuery="UPDATE %s SET %s WHERE %s = %%s"%(self.table,setStr,self.id_field)
           # and insert
           fields=string.join([self.xml_field_map[x].getName() for x in self.xml_update_list], ',')
           values=string.join(['%s' for f in self.xml_update_list], ',')
           self.addQuery="INSERT INTO %s (%s) VALUES (%s)"%(self.table,fields,values)
           logging.debug("update-query: "+self.updQuery)
           logging.debug("add-query: "+self.addQuery)
           return
   
       def handle_meta_field(self, end_condition):
           name = self.params.get((None, u'NAME'))
           yield None
           #Element closed.  Wrap up
           if self.lc_names:
               # clean name
               sqlname = name.replace(" ","_").lower() 
           else:
               sqlname = name
           self.xml_field_names.append(name)
           # map to sql name and default text type
           self.xml_field_map[name] = TableColumn(sqlname, 'text')
           self.sql_field_map[sqlname] = name
           logging.debug("FIELD name: "+name)
           return
   
       def handle_data_fields(self, end_condition):
           dispatcher = {
               (saxtools.START_ELEMENT, fm_ns, u'ROW'):
               self.handle_row,
               }
           #First round through the generator corresponds to the
           #start element event
           logging.debug("START RESULTSET")
           self.rowcnt = 0
           yield None
       
           #delegate is a generator that handles all the events "within"
           #this element
           delegate = None
           while not self.event == end_condition:
               delegate = saxtools.tenorsax.event_loop_body(
                   dispatcher, delegate, self.event)
               yield None
           
           #Element closed.  Wrap up
           logging.debug("END RESULTSET")
           self.dbCon.commit()
   
             for name in names:          if self.sync_mode:
                 fn = name.getAttribute('NAME')              # delete unmatched entries in db
                 fieldNames.append(fn)              logging.info("deleting unmatched rows from db")
               delQuery = "DELETE FROM %s WHERE %s = %%s"%(self.table,self.id_field)
               for id in self.dbIDs.keys():
                   # find all not-updated fields
                   if self.dbIDs[id] == 0:
                       logging.info(" delete:"+id)
                       SimpleSearch(self.db, delQuery, [id], ascii=self.ascii_db)
                       sys.exit(1)
                           
             if update_fields is None:                  elif self.dbIDs[id] > 1:
                 # update all fields                      logging.info(" sync: ID %s used more than once?"%id)
                 update_fields = fieldNames  
                           
             logging.debug("fieldnames:"+repr(fieldNames))              self.dbCon.commit()
             # get list of fields in db table  
             qstr="""select attname from pg_attribute, pg_class where attrelid = pg_class.oid and relname = '%s'"""  
             columns=[x[0] for x in SimpleSearch(db, qstr%table)]  
               
             # adjust db table to fields in XML and fieldlist  
             for fieldName in fieldNames:  
                 logging.debug("db-fieldname:"+repr(fieldName))                       
                 if (fieldName not in columns) and (fieldName in update_fields):  
                     qstr="alter table %s add %%s %%s"%table  
                     logging.info("db add field:"+qstr%(fieldName,'text'))  
                     db.execute(qstr, (fieldName,'text'))  
                     db.commit()  
                       
           
         # ROW tags (in RESULTSET tag) hold data  
         elif node[1].nodeName == 'ROW':  
             rowcnt += 1  
               
             doc.expandNode(node[1])  
             cols=node[1].getElementsByTagName('COL')  
             dataSet={}  
             i = 0  
             # populate with data  
             for col in cols:  
                 data=col.getElementsByTagName('DATA')  
                 dataSet[fieldNames[i]] = getTextFromNode(data[0])  
                 i += 1  
                                   
           # reinstate backup tables
           if self.backup_table:
               backup_name = "%s_%s"%(self.orig_table,time.strftime('%Y_%m_%d_%H_%M_%S'))
               logging.info("rename backup table %s to %s"%(self.orig_table,backup_name))
               qstr = "ALTER TABLE %s RENAME TO %s"%(self.orig_table,backup_name)
               self.db.execute(qstr)
               logging.info("rename working table %s to %s"%(self.table,self.orig_table))
               qstr = "ALTER TABLE %s RENAME TO %s"%(self.table,self.orig_table)
               self.db.execute(qstr)
               self.dbCon.commit()
           
           return
   
       def handle_row(self, end_condition):
           dispatcher = {
               (saxtools.START_ELEMENT, fm_ns, u'COL'):
               self.handle_col,
               }
           logging.debug("START ROW")
           self.xml_data = {}
           self.colIdx = 0
           yield None
       
           #delegate is a generator that handles all the events "within"
           #this element
           delegate = None
           while not self.event == end_condition:
               delegate = saxtools.tenorsax.event_loop_body(
                   dispatcher, delegate, self.event)
               yield None
           
           #Element closed.  Wrap up
           logging.debug("END ROW")
           self.rowcnt += 1
           # process collected row data
             update=False              update=False
                       id_val=''
             # synchronize by id_field              # synchronize by id_field
             if id_field:          if self.id_field:
                 id_val=dataSet[id_field]              id_val = self.xml_data[self.xml_id]
                 if id_val in dbIDs:              if id_val in self.dbIDs:
                     dbIDs[id_val] += 1                  self.dbIDs[id_val] += 1
                     update=True                      update=True
                           
           # collect all values
           args = []
           for fn in self.xml_update_list:
               f = self.xml_field_map[fn]
               val = self.xml_data[fn]
               type = self.sql_fields[f.getName()].getType()
               if type == "date" and len(val) == 0: 
                   # empty date field
                   val = None
                   
               elif type == "integer" and len(val) == 0: 
                   # empty int field
                   val = None
                   
               args.append(val)
                       
             if update:              if update:
                 # update existing row (by id_field)                  # update existing row (by id_field)
                 setvals=[]              # last argument is ID match
                 for fieldName in update_fields:              args.append(id_val)
                     setvals.append("%s = %s"%(fieldName,sql_quote(dataSet[fieldName])))              logging.debug("update: %s = %s"%(id_val, args))
                 setStr=string.join(setvals, ',')              SimpleSearch(self.db, self.updQuery, args, ascii=self.ascii_db)
                 id_val=dataSet[id_field]  
                 qstr="UPDATE %s SET %s WHERE %s = '%s' "%(table,setStr,id_field,id_val)  
                 db.execute(qstr)  
                 ret += "up: %s"%id_val  
             else:              else:
                 # create new row                  # create new row
                 fields=string.join(update_fields, ',')              logging.debug("insert: %s"%args)
                 values=string.join([" %s "%sql_quote(dataSet[x]) for x in update_fields], ',')              SimpleSearch(self.db, self.addQuery, args, ascii=self.ascii_db)
                 qstr="INSERT INTO %s (%s) VALUES (%s)"%(table,fields,values)  
                 db.execute(qstr)          #logging.info(" row:"+"%d (%s)"%(self.rowcnt,id_val))
                 ret += "ad: %s"%dataSet.get(id_field, rowcnt)          if (self.rowcnt % 10) == 0:
               logging.info(" row:"+"%d (%s)"%(self.rowcnt,id_val))
             #logging.info(" row:"+"%d (%s)"%(rowcnt,id_val))              self.dbCon.commit()
             if (rowcnt % 10) == 0:              
                 logging.info(" row:"+"%d (%s)"%(rowcnt,id_val))          return
                 dbCon.commit()  
       def handle_col(self, end_condition):
           dispatcher = {
               (saxtools.START_ELEMENT, fm_ns, u'DATA'):
               self.handle_data_tag,
               }
           #print "START COL"
           yield None
           #delegate is a generator that handles all the events "within"
           #this element
           delegate = None
           while not self.event == end_condition:
               delegate = saxtools.tenorsax.event_loop_body(
                   dispatcher, delegate, self.event)
               yield None
           #Element closed.  Wrap up
           #print "END COL"
           self.colIdx += 1
           return
   
       def handle_data_tag(self, end_condition):
           #print "START DATA"
           content = u''
           yield None
           # gather child elements
           while not self.event == end_condition:
               if self.event[0] == saxtools.CHARACTER_DATA:
                   content += self.params
               yield None
           #Element closed.  Wrap up
           fn = self.xml_field_names[self.colIdx]
           self.xml_data[fn] = content
           return
   
     dbCon.commit()  
     if sync_mode:  
         # delete unmatched entries in db  
         for id in dbIDs.keys():  
             # find all not-updated fields  
             if dbIDs[id] == 0:  
                 logging.info(" delete:"+id)  
                 qstr = "DELETE FROM %s WHERE %%s = '%%s'"%table  
                 db.execute(qstr, (id_field,id))  
                                   
             elif dbIDs[id] > 1:  
                 logging.info(" sync:"+"id used more than once?"+id)  
                   
         dbCon.commit()  
           
     return ret  
   
 ##  ##
 ## public static int main()  ## public static int main()
Line 196  opars.add_option("-t", "--table", Line 471  opars.add_option("-t", "--table",
                  help="database table name")                   help="database table name")
 opars.add_option("--fields", default=None,   opars.add_option("--fields", default=None, 
                  dest="update_fields",                    dest="update_fields", 
                  help="list of fields to update (comma separated)", metavar="LIST")                   help="list of fields to update (comma separated, sql-names)", metavar="LIST")
 opars.add_option("--id-field", default=None,   opars.add_option("--id-field", default=None, 
                  dest="id_field",                    dest="id_field", 
                  help="name of id field for synchronisation", metavar="NAME")                   help="name of id field for synchronisation (only appends data otherwise, sql-name)", metavar="NAME")
 opars.add_option("--sync-mode", default=False, action="store_true",   opars.add_option("--sync", "--sync-mode", default=False, action="store_true", 
                  dest="sync_mode",                    dest="sync_mode", 
                  help="do full sync (remove unmatched fields from db)")                   help="do full sync based on id field (remove unmatched fields from db)")
   opars.add_option("--lc-names", default=False, action="store_true", 
                    dest="lc_names", 
                    help="clean and lower case field names from XML")
   opars.add_option("--keep-fields", default=False, action="store_true", 
                    dest="keep_fields", 
                    help="don't add fields from XML to SQL table")
   opars.add_option("--ascii-db", default=False, action="store_true", 
                    dest="ascii_db", 
                    help="the SQL database stores ASCII instead of unicode")
   opars.add_option("--replace", default=False, action="store_true", 
                    dest="replace_table", 
                    help="replace table i.e. delete and re-insert data")
   opars.add_option("--backup", default=False, action="store_true", 
                    dest="backup_table", 
                    help="create backup of old table (breaks indices)")
 opars.add_option("-d", "--debug", default=False, action="store_true",   opars.add_option("-d", "--debug", default=False, action="store_true", 
                  dest="debug",                    dest="debug", 
                  help="debug mode (more output)")                   help="debug mode (more output)")
Line 210  opars.add_option("-d", "--debug", defaul Line 500  opars.add_option("-d", "--debug", defaul
 (options, args) = opars.parse_args()  (options, args) = opars.parse_args()
   
 if len(sys.argv) < 2 or options.filename is None or options.dsn is None:  if len(sys.argv) < 2 or options.filename is None or options.dsn is None:
       print "importFMPXML "+version_string
     opars.print_help()      opars.print_help()
     sys.exit(1)      sys.exit(1)
   
Line 225  logging.basicConfig(level=loglevel, Line 516  logging.basicConfig(level=loglevel,
 update_fields = None  update_fields = None
   
 if options.update_fields:  if options.update_fields:
     update_fields = [string.strip(s) for s in options.update_fields.split(',')]      uf = {}
       for f in options.update_fields.split(','):
           (n,t) = f.split(':')
           uf[n] = TableColumn(n,t)
           
       options.update_fields = uf
   
   if options.id_field and options.replace_table:
       logging.error("ABORT: sorry, you can't do both sync (id_field) and replace")
       sys.exit(1)
       
   parser = sax.make_parser()
   #The "consumer" is our own handler
   consumer = xml_handler(options)
   #Initialize Tenorsax with handler
   handler = saxtools.tenorsax(consumer)
   #Resulting tenorsax instance is the SAX handler 
   parser.setContentHandler(handler)
   parser.setFeature(sax.handler.feature_namespaces, 1)
   parser.parse(options.filename)  
             
 importXMLFileFMP(dsn=options.dsn,table=options.table,filename=options.filename,  
                  update_fields=update_fields,id_field=options.id_field,  
                  sync_mode=options.sync_mode)  
   
 print "DONE!"  print "DONE!"

Removed from v.1.1  
changed lines
  Added in v.1.8


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>