|
|
| version 1.1, 2006/12/20 18:45:58 | version 1.7, 2007/02/20 16:00:03 |
|---|---|
| Line 2 | Line 2 |
| # | # |
| import string | import string |
| from xml.dom.pulldom import parse | |
| import psycopg2 as psycopg | |
| import logging | import logging |
| import sys | import sys |
| import types | |
| from xml import sax | |
| from amara import saxtools | |
| try: | |
| import psycopg2 as psycopg | |
| psyco = 2 | |
| except: | |
| import psycopg | |
| psyco = 1 | |
| fm_ns = 'http://www.filemaker.com/fmpxmlresult' | |
| def getTextFromNode(nodename): | def getTextFromNode(nodename): |
| """get the cdata content of a node""" | """get the cdata content of a node""" |
| Line 30 def sql_quote(v): | Line 40 def sql_quote(v): |
| def SimpleSearch(curs,query, args=None): | def SimpleSearch(curs,query, args=None): |
| """execute sql query and return data""" | """execute sql query and return data""" |
| logging.debug("executing: "+query) | logging.debug("executing: "+query) |
| if psyco == 1: | |
| query = query.encode("UTF-8") | |
| #if args is not None: | |
| # args = [ sql_quote(a) for a in args ] | |
| #logging.debug(query) | |
| #logging.debug(args) | |
| curs.execute(query, args) | curs.execute(query, args) |
| logging.debug("sql done") | logging.debug("sql done") |
| try: | |
| return curs.fetchall() | return curs.fetchall() |
| except: | |
| return None | |
| def importXMLFileFMP(dsn,table,filename,update_fields=None,id_field=None,sync_mode=False): | class xml_handler: |
| def __init__(self,dsn,table,update_fields=None,id_field=None,sync_mode=False): | |
| ''' | ''' |
| Import FileMaker XML file (FMPXMLRESULT format) into the table. | SAX handler to import FileMaker XML file (FMPXMLRESULT format) into the table. |
| @param dsn: database connection string | |
| @param table: name of the table the xml shall be imported into | @param table: name of the table the xml shall be imported into |
| @param filename: xmlfile filename | @param filename: xmlfile filename |
| @param update_fields: (optional) list of fields to update; default is to create all fields | @param update_fields: (optional) list of fields to update; default is to create all fields |
| @param id_field: (optional) field which uniquely identifies an entry for updating purposes. | @param id_field: (optional) field which uniquely identifies an entry for updating purposes. |
| @param sync_mode: (optional) really synchronise, i.e. delete entries not in XML file | @param sync_mode: (optional) really synchronise, i.e. delete entries not in XML file |
| ''' | ''' |
| # set up parser | |
| self.event = None | |
| self.top_dispatcher = { | |
| (saxtools.START_ELEMENT, fm_ns, u'METADATA'): | |
| self.handle_meta_fields, | |
| (saxtools.START_ELEMENT, fm_ns, u'RESULTSET'): | |
| self.handle_data, | |
| } | |
| # connect database | # connect database |
| dbCon = psycopg.connect(dsn) | self.dbCon = psycopg.connect(dsn) |
| db = dbCon.cursor() | self.db = self.dbCon.cursor() |
| assert db, "AIIEE no db cursor for %s!!"%dsn | assert self.db, "AIIEE no db cursor for %s!!"%dsn |
| # read XML file | |
| fh=file(filename) | |
| logging.info("reading file "+filename) | |
| doc=parse(fh) | |
| logging.info("file read") | |
| dbIDs = {} | |
| rowcnt = 0 | |
| ret = "" | |
| logging.debug("dsn: "+dsn) | logging.debug("dsn: "+repr(dsn)) |
| logging.debug("table: "+table) | logging.debug("table: "+repr(table)) |
| logging.debug("update_fields: "+repr(update_fields)) | logging.debug("update_fields: "+repr(update_fields)) |
| logging.debug("id_field: "+id_field) | logging.debug("id_field: "+repr(id_field)) |
| logging.debug("sync_mode: "+repr(sync_mode)) | logging.debug("sync_mode: "+repr(sync_mode)) |
| self.table = table | |
| self.update_fields = update_fields | |
| self.id_field = id_field | |
| self.sync_mode = sync_mode | |
| self.dbIDs = {} | |
| self.rowcnt = 0 | |
| self.db.execute("set datestyle to 'german'") | |
| if id_field is not None: | if id_field is not None: |
| # prepare a list of ids for sync mode | # prepare a list of ids for sync mode |
| qstr="select %s from %s"%(id_field,table) | qstr="select %s from %s"%(id_field,table) |
| for id in SimpleSearch(db, qstr): | for id in SimpleSearch(self.db, qstr): |
| # value 0: not updated | # value 0: not updated |
| dbIDs[id[0]] = 0; | self.dbIDs[id[0]] = 0; |
| rowcnt += 1 | self.rowcnt += 1 |
| logging.info("%d entries in DB to sync"%rowcnt) | |
| fieldNames = [] | |
| rowcnt = 0 | |
| id_val = '' | |
| while 1: | logging.info("%d entries in DB to sync"%self.rowcnt) |
| node=doc.getEvent() | |
| if node is None: | self.fieldNames = [] |
| break; | |
| # METADATA tag defines number and names of fields in FMPXMLRESULT | return |
| if node[1].nodeName == 'METADATA': | |
| doc.expandNode(node[1]) | |
| names=node[1].getElementsByTagName('FIELD') | def handle_meta_fields(self, end_condition): |
| dispatcher = { | |
| for name in names: | (saxtools.START_ELEMENT, fm_ns, u'FIELD'): |
| fn = name.getAttribute('NAME') | self.handle_meta_field, |
| fieldNames.append(fn) | } |
| #First round through the generator corresponds to the | |
| if update_fields is None: | #start element event |
| logging.debug("START METADATA") | |
| yield None | |
| #delegate is a generator that handles all the events "within" | |
| #this element | |
| delegate = None | |
| while not self.event == end_condition: | |
| delegate = saxtools.tenorsax.event_loop_body( | |
| dispatcher, delegate, self.event) | |
| yield None | |
| #Element closed. Wrap up | |
| logging.debug("END METADATA") | |
| if self.update_fields is None: | |
| # update all fields | # update all fields |
| update_fields = fieldNames | self.update_fields = self.fieldNames |
| logging.debug("fieldnames:"+repr(fieldNames)) | logging.debug("xml-fieldnames:"+repr(self.fieldNames)) |
| # get list of fields in db table | # get list of fields in db table |
| qstr="""select attname from pg_attribute, pg_class where attrelid = pg_class.oid and relname = '%s'""" | qstr="""select attname from pg_attribute, pg_class where attrelid = pg_class.oid and relname = '%s'""" |
| columns=[x[0] for x in SimpleSearch(db, qstr%table)] | columns=[x[0] for x in SimpleSearch(self.db, qstr%self.table)] |
| # adjust db table to fields in XML and fieldlist | # adjust db table to fields in XML and fieldlist |
| for fieldName in fieldNames: | for fieldName in self.fieldNames: |
| logging.debug("db-fieldname:"+repr(fieldName)) | logging.debug("db-fieldname:"+repr(fieldName)) |
| if (fieldName not in columns) and (fieldName in update_fields): | fieldName=fieldName.replace(" ","_") # repair _ |
| qstr="alter table %s add %%s %%s"%table | if (fieldName.lower() not in columns) and (fieldName in self.update_fields): |
| logging.info("db add field:"+qstr%(fieldName,'text')) | qstr="alter table %s add %s %s"%(self.table,fieldName,'text') |
| db.execute(qstr, (fieldName,'text')) | logging.info("db add field:"+qstr) |
| db.commit() | |
| if type(qstr)==types.UnicodeType: | |
| qstr=qstr.encode('utf-8') | |
| # ROW tags (in RESULTSET tag) hold data | self.db.execute(qstr) |
| elif node[1].nodeName == 'ROW': | self.dbCon.commit() |
| rowcnt += 1 | |
| # prepare sql statements for update | |
| doc.expandNode(node[1]) | setStr=string.join(["%s = %%s"%f for f in self.update_fields], ', ') |
| cols=node[1].getElementsByTagName('COL') | self.updQuery="UPDATE %s SET %s WHERE %s = %%s"%(self.table,setStr,self.id_field) |
| dataSet={} | # and insert |
| i = 0 | fields=string.join(self.update_fields, ',') |
| # populate with data | values=string.join(['%s' for f in self.update_fields], ',') |
| for col in cols: | self.addQuery="INSERT INTO %s (%s) VALUES (%s)"%(self.table,fields,values) |
| data=col.getElementsByTagName('DATA') | #print "upQ: ", self.updQuery |
| dataSet[fieldNames[i]] = getTextFromNode(data[0]) | #print "adQ: ", self.addQuery |
| i += 1 | |
| return | |
| def handle_meta_field(self, end_condition): | |
| name = self.params.get((None, u'NAME')) | |
| yield None | |
| #Element closed. Wrap up | |
| name=name.replace(" ","_")# make sure no spaces | |
| self.fieldNames.append(name) | |
| logging.debug("FIELD name: "+name) | |
| return | |
| def handle_data(self, end_condition): | |
| dispatcher = { | |
| (saxtools.START_ELEMENT, fm_ns, u'ROW'): | |
| self.handle_row, | |
| } | |
| #First round through the generator corresponds to the | |
| #start element event | |
| logging.debug("START RESULTSET") | |
| self.rowcnt = 0 | |
| yield None | |
| #delegate is a generator that handles all the events "within" | |
| #this element | |
| delegate = None | |
| while not self.event == end_condition: | |
| delegate = saxtools.tenorsax.event_loop_body( | |
| dispatcher, delegate, self.event) | |
| yield None | |
| #Element closed. Wrap up | |
| logging.debug("END RESULTSET") | |
| self.dbCon.commit() | |
| update=False | if self.sync_mode: |
| # delete unmatched entries in db | |
| delQuery = "DELETE FROM %s WHERE %s = %%s"%(self.table,self.id_field) | |
| for id in self.dbIDs.keys(): | |
| # find all not-updated fields | |
| if self.dbIDs[id] == 0: | |
| logging.info(" delete:"+id) | |
| SimpleSearch(self.db, delQuery, [id]) | |
| sys.exit(1) | |
| elif self.dbIDs[id] > 1: | |
| logging.info(" sync:"+"id used more than once?"+id) | |
| self.dbCon.commit() | |
| return | |
| def handle_row(self, end_condition): | |
| dispatcher = { | |
| (saxtools.START_ELEMENT, fm_ns, u'COL'): | |
| self.handle_col, | |
| } | |
| logging.debug("START ROW") | |
| self.dataSet = {} | |
| self.colIdx = 0 | |
| yield None | |
| #delegate is a generator that handles all the events "within" | |
| #this element | |
| delegate = None | |
| while not self.event == end_condition: | |
| delegate = saxtools.tenorsax.event_loop_body( | |
| dispatcher, delegate, self.event) | |
| yield None | |
| #Element closed. Wrap up | |
| logging.debug("END ROW") | |
| self.rowcnt += 1 | |
| # process collected row data | |
| update=False | |
| id_val='' | |
| # synchronize by id_field | # synchronize by id_field |
| if id_field: | if self.id_field: |
| id_val=dataSet[id_field] | id_val=self.dataSet[self.id_field.lower()] |
| if id_val in dbIDs: | if id_val in self.dbIDs: |
| dbIDs[id_val] += 1 | self.dbIDs[id_val] += 1 |
| update=True | update=True |
| if update: | if update: |
| # update existing row (by id_field) | # update existing row (by id_field) |
| setvals=[] | #setvals=[] |
| for fieldName in update_fields: | #for fieldName in self.update_fields: |
| setvals.append("%s = %s"%(fieldName,sql_quote(dataSet[fieldName]))) | # setvals.append("%s = %s"%(fieldName,sql_quote(self.dataSet[fieldName]))) |
| setStr=string.join(setvals, ',') | #setStr=string.join(setvals, ',') |
| id_val=dataSet[id_field] | id_val=self.dataSet[self.id_field.lower()] |
| qstr="UPDATE %s SET %s WHERE %s = '%s' "%(table,setStr,id_field,id_val) | #qstr="UPDATE %s SET %s WHERE %s = '%s' "%(self.table,setStr,self.id_field,id_val) |
| db.execute(qstr) | args = [self.dataSet[f.lower()] for f in self.update_fields] |
| ret += "up: %s"%id_val | args.append(id_val) |
| SimpleSearch(self.db, self.updQuery, args) | |
| logging.debug("update: %s"%id_val) | |
| else: | else: |
| # create new row | # create new row |
| fields=string.join(update_fields, ',') | #fields=string.join(update_fields, ',') |
| values=string.join([" %s "%sql_quote(dataSet[x]) for x in update_fields], ',') | #values=string.join([" %s "%sql_quote(self.dataSet[x]) for x in self.update_fields], ',') |
| qstr="INSERT INTO %s (%s) VALUES (%s)"%(table,fields,values) | #qstr="INSERT INTO %s (%s) VALUES (%s)"%(self.table,fields,self.values) |
| db.execute(qstr) | args=[] |
| ret += "ad: %s"%dataSet.get(id_field, rowcnt) | for f in self.update_fields: |
| value=self.dataSet[f.lower()].encode('utf-8') | |
| #logging.info(" row:"+"%d (%s)"%(rowcnt,id_val)) | if value=="": #hack DW |
| if (rowcnt % 10) == 0: | value=None |
| logging.info(" row:"+"%d (%s)"%(rowcnt,id_val)) | |
| dbCon.commit() | args.append(value) |
| #args = [self.dataSet[f.lower()].encode('utf-8') for f in self.update_fields] | |
| logging.debug(args) | |
| SimpleSearch(self.db, self.addQuery, args) | |
| logging.debug("add: %s"%self.dataSet.get(self.id_field, self.rowcnt)) | |
| #logging.info(" row:"+"%d (%s)"%(self.rowcnt,id_val)) | |
| if (self.rowcnt % 10) == 0: | |
| logging.info(" row:"+"%d (%s)"%(self.rowcnt,id_val)) | |
| self.dbCon.commit() | |
| return | |
| def handle_col(self, end_condition): | |
| dispatcher = { | |
| (saxtools.START_ELEMENT, fm_ns, u'DATA'): | |
| self.handle_data_tag, | |
| } | |
| #print "START COL" | |
| yield None | |
| #delegate is a generator that handles all the events "within" | |
| #this element | |
| delegate = None | |
| while not self.event == end_condition: | |
| delegate = saxtools.tenorsax.event_loop_body( | |
| dispatcher, delegate, self.event) | |
| yield None | |
| #Element closed. Wrap up | |
| #print "END COL" | |
| self.colIdx += 1 | |
| return | |
| def handle_data_tag(self, end_condition): | |
| #print "START DATA" | |
| content = u'' | |
| yield None | |
| # gather child elements | |
| while not self.event == end_condition: | |
| if self.event[0] == saxtools.CHARACTER_DATA: | |
| content += self.params | |
| yield None | |
| #Element closed. Wrap up | |
| field = self.fieldNames[self.colIdx] | |
| self.dataSet[field.lower()] = content | |
| #print " DATA(", field, ") ", repr(content) | |
| return | |
| dbCon.commit() | |
| if sync_mode: | |
| # delete unmatched entries in db | |
| for id in dbIDs.keys(): | |
| # find all not-updated fields | |
| if dbIDs[id] == 0: | |
| logging.info(" delete:"+id) | |
| qstr = "DELETE FROM %s WHERE %%s = '%%s'"%table | |
| db.execute(qstr, (id_field,id)) | |
| elif dbIDs[id] > 1: | |
| logging.info(" sync:"+"id used more than once?"+id) | |
| dbCon.commit() | |
| return ret | |
| ## | ## |
| ## public static int main() | ## public static int main() |
| Line 199 opars.add_option("--fields", default=Non | Line 341 opars.add_option("--fields", default=Non |
| help="list of fields to update (comma separated)", metavar="LIST") | help="list of fields to update (comma separated)", metavar="LIST") |
| opars.add_option("--id-field", default=None, | opars.add_option("--id-field", default=None, |
| dest="id_field", | dest="id_field", |
| help="name of id field for synchronisation", metavar="NAME") | help="name of id field for synchronisation (only appends data otherwise)", metavar="NAME") |
| opars.add_option("--sync-mode", default=False, action="store_true", | opars.add_option("--sync-mode", default=False, action="store_true", |
| dest="sync_mode", | dest="sync_mode", |
| help="do full sync (remove unmatched fields from db)") | help="do full sync based on id field (remove unmatched fields from db)") |
| opars.add_option("-d", "--debug", default=False, action="store_true", | opars.add_option("-d", "--debug", default=False, action="store_true", |
| dest="debug", | dest="debug", |
| help="debug mode (more output)") | help="debug mode (more output)") |
| Line 227 update_fields = None | Line 369 update_fields = None |
| if options.update_fields: | if options.update_fields: |
| update_fields = [string.strip(s) for s in options.update_fields.split(',')] | update_fields = [string.strip(s) for s in options.update_fields.split(',')] |
| importXMLFileFMP(dsn=options.dsn,table=options.table,filename=options.filename, | parser = sax.make_parser() |
| #The "consumer" is our own handler | |
| consumer = xml_handler(dsn=options.dsn,table=options.table, | |
| update_fields=update_fields,id_field=options.id_field, | update_fields=update_fields,id_field=options.id_field, |
| sync_mode=options.sync_mode) | sync_mode=options.sync_mode) |
| #Initialize Tenorsax with handler | |
| handler = saxtools.tenorsax(consumer) | |
| #Resulting tenorsax instance is the SAX handler | |
| parser.setContentHandler(handler) | |
| parser.setFeature(sax.handler.feature_namespaces, 1) | |
| parser.parse(options.filename) | |
| print "DONE!" | print "DONE!" |