1: #!/usr/local/bin/python
2: #
3:
4: import string
5: import logging
6: import sys
7: import types
8: import time
9: import re
10:
11: from xml import sax
12: from amara import saxtools
13:
14: try:
15: import psycopg2 as psycopg
16: import psycopg2.extensions
17: # switch to unicode
18: psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
19: psyco = 2
20: except:
21: import psycopg
22: psyco = 1
23:
24: fm_ns = 'http://www.filemaker.com/fmpxmlresult'
25:
26: version_string = "V0.6.7 ROC 21.6.2011"
27:
28: def unicodify(text, withNone=False):
29: """decode str (utf-8 or latin-1 representation) into unicode object"""
30: if withNone and text is None:
31: return None
32: if not text:
33: return u""
34: if isinstance(text, str):
35: try:
36: return text.decode('utf-8')
37: except:
38: return text.decode('latin-1')
39: else:
40: return text
41:
42: def utf8ify(text, withNone=False):
43: """encode unicode object or string into byte string in utf-8 representation"""
44: if withNone and text is None:
45: return None
46: if not text:
47: return ""
48: if isinstance(text, unicode):
49: return text.encode('utf-8')
50: else:
51: return text
52:
53: def getTextFromNode(nodename):
54: """get the cdata content of a node"""
55: if nodename is None:
56: return ""
57: nodelist=nodename.childNodes
58: rc = ""
59: for node in nodelist:
60: if node.nodeType == node.TEXT_NODE:
61: rc = rc + node.data
62: return rc
63:
64: def sql_quote(v):
65: # quote dictionary
66: quote_dict = {"\'": "''", "\\": "\\\\"}
67: for dkey in quote_dict.keys():
68: if string.find(v, dkey) >= 0:
69: v=string.join(string.split(v,dkey),quote_dict[dkey])
70: return "'%s'"%v
71:
72: def sqlName(s, lc=True, more=''):
73: """returns restricted ASCII-only version of string"""
74: if s is None:
75: return ""
76:
77: # remove '
78: s = s.replace("'","")
79: # all else -> "_"
80: s = re.sub('[^A-Za-z0-9_'+more+']','_',s)
81: if lc:
82: return s.lower()
83:
84: return s
85:
86: def SimpleSearch(curs,query, args=None, ascii=False):
87: """execute sql query and return data"""
88: #logger.debug("executing: "+query)
89: if ascii:
90: # encode all in UTF-8
91: query = utf8ify(query)
92: if args is not None:
93: encargs = []
94: for a in args:
95: encargs.append(utf8ify(a, withNone=True))
96:
97: args = encargs
98:
99: curs.execute(query, args)
100: #logger.debug("sql done")
101: try:
102: return curs.fetchall()
103: except:
104: return None
105:
106:
107: class TableColumn:
108: """simple type for storing sql column name and type"""
109:
110: def __init__(self, name, type=None):
111: #print "new tablecolumn(%s,%s)"%(name, type)
112: self.name = name
113: self.type = type
114:
115: def getName(self):
116: return self.name
117:
118: def getType(self):
119: if self.type is not None:
120: return self.type
121: else:
122: return "text"
123:
124: def __str__(self):
125: return self.name
126:
127:
128: class xml_handler:
129: def __init__(self,options):
130: """SAX handler to import FileMaker XML file (FMPXMLRESULT format) into the table.
131: @param options: dict of options
132: @param options.dsn: database connection string
133: @param options.table: name of the table the xml shall be imported into
134: @param options.filename: xmlfile filename
135: @param options.update_fields: (optional) list of fields to update; default is to create all fields
136: @param options.id_field: (optional) field which uniquely identifies an entry for updating purposes.
137: @param options.sync_mode: (optional) really synchronise, i.e. delete entries not in XML file
138: @param options.lc_names: (optional) lower case and clean up field names from XML
139: @param options.keep_fields: (optional) don't add fields to SQL database
140: @param options.ascii_db: (optional) assume ascii encoding in db
141: @param options.replace_table: (optional) delete and re-insert data
142: @param options.backup_table: (optional) create backup of old table (breaks indices)
143: @param options.use_logger_instance: (optional) use this instance of a logger
144: """
145:
146: # set up logger
147: if hasattr(options, 'use_logger_instance'):
148: self.logger = options.use_logger_instance
149: else:
150: self.logger = logging.getLogger('db.import.fmpxml')
151:
152:
153: # set up parser
154: self.event = None
155: self.top_dispatcher = {
156: (saxtools.START_ELEMENT, fm_ns, u'METADATA'):
157: self.handle_meta_fields,
158: (saxtools.START_ELEMENT, fm_ns, u'RESULTSET'):
159: self.handle_data_fields,
160: }
161:
162: # connect database
163: self.dbCon = psycopg.connect(options.dsn)
164: logging.debug("DB encoding: %s"%getattr(self.dbCon, 'encoding', 'UNKNOWN'))
165: self.db = self.dbCon.cursor()
166: assert self.db, "AIIEE no db cursor for %s!!"%options.dsn
167:
168: self.table = getattr(options,"table",None)
169: self.update_fields = getattr(options,"update_fields",None)
170: self.id_field = getattr(options,"id_field",None)
171: self.sync_mode = getattr(options,"sync_mode",None)
172: self.lc_names = getattr(options,"lc_names",None)
173: self.keep_fields = getattr(options,"keep_fields",None)
174: self.ascii_db = getattr(options,"ascii_db",None)
175: self.replace_table = getattr(options,"replace_table",None)
176: self.backup_table = getattr(options,"backup_table",None)
177: self.read_before_update = getattr(options,"read_before_update",None)
178: self.debug_data = getattr(options,"debug_data",None)
179:
180: self.logger.debug("dsn: "+repr(getattr(options,"dsn",None)))
181: self.logger.debug("table: "+repr(self.table))
182: self.logger.debug("update_fields: "+repr(self.update_fields))
183: self.logger.debug("id_field: "+repr(self.id_field))
184: self.logger.debug("sync_mode: "+repr(self.sync_mode))
185: self.logger.debug("lc_names: "+repr(self.lc_names))
186: self.logger.debug("keep_fields: "+repr(self.keep_fields))
187: self.logger.debug("ascii_db: "+repr(self.ascii_db))
188: self.logger.debug("replace_table: "+repr(self.replace_table))
189: self.logger.debug("backup_table: "+repr(self.backup_table))
190: self.logger.debug("read_before_update: "+repr(self.read_before_update))
191: self.logger.debug("debug_data: "+repr(self.debug_data))
192:
193: self.dbIDs = {}
194: self.rowcnt = 0
195:
196: if self.id_field is not None:
197: # prepare a list of ids for sync mode
198: qstr="select %s from %s"%(self.id_field,self.table)
199: for id in SimpleSearch(self.db, qstr):
200: # value 0: not updated
201: self.dbIDs[id[0]] = 0;
202: self.rowcnt += 1
203:
204: self.logger.info("%d entries in DB to sync"%self.rowcnt)
205:
206: # names of fields in XML file
207: self.xml_field_names = []
208: # map XML field names to SQL field names
209: self.xml_field_map = {}
210: # and vice versa
211: self.sql_field_map = {}
212:
213: return
214:
215: def handle_meta_fields(self, end_condition):
216: dispatcher = {
217: (saxtools.START_ELEMENT, fm_ns, u'FIELD'):
218: self.handle_meta_field,
219: }
220: #First round through the generator corresponds to the
221: #start element event
222: self.logger.info("reading metadata...")
223: if self.debug_data:
224: self.logger.debug("START METADATA")
225: yield None
226:
227: #delegate is a generator that handles all the events "within"
228: #this element
229: delegate = None
230: while not self.event == end_condition:
231: delegate = saxtools.tenorsax.event_loop_body(
232: dispatcher, delegate, self.event)
233: yield None
234:
235: #Element closed. Wrap up
236: if self.debug_data:
237: self.logger.debug("END METADATA")
238:
239: # rename table for backup
240: if self.backup_table:
241: self.orig_table = self.table
242: self.tmp_table = self.table + "_tmp"
243: backup_name = "%s_%s"%(self.table,time.strftime('%Y_%m_%d_%H_%M_%S'))
244:
245: # remove old temp table
246: qstr = "DROP TABLE %s"%(self.tmp_table)
247: try:
248: self.db.execute(qstr)
249: except:
250: pass
251:
252: self.dbCon.commit()
253:
254: if self.id_field:
255: # sync mode -- copy backup table, update current table
256: self.logger.info("copy table %s to %s"%(self.table,backup_name))
257: qstr = "CREATE TABLE %s AS (SELECT * FROM %s)"%(backup_name,self.table)
258:
259: else:
260: # replace mode -- create empty tmp table, insert into tmp table
261: self.table = self.tmp_table
262: self.logger.info("create empty table %s"%(self.table))
263: qstr = "CREATE TABLE %s AS (SELECT * FROM %s WHERE 1=0)"%(self.table,self.orig_table)
264:
265: self.db.execute(qstr)
266: self.dbCon.commit()
267:
268: # delete data from table for replace
269: if self.replace_table:
270: self.logger.info("delete data from table %s"%(self.table))
271: qstr = "TRUNCATE TABLE %s"%(self.table)
272: self.db.execute(qstr)
273: self.dbCon.commit()
274:
275: # try to match date style with XML
276: self.db.execute("set datestyle to 'german'")
277:
278: #self.logger.debug("xml-fieldnames:"+repr(self.xml_field_names))
279: # get list of fields and types of db table
280: qstr="select attname, format_type(pg_attribute.atttypid, pg_attribute.atttypmod) from pg_attribute, pg_class where attrelid = pg_class.oid and pg_attribute.attnum > 0 and relname = '%s'"
281: self.sql_fields={}
282: for f in SimpleSearch(self.db, qstr%self.table):
283: fn = f[0]
284: ft = f[1]
285: #print "SQL fields: %s (%s)"%(n,t)
286: self.sql_fields[fn] = TableColumn(fn,ft)
287:
288: # translate id_field (SQL-name) to XML-name
289: self.xml_id = self.sql_field_map.get(self.id_field, None)
290: # get type of id_field
291: if self.id_field:
292: self.id_type = self.sql_fields[self.id_field].getType()
293: else:
294: self.id_type = None
295:
296: # check fields to update
297: if self.update_fields is None:
298: if self.keep_fields:
299: # update all existing fields from sql (when they are in the xml file)
300: self.update_fields = {}
301: for f in self.sql_fields.keys():
302: if self.sql_field_map.has_key(f):
303: xf = self.sql_field_map[f]
304: self.update_fields[f] = self.xml_field_map[xf]
305:
306: else:
307: # update all fields
308: if self.lc_names:
309: # create dict with sql names
310: self.update_fields = {}
311: for f in self.xml_field_map.values():
312: self.update_fields[f.getName()] = f
313:
314: else:
315: self.update_fields = self.xml_field_map
316:
317: # and translate to list of xml fields
318: if self.lc_names:
319: self.xml_update_list = [self.sql_field_map[x] for x in self.update_fields]
320: else:
321: self.xml_update_list = self.update_fields.keys()
322:
323: if not self.keep_fields:
324: # adjust db table to fields in XML and update_fields
325: for f in self.xml_field_map.values():
326: self.logger.debug("sync-fieldname: %s"%f.getName())
327: sf = self.sql_fields.get(f.getName(), None)
328: uf = self.update_fields.get(f.getName(), None)
329: if sf is not None:
330: # name in db -- check type
331: if f.getType() != sf.getType():
332: self.logger.debug("field %s has different type (%s vs %s)"%(f,f.getType(),sf.getType()))
333: elif uf is not None:
334: # add field to table
335: fn = uf.getName()
336: ft = uf.getType()
337: qstr="alter table %s add \"%s\" %s"%(self.table,fn,ft)
338: self.logger.info("db add field:"+qstr)
339:
340: if self.ascii_db and type(qstr)==types.UnicodeType:
341: qstr=qstr.encode('utf-8')
342:
343: self.db.execute(qstr)
344: self.dbCon.commit()
345: # add field to field list
346: self.sql_fields[fn] = TableColumn(fn, ft)
347:
348: # prepare sql statements for update (do not update id_field)
349: setStr=string.join(["\"%s\" = %%s"%self.xml_field_map[f] for f in self.xml_update_list if f != self.xml_id], ', ')
350: self.updQuery="UPDATE %s SET %s WHERE \"%s\" = %%s"%(self.table,setStr,self.id_field)
351: # and select (for update check)
352: selStr=string.join([self.xml_field_map[f].getName() for f in self.xml_update_list if f != self.xml_id], ', ')
353: self.selQuery="SELECT %s FROM %s WHERE \"%s\" = %%s"%(selStr,self.table,self.id_field)
354: # and insert
355: fields=string.join(["\"%s\""%self.xml_field_map[x].getName() for x in self.xml_update_list], ',')
356: values=string.join(['%s' for f in self.xml_update_list], ',')
357: self.addQuery="INSERT INTO %s (%s) VALUES (%s)"%(self.table,fields,values)
358: self.logger.debug("update-query: "+self.updQuery)
359: self.logger.debug("sel-query: "+self.selQuery)
360: self.logger.debug("add-query: "+self.addQuery)
361: return
362:
363: def handle_meta_field(self, end_condition):
364: name = self.params.get((None, u'NAME'))
365: yield None
366: #Element closed. Wrap up
367: if self.lc_names:
368: # clean name
369: sqlname = sqlName(name)
370: else:
371: sqlname = name
372: self.xml_field_names.append(name)
373: # map to sql name and default text type
374: self.xml_field_map[name] = TableColumn(sqlname, 'text')
375: self.sql_field_map[sqlname] = name
376: self.logger.debug("FIELD name: "+name)
377: return
378:
379: def handle_data_fields(self, end_condition):
380: dispatcher = {
381: (saxtools.START_ELEMENT, fm_ns, u'ROW'):
382: self.handle_row,
383: }
384: #First round through the generator corresponds to the
385: #start element event
386: self.logger.info("reading data...")
387: if self.debug_data:
388: self.logger.debug("START RESULTSET")
389: self.rowcnt = 0
390: yield None
391:
392: #delegate is a generator that handles all the events "within"
393: #this element
394: delegate = None
395: while not self.event == end_condition:
396: delegate = saxtools.tenorsax.event_loop_body(
397: dispatcher, delegate, self.event)
398: yield None
399:
400: #Element closed. Wrap up
401: if self.debug_data:
402: self.logger.debug("END RESULTSET")
403: self.dbCon.commit()
404:
405: if self.sync_mode:
406: # delete unmatched entries in db
407: if self.rowcnt > 0:
408: self.logger.info("deleting unmatched rows from db")
409: delQuery = "DELETE FROM %s WHERE \"%s\" = %%s"%(self.table,self.id_field)
410: for id in self.dbIDs.keys():
411: # find all not-updated fields
412: if self.dbIDs[id] == 0:
413: self.logger.info(" delete: %s"%id)
414: SimpleSearch(self.db, delQuery, [id], ascii=self.ascii_db)
415:
416: elif self.dbIDs[id] > 1:
417: self.logger.info(" sync: ID %s used more than once?"%id)
418:
419: self.dbCon.commit()
420:
421: else:
422: # safety in case we had an empty file
423: self.logger.warning("no rows read! not deleting unmatched rows!")
424:
425: # reinstate backup tables
426: if self.backup_table and not self.id_field:
427: backup_name = "%s_%s"%(self.orig_table,time.strftime('%Y_%m_%d_%H_%M_%S'))
428: self.logger.info("rename backup table %s to %s"%(self.orig_table,backup_name))
429: qstr = "ALTER TABLE %s RENAME TO %s"%(self.orig_table,backup_name)
430: self.db.execute(qstr)
431: self.logger.info("rename working table %s to %s"%(self.table,self.orig_table))
432: qstr = "ALTER TABLE %s RENAME TO %s"%(self.table,self.orig_table)
433: self.db.execute(qstr)
434: self.dbCon.commit()
435:
436: self.logger.info("Done (%s rows)"%self.rowcnt)
437: return
438:
439: def handle_row(self, end_condition):
440: dispatcher = {
441: (saxtools.START_ELEMENT, fm_ns, u'COL'):
442: self.handle_col,
443: }
444: if self.debug_data:
445: self.logger.debug("START ROW")
446: self.xml_data = {}
447: self.colIdx = 0
448: yield None
449:
450: #delegate is a generator that handles all the events "within"
451: #this element
452: delegate = None
453: while not self.event == end_condition:
454: delegate = saxtools.tenorsax.event_loop_body(
455: dispatcher, delegate, self.event)
456: yield None
457:
458: #Element closed. Wrap up
459: if self.debug_data:
460: self.logger.debug("END ROW")
461: self.rowcnt += 1
462: # process collected row data
463: update=False
464: id_val=''
465: # synchronize by id_field
466: if self.id_field:
467: if self.id_type == 'integer':
468: try:
469: id_val = int(self.xml_data[self.xml_id])
470: except:
471: pass
472: else:
473: id_val = self.xml_data[self.xml_id]
474:
475: if not id_val:
476: # abort update
477: self.logger.error("ERROR: unable to sync! emtpy id in row %s"%self.rowcnt)
478: return
479:
480: if id_val in self.dbIDs:
481: self.dbIDs[id_val] += 1
482: update=True
483:
484: # collect all values
485: args = []
486: for fn in self.xml_update_list:
487: # do not update id_field
488: if update and fn == self.xml_id:
489: continue
490:
491: f = self.xml_field_map[fn]
492: val = self.xml_data[fn]
493: type = self.sql_fields[f.getName()].getType()
494: if type == "date" and len(val.strip()) == 0:
495: # empty date field
496: val = None
497:
498: elif type == "integer" and len(val) == 0:
499: # empty int field
500: val = None
501:
502: args.append(val)
503:
504: if update:
505: # update existing row (by id_field)
506: if self.read_before_update:
507: # read data
508: if self.debug_data:
509: self.logger.debug("update check: %s = %s"%(id_val, args))
510: oldrow = SimpleSearch(self.db, self.selQuery, [id_val], ascii=self.ascii_db)
511: #i = 0
512: #for v in oldrow[0]:
513: # logging.debug("v: %s = %s (%s)"%(v,args[i],v==args[i]))
514: # i += 1
515: if tuple(oldrow[0]) != tuple(args):
516: # data has changed -- update
517: if self.debug_data:
518: self.logger.debug("really update: %s = %s"%(id_val, args))
519: args.append(id_val) # last arg is id
520: SimpleSearch(self.db, self.updQuery, args, ascii=self.ascii_db)
521:
522: else:
523: # always update
524: if self.debug_data:
525: self.logger.debug("update: %s = %s"%(id_val, args))
526: args.append(id_val) # last arg is id
527: SimpleSearch(self.db, self.updQuery, args, ascii=self.ascii_db)
528:
529: else:
530: # create new row
531: if self.debug_data:
532: self.logger.debug("insert: %s"%args)
533: SimpleSearch(self.db, self.addQuery, args, ascii=self.ascii_db)
534:
535: #self.logger.info(" row:"+"%d (%s)"%(self.rowcnt,id_val))
536: if (self.rowcnt % 100) == 0:
537: self.logger.info(" row:"+"%d (id:%s)"%(self.rowcnt,id_val))
538: self.dbCon.commit()
539:
540: return
541:
542: def handle_col(self, end_condition):
543: dispatcher = {
544: (saxtools.START_ELEMENT, fm_ns, u'DATA'):
545: self.handle_data_tag,
546: }
547: #print "START COL"
548: yield None
549: #delegate is a generator that handles all the events "within"
550: #this element
551: delegate = None
552: while not self.event == end_condition:
553: delegate = saxtools.tenorsax.event_loop_body(
554: dispatcher, delegate, self.event)
555: yield None
556: #Element closed. Wrap up
557: #print "END COL"
558: self.colIdx += 1
559: return
560:
561: def handle_data_tag(self, end_condition):
562: #print "START DATA"
563: content = u''
564: yield None
565: # gather child elements
566: while not self.event == end_condition:
567: if self.event[0] == saxtools.CHARACTER_DATA:
568: content += self.params
569: yield None
570: #Element closed. Wrap up
571: fn = self.xml_field_names[self.colIdx]
572: self.xml_data[fn] = content
573: return
574:
575:
576: def importFMPXML(options):
577: """import FileMaker XML file (FMPXMLRESULT format) into the table.
578: @param options: dict of options
579: @param options.dsn: database connection string
580: @param options.table: name of the table the xml shall be imported into
581: @param options.filename: xmlfile filename
582: @param options.update_fields: (optional) list of fields to update; default is to create all fields
583: @param options.id_field: (optional) field which uniquely identifies an entry for updating purposes.
584: @param options.sync_mode: (optional) really synchronise, i.e. delete entries not in XML file
585: @param options.lc_names: (optional) lower case and clean up field names from XML
586: @param options.keep_fields: (optional) don't add fields to SQL database
587: @param options.ascii_db: (optional) assume ascii encoding in db
588: @param options.replace_table: (optional) delete and re-insert data
589: @param options.backup_table: (optional) create backup of old table
590: """
591:
592: if getattr(options,'update_fields',None):
593: uf = {}
594: for f in options.update_fields.split(','):
595: if f.find(':') > 0:
596: (n,t) = f.split(':')
597: else:
598: n = f
599: t = None
600: uf[n] = TableColumn(n,t)
601:
602: options.update_fields = uf
603:
604: if getattr(options,'id_field',None) and getattr(options,'replace_table',None):
605: logging.error("ABORT: sorry, you can't do both sync (id_field) and replace")
606: return
607:
608: parser = sax.make_parser()
609: #The "consumer" is our own handler
610: consumer = xml_handler(options)
611: #Initialize Tenorsax with handler
612: handler = saxtools.tenorsax(consumer)
613: #Resulting tenorsax instance is the SAX handler
614: parser.setContentHandler(handler)
615: parser.setFeature(sax.handler.feature_namespaces, 1)
616: parser.parse(options.filename)
617:
618:
619: if __name__ == "__main__":
620: from optparse import OptionParser
621:
622: opars = OptionParser()
623: opars.add_option("-f", "--file",
624: dest="filename",
625: help="FMPXML file name", metavar="FILE")
626: opars.add_option("-c", "--dsn",
627: dest="dsn",
628: help="database connection string")
629: opars.add_option("-t", "--table",
630: dest="table",
631: help="database table name")
632: opars.add_option("--fields", default=None,
633: dest="update_fields",
634: help="list of fields to update (comma separated, sql-names)", metavar="LIST")
635: opars.add_option("--id-field", default=None,
636: dest="id_field",
637: help="name of id field for synchronisation (only appends data otherwise, sql-name)", metavar="NAME")
638: opars.add_option("--sync", "--sync-mode", default=False, action="store_true",
639: dest="sync_mode",
640: help="do full sync based on id field (remove unmatched fields from db)")
641: opars.add_option("--lc-names", default=False, action="store_true",
642: dest="lc_names",
643: help="clean and lower case field names from XML")
644: opars.add_option("--keep-fields", default=False, action="store_true",
645: dest="keep_fields",
646: help="don't add fields from XML to SQL table")
647: opars.add_option("--ascii-db", default=False, action="store_true",
648: dest="ascii_db",
649: help="the SQL database stores ASCII instead of unicode")
650: opars.add_option("--replace", default=False, action="store_true",
651: dest="replace_table",
652: help="replace table i.e. delete and re-insert data")
653: opars.add_option("--backup", default=False, action="store_true",
654: dest="backup_table",
655: help="create backup of old table")
656: opars.add_option("--read-before-update", default=False, action="store_true",
657: dest="read_before_update",
658: help="read all data to check if it really changed")
659: opars.add_option("-d", "--debug", default=False, action="store_true",
660: dest="debug",
661: help="debug mode (more output)")
662: opars.add_option("--debug-data", default=False, action="store_true",
663: dest="debug_data",
664: help="debug mode for data (even more output)")
665:
666: (options, args) = opars.parse_args()
667:
668: if len(sys.argv) < 2 or options.filename is None or options.dsn is None:
669: print "importFMPXML "+version_string
670: opars.print_help()
671: sys.exit(1)
672:
673: if options.debug:
674: loglevel = logging.DEBUG
675: else:
676: loglevel = logging.INFO
677:
678: logging.basicConfig(level=loglevel,
679: format='%(asctime)s %(levelname)s %(message)s',
680: datefmt='%H:%M:%S')
681:
682: importFMPXML(options)
683:
684:
685:
686:
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>