File:  [Repository] / ZSQLExtend / importFMPXML.py
Revision 1.7: download - view: text, annotated - select for diffs - revision graph
Tue Feb 20 16:00:03 2007 UTC (17 years, 3 months ago) by dwinter
Branches: MAIN
CVS tags: HEAD
minor

    1: #!/usr/local/bin/python
    2: #
    3: 
    4: import string
    5: import logging
    6: import sys
    7: import types
    8: 
    9: from xml import sax
   10: from amara import saxtools
   11: 
   12: try:
   13:     import psycopg2 as psycopg
   14:     psyco = 2
   15: except:
   16:     import psycopg
   17:     psyco = 1
   18: 
   19: fm_ns = 'http://www.filemaker.com/fmpxmlresult'
   20: 
   21: def getTextFromNode(nodename):
   22:     """get the cdata content of a node"""
   23:     if nodename is None:
   24:         return ""
   25:     nodelist=nodename.childNodes
   26:     rc = ""
   27:     for node in nodelist:
   28:         if node.nodeType == node.TEXT_NODE:
   29:            rc = rc + node.data
   30:     return rc
   31: 
   32: def sql_quote(v):
   33:     # quote dictionary
   34:     quote_dict = {"\'": "''", "\\": "\\\\"}
   35:     for dkey in quote_dict.keys():
   36:         if string.find(v, dkey) >= 0:
   37:             v=string.join(string.split(v,dkey),quote_dict[dkey])
   38:     return "'%s'"%v
   39: 
   40: def SimpleSearch(curs,query, args=None):
   41:     """execute sql query and return data"""
   42:     logging.debug("executing: "+query)
   43:     if psyco == 1:
   44:         query = query.encode("UTF-8")
   45:         #if args is not None:
   46:         #    args = [ sql_quote(a) for a in args ]
   47:     #logging.debug(query)
   48:     #logging.debug(args)
   49: 
   50:     curs.execute(query, args)
   51:     logging.debug("sql done")
   52:     try:
   53:         return curs.fetchall()
   54:     except:
   55:         return None
   56: 
   57: 
   58: 
   59: class xml_handler:
   60:     
   61:     def __init__(self,dsn,table,update_fields=None,id_field=None,sync_mode=False):
   62:         '''
   63:         SAX handler to import FileMaker XML file (FMPXMLRESULT format) into the table.
   64:         @param dsn: database connection string
   65:         @param table: name of the table the xml shall be imported into
   66:         @param filename: xmlfile filename
   67:         @param update_fields: (optional) list of fields to update; default is to create all fields
   68:         @param id_field: (optional) field which uniquely identifies an entry for updating purposes.
   69:         @param sync_mode: (optional) really synchronise, i.e. delete entries not in XML file
   70:         '''
   71:         # set up parser
   72:         self.event = None
   73:         self.top_dispatcher = { 
   74:             (saxtools.START_ELEMENT, fm_ns, u'METADATA'): 
   75:             self.handle_meta_fields,
   76:             (saxtools.START_ELEMENT, fm_ns, u'RESULTSET'): 
   77:             self.handle_data,
   78:             }
   79:         
   80:         # connect database
   81:         self.dbCon = psycopg.connect(dsn)
   82:         self.db = self.dbCon.cursor()
   83:         assert self.db, "AIIEE no db cursor for %s!!"%dsn
   84:     
   85:         logging.debug("dsn: "+repr(dsn))
   86:         logging.debug("table: "+repr(table))
   87:         logging.debug("update_fields: "+repr(update_fields))
   88:         logging.debug("id_field: "+repr(id_field))
   89:         logging.debug("sync_mode: "+repr(sync_mode))
   90: 
   91:         self.table = table
   92:         self.update_fields = update_fields
   93:         self.id_field = id_field
   94:         self.sync_mode = sync_mode
   95:         
   96:         self.dbIDs = {}
   97:         self.rowcnt = 0
   98:              
   99:         self.db.execute("set datestyle to 'german'")
  100:         if id_field is not None:
  101:             # prepare a list of ids for sync mode
  102:             qstr="select %s from %s"%(id_field,table)
  103:             for id in SimpleSearch(self.db, qstr):
  104:                 # value 0: not updated
  105:                 self.dbIDs[id[0]] = 0;
  106:                 self.rowcnt += 1
  107:                 
  108:             logging.info("%d entries in DB to sync"%self.rowcnt)
  109:         
  110:         self.fieldNames = []
  111:         
  112:         return
  113: 
  114:     def handle_meta_fields(self, end_condition):
  115:         dispatcher = {
  116:             (saxtools.START_ELEMENT, fm_ns, u'FIELD'):
  117:             self.handle_meta_field,
  118:             }
  119:         #First round through the generator corresponds to the
  120:         #start element event
  121:         logging.debug("START METADATA")
  122:         yield None
  123:     
  124:         #delegate is a generator that handles all the events "within"
  125:         #this element
  126:         delegate = None
  127:         while not self.event == end_condition:
  128:             delegate = saxtools.tenorsax.event_loop_body(
  129:                 dispatcher, delegate, self.event)
  130:             yield None
  131:         
  132:         #Element closed. Wrap up
  133:         logging.debug("END METADATA")
  134:         if self.update_fields is None:
  135:             # update all fields
  136:             self.update_fields = self.fieldNames
  137:         
  138:         logging.debug("xml-fieldnames:"+repr(self.fieldNames))
  139:         # get list of fields in db table
  140:         qstr="""select attname from pg_attribute, pg_class where attrelid = pg_class.oid and relname = '%s'"""
  141:         columns=[x[0] for x in SimpleSearch(self.db, qstr%self.table)]
  142:         
  143:         # adjust db table to fields in XML and fieldlist
  144:         for fieldName in self.fieldNames:
  145:             logging.debug("db-fieldname:"+repr(fieldName))
  146:             fieldName=fieldName.replace(" ","_")   # repair _                  
  147:             if (fieldName.lower() not in columns) and (fieldName in self.update_fields):
  148:                 qstr="alter table %s add %s %s"%(self.table,fieldName,'text')
  149:                 logging.info("db add field:"+qstr)
  150:                 
  151:                 if type(qstr)==types.UnicodeType:
  152:                     qstr=qstr.encode('utf-8')
  153:                 self.db.execute(qstr)
  154:                 self.dbCon.commit()
  155: 
  156:         # prepare sql statements for update
  157:         setStr=string.join(["%s = %%s"%f for f in self.update_fields], ', ')
  158:         self.updQuery="UPDATE %s SET %s WHERE %s = %%s"%(self.table,setStr,self.id_field)
  159:         # and insert
  160:         fields=string.join(self.update_fields, ',')
  161:         values=string.join(['%s' for f in self.update_fields], ',')
  162:         self.addQuery="INSERT INTO %s (%s) VALUES (%s)"%(self.table,fields,values)
  163:         #print "upQ: ", self.updQuery
  164:         #print "adQ: ", self.addQuery
  165:                         
  166:         return
  167: 
  168:     def handle_meta_field(self, end_condition):
  169:         name = self.params.get((None, u'NAME'))
  170:         yield None
  171:         #Element closed.  Wrap up
  172:         name=name.replace(" ","_")# make sure no spaces
  173:         self.fieldNames.append(name)
  174:         logging.debug("FIELD name: "+name)
  175:         return
  176: 
  177:     def handle_data(self, end_condition):
  178:         dispatcher = {
  179:             (saxtools.START_ELEMENT, fm_ns, u'ROW'):
  180:             self.handle_row,
  181:             }
  182:         #First round through the generator corresponds to the
  183:         #start element event
  184:         logging.debug("START RESULTSET")
  185:         self.rowcnt = 0
  186:         yield None
  187:     
  188:         #delegate is a generator that handles all the events "within"
  189:         #this element
  190:         delegate = None
  191:         while not self.event == end_condition:
  192:             delegate = saxtools.tenorsax.event_loop_body(
  193:                 dispatcher, delegate, self.event)
  194:             yield None
  195:         
  196:         #Element closed.  Wrap up
  197:         logging.debug("END RESULTSET")
  198:         self.dbCon.commit()
  199:         
  200:         if self.sync_mode:
  201:             # delete unmatched entries in db
  202:             delQuery = "DELETE FROM %s WHERE %s = %%s"%(self.table,self.id_field)
  203:             for id in self.dbIDs.keys():
  204:                 # find all not-updated fields
  205:                 if self.dbIDs[id] == 0:
  206:                     logging.info(" delete:"+id)
  207:                     SimpleSearch(self.db, delQuery, [id])
  208:                     sys.exit(1)
  209:                     
  210:                 elif self.dbIDs[id] > 1:
  211:                     logging.info(" sync:"+"id used more than once?"+id)
  212:             
  213:             self.dbCon.commit()
  214:         
  215:         return
  216: 
  217:     def handle_row(self, end_condition):
  218:         dispatcher = {
  219:             (saxtools.START_ELEMENT, fm_ns, u'COL'):
  220:             self.handle_col,
  221:             }
  222:         logging.debug("START ROW")
  223:         self.dataSet = {}
  224:         self.colIdx = 0
  225:         yield None
  226:     
  227:         #delegate is a generator that handles all the events "within"
  228:         #this element
  229:         delegate = None
  230:         while not self.event == end_condition:
  231:             delegate = saxtools.tenorsax.event_loop_body(
  232:                 dispatcher, delegate, self.event)
  233:             yield None
  234:         
  235:         #Element closed.  Wrap up
  236:         logging.debug("END ROW")
  237:         self.rowcnt += 1
  238:         # process collected row data
  239:         update=False
  240:         id_val=''
  241:         # synchronize by id_field
  242:         if self.id_field:
  243:             id_val=self.dataSet[self.id_field.lower()]
  244:             if id_val in self.dbIDs:
  245:                 self.dbIDs[id_val] += 1
  246:                 update=True
  247:         
  248:         if update:
  249:             # update existing row (by id_field)
  250:             #setvals=[]
  251:             #for fieldName in self.update_fields:
  252:             #    setvals.append("%s = %s"%(fieldName,sql_quote(self.dataSet[fieldName])))
  253:             #setStr=string.join(setvals, ',')
  254:             id_val=self.dataSet[self.id_field.lower()]
  255:             #qstr="UPDATE %s SET %s WHERE %s = '%s' "%(self.table,setStr,self.id_field,id_val)
  256:             args = [self.dataSet[f.lower()] for f in self.update_fields]
  257:             args.append(id_val)
  258:             SimpleSearch(self.db, self.updQuery, args)
  259:             logging.debug("update: %s"%id_val)
  260:         else:
  261:             # create new row
  262:             #fields=string.join(update_fields, ',')
  263:             #values=string.join([" %s "%sql_quote(self.dataSet[x]) for x in self.update_fields], ',')
  264:             #qstr="INSERT INTO %s (%s) VALUES (%s)"%(self.table,fields,self.values)
  265:             args=[]
  266:             for f in self.update_fields:
  267:                 value=self.dataSet[f.lower()].encode('utf-8') 
  268:                 if value=="": #hack DW
  269:                     value=None
  270:                     
  271:                 args.append(value)
  272:                 
  273:             #args = [self.dataSet[f.lower()].encode('utf-8') for f in self.update_fields]
  274:             logging.debug(args)
  275:             SimpleSearch(self.db, self.addQuery, args)
  276:             logging.debug("add: %s"%self.dataSet.get(self.id_field, self.rowcnt))
  277: 
  278:         #logging.info(" row:"+"%d (%s)"%(self.rowcnt,id_val))
  279:         if (self.rowcnt % 10) == 0:
  280:             logging.info(" row:"+"%d (%s)"%(self.rowcnt,id_val))
  281:             self.dbCon.commit()
  282:             
  283:         return
  284: 
  285:     def handle_col(self, end_condition):
  286:         dispatcher = {
  287:             (saxtools.START_ELEMENT, fm_ns, u'DATA'):
  288:             self.handle_data_tag,
  289:             }
  290:         #print "START COL"
  291:         yield None
  292:         #delegate is a generator that handles all the events "within"
  293:         #this element
  294:         delegate = None
  295:         while not self.event == end_condition:
  296:             delegate = saxtools.tenorsax.event_loop_body(
  297:                 dispatcher, delegate, self.event)
  298:             yield None
  299:         #Element closed.  Wrap up
  300:         #print "END COL"
  301:         self.colIdx += 1
  302:         return
  303: 
  304:     def handle_data_tag(self, end_condition):
  305:         #print "START DATA"
  306:         content = u''
  307:         yield None
  308:         # gather child elements
  309:         while not self.event == end_condition:
  310:             if self.event[0] == saxtools.CHARACTER_DATA:
  311:                 content += self.params
  312:             yield None
  313:         #Element closed.  Wrap up
  314:         field = self.fieldNames[self.colIdx]
  315:         self.dataSet[field.lower()] = content
  316:         #print "  DATA(", field, ") ", repr(content)
  317:         return
  318: 
  319: 
  320: 
  321: 
  322: 
  323: ##
  324: ## public static int main()
  325: ##
  326: 
  327: from optparse import OptionParser
  328: 
  329: opars = OptionParser()
  330: opars.add_option("-f", "--file", 
  331:                  dest="filename",
  332:                  help="FMPXML file name", metavar="FILE")
  333: opars.add_option("-c", "--dsn", 
  334:                  dest="dsn", 
  335:                  help="database connection string")
  336: opars.add_option("-t", "--table", 
  337:                  dest="table", 
  338:                  help="database table name")
  339: opars.add_option("--fields", default=None, 
  340:                  dest="update_fields", 
  341:                  help="list of fields to update (comma separated)", metavar="LIST")
  342: opars.add_option("--id-field", default=None, 
  343:                  dest="id_field", 
  344:                  help="name of id field for synchronisation (only appends data otherwise)", metavar="NAME")
  345: opars.add_option("--sync-mode", default=False, action="store_true", 
  346:                  dest="sync_mode", 
  347:                  help="do full sync based on id field (remove unmatched fields from db)")
  348: opars.add_option("-d", "--debug", default=False, action="store_true", 
  349:                  dest="debug", 
  350:                  help="debug mode (more output)")
  351: 
  352: (options, args) = opars.parse_args()
  353: 
  354: if len(sys.argv) < 2 or options.filename is None or options.dsn is None:
  355:     opars.print_help()
  356:     sys.exit(1)
  357: 
  358: if options.debug:
  359:     loglevel = logging.DEBUG
  360: else:
  361:     loglevel = logging.INFO
  362: 
  363: logging.basicConfig(level=loglevel, 
  364:                     format='%(asctime)s %(levelname)s %(message)s',
  365:                     datefmt='%H:%M:%S')
  366: 
  367: update_fields = None
  368: 
  369: if options.update_fields:
  370:     update_fields = [string.strip(s) for s in options.update_fields.split(',')]
  371: 
  372: parser = sax.make_parser()
  373: #The "consumer" is our own handler
  374: consumer = xml_handler(dsn=options.dsn,table=options.table,
  375:              update_fields=update_fields,id_field=options.id_field,
  376:              sync_mode=options.sync_mode)
  377: #Initialize Tenorsax with handler
  378: handler = saxtools.tenorsax(consumer)
  379: #Resulting tenorsax instance is the SAX handler 
  380: parser.setContentHandler(handler)
  381: parser.setFeature(sax.handler.feature_namespaces, 1)
  382: parser.parse(options.filename)  
  383: 
  384: 
  385: print "DONE!"

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>