--- ZSQLExtend/importFMPXML.py 2006/12/21 12:17:32 1.4 +++ ZSQLExtend/importFMPXML.py 2007/01/09 14:00:59 1.5 @@ -2,10 +2,12 @@ # import string -from xml.dom.pulldom import parse import logging import sys +from xml import sax +from amara import saxtools + try: import psycopg2 as psycopg psyco = 2 @@ -13,6 +15,7 @@ except: import psycopg psyco = 1 +fm_ns = 'http://www.filemaker.com/fmpxmlresult' def getTextFromNode(nodename): """get the cdata content of a node""" @@ -31,7 +34,7 @@ def sql_quote(v): for dkey in quote_dict.keys(): if string.find(v, dkey) >= 0: v=string.join(string.split(v,dkey),quote_dict[dkey]) - return "'%s'" % v + return "'%s'"%v def SimpleSearch(curs,query, args=None): """execute sql query and return data""" @@ -46,148 +49,254 @@ def SimpleSearch(curs,query, args=None): return None -def importXMLFileFMP(dsn,table,filename,update_fields=None,id_field=None,sync_mode=False): - ''' - Import FileMaker XML file (FMPXMLRESULT format) into the table. - @param table: name of the table the xml shall be imported into - @param filename: xmlfile filename - @param update_fields: (optional) list of fields to update; default is to create all fields - @param id_field: (optional) field which uniquely identifies an entry for updating purposes. - @param sync_mode: (optional) really synchronise, i.e. delete entries not in XML file - ''' - - # connect database - dbCon = psycopg.connect(dsn) - db = dbCon.cursor() - assert db, "AIIEE no db cursor for %s!!"%dsn - - # read XML file - fh=file(filename) - logging.info("reading file "+filename) - doc=parse(fh) - logging.info("file read") - - dbIDs = {} - rowcnt = 0 - ret = "" - - logging.debug("dsn: "+dsn) - logging.debug("table: "+table) - logging.debug("update_fields: "+repr(update_fields)) - logging.debug("id_field: "+id_field) - logging.debug("sync_mode: "+repr(sync_mode)) + +class xml_handler: - if id_field is not None: - # prepare a list of ids for sync mode - qstr="select %s from %s"%(id_field,table) - for id in SimpleSearch(db, qstr): - # value 0: not updated - dbIDs[id[0]] = 0; - rowcnt += 1 - - logging.info("%d entries in DB to sync"%rowcnt) + def __init__(self,dsn,table,update_fields=None,id_field=None,sync_mode=False): + ''' + SAX handler to import FileMaker XML file (FMPXMLRESULT format) into the table. + @param dsn: database connection string + @param table: name of the table the xml shall be imported into + @param filename: xmlfile filename + @param update_fields: (optional) list of fields to update; default is to create all fields + @param id_field: (optional) field which uniquely identifies an entry for updating purposes. + @param sync_mode: (optional) really synchronise, i.e. delete entries not in XML file + ''' + # set up parser + self.event = None + self.top_dispatcher = { + (saxtools.START_ELEMENT, fm_ns, u'METADATA'): + self.handle_meta_fields, + (saxtools.START_ELEMENT, fm_ns, u'RESULTSET'): + self.handle_data, + } + + # connect database + self.dbCon = psycopg.connect(dsn) + self.db = self.dbCon.cursor() + assert self.db, "AIIEE no db cursor for %s!!"%dsn - fieldNames = [] - rowcnt = 0 - id_val = '' + logging.debug("dsn: "+repr(dsn)) + logging.debug("table: "+repr(table)) + logging.debug("update_fields: "+repr(update_fields)) + logging.debug("id_field: "+repr(id_field)) + logging.debug("sync_mode: "+repr(sync_mode)) + + self.table = table + self.update_fields = update_fields + self.id_field = id_field + self.sync_mode = sync_mode + + self.dbIDs = {} + self.rowcnt = 0 + + if id_field is not None: + # prepare a list of ids for sync mode + qstr="select %s from %s"%(id_field,table) + for id in SimpleSearch(self.db, qstr): + # value 0: not updated + self.dbIDs[id[0]] = 0; + self.rowcnt += 1 + + logging.info("%d entries in DB to sync"%self.rowcnt) + + self.fieldNames = [] + + return + + def handle_meta_fields(self, end_condition): + dispatcher = { + (saxtools.START_ELEMENT, fm_ns, u'FIELD'): + self.handle_meta_field, + } + #First round through the generator corresponds to the + #start element event + logging.debug("START METADATA") + yield None - while 1: - node=doc.getEvent() + #delegate is a generator that handles all the events "within" + #this element + delegate = None + while not self.event == end_condition: + delegate = saxtools.tenorsax.event_loop_body( + dispatcher, delegate, self.event) + yield None + + #Element closed. Wrap up + logging.debug("END METADATA") + if self.update_fields is None: + # update all fields + self.update_fields = self.fieldNames + + logging.debug("xml-fieldnames:"+repr(self.fieldNames)) + # get list of fields in db table + qstr="""select attname from pg_attribute, pg_class where attrelid = pg_class.oid and relname = '%s'""" + columns=[x[0] for x in SimpleSearch(self.db, qstr%self.table)] + + # adjust db table to fields in XML and fieldlist + for fieldName in self.fieldNames: + logging.debug("db-fieldname:"+repr(fieldName)) + if (fieldName not in columns) and (fieldName in self.update_fields): + qstr="alter table %s add %s %s"%(self.table,fieldName,'text') + logging.info("db add field:"+qstr) + self.db.execute(qstr) + self.dbCon.commit() + + # prepare sql statements for update + setStr=string.join(["%s = %%s"%f for f in self.update_fields], ', ') + self.updQuery="UPDATE %s SET %s WHERE %s = %%s"%(self.table,setStr,self.id_field) + # and insert + fields=string.join(self.update_fields, ',') + values=string.join(['%s' for f in self.update_fields], ',') + self.addQuery="INSERT INTO %s (%s) VALUES (%s)"%(self.table,fields,values) + #print "upQ: ", self.updQuery + #print "adQ: ", self.addQuery + + return + + def handle_meta_field(self, end_condition): + name = self.params.get((None, u'NAME')) + yield None + #Element closed. Wrap up + self.fieldNames.append(name) + logging.debug("FIELD name: "+name) + return + + def handle_data(self, end_condition): + dispatcher = { + (saxtools.START_ELEMENT, fm_ns, u'ROW'): + self.handle_row, + } + #First round through the generator corresponds to the + #start element event + logging.debug("START RESULTSET") + self.rowcnt = 0 + yield None - if node is None: - break; + #delegate is a generator that handles all the events "within" + #this element + delegate = None + while not self.event == end_condition: + delegate = saxtools.tenorsax.event_loop_body( + dispatcher, delegate, self.event) + yield None - # METADATA tag defines number and names of fields in FMPXMLRESULT - if node[1].nodeName == 'METADATA': - doc.expandNode(node[1]) - - names=node[1].getElementsByTagName('FIELD') - - for name in names: - fn = name.getAttribute('NAME') - fieldNames.append(fn) - - if update_fields is None: - # update all fields - update_fields = fieldNames - - logging.debug("fieldnames:"+repr(fieldNames)) - # get list of fields in db table - qstr="""select attname from pg_attribute, pg_class where attrelid = pg_class.oid and relname = '%s'""" - columns=[x[0] for x in SimpleSearch(db, qstr%table)] - - # adjust db table to fields in XML and fieldlist - for fieldName in fieldNames: - logging.debug("db-fieldname:"+repr(fieldName)) - if (fieldName not in columns) and (fieldName in update_fields): - qstr="alter table %s add %%s %%s"%table - logging.info("db add field:"+qstr%(fieldName,'text')) - db.execute(qstr, (fieldName,'text')) - db.commit() - + #Element closed. Wrap up + logging.debug("END RESULTSET") + self.dbCon.commit() - # ROW tags (in RESULTSET tag) hold data - elif node[1].nodeName == 'ROW': - rowcnt += 1 - - doc.expandNode(node[1]) - cols=node[1].getElementsByTagName('COL') - dataSet={} - i = 0 - # populate with data - for col in cols: - data=col.getElementsByTagName('DATA') - dataSet[fieldNames[i]] = getTextFromNode(data[0]) - i += 1 - - update=False - - # synchronize by id_field - if id_field: - id_val=dataSet[id_field] - if id_val in dbIDs: - dbIDs[id_val] += 1 - update=True + if self.sync_mode: + # delete unmatched entries in db + for id in self.dbIDs.keys(): + # find all not-updated fields + if self.dbIDs[id] == 0: + logging.info(" delete:"+id) + qstr = "DELETE FROM %s WHERE %%s = '%%s'"%self.table + SimpleSearch(self.db, qstr, (self.id_field,id)) + + elif self.dbIDs[id] > 1: + logging.info(" sync:"+"id used more than once?"+id) - if update: - # update existing row (by id_field) - setvals=[] - for fieldName in update_fields: - setvals.append("%s = %s"%(fieldName,sql_quote(dataSet[fieldName]))) - setStr=string.join(setvals, ',') - id_val=dataSet[id_field] - qstr="UPDATE %s SET %s WHERE %s = '%s' "%(table,setStr,id_field,id_val) - SimpleSearch(db, qstr) - ret += "up: %s"%id_val - else: - # create new row - fields=string.join(update_fields, ',') - values=string.join([" %s "%sql_quote(dataSet[x]) for x in update_fields], ',') - qstr="INSERT INTO %s (%s) VALUES (%s)"%(table,fields,values) - SimpleSearch(db, qstr) - ret += "ad: %s"%dataSet.get(id_field, rowcnt) - - #logging.info(" row:"+"%d (%s)"%(rowcnt,id_val)) - if (rowcnt % 10) == 0: - logging.info(" row:"+"%d (%s)"%(rowcnt,id_val)) - dbCon.commit() - - dbCon.commit() - if sync_mode: - # delete unmatched entries in db - for id in dbIDs.keys(): - # find all not-updated fields - if dbIDs[id] == 0: - logging.info(" delete:"+id) - qstr = "DELETE FROM %s WHERE %%s = '%%s'"%table - SimpleSearch(db, qstr, (id_field,id)) - - elif dbIDs[id] > 1: - logging.info(" sync:"+"id used more than once?"+id) + self.dbCon.commit() - dbCon.commit() + return + + def handle_row(self, end_condition): + dispatcher = { + (saxtools.START_ELEMENT, fm_ns, u'COL'): + self.handle_col, + } + logging.debug("START ROW") + self.dataSet = {} + self.colIdx = 0 + yield None - return ret + #delegate is a generator that handles all the events "within" + #this element + delegate = None + while not self.event == end_condition: + delegate = saxtools.tenorsax.event_loop_body( + dispatcher, delegate, self.event) + yield None + + #Element closed. Wrap up + logging.debug("END ROW") + self.rowcnt += 1 + # process collected row data + update=False + id_val='' + # synchronize by id_field + if self.id_field: + id_val=self.dataSet[self.id_field] + if id_val in self.dbIDs: + self.dbIDs[id_val] += 1 + update=True + + if update: + # update existing row (by id_field) + #setvals=[] + #for fieldName in self.update_fields: + # setvals.append("%s = %s"%(fieldName,sql_quote(self.dataSet[fieldName]))) + #setStr=string.join(setvals, ',') + id_val=self.dataSet[self.id_field] + #qstr="UPDATE %s SET %s WHERE %s = '%s' "%(self.table,setStr,self.id_field,id_val) + args = [self.dataSet[f] for f in self.update_fields] + args.append(id_val) + SimpleSearch(self.db, self.updQuery, args) + logging.debug("update: %s"%id_val) + else: + # create new row + #fields=string.join(update_fields, ',') + #values=string.join([" %s "%sql_quote(self.dataSet[x]) for x in self.update_fields], ',') + #qstr="INSERT INTO %s (%s) VALUES (%s)"%(self.table,fields,self.values) + args = [self.dataSet[f] for f in self.update_fields] + SimpleSearch(self.db, self.addQuery, args) + logging.debug("add: %s"%self.dataSet.get(self.id_field, rowcnt)) + + #logging.info(" row:"+"%d (%s)"%(self.rowcnt,id_val)) + if (self.rowcnt % 10) == 0: + logging.info(" row:"+"%d (%s)"%(self.rowcnt,id_val)) + self.dbCon.commit() + + return + + def handle_col(self, end_condition): + dispatcher = { + (saxtools.START_ELEMENT, fm_ns, u'DATA'): + self.handle_data_tag, + } + #print "START COL" + yield None + #delegate is a generator that handles all the events "within" + #this element + delegate = None + while not self.event == end_condition: + delegate = saxtools.tenorsax.event_loop_body( + dispatcher, delegate, self.event) + yield None + #Element closed. Wrap up + #print "END COL" + self.colIdx += 1 + return + + def handle_data_tag(self, end_condition): + #print "START DATA" + content = u'' + yield None + # gather child elements + while not self.event == end_condition: + if self.event[0] == saxtools.CHARACTER_DATA: + content += self.params + yield None + #Element closed. Wrap up + field = self.fieldNames[self.colIdx] + self.dataSet[field] = content + #print " DATA(", field, ") ", repr(content) + return + + + + ## ## public static int main() @@ -210,10 +319,10 @@ opars.add_option("--fields", default=Non help="list of fields to update (comma separated)", metavar="LIST") opars.add_option("--id-field", default=None, dest="id_field", - help="name of id field for synchronisation", metavar="NAME") + help="name of id field for synchronisation (only appends data otherwise)", metavar="NAME") opars.add_option("--sync-mode", default=False, action="store_true", dest="sync_mode", - help="do full sync (remove unmatched fields from db)") + help="do full sync based on id field (remove unmatched fields from db)") opars.add_option("-d", "--debug", default=False, action="store_true", dest="debug", help="debug mode (more output)") @@ -237,9 +346,18 @@ update_fields = None if options.update_fields: update_fields = [string.strip(s) for s in options.update_fields.split(',')] - -importXMLFileFMP(dsn=options.dsn,table=options.table,filename=options.filename, - update_fields=update_fields,id_field=options.id_field, - sync_mode=options.sync_mode) + +parser = sax.make_parser() +#The "consumer" is our own handler +consumer = xml_handler(dsn=options.dsn,table=options.table, + update_fields=update_fields,id_field=options.id_field, + sync_mode=options.sync_mode) +#Initialize Tenorsax with handler +handler = saxtools.tenorsax(consumer) +#Resulting tenorsax instance is the SAX handler +parser.setContentHandler(handler) +parser.setFeature(sax.handler.feature_namespaces, 1) +parser.parse(options.filename) + print "DONE!"