--- ZSQLExtend/importFMPXML.py 2007/12/11 20:30:40 1.18 +++ ZSQLExtend/importFMPXML.py 2012/02/15 08:41:01 1.35 @@ -6,12 +6,17 @@ import logging import sys import types import time +import re from xml import sax -from amara import saxtools +from xml.sax.handler import ContentHandler +#from amara import saxtools try: import psycopg2 as psycopg + import psycopg2.extensions + # switch to unicode + psycopg2.extensions.register_type(psycopg2.extensions.UNICODE) psyco = 2 except: import psycopg @@ -19,7 +24,32 @@ except: fm_ns = 'http://www.filemaker.com/fmpxmlresult' -version_string = "V0.5 ROC 11.12.2007" +version_string = "V0.6.7 ROC 21.6.2011" + +def unicodify(text, withNone=False): + """decode str (utf-8 or latin-1 representation) into unicode object""" + if withNone and text is None: + return None + if not text: + return u"" + if isinstance(text, str): + try: + return text.decode('utf-8') + except: + return text.decode('latin-1') + else: + return text + +def utf8ify(text, withNone=False): + """encode unicode object or string into byte string in utf-8 representation""" + if withNone and text is None: + return None + if not text: + return "" + if isinstance(text, unicode): + return text.encode('utf-8') + else: + return text def getTextFromNode(nodename): """get the cdata content of a node""" @@ -40,18 +70,30 @@ def sql_quote(v): v=string.join(string.split(v,dkey),quote_dict[dkey]) return "'%s'"%v +def sqlName(s, lc=True, more=''): + """returns restricted ASCII-only version of string""" + if s is None: + return "" + + # remove ' + s = s.replace("'","") + # all else -> "_" + s = re.sub('[^A-Za-z0-9_'+more+']','_',s) + if lc: + return s.lower() + + return s + def SimpleSearch(curs,query, args=None, ascii=False): """execute sql query and return data""" #logger.debug("executing: "+query) if ascii: # encode all in UTF-8 - query = query.encode("UTF-8") + query = utf8ify(query) if args is not None: encargs = [] for a in args: - if a is not None and isinstance(a, str): - a = a.encode("UTF-8") - encargs.append(a) + encargs.append(utf8ify(a, withNone=True)) args = encargs @@ -84,7 +126,7 @@ class TableColumn: return self.name -class xml_handler: +class xml_handler(ContentHandler): def __init__(self,options): """SAX handler to import FileMaker XML file (FMPXMLRESULT format) into the table. @param options: dict of options @@ -110,16 +152,19 @@ class xml_handler: # set up parser + self.result={} self.event = None - self.top_dispatcher = { - (saxtools.START_ELEMENT, fm_ns, u'METADATA'): - self.handle_meta_fields, - (saxtools.START_ELEMENT, fm_ns, u'RESULTSET'): - self.handle_data_fields, - } + +# self.top_dispatcher = { +# (saxtools.START_ELEMENT, fm_ns, u'METADATA'): +# self.handle_meta_fields, +# (saxtools.START_ELEMENT, fm_ns, u'RESULTSET'): +# self.handle_data_fields, +# } # connect database self.dbCon = psycopg.connect(options.dsn) + logging.debug("DB encoding: %s"%getattr(self.dbCon, 'encoding', 'UNKNOWN')) self.db = self.dbCon.cursor() assert self.db, "AIIEE no db cursor for %s!!"%options.dsn @@ -132,6 +177,8 @@ class xml_handler: self.ascii_db = getattr(options,"ascii_db",None) self.replace_table = getattr(options,"replace_table",None) self.backup_table = getattr(options,"backup_table",None) + self.read_before_update = getattr(options,"read_before_update",None) + self.debug_data = getattr(options,"debug_data",None) self.logger.debug("dsn: "+repr(getattr(options,"dsn",None))) self.logger.debug("table: "+repr(self.table)) @@ -143,10 +190,14 @@ class xml_handler: self.logger.debug("ascii_db: "+repr(self.ascii_db)) self.logger.debug("replace_table: "+repr(self.replace_table)) self.logger.debug("backup_table: "+repr(self.backup_table)) + self.logger.debug("read_before_update: "+repr(self.read_before_update)) + self.logger.debug("debug_data: "+repr(self.debug_data)) self.dbIDs = {} self.rowcnt = 0 + self.currentName = None + if self.id_field is not None: # prepare a list of ids for sync mode qstr="select %s from %s"%(self.id_field,self.table) @@ -166,27 +217,71 @@ class xml_handler: return - def handle_meta_fields(self, end_condition): - dispatcher = { - (saxtools.START_ELEMENT, fm_ns, u'FIELD'): - self.handle_meta_field, - } + def startElement(self, name, attrs): + logging.debug(name) + if (name.lower() == "field") : + self.handle_meta_field(attrs) + if (name.lower() == "row") : + logging.debug("handleROW") + self.handle_row(attrs) + if (name.lower()=="resultset"): + self.handle_data_fields(attrs) + + if (name.lower()=="data"): + self.handle_data_tag(attrs); + + def endElement(self,name): + if (name.lower() == "resultset") : + self.currentTag="" + self.handle_end_data_fields() + if (name.lower() == "field") : + self.handle_end_meta_field() + if (name.lower() == "metadata"): + self.handle_end_meta_fields() + if (name.lower() == "row") : + logging.debug("handleROW") + self.handle_end_row() + + if (name.lower() == "col") : + self.handle_end_col() + def characters(self,content): + + try: + fn = self.xml_field_names[self.colIdx] + + contentTmp = self.xml_data.get(fn,'') #gibt es schon einen Inhalt, dann dieses hinzufuegen (in einem Tag kann u.U. der characters handler mehrfach aufgerufen werden.) + self.xml_data[fn] = contentTmp+content + except: + logging.debug(content) + pass + +# if self.currentName is not None: +# logging.debug(self.currentName+" "+content) +# self.currentRow[self.currentName]=content; +# + def handle_end_meta_fields(self): +# dispatcher = { +# (saxtools.START_ELEMENT, fm_ns, u'FIELD'): +# self.handle_meta_field, +# } #First round through the generator corresponds to the #start element event - self.logger.info("reading metadata...") - self.logger.debug("START METADATA") - yield None +# self.logger.info("reading metadata...") +# if self.debug_data: +# self.logger.debug("START METADATA") +# #yield None #delegate is a generator that handles all the events "within" #this element - delegate = None - while not self.event == end_condition: - delegate = saxtools.tenorsax.event_loop_body( - dispatcher, delegate, self.event) - yield None - - #Element closed. Wrap up - self.logger.debug("END METADATA") +# delegate = None +# while not self.event == end_condition: +# delegate = saxtools.tenorsax.event_loop_body( +# dispatcher, delegate, self.event) +# yield None +# +# #Element closed. Wrap up + if self.debug_data: + self.logger.debug("END METADATA") # rename table for backup if self.backup_table: @@ -232,10 +327,10 @@ class xml_handler: qstr="select attname, format_type(pg_attribute.atttypid, pg_attribute.atttypmod) from pg_attribute, pg_class where attrelid = pg_class.oid and pg_attribute.attnum > 0 and relname = '%s'" self.sql_fields={} for f in SimpleSearch(self.db, qstr%self.table): - n = f[0] - t = f[1] + fn = f[0] + ft = f[1] #print "SQL fields: %s (%s)"%(n,t) - self.sql_fields[n] = TableColumn(n,t) + self.sql_fields[fn] = TableColumn(fn,ft) # translate id_field (SQL-name) to XML-name self.xml_id = self.sql_field_map.get(self.id_field, None) @@ -284,7 +379,9 @@ class xml_handler: self.logger.debug("field %s has different type (%s vs %s)"%(f,f.getType(),sf.getType())) elif uf is not None: # add field to table - qstr="alter table %s add %s %s"%(self.table,uf.getName(),uf.getType()) + fn = uf.getName() + ft = uf.getType() + qstr="alter table %s add \"%s\" %s"%(self.table,fn,ft) self.logger.info("db add field:"+qstr) if self.ascii_db and type(qstr)==types.UnicodeType: @@ -292,25 +389,34 @@ class xml_handler: self.db.execute(qstr) self.dbCon.commit() + # add field to field list + self.sql_fields[fn] = TableColumn(fn, ft) # prepare sql statements for update (do not update id_field) - setStr=string.join(["%s = %%s"%self.xml_field_map[f] for f in self.xml_update_list if f != self.xml_id], ', ') - self.updQuery="UPDATE %s SET %s WHERE %s = %%s"%(self.table,setStr,self.id_field) + setStr=string.join(["\"%s\" = %%s"%self.xml_field_map[f] for f in self.xml_update_list if f != self.xml_id], ', ') + self.updQuery="UPDATE %s SET %s WHERE \"%s\" = %%s"%(self.table,setStr,self.id_field) + # and select (for update check) + selStr=string.join([self.xml_field_map[f].getName() for f in self.xml_update_list if f != self.xml_id], ', ') + self.selQuery="SELECT %s FROM %s WHERE \"%s\" = %%s"%(selStr,self.table,self.id_field) # and insert - fields=string.join([self.xml_field_map[x].getName() for x in self.xml_update_list], ',') + fields=string.join(["\"%s\""%self.xml_field_map[x].getName() for x in self.xml_update_list], ',') values=string.join(['%s' for f in self.xml_update_list], ',') self.addQuery="INSERT INTO %s (%s) VALUES (%s)"%(self.table,fields,values) self.logger.debug("update-query: "+self.updQuery) + self.logger.debug("sel-query: "+self.selQuery) self.logger.debug("add-query: "+self.addQuery) return - def handle_meta_field(self, end_condition): - name = self.params.get((None, u'NAME')) - yield None + def handle_meta_field(self, attrs): + self.currentName = attrs.get('NAME') + #yield None + return + def handle_end_meta_field(self): #Element closed. Wrap up + name = self.currentName if self.lc_names: # clean name - sqlname = name.replace(" ","_").lower() + sqlname = sqlName(name) else: sqlname = name self.xml_field_names.append(name) @@ -320,45 +426,44 @@ class xml_handler: self.logger.debug("FIELD name: "+name) return - def handle_data_fields(self, end_condition): - dispatcher = { - (saxtools.START_ELEMENT, fm_ns, u'ROW'): - self.handle_row, - } + def handle_data_fields(self, attrs): + #First round through the generator corresponds to the #start element event self.logger.info("reading data...") - self.logger.debug("START RESULTSET") + if self.debug_data: + self.logger.debug("START RESULTSET") self.rowcnt = 0 - yield None + return + def handle_end_data_fields(self): #delegate is a generator that handles all the events "within" #this element - delegate = None - while not self.event == end_condition: - delegate = saxtools.tenorsax.event_loop_body( - dispatcher, delegate, self.event) - yield None - + #Element closed. Wrap up - self.logger.debug("END RESULTSET") + if self.debug_data: + self.logger.debug("END RESULTSET") self.dbCon.commit() if self.sync_mode: # delete unmatched entries in db - self.logger.info("deleting unmatched rows from db") - delQuery = "DELETE FROM %s WHERE %s = %%s"%(self.table,self.id_field) - for id in self.dbIDs.keys(): - # find all not-updated fields - if self.dbIDs[id] == 0: - self.logger.info(" delete:"+id) - SimpleSearch(self.db, delQuery, [id], ascii=self.ascii_db) - sys.exit(1) - - elif self.dbIDs[id] > 1: - self.logger.info(" sync: ID %s used more than once?"%id) - - self.dbCon.commit() + if self.rowcnt > 0: + self.logger.info("deleting unmatched rows from db") + delQuery = "DELETE FROM %s WHERE \"%s\" = %%s"%(self.table,self.id_field) + for id in self.dbIDs.keys(): + # find all not-updated fields + if self.dbIDs[id] == 0: + self.logger.info(" delete: %s"%id) + SimpleSearch(self.db, delQuery, [id], ascii=self.ascii_db) + + elif self.dbIDs[id] > 1: + self.logger.info(" sync: ID %s used more than once?"%id) + + self.dbCon.commit() + + else: + # safety in case we had an empty file + self.logger.warning("no rows read! not deleting unmatched rows!") # reinstate backup tables if self.backup_table and not self.id_field: @@ -371,28 +476,25 @@ class xml_handler: self.db.execute(qstr) self.dbCon.commit() + self.logger.info("Done (%s rows)"%self.rowcnt) return def handle_row(self, end_condition): - dispatcher = { - (saxtools.START_ELEMENT, fm_ns, u'COL'): - self.handle_col, - } - self.logger.debug("START ROW") + + if self.debug_data: + self.logger.debug("START ROW") self.xml_data = {} self.colIdx = 0 - yield None + return + + def handle_end_row(self): #delegate is a generator that handles all the events "within" #this element - delegate = None - while not self.event == end_condition: - delegate = saxtools.tenorsax.event_loop_body( - dispatcher, delegate, self.event) - yield None - + #Element closed. Wrap up - self.logger.debug("END ROW") + if self.debug_data: + self.logger.debug("END ROW") self.rowcnt += 1 # process collected row data update=False @@ -400,9 +502,17 @@ class xml_handler: # synchronize by id_field if self.id_field: if self.id_type == 'integer': - id_val = int(self.xml_data[self.xml_id]) + try: + id_val = int(self.xml_data[self.xml_id]) + except: + pass else: id_val = self.xml_data[self.xml_id] + + if not id_val: + # abort update + self.logger.error("ERROR: unable to sync! emtpy id in row %s"%self.rowcnt) + return if id_val in self.dbIDs: self.dbIDs[id_val] += 1 @@ -412,13 +522,13 @@ class xml_handler: args = [] for fn in self.xml_update_list: # do not update id_field - if self.id_field and fn == self.xml_id: + if update and fn == self.xml_id: continue f = self.xml_field_map[fn] - val = self.xml_data[fn] + val = self.xml_data.get(fn,None) type = self.sql_fields[f.getName()].getType() - if type == "date" and len(val) == 0: + if type == "date" and len(val.strip()) == 0: # empty date field val = None @@ -430,14 +540,33 @@ class xml_handler: if update: # update existing row (by id_field) - # last argument is ID match - args.append(id_val) - self.logger.debug("update: %s = %s"%(id_val, args)) - SimpleSearch(self.db, self.updQuery, args, ascii=self.ascii_db) + if self.read_before_update: + # read data + if self.debug_data: + self.logger.debug("update check: %s = %s"%(id_val, args)) + oldrow = SimpleSearch(self.db, self.selQuery, [id_val], ascii=self.ascii_db) + #i = 0 + #for v in oldrow[0]: + # logging.debug("v: %s = %s (%s)"%(v,args[i],v==args[i])) + # i += 1 + if tuple(oldrow[0]) != tuple(args): + # data has changed -- update + if self.debug_data: + self.logger.debug("really update: %s = %s"%(id_val, args)) + args.append(id_val) # last arg is id + SimpleSearch(self.db, self.updQuery, args, ascii=self.ascii_db) + + else: + # always update + if self.debug_data: + self.logger.debug("update: %s = %s"%(id_val, args)) + args.append(id_val) # last arg is id + SimpleSearch(self.db, self.updQuery, args, ascii=self.ascii_db) else: # create new row - self.logger.debug("insert: %s"%args) + if self.debug_data: + self.logger.debug("insert: %s"%args) SimpleSearch(self.db, self.addQuery, args, ascii=self.ascii_db) #self.logger.info(" row:"+"%d (%s)"%(self.rowcnt,id_val)) @@ -447,37 +576,25 @@ class xml_handler: return - def handle_col(self, end_condition): - dispatcher = { - (saxtools.START_ELEMENT, fm_ns, u'DATA'): - self.handle_data_tag, - } - #print "START COL" - yield None - #delegate is a generator that handles all the events "within" - #this element - delegate = None - while not self.event == end_condition: - delegate = saxtools.tenorsax.event_loop_body( - dispatcher, delegate, self.event) - yield None - #Element closed. Wrap up - #print "END COL" + def handle_end_col(self): + + self.colIdx += 1 return - def handle_data_tag(self, end_condition): + + def handle_data_tag(self, attrs): #print "START DATA" - content = u'' - yield None - # gather child elements - while not self.event == end_condition: - if self.event[0] == saxtools.CHARACTER_DATA: - content += self.params - yield None - #Element closed. Wrap up - fn = self.xml_field_names[self.colIdx] - self.xml_data[fn] = content + self.content = u'' +# yield None +# # gather child elements +# while not self.event == end_condition: +# if self.event[0] == saxtools.CHARACTER_DATA: +# content += self.params +# yield None +# #Element closed. Wrap up +# fn = self.xml_field_names[self.colIdx] +# self.xml_data[fn] = content return @@ -517,10 +634,10 @@ def importFMPXML(options): #The "consumer" is our own handler consumer = xml_handler(options) #Initialize Tenorsax with handler - handler = saxtools.tenorsax(consumer) + #handler = saxtools.tenorsax(consumer) #Resulting tenorsax instance is the SAX handler - parser.setContentHandler(handler) - parser.setFeature(sax.handler.feature_namespaces, 1) + parser.setContentHandler(consumer) + #parser.setFeature(sax.handler.feature_namespaces, 1) parser.parse(options.filename) @@ -561,9 +678,15 @@ if __name__ == "__main__": opars.add_option("--backup", default=False, action="store_true", dest="backup_table", help="create backup of old table") + opars.add_option("--read-before-update", default=False, action="store_true", + dest="read_before_update", + help="read all data to check if it really changed") opars.add_option("-d", "--debug", default=False, action="store_true", dest="debug", help="debug mode (more output)") + opars.add_option("--debug-data", default=False, action="store_true", + dest="debug_data", + help="debug mode for data (even more output)") (options, args) = opars.parse_args()