File:  [Repository] / ZSQLExtend / importFMPXML.py
Revision 1.8: download - view: text, annotated - select for diffs - revision graph
Thu Mar 29 18:31:32 2007 UTC (17 years, 2 months ago) by casties
Branches: MAIN
CVS tags: HEAD
new version 0.4
- backup option
- replace mode
- checks type of fields

    1: #!/usr/local/bin/python
    2: #
    3: 
    4: import string
    5: import logging
    6: import sys
    7: import types
    8: import time
    9: 
   10: from xml import sax
   11: from amara import saxtools
   12: 
   13: try:
   14:     import psycopg2 as psycopg
   15:     psyco = 2
   16: except:
   17:     import psycopg
   18:     psyco = 1
   19: 
   20: fm_ns = 'http://www.filemaker.com/fmpxmlresult'
   21: 
   22: version_string = "V0.4 ROC 29.3.2007"
   23: 
   24: def getTextFromNode(nodename):
   25:     """get the cdata content of a node"""
   26:     if nodename is None:
   27:         return ""
   28:     nodelist=nodename.childNodes
   29:     rc = ""
   30:     for node in nodelist:
   31:         if node.nodeType == node.TEXT_NODE:
   32:            rc = rc + node.data
   33:     return rc
   34: 
   35: def sql_quote(v):
   36:     # quote dictionary
   37:     quote_dict = {"\'": "''", "\\": "\\\\"}
   38:     for dkey in quote_dict.keys():
   39:         if string.find(v, dkey) >= 0:
   40:             v=string.join(string.split(v,dkey),quote_dict[dkey])
   41:     return "'%s'"%v
   42: 
   43: def SimpleSearch(curs,query, args=None, ascii=False):
   44:     """execute sql query and return data"""
   45:     #logging.debug("executing: "+query)
   46:     if ascii:
   47:         # encode all in UTF-8
   48:         query = query.encode("UTF-8")
   49:         if args is not None:
   50:             encargs = []
   51:             for a in args:
   52:                 if a is not None:
   53:                     a = a.encode("UTF-8")
   54:                 encargs.append(a)
   55:             
   56:             args = encargs
   57: 
   58:     curs.execute(query, args)
   59:     #logging.debug("sql done")
   60:     try:
   61:         return curs.fetchall()
   62:     except:
   63:         return None
   64: 
   65: 
   66: class TableColumn:
   67:     """simple type for storing sql column name and type"""
   68:     
   69:     def __init__(self, name, type=None):
   70:         #print "new tablecolumn(%s,%s)"%(name, type)
   71:         self.name = name
   72:         self.type = type
   73:         
   74:     def getName(self):
   75:         return self.name
   76:     
   77:     def getType(self):
   78:         if self.type is not None:
   79:             return self.type
   80:         else:
   81:             return "text"
   82: 
   83:     def __str__(self):
   84:         return self.name
   85:     
   86:     
   87: class xml_handler:
   88:     
   89:     def __init__(self,options):
   90:         '''
   91:         SAX handler to import FileMaker XML file (FMPXMLRESULT format) into the table.
   92:         @param options: dict of options
   93:         @param options.dsn: database connection string
   94:         @param options.table: name of the table the xml shall be imported into
   95:         @param options.filename: xmlfile filename
   96:         @param options.update_fields: (optional) list of fields to update; default is to create all fields
   97:         @param options.id_field: (optional) field which uniquely identifies an entry for updating purposes.
   98:         @param options.sync_mode: (optional) really synchronise, i.e. delete entries not in XML file
   99:         @param options.lc_names: (optional) lower case and clean up field names from XML
  100:         @param options.keep_fields: (optional) don't add fields to SQL database
  101:         @param options.ascii_db: (optional) assume ascii encoding in db
  102:         @param options.replace_table: (optional) delete and re-insert data
  103:         '''
  104:         # set up parser
  105:         self.event = None
  106:         self.top_dispatcher = { 
  107:             (saxtools.START_ELEMENT, fm_ns, u'METADATA'): 
  108:             self.handle_meta_fields,
  109:             (saxtools.START_ELEMENT, fm_ns, u'RESULTSET'): 
  110:             self.handle_data_fields,
  111:             }
  112:         
  113:         # connect database
  114:         self.dbCon = psycopg.connect(options.dsn)
  115:         self.db = self.dbCon.cursor()
  116:         assert self.db, "AIIEE no db cursor for %s!!"%options.dsn
  117:     
  118:         self.table = options.table
  119:         self.update_fields = options.update_fields
  120:         self.id_field = options.id_field
  121:         self.sync_mode = options.sync_mode
  122:         self.lc_names = options.lc_names
  123:         self.keep_fields = options.keep_fields
  124:         self.ascii_db = options.ascii_db
  125:         self.replace_table = options.replace_table
  126:         self.backup_table = options.backup_table
  127: 
  128:         logging.debug("dsn: "+repr(options.dsn))
  129:         logging.debug("table: "+repr(self.table))
  130:         logging.debug("update_fields: "+repr(self.update_fields))
  131:         logging.debug("id_field: "+repr(self.id_field))
  132:         logging.debug("sync_mode: "+repr(self.sync_mode))
  133:         logging.debug("lc_names: "+repr(self.lc_names))
  134:         logging.debug("keep_fields: "+repr(self.keep_fields))
  135:         logging.debug("ascii_db: "+repr(self.ascii_db))
  136:         logging.debug("replace_table: "+repr(self.replace_table))
  137:         
  138:         self.dbIDs = {}
  139:         self.rowcnt = 0
  140:         
  141:         if self.id_field is not None:
  142:             # prepare a list of ids for sync mode
  143:             qstr="select %s from %s"%(self.id_field,self.table)
  144:             for id in SimpleSearch(self.db, qstr):
  145:                 # value 0: not updated
  146:                 self.dbIDs[id[0]] = 0;
  147:                 self.rowcnt += 1
  148:                 
  149:             logging.info("%d entries in DB to sync"%self.rowcnt)
  150:         
  151:         # names of fields in XML file
  152:         self.xml_field_names = []
  153:         # map XML field names to SQL field names
  154:         self.xml_field_map = {}
  155:         # and vice versa
  156:         self.sql_field_map = {}
  157:         
  158:         return
  159: 
  160:     def handle_meta_fields(self, end_condition):
  161:         dispatcher = {
  162:             (saxtools.START_ELEMENT, fm_ns, u'FIELD'):
  163:             self.handle_meta_field,
  164:             }
  165:         #First round through the generator corresponds to the
  166:         #start element event
  167:         logging.debug("START METADATA")
  168:         yield None
  169:     
  170:         #delegate is a generator that handles all the events "within"
  171:         #this element
  172:         delegate = None
  173:         while not self.event == end_condition:
  174:             delegate = saxtools.tenorsax.event_loop_body(
  175:                 dispatcher, delegate, self.event)
  176:             yield None
  177:         
  178:         #Element closed. Wrap up
  179:         logging.debug("END METADATA")
  180:         
  181:         # rename table for backup
  182:         if self.backup_table:
  183:             self.orig_table = self.table
  184:             self.table = self.table + "_tmp"
  185:             # remove old temp table
  186:             qstr = "DROP TABLE %s"%(self.table)
  187:             try:
  188:                 self.db.execute(qstr)
  189:             except:
  190:                 pass
  191:             
  192:             self.dbCon.commit()
  193:            
  194:             if self.id_field:
  195:                 # sync mode -- copy table
  196:                 logging.info("copy table %s to %s"%(self.orig_table,self.table))
  197:                 qstr = "CREATE TABLE %s AS (SELECT * FROM %s)"%(self.table,self.orig_table)
  198: 
  199:             else:
  200:                 # rename table and create empty new one
  201:                 logging.info("create empty table %s"%(self.table))
  202:                 qstr = "CREATE TABLE %s AS (SELECT * FROM %s WHERE 1=0)"%(self.table,self.orig_table)
  203:             
  204:             self.db.execute(qstr)
  205:             self.dbCon.commit()
  206:         
  207:         # delete data from table for replace
  208:         if self.replace_table:
  209:             logging.info("delete data from table %s"%(self.table))
  210:             qstr = "TRUNCATE TABLE %s"%(self.table)
  211:             self.db.execute(qstr)
  212:             self.dbCon.commit()
  213:            
  214:         # try to match date style with XML
  215:         self.db.execute("set datestyle to 'german'")
  216:         
  217:         # translate id_field (SQL-name) to XML-name
  218:         self.xml_id = self.sql_field_map.get(self.id_field, None)
  219:         
  220:         #logging.debug("xml-fieldnames:"+repr(self.xml_field_names))
  221:         # get list of fields and types of db table
  222:         qstr="select attname, format_type(pg_attribute.atttypid, pg_attribute.atttypmod) from pg_attribute, pg_class where attrelid = pg_class.oid and pg_attribute.attnum > 0 and relname = '%s'"
  223:         self.sql_fields={}
  224:         for f in SimpleSearch(self.db, qstr%self.table):
  225:             n = f[0]
  226:             t = f[1]
  227:             #print "SQL fields: %s (%s)"%(n,t)
  228:             self.sql_fields[n] = TableColumn(n,t)
  229:         
  230:         # check fields to update
  231:         if self.update_fields is None:
  232:             if self.keep_fields:
  233:                 # update existing fields
  234:                 self.update_fields = self.sql_fields
  235:                 
  236:             else:
  237:                 # update all fields
  238:                 if self.lc_names:
  239:                     # create dict with sql names
  240:                     self.update_fields = {}
  241:                     for f in self.xml_field_map.values():
  242:                         self.update_fields[f.getName()] = f
  243:                         
  244:                 else:
  245:                     self.update_fields = self.xml_field_map
  246:             
  247:         # and translate to list of xml fields
  248:         if self.lc_names:
  249:             self.xml_update_list = [self.sql_field_map[x] for x in self.update_fields]
  250:         else:
  251:             self.xml_update_list = self.update_fields.keys()
  252:         
  253:         if not self.keep_fields:
  254:             # adjust db table to fields in XML and update_fields
  255:             for f in self.xml_field_map.values():
  256:                 logging.debug("sync-fieldname: %s"%f.getName())
  257:                 sf = self.sql_fields.get(f.getName(), None)
  258:                 uf = self.update_fields.get(f.getName(), None)
  259:                 if sf is not None:
  260:                     # name in db -- check type
  261:                     if f.getType() != sf.getType():
  262:                         logging.debug("field %s has different type (%s vs %s)"%(f,f.getType(),sf.getType()))
  263:                 elif uf is not None:
  264:                     # add field to table
  265:                     qstr="alter table %s add %s %s"%(self.table,uf.getName(),uf.getType())
  266:                     logging.info("db add field:"+qstr)
  267:                     
  268:                     if self.ascii_db and type(qstr)==types.UnicodeType:
  269:                         qstr=qstr.encode('utf-8')
  270:                         
  271:                     self.db.execute(qstr)
  272:                     self.dbCon.commit()
  273:                 
  274:         # prepare sql statements for update
  275:         setStr=string.join(["%s = %%s"%self.xml_field_map[f] for f in self.xml_update_list], ', ')
  276:         self.updQuery="UPDATE %s SET %s WHERE %s = %%s"%(self.table,setStr,self.id_field)
  277:         # and insert
  278:         fields=string.join([self.xml_field_map[x].getName() for x in self.xml_update_list], ',')
  279:         values=string.join(['%s' for f in self.xml_update_list], ',')
  280:         self.addQuery="INSERT INTO %s (%s) VALUES (%s)"%(self.table,fields,values)
  281:         logging.debug("update-query: "+self.updQuery)
  282:         logging.debug("add-query: "+self.addQuery)
  283:         return
  284: 
  285:     def handle_meta_field(self, end_condition):
  286:         name = self.params.get((None, u'NAME'))
  287:         yield None
  288:         #Element closed.  Wrap up
  289:         if self.lc_names:
  290:             # clean name
  291:             sqlname = name.replace(" ","_").lower() 
  292:         else:
  293:             sqlname = name
  294:         self.xml_field_names.append(name)
  295:         # map to sql name and default text type
  296:         self.xml_field_map[name] = TableColumn(sqlname, 'text')
  297:         self.sql_field_map[sqlname] = name
  298:         logging.debug("FIELD name: "+name)
  299:         return
  300: 
  301:     def handle_data_fields(self, end_condition):
  302:         dispatcher = {
  303:             (saxtools.START_ELEMENT, fm_ns, u'ROW'):
  304:             self.handle_row,
  305:             }
  306:         #First round through the generator corresponds to the
  307:         #start element event
  308:         logging.debug("START RESULTSET")
  309:         self.rowcnt = 0
  310:         yield None
  311:     
  312:         #delegate is a generator that handles all the events "within"
  313:         #this element
  314:         delegate = None
  315:         while not self.event == end_condition:
  316:             delegate = saxtools.tenorsax.event_loop_body(
  317:                 dispatcher, delegate, self.event)
  318:             yield None
  319:         
  320:         #Element closed.  Wrap up
  321:         logging.debug("END RESULTSET")
  322:         self.dbCon.commit()
  323:         
  324:         if self.sync_mode:
  325:             # delete unmatched entries in db
  326:             logging.info("deleting unmatched rows from db")
  327:             delQuery = "DELETE FROM %s WHERE %s = %%s"%(self.table,self.id_field)
  328:             for id in self.dbIDs.keys():
  329:                 # find all not-updated fields
  330:                 if self.dbIDs[id] == 0:
  331:                     logging.info(" delete:"+id)
  332:                     SimpleSearch(self.db, delQuery, [id], ascii=self.ascii_db)
  333:                     sys.exit(1)
  334:                     
  335:                 elif self.dbIDs[id] > 1:
  336:                     logging.info(" sync: ID %s used more than once?"%id)
  337:             
  338:             self.dbCon.commit()
  339:             
  340:         # reinstate backup tables
  341:         if self.backup_table:
  342:             backup_name = "%s_%s"%(self.orig_table,time.strftime('%Y_%m_%d_%H_%M_%S'))
  343:             logging.info("rename backup table %s to %s"%(self.orig_table,backup_name))
  344:             qstr = "ALTER TABLE %s RENAME TO %s"%(self.orig_table,backup_name)
  345:             self.db.execute(qstr)
  346:             logging.info("rename working table %s to %s"%(self.table,self.orig_table))
  347:             qstr = "ALTER TABLE %s RENAME TO %s"%(self.table,self.orig_table)
  348:             self.db.execute(qstr)
  349:             self.dbCon.commit()
  350:         
  351:         return
  352: 
  353:     def handle_row(self, end_condition):
  354:         dispatcher = {
  355:             (saxtools.START_ELEMENT, fm_ns, u'COL'):
  356:             self.handle_col,
  357:             }
  358:         logging.debug("START ROW")
  359:         self.xml_data = {}
  360:         self.colIdx = 0
  361:         yield None
  362:     
  363:         #delegate is a generator that handles all the events "within"
  364:         #this element
  365:         delegate = None
  366:         while not self.event == end_condition:
  367:             delegate = saxtools.tenorsax.event_loop_body(
  368:                 dispatcher, delegate, self.event)
  369:             yield None
  370:         
  371:         #Element closed.  Wrap up
  372:         logging.debug("END ROW")
  373:         self.rowcnt += 1
  374:         # process collected row data
  375:         update=False
  376:         id_val=''
  377:         # synchronize by id_field
  378:         if self.id_field:
  379:             id_val = self.xml_data[self.xml_id]
  380:             if id_val in self.dbIDs:
  381:                 self.dbIDs[id_val] += 1
  382:                 update=True
  383: 
  384:         # collect all values
  385:         args = []
  386:         for fn in self.xml_update_list:
  387:             f = self.xml_field_map[fn]
  388:             val = self.xml_data[fn]
  389:             type = self.sql_fields[f.getName()].getType()
  390:             if type == "date" and len(val) == 0: 
  391:                 # empty date field
  392:                 val = None
  393:                 
  394:             elif type == "integer" and len(val) == 0: 
  395:                 # empty int field
  396:                 val = None
  397:                 
  398:             args.append(val)
  399:                     
  400:         if update:
  401:             # update existing row (by id_field)
  402:             # last argument is ID match
  403:             args.append(id_val)
  404:             logging.debug("update: %s = %s"%(id_val, args))
  405:             SimpleSearch(self.db, self.updQuery, args, ascii=self.ascii_db)
  406: 
  407:         else:
  408:             # create new row
  409:             logging.debug("insert: %s"%args)
  410:             SimpleSearch(self.db, self.addQuery, args, ascii=self.ascii_db)
  411: 
  412:         #logging.info(" row:"+"%d (%s)"%(self.rowcnt,id_val))
  413:         if (self.rowcnt % 10) == 0:
  414:             logging.info(" row:"+"%d (%s)"%(self.rowcnt,id_val))
  415:             self.dbCon.commit()
  416:             
  417:         return
  418: 
  419:     def handle_col(self, end_condition):
  420:         dispatcher = {
  421:             (saxtools.START_ELEMENT, fm_ns, u'DATA'):
  422:             self.handle_data_tag,
  423:             }
  424:         #print "START COL"
  425:         yield None
  426:         #delegate is a generator that handles all the events "within"
  427:         #this element
  428:         delegate = None
  429:         while not self.event == end_condition:
  430:             delegate = saxtools.tenorsax.event_loop_body(
  431:                 dispatcher, delegate, self.event)
  432:             yield None
  433:         #Element closed.  Wrap up
  434:         #print "END COL"
  435:         self.colIdx += 1
  436:         return
  437: 
  438:     def handle_data_tag(self, end_condition):
  439:         #print "START DATA"
  440:         content = u''
  441:         yield None
  442:         # gather child elements
  443:         while not self.event == end_condition:
  444:             if self.event[0] == saxtools.CHARACTER_DATA:
  445:                 content += self.params
  446:             yield None
  447:         #Element closed.  Wrap up
  448:         fn = self.xml_field_names[self.colIdx]
  449:         self.xml_data[fn] = content
  450:         return
  451: 
  452: 
  453: 
  454: 
  455: 
  456: ##
  457: ## public static int main()
  458: ##
  459: 
  460: from optparse import OptionParser
  461: 
  462: opars = OptionParser()
  463: opars.add_option("-f", "--file", 
  464:                  dest="filename",
  465:                  help="FMPXML file name", metavar="FILE")
  466: opars.add_option("-c", "--dsn", 
  467:                  dest="dsn", 
  468:                  help="database connection string")
  469: opars.add_option("-t", "--table", 
  470:                  dest="table", 
  471:                  help="database table name")
  472: opars.add_option("--fields", default=None, 
  473:                  dest="update_fields", 
  474:                  help="list of fields to update (comma separated, sql-names)", metavar="LIST")
  475: opars.add_option("--id-field", default=None, 
  476:                  dest="id_field", 
  477:                  help="name of id field for synchronisation (only appends data otherwise, sql-name)", metavar="NAME")
  478: opars.add_option("--sync", "--sync-mode", default=False, action="store_true", 
  479:                  dest="sync_mode", 
  480:                  help="do full sync based on id field (remove unmatched fields from db)")
  481: opars.add_option("--lc-names", default=False, action="store_true", 
  482:                  dest="lc_names", 
  483:                  help="clean and lower case field names from XML")
  484: opars.add_option("--keep-fields", default=False, action="store_true", 
  485:                  dest="keep_fields", 
  486:                  help="don't add fields from XML to SQL table")
  487: opars.add_option("--ascii-db", default=False, action="store_true", 
  488:                  dest="ascii_db", 
  489:                  help="the SQL database stores ASCII instead of unicode")
  490: opars.add_option("--replace", default=False, action="store_true", 
  491:                  dest="replace_table", 
  492:                  help="replace table i.e. delete and re-insert data")
  493: opars.add_option("--backup", default=False, action="store_true", 
  494:                  dest="backup_table", 
  495:                  help="create backup of old table (breaks indices)")
  496: opars.add_option("-d", "--debug", default=False, action="store_true", 
  497:                  dest="debug", 
  498:                  help="debug mode (more output)")
  499: 
  500: (options, args) = opars.parse_args()
  501: 
  502: if len(sys.argv) < 2 or options.filename is None or options.dsn is None:
  503:     print "importFMPXML "+version_string
  504:     opars.print_help()
  505:     sys.exit(1)
  506: 
  507: if options.debug:
  508:     loglevel = logging.DEBUG
  509: else:
  510:     loglevel = logging.INFO
  511: 
  512: logging.basicConfig(level=loglevel, 
  513:                     format='%(asctime)s %(levelname)s %(message)s',
  514:                     datefmt='%H:%M:%S')
  515: 
  516: update_fields = None
  517: 
  518: if options.update_fields:
  519:     uf = {}
  520:     for f in options.update_fields.split(','):
  521:         (n,t) = f.split(':')
  522:         uf[n] = TableColumn(n,t)
  523:         
  524:     options.update_fields = uf
  525: 
  526: if options.id_field and options.replace_table:
  527:     logging.error("ABORT: sorry, you can't do both sync (id_field) and replace")
  528:     sys.exit(1)
  529:     
  530: parser = sax.make_parser()
  531: #The "consumer" is our own handler
  532: consumer = xml_handler(options)
  533: #Initialize Tenorsax with handler
  534: handler = saxtools.tenorsax(consumer)
  535: #Resulting tenorsax instance is the SAX handler 
  536: parser.setContentHandler(handler)
  537: parser.setFeature(sax.handler.feature_namespaces, 1)
  538: parser.parse(options.filename)  
  539: 
  540: 
  541: print "DONE!"

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>