File:  [Repository] / ZSQLExtend / importFMPXML.py
Revision 1.18: download - view: text, annotated - select for diffs - revision graph
Tue Dec 11 20:30:40 2007 UTC (16 years, 5 months ago) by casties
Branches: MAIN
CVS tags: HEAD
fixed really silly bug

    1: #!/usr/local/bin/python
    2: #
    3: 
    4: import string
    5: import logging
    6: import sys
    7: import types
    8: import time
    9: 
   10: from xml import sax
   11: from amara import saxtools
   12: 
   13: try:
   14:     import psycopg2 as psycopg
   15:     psyco = 2
   16: except:
   17:     import psycopg
   18:     psyco = 1
   19: 
   20: fm_ns = 'http://www.filemaker.com/fmpxmlresult'
   21: 
   22: version_string = "V0.5 ROC 11.12.2007"
   23: 
   24: def getTextFromNode(nodename):
   25:     """get the cdata content of a node"""
   26:     if nodename is None:
   27:         return ""
   28:     nodelist=nodename.childNodes
   29:     rc = ""
   30:     for node in nodelist:
   31:         if node.nodeType == node.TEXT_NODE:
   32:            rc = rc + node.data
   33:     return rc
   34: 
   35: def sql_quote(v):
   36:     # quote dictionary
   37:     quote_dict = {"\'": "''", "\\": "\\\\"}
   38:     for dkey in quote_dict.keys():
   39:         if string.find(v, dkey) >= 0:
   40:             v=string.join(string.split(v,dkey),quote_dict[dkey])
   41:     return "'%s'"%v
   42: 
   43: def SimpleSearch(curs,query, args=None, ascii=False):
   44:     """execute sql query and return data"""
   45:     #logger.debug("executing: "+query)
   46:     if ascii:
   47:         # encode all in UTF-8
   48:         query = query.encode("UTF-8")
   49:         if args is not None:
   50:             encargs = []
   51:             for a in args:
   52:                 if a is not None and isinstance(a, str):
   53:                     a = a.encode("UTF-8")
   54:                 encargs.append(a)
   55:             
   56:             args = encargs
   57: 
   58:     curs.execute(query, args)
   59:     #logger.debug("sql done")
   60:     try:
   61:         return curs.fetchall()
   62:     except:
   63:         return None
   64: 
   65: 
   66: class TableColumn:
   67:     """simple type for storing sql column name and type"""
   68:     
   69:     def __init__(self, name, type=None):
   70:         #print "new tablecolumn(%s,%s)"%(name, type)
   71:         self.name = name
   72:         self.type = type
   73:         
   74:     def getName(self):
   75:         return self.name
   76:     
   77:     def getType(self):
   78:         if self.type is not None:
   79:             return self.type
   80:         else:
   81:             return "text"
   82: 
   83:     def __str__(self):
   84:         return self.name
   85:     
   86:     
   87: class xml_handler:
   88:     def __init__(self,options):
   89:         """SAX handler to import FileMaker XML file (FMPXMLRESULT format) into the table.
   90:         @param options: dict of options
   91:         @param options.dsn: database connection string
   92:         @param options.table: name of the table the xml shall be imported into
   93:         @param options.filename: xmlfile filename
   94:         @param options.update_fields: (optional) list of fields to update; default is to create all fields
   95:         @param options.id_field: (optional) field which uniquely identifies an entry for updating purposes.
   96:         @param options.sync_mode: (optional) really synchronise, i.e. delete entries not in XML file
   97:         @param options.lc_names: (optional) lower case and clean up field names from XML
   98:         @param options.keep_fields: (optional) don't add fields to SQL database
   99:         @param options.ascii_db: (optional) assume ascii encoding in db
  100:         @param options.replace_table: (optional) delete and re-insert data
  101:         @param options.backup_table: (optional) create backup of old table (breaks indices)
  102:         @param options.use_logger_instance: (optional) use this instance of a logger
  103:         """
  104:         
  105:         # set up logger
  106:         if hasattr(options, 'use_logger_instance'):
  107:             self.logger = options.use_logger_instance
  108:         else:
  109:             self.logger = logging.getLogger('db.import.fmpxml')
  110: 
  111:         
  112:         # set up parser
  113:         self.event = None
  114:         self.top_dispatcher = { 
  115:             (saxtools.START_ELEMENT, fm_ns, u'METADATA'): 
  116:             self.handle_meta_fields,
  117:             (saxtools.START_ELEMENT, fm_ns, u'RESULTSET'): 
  118:             self.handle_data_fields,
  119:             }
  120:         
  121:         # connect database
  122:         self.dbCon = psycopg.connect(options.dsn)
  123:         self.db = self.dbCon.cursor()
  124:         assert self.db, "AIIEE no db cursor for %s!!"%options.dsn
  125:     
  126:         self.table = getattr(options,"table",None)
  127:         self.update_fields = getattr(options,"update_fields",None)
  128:         self.id_field = getattr(options,"id_field",None)
  129:         self.sync_mode = getattr(options,"sync_mode",None)
  130:         self.lc_names = getattr(options,"lc_names",None)
  131:         self.keep_fields = getattr(options,"keep_fields",None)
  132:         self.ascii_db = getattr(options,"ascii_db",None)
  133:         self.replace_table = getattr(options,"replace_table",None)
  134:         self.backup_table = getattr(options,"backup_table",None)
  135: 
  136:         self.logger.debug("dsn: "+repr(getattr(options,"dsn",None)))
  137:         self.logger.debug("table: "+repr(self.table))
  138:         self.logger.debug("update_fields: "+repr(self.update_fields))
  139:         self.logger.debug("id_field: "+repr(self.id_field))
  140:         self.logger.debug("sync_mode: "+repr(self.sync_mode))
  141:         self.logger.debug("lc_names: "+repr(self.lc_names))
  142:         self.logger.debug("keep_fields: "+repr(self.keep_fields))
  143:         self.logger.debug("ascii_db: "+repr(self.ascii_db))
  144:         self.logger.debug("replace_table: "+repr(self.replace_table))
  145:         self.logger.debug("backup_table: "+repr(self.backup_table))
  146:         
  147:         self.dbIDs = {}
  148:         self.rowcnt = 0
  149:         
  150:         if self.id_field is not None:
  151:             # prepare a list of ids for sync mode
  152:             qstr="select %s from %s"%(self.id_field,self.table)
  153:             for id in SimpleSearch(self.db, qstr):
  154:                 # value 0: not updated
  155:                 self.dbIDs[id[0]] = 0;
  156:                 self.rowcnt += 1
  157:                 
  158:             self.logger.info("%d entries in DB to sync"%self.rowcnt)
  159:         
  160:         # names of fields in XML file
  161:         self.xml_field_names = []
  162:         # map XML field names to SQL field names
  163:         self.xml_field_map = {}
  164:         # and vice versa
  165:         self.sql_field_map = {}
  166:         
  167:         return
  168: 
  169:     def handle_meta_fields(self, end_condition):
  170:         dispatcher = {
  171:             (saxtools.START_ELEMENT, fm_ns, u'FIELD'):
  172:             self.handle_meta_field,
  173:             }
  174:         #First round through the generator corresponds to the
  175:         #start element event
  176:         self.logger.info("reading metadata...")
  177:         self.logger.debug("START METADATA")
  178:         yield None
  179:     
  180:         #delegate is a generator that handles all the events "within"
  181:         #this element
  182:         delegate = None
  183:         while not self.event == end_condition:
  184:             delegate = saxtools.tenorsax.event_loop_body(
  185:                 dispatcher, delegate, self.event)
  186:             yield None
  187:         
  188:         #Element closed. Wrap up
  189:         self.logger.debug("END METADATA")
  190:         
  191:         # rename table for backup
  192:         if self.backup_table:
  193:             self.orig_table = self.table
  194:             self.tmp_table = self.table + "_tmp"
  195:             backup_name = "%s_%s"%(self.table,time.strftime('%Y_%m_%d_%H_%M_%S'))
  196:             
  197:             # remove old temp table
  198:             qstr = "DROP TABLE %s"%(self.tmp_table)
  199:             try:
  200:                 self.db.execute(qstr)
  201:             except:
  202:                 pass
  203:             
  204:             self.dbCon.commit()
  205:            
  206:             if self.id_field:
  207:                 # sync mode -- copy backup table, update current table 
  208:                 self.logger.info("copy table %s to %s"%(self.table,backup_name))
  209:                 qstr = "CREATE TABLE %s AS (SELECT * FROM %s)"%(backup_name,self.table)
  210: 
  211:             else:
  212:                 # replace mode -- create empty tmp table, insert into tmp table
  213:                 self.table = self.tmp_table
  214:                 self.logger.info("create empty table %s"%(self.table))
  215:                 qstr = "CREATE TABLE %s AS (SELECT * FROM %s WHERE 1=0)"%(self.table,self.orig_table)
  216:             
  217:             self.db.execute(qstr)
  218:             self.dbCon.commit()
  219:         
  220:         # delete data from table for replace
  221:         if self.replace_table:
  222:             self.logger.info("delete data from table %s"%(self.table))
  223:             qstr = "TRUNCATE TABLE %s"%(self.table)
  224:             self.db.execute(qstr)
  225:             self.dbCon.commit()
  226:            
  227:         # try to match date style with XML
  228:         self.db.execute("set datestyle to 'german'")
  229:         
  230:         #self.logger.debug("xml-fieldnames:"+repr(self.xml_field_names))
  231:         # get list of fields and types of db table
  232:         qstr="select attname, format_type(pg_attribute.atttypid, pg_attribute.atttypmod) from pg_attribute, pg_class where attrelid = pg_class.oid and pg_attribute.attnum > 0 and relname = '%s'"
  233:         self.sql_fields={}
  234:         for f in SimpleSearch(self.db, qstr%self.table):
  235:             n = f[0]
  236:             t = f[1]
  237:             #print "SQL fields: %s (%s)"%(n,t)
  238:             self.sql_fields[n] = TableColumn(n,t)
  239:         
  240:         # translate id_field (SQL-name) to XML-name
  241:         self.xml_id = self.sql_field_map.get(self.id_field, None)
  242:         # get type of id_field
  243:         if self.id_field:
  244:             self.id_type = self.sql_fields[self.id_field].getType()
  245:         else:
  246:             self.id_type = None
  247:         
  248:         # check fields to update
  249:         if self.update_fields is None:
  250:             if self.keep_fields:
  251:                 # update all existing fields from sql (when they are in the xml file)
  252:                 self.update_fields = {}
  253:                 for f in self.sql_fields.keys():
  254:                     if self.sql_field_map.has_key(f):
  255:                         xf = self.sql_field_map[f]
  256:                         self.update_fields[f] = self.xml_field_map[xf]
  257: 
  258:             else:
  259:                 # update all fields
  260:                 if self.lc_names:
  261:                     # create dict with sql names
  262:                     self.update_fields = {}
  263:                     for f in self.xml_field_map.values():
  264:                         self.update_fields[f.getName()] = f
  265:                         
  266:                 else:
  267:                     self.update_fields = self.xml_field_map
  268:                                 
  269:         # and translate to list of xml fields
  270:         if self.lc_names:
  271:             self.xml_update_list = [self.sql_field_map[x] for x in self.update_fields]
  272:         else:
  273:             self.xml_update_list = self.update_fields.keys()
  274: 
  275:         if not self.keep_fields:
  276:             # adjust db table to fields in XML and update_fields
  277:             for f in self.xml_field_map.values():
  278:                 self.logger.debug("sync-fieldname: %s"%f.getName())
  279:                 sf = self.sql_fields.get(f.getName(), None)
  280:                 uf = self.update_fields.get(f.getName(), None)
  281:                 if sf is not None:
  282:                     # name in db -- check type
  283:                     if f.getType() != sf.getType():
  284:                         self.logger.debug("field %s has different type (%s vs %s)"%(f,f.getType(),sf.getType()))
  285:                 elif uf is not None:
  286:                     # add field to table
  287:                     qstr="alter table %s add %s %s"%(self.table,uf.getName(),uf.getType())
  288:                     self.logger.info("db add field:"+qstr)
  289:                     
  290:                     if self.ascii_db and type(qstr)==types.UnicodeType:
  291:                         qstr=qstr.encode('utf-8')
  292:                         
  293:                     self.db.execute(qstr)
  294:                     self.dbCon.commit()
  295:                 
  296:         # prepare sql statements for update (do not update id_field)
  297:         setStr=string.join(["%s = %%s"%self.xml_field_map[f] for f in self.xml_update_list if f != self.xml_id], ', ')
  298:         self.updQuery="UPDATE %s SET %s WHERE %s = %%s"%(self.table,setStr,self.id_field)
  299:         # and insert
  300:         fields=string.join([self.xml_field_map[x].getName() for x in self.xml_update_list], ',')
  301:         values=string.join(['%s' for f in self.xml_update_list], ',')
  302:         self.addQuery="INSERT INTO %s (%s) VALUES (%s)"%(self.table,fields,values)
  303:         self.logger.debug("update-query: "+self.updQuery)
  304:         self.logger.debug("add-query: "+self.addQuery)
  305:         return
  306: 
  307:     def handle_meta_field(self, end_condition):
  308:         name = self.params.get((None, u'NAME'))
  309:         yield None
  310:         #Element closed.  Wrap up
  311:         if self.lc_names:
  312:             # clean name
  313:             sqlname = name.replace(" ","_").lower() 
  314:         else:
  315:             sqlname = name
  316:         self.xml_field_names.append(name)
  317:         # map to sql name and default text type
  318:         self.xml_field_map[name] = TableColumn(sqlname, 'text')
  319:         self.sql_field_map[sqlname] = name
  320:         self.logger.debug("FIELD name: "+name)
  321:         return
  322: 
  323:     def handle_data_fields(self, end_condition):
  324:         dispatcher = {
  325:             (saxtools.START_ELEMENT, fm_ns, u'ROW'):
  326:             self.handle_row,
  327:             }
  328:         #First round through the generator corresponds to the
  329:         #start element event
  330:         self.logger.info("reading data...")
  331:         self.logger.debug("START RESULTSET")
  332:         self.rowcnt = 0
  333:         yield None
  334:     
  335:         #delegate is a generator that handles all the events "within"
  336:         #this element
  337:         delegate = None
  338:         while not self.event == end_condition:
  339:             delegate = saxtools.tenorsax.event_loop_body(
  340:                 dispatcher, delegate, self.event)
  341:             yield None
  342:         
  343:         #Element closed.  Wrap up
  344:         self.logger.debug("END RESULTSET")
  345:         self.dbCon.commit()
  346:         
  347:         if self.sync_mode:
  348:             # delete unmatched entries in db
  349:             self.logger.info("deleting unmatched rows from db")
  350:             delQuery = "DELETE FROM %s WHERE %s = %%s"%(self.table,self.id_field)
  351:             for id in self.dbIDs.keys():
  352:                 # find all not-updated fields
  353:                 if self.dbIDs[id] == 0:
  354:                     self.logger.info(" delete:"+id)
  355:                     SimpleSearch(self.db, delQuery, [id], ascii=self.ascii_db)
  356:                     sys.exit(1)
  357:                     
  358:                 elif self.dbIDs[id] > 1:
  359:                     self.logger.info(" sync: ID %s used more than once?"%id)
  360:             
  361:             self.dbCon.commit()
  362:             
  363:         # reinstate backup tables
  364:         if self.backup_table and not self.id_field:
  365:             backup_name = "%s_%s"%(self.orig_table,time.strftime('%Y_%m_%d_%H_%M_%S'))
  366:             self.logger.info("rename backup table %s to %s"%(self.orig_table,backup_name))
  367:             qstr = "ALTER TABLE %s RENAME TO %s"%(self.orig_table,backup_name)
  368:             self.db.execute(qstr)
  369:             self.logger.info("rename working table %s to %s"%(self.table,self.orig_table))
  370:             qstr = "ALTER TABLE %s RENAME TO %s"%(self.table,self.orig_table)
  371:             self.db.execute(qstr)
  372:             self.dbCon.commit()
  373:         
  374:         return
  375: 
  376:     def handle_row(self, end_condition):
  377:         dispatcher = {
  378:             (saxtools.START_ELEMENT, fm_ns, u'COL'):
  379:             self.handle_col,
  380:             }
  381:         self.logger.debug("START ROW")
  382:         self.xml_data = {}
  383:         self.colIdx = 0
  384:         yield None
  385:     
  386:         #delegate is a generator that handles all the events "within"
  387:         #this element
  388:         delegate = None
  389:         while not self.event == end_condition:
  390:             delegate = saxtools.tenorsax.event_loop_body(
  391:                 dispatcher, delegate, self.event)
  392:             yield None
  393:         
  394:         #Element closed.  Wrap up
  395:         self.logger.debug("END ROW")
  396:         self.rowcnt += 1
  397:         # process collected row data
  398:         update=False
  399:         id_val=''
  400:         # synchronize by id_field
  401:         if self.id_field:
  402:             if self.id_type == 'integer':
  403:                 id_val = int(self.xml_data[self.xml_id])
  404:             else:
  405:                 id_val = self.xml_data[self.xml_id]
  406:                 
  407:             if id_val in self.dbIDs:
  408:                 self.dbIDs[id_val] += 1
  409:                 update=True
  410: 
  411:         # collect all values
  412:         args = []
  413:         for fn in self.xml_update_list:
  414:             # do not update id_field
  415:             if self.id_field and fn == self.xml_id:
  416:                 continue
  417:             
  418:             f = self.xml_field_map[fn]
  419:             val = self.xml_data[fn]
  420:             type = self.sql_fields[f.getName()].getType()
  421:             if type == "date" and len(val) == 0: 
  422:                 # empty date field
  423:                 val = None
  424:                 
  425:             elif type == "integer" and len(val) == 0: 
  426:                 # empty int field
  427:                 val = None
  428:                 
  429:             args.append(val)
  430:                     
  431:         if update:
  432:             # update existing row (by id_field)
  433:             # last argument is ID match
  434:             args.append(id_val)
  435:             self.logger.debug("update: %s = %s"%(id_val, args))
  436:             SimpleSearch(self.db, self.updQuery, args, ascii=self.ascii_db)
  437: 
  438:         else:
  439:             # create new row
  440:             self.logger.debug("insert: %s"%args)
  441:             SimpleSearch(self.db, self.addQuery, args, ascii=self.ascii_db)
  442: 
  443:         #self.logger.info(" row:"+"%d (%s)"%(self.rowcnt,id_val))
  444:         if (self.rowcnt % 100) == 0:
  445:             self.logger.info(" row:"+"%d (id:%s)"%(self.rowcnt,id_val))
  446:             self.dbCon.commit()
  447:             
  448:         return
  449: 
  450:     def handle_col(self, end_condition):
  451:         dispatcher = {
  452:             (saxtools.START_ELEMENT, fm_ns, u'DATA'):
  453:             self.handle_data_tag,
  454:             }
  455:         #print "START COL"
  456:         yield None
  457:         #delegate is a generator that handles all the events "within"
  458:         #this element
  459:         delegate = None
  460:         while not self.event == end_condition:
  461:             delegate = saxtools.tenorsax.event_loop_body(
  462:                 dispatcher, delegate, self.event)
  463:             yield None
  464:         #Element closed.  Wrap up
  465:         #print "END COL"
  466:         self.colIdx += 1
  467:         return
  468: 
  469:     def handle_data_tag(self, end_condition):
  470:         #print "START DATA"
  471:         content = u''
  472:         yield None
  473:         # gather child elements
  474:         while not self.event == end_condition:
  475:             if self.event[0] == saxtools.CHARACTER_DATA:
  476:                 content += self.params
  477:             yield None
  478:         #Element closed.  Wrap up
  479:         fn = self.xml_field_names[self.colIdx]
  480:         self.xml_data[fn] = content
  481:         return
  482: 
  483: 
  484: def importFMPXML(options):
  485:     """import FileMaker XML file (FMPXMLRESULT format) into the table.     
  486:         @param options: dict of options
  487:         @param options.dsn: database connection string
  488:         @param options.table: name of the table the xml shall be imported into
  489:         @param options.filename: xmlfile filename
  490:         @param options.update_fields: (optional) list of fields to update; default is to create all fields
  491:         @param options.id_field: (optional) field which uniquely identifies an entry for updating purposes.
  492:         @param options.sync_mode: (optional) really synchronise, i.e. delete entries not in XML file
  493:         @param options.lc_names: (optional) lower case and clean up field names from XML
  494:         @param options.keep_fields: (optional) don't add fields to SQL database
  495:         @param options.ascii_db: (optional) assume ascii encoding in db
  496:         @param options.replace_table: (optional) delete and re-insert data
  497:         @param options.backup_table: (optional) create backup of old table
  498:         """
  499:         
  500:     if getattr(options,'update_fields',None):
  501:         uf = {}
  502:         for f in options.update_fields.split(','):
  503:             if f.find(':') > 0:
  504:                 (n,t) = f.split(':')
  505:             else:
  506:                 n = f
  507:                 t = None
  508:             uf[n] = TableColumn(n,t)
  509:             
  510:         options.update_fields = uf
  511:     
  512:     if getattr(options,'id_field',None) and getattr(options,'replace_table',None):
  513:         logging.error("ABORT: sorry, you can't do both sync (id_field) and replace")
  514:         return
  515:         
  516:     parser = sax.make_parser()
  517:     #The "consumer" is our own handler
  518:     consumer = xml_handler(options)
  519:     #Initialize Tenorsax with handler
  520:     handler = saxtools.tenorsax(consumer)
  521:     #Resulting tenorsax instance is the SAX handler 
  522:     parser.setContentHandler(handler)
  523:     parser.setFeature(sax.handler.feature_namespaces, 1)
  524:     parser.parse(options.filename)  
  525:     
  526: 
  527: if __name__ == "__main__":
  528:     from optparse import OptionParser
  529: 
  530:     opars = OptionParser()
  531:     opars.add_option("-f", "--file", 
  532:                      dest="filename",
  533:                      help="FMPXML file name", metavar="FILE")
  534:     opars.add_option("-c", "--dsn", 
  535:                      dest="dsn", 
  536:                      help="database connection string")
  537:     opars.add_option("-t", "--table", 
  538:                      dest="table", 
  539:                      help="database table name")
  540:     opars.add_option("--fields", default=None, 
  541:                      dest="update_fields", 
  542:                      help="list of fields to update (comma separated, sql-names)", metavar="LIST")
  543:     opars.add_option("--id-field", default=None, 
  544:                      dest="id_field", 
  545:                      help="name of id field for synchronisation (only appends data otherwise, sql-name)", metavar="NAME")
  546:     opars.add_option("--sync", "--sync-mode", default=False, action="store_true", 
  547:                      dest="sync_mode", 
  548:                      help="do full sync based on id field (remove unmatched fields from db)")
  549:     opars.add_option("--lc-names", default=False, action="store_true", 
  550:                      dest="lc_names", 
  551:                      help="clean and lower case field names from XML")
  552:     opars.add_option("--keep-fields", default=False, action="store_true", 
  553:                      dest="keep_fields", 
  554:                      help="don't add fields from XML to SQL table")
  555:     opars.add_option("--ascii-db", default=False, action="store_true", 
  556:                      dest="ascii_db", 
  557:                      help="the SQL database stores ASCII instead of unicode")
  558:     opars.add_option("--replace", default=False, action="store_true", 
  559:                      dest="replace_table", 
  560:                      help="replace table i.e. delete and re-insert data")
  561:     opars.add_option("--backup", default=False, action="store_true", 
  562:                      dest="backup_table", 
  563:                      help="create backup of old table")
  564:     opars.add_option("-d", "--debug", default=False, action="store_true", 
  565:                      dest="debug", 
  566:                      help="debug mode (more output)")
  567:     
  568:     (options, args) = opars.parse_args()
  569:     
  570:     if len(sys.argv) < 2 or options.filename is None or options.dsn is None:
  571:         print "importFMPXML "+version_string
  572:         opars.print_help()
  573:         sys.exit(1)
  574:     
  575:     if options.debug:
  576:         loglevel = logging.DEBUG
  577:     else:
  578:         loglevel = logging.INFO
  579:     
  580:     logging.basicConfig(level=loglevel, 
  581:                         format='%(asctime)s %(levelname)s %(message)s',
  582:                         datefmt='%H:%M:%S')
  583: 
  584:     importFMPXML(options)
  585: 
  586: 
  587:     
  588: 

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>