version 1.4, 2006/12/21 12:17:32
|
version 1.5, 2007/01/09 14:00:59
|
Line 2
|
Line 2
|
# |
# |
|
|
import string |
import string |
from xml.dom.pulldom import parse |
|
import logging |
import logging |
import sys |
import sys |
|
|
|
from xml import sax |
|
from amara import saxtools |
|
|
try: |
try: |
import psycopg2 as psycopg |
import psycopg2 as psycopg |
psyco = 2 |
psyco = 2 |
Line 13 except:
|
Line 15 except:
|
import psycopg |
import psycopg |
psyco = 1 |
psyco = 1 |
|
|
|
fm_ns = 'http://www.filemaker.com/fmpxmlresult' |
|
|
def getTextFromNode(nodename): |
def getTextFromNode(nodename): |
"""get the cdata content of a node""" |
"""get the cdata content of a node""" |
Line 46 def SimpleSearch(curs,query, args=None):
|
Line 49 def SimpleSearch(curs,query, args=None):
|
return None |
return None |
|
|
|
|
def importXMLFileFMP(dsn,table,filename,update_fields=None,id_field=None,sync_mode=False): |
|
|
class xml_handler: |
|
|
|
def __init__(self,dsn,table,update_fields=None,id_field=None,sync_mode=False): |
''' |
''' |
Import FileMaker XML file (FMPXMLRESULT format) into the table. |
SAX handler to import FileMaker XML file (FMPXMLRESULT format) into the table. |
|
@param dsn: database connection string |
@param table: name of the table the xml shall be imported into |
@param table: name of the table the xml shall be imported into |
@param filename: xmlfile filename |
@param filename: xmlfile filename |
@param update_fields: (optional) list of fields to update; default is to create all fields |
@param update_fields: (optional) list of fields to update; default is to create all fields |
@param id_field: (optional) field which uniquely identifies an entry for updating purposes. |
@param id_field: (optional) field which uniquely identifies an entry for updating purposes. |
@param sync_mode: (optional) really synchronise, i.e. delete entries not in XML file |
@param sync_mode: (optional) really synchronise, i.e. delete entries not in XML file |
''' |
''' |
|
# set up parser |
|
self.event = None |
|
self.top_dispatcher = { |
|
(saxtools.START_ELEMENT, fm_ns, u'METADATA'): |
|
self.handle_meta_fields, |
|
(saxtools.START_ELEMENT, fm_ns, u'RESULTSET'): |
|
self.handle_data, |
|
} |
|
|
# connect database |
# connect database |
dbCon = psycopg.connect(dsn) |
self.dbCon = psycopg.connect(dsn) |
db = dbCon.cursor() |
self.db = self.dbCon.cursor() |
assert db, "AIIEE no db cursor for %s!!"%dsn |
assert self.db, "AIIEE no db cursor for %s!!"%dsn |
|
|
# read XML file |
|
fh=file(filename) |
|
logging.info("reading file "+filename) |
|
doc=parse(fh) |
|
logging.info("file read") |
|
|
|
dbIDs = {} |
|
rowcnt = 0 |
|
ret = "" |
|
|
|
logging.debug("dsn: "+dsn) |
logging.debug("dsn: "+repr(dsn)) |
logging.debug("table: "+table) |
logging.debug("table: "+repr(table)) |
logging.debug("update_fields: "+repr(update_fields)) |
logging.debug("update_fields: "+repr(update_fields)) |
logging.debug("id_field: "+id_field) |
logging.debug("id_field: "+repr(id_field)) |
logging.debug("sync_mode: "+repr(sync_mode)) |
logging.debug("sync_mode: "+repr(sync_mode)) |
|
|
|
self.table = table |
|
self.update_fields = update_fields |
|
self.id_field = id_field |
|
self.sync_mode = sync_mode |
|
|
|
self.dbIDs = {} |
|
self.rowcnt = 0 |
|
|
if id_field is not None: |
if id_field is not None: |
# prepare a list of ids for sync mode |
# prepare a list of ids for sync mode |
qstr="select %s from %s"%(id_field,table) |
qstr="select %s from %s"%(id_field,table) |
for id in SimpleSearch(db, qstr): |
for id in SimpleSearch(self.db, qstr): |
# value 0: not updated |
# value 0: not updated |
dbIDs[id[0]] = 0; |
self.dbIDs[id[0]] = 0; |
rowcnt += 1 |
self.rowcnt += 1 |
|
|
logging.info("%d entries in DB to sync"%rowcnt) |
|
|
|
fieldNames = [] |
logging.info("%d entries in DB to sync"%self.rowcnt) |
rowcnt = 0 |
|
id_val = '' |
|
|
|
while 1: |
self.fieldNames = [] |
node=doc.getEvent() |
|
|
|
if node is None: |
return |
break; |
|
|
|
# METADATA tag defines number and names of fields in FMPXMLRESULT |
def handle_meta_fields(self, end_condition): |
if node[1].nodeName == 'METADATA': |
dispatcher = { |
doc.expandNode(node[1]) |
(saxtools.START_ELEMENT, fm_ns, u'FIELD'): |
|
self.handle_meta_field, |
names=node[1].getElementsByTagName('FIELD') |
} |
|
#First round through the generator corresponds to the |
for name in names: |
#start element event |
fn = name.getAttribute('NAME') |
logging.debug("START METADATA") |
fieldNames.append(fn) |
yield None |
|
|
if update_fields is None: |
#delegate is a generator that handles all the events "within" |
|
#this element |
|
delegate = None |
|
while not self.event == end_condition: |
|
delegate = saxtools.tenorsax.event_loop_body( |
|
dispatcher, delegate, self.event) |
|
yield None |
|
|
|
#Element closed. Wrap up |
|
logging.debug("END METADATA") |
|
if self.update_fields is None: |
# update all fields |
# update all fields |
update_fields = fieldNames |
self.update_fields = self.fieldNames |
|
|
logging.debug("fieldnames:"+repr(fieldNames)) |
logging.debug("xml-fieldnames:"+repr(self.fieldNames)) |
# get list of fields in db table |
# get list of fields in db table |
qstr="""select attname from pg_attribute, pg_class where attrelid = pg_class.oid and relname = '%s'""" |
qstr="""select attname from pg_attribute, pg_class where attrelid = pg_class.oid and relname = '%s'""" |
columns=[x[0] for x in SimpleSearch(db, qstr%table)] |
columns=[x[0] for x in SimpleSearch(self.db, qstr%self.table)] |
|
|
# adjust db table to fields in XML and fieldlist |
# adjust db table to fields in XML and fieldlist |
for fieldName in fieldNames: |
for fieldName in self.fieldNames: |
logging.debug("db-fieldname:"+repr(fieldName)) |
logging.debug("db-fieldname:"+repr(fieldName)) |
if (fieldName not in columns) and (fieldName in update_fields): |
if (fieldName not in columns) and (fieldName in self.update_fields): |
qstr="alter table %s add %%s %%s"%table |
qstr="alter table %s add %s %s"%(self.table,fieldName,'text') |
logging.info("db add field:"+qstr%(fieldName,'text')) |
logging.info("db add field:"+qstr) |
db.execute(qstr, (fieldName,'text')) |
self.db.execute(qstr) |
db.commit() |
self.dbCon.commit() |
|
|
|
# prepare sql statements for update |
# ROW tags (in RESULTSET tag) hold data |
setStr=string.join(["%s = %%s"%f for f in self.update_fields], ', ') |
elif node[1].nodeName == 'ROW': |
self.updQuery="UPDATE %s SET %s WHERE %s = %%s"%(self.table,setStr,self.id_field) |
rowcnt += 1 |
# and insert |
|
fields=string.join(self.update_fields, ',') |
doc.expandNode(node[1]) |
values=string.join(['%s' for f in self.update_fields], ',') |
cols=node[1].getElementsByTagName('COL') |
self.addQuery="INSERT INTO %s (%s) VALUES (%s)"%(self.table,fields,values) |
dataSet={} |
#print "upQ: ", self.updQuery |
i = 0 |
#print "adQ: ", self.addQuery |
# populate with data |
|
for col in cols: |
return |
data=col.getElementsByTagName('DATA') |
|
dataSet[fieldNames[i]] = getTextFromNode(data[0]) |
def handle_meta_field(self, end_condition): |
i += 1 |
name = self.params.get((None, u'NAME')) |
|
yield None |
|
#Element closed. Wrap up |
|
self.fieldNames.append(name) |
|
logging.debug("FIELD name: "+name) |
|
return |
|
|
|
def handle_data(self, end_condition): |
|
dispatcher = { |
|
(saxtools.START_ELEMENT, fm_ns, u'ROW'): |
|
self.handle_row, |
|
} |
|
#First round through the generator corresponds to the |
|
#start element event |
|
logging.debug("START RESULTSET") |
|
self.rowcnt = 0 |
|
yield None |
|
|
|
#delegate is a generator that handles all the events "within" |
|
#this element |
|
delegate = None |
|
while not self.event == end_condition: |
|
delegate = saxtools.tenorsax.event_loop_body( |
|
dispatcher, delegate, self.event) |
|
yield None |
|
|
|
#Element closed. Wrap up |
|
logging.debug("END RESULTSET") |
|
self.dbCon.commit() |
|
|
update=False |
if self.sync_mode: |
|
# delete unmatched entries in db |
|
for id in self.dbIDs.keys(): |
|
# find all not-updated fields |
|
if self.dbIDs[id] == 0: |
|
logging.info(" delete:"+id) |
|
qstr = "DELETE FROM %s WHERE %%s = '%%s'"%self.table |
|
SimpleSearch(self.db, qstr, (self.id_field,id)) |
|
|
|
elif self.dbIDs[id] > 1: |
|
logging.info(" sync:"+"id used more than once?"+id) |
|
|
|
self.dbCon.commit() |
|
|
|
return |
|
|
|
def handle_row(self, end_condition): |
|
dispatcher = { |
|
(saxtools.START_ELEMENT, fm_ns, u'COL'): |
|
self.handle_col, |
|
} |
|
logging.debug("START ROW") |
|
self.dataSet = {} |
|
self.colIdx = 0 |
|
yield None |
|
|
|
#delegate is a generator that handles all the events "within" |
|
#this element |
|
delegate = None |
|
while not self.event == end_condition: |
|
delegate = saxtools.tenorsax.event_loop_body( |
|
dispatcher, delegate, self.event) |
|
yield None |
|
|
|
#Element closed. Wrap up |
|
logging.debug("END ROW") |
|
self.rowcnt += 1 |
|
# process collected row data |
|
update=False |
|
id_val='' |
# synchronize by id_field |
# synchronize by id_field |
if id_field: |
if self.id_field: |
id_val=dataSet[id_field] |
id_val=self.dataSet[self.id_field] |
if id_val in dbIDs: |
if id_val in self.dbIDs: |
dbIDs[id_val] += 1 |
self.dbIDs[id_val] += 1 |
update=True |
update=True |
|
|
if update: |
if update: |
# update existing row (by id_field) |
# update existing row (by id_field) |
setvals=[] |
#setvals=[] |
for fieldName in update_fields: |
#for fieldName in self.update_fields: |
setvals.append("%s = %s"%(fieldName,sql_quote(dataSet[fieldName]))) |
# setvals.append("%s = %s"%(fieldName,sql_quote(self.dataSet[fieldName]))) |
setStr=string.join(setvals, ',') |
#setStr=string.join(setvals, ',') |
id_val=dataSet[id_field] |
id_val=self.dataSet[self.id_field] |
qstr="UPDATE %s SET %s WHERE %s = '%s' "%(table,setStr,id_field,id_val) |
#qstr="UPDATE %s SET %s WHERE %s = '%s' "%(self.table,setStr,self.id_field,id_val) |
SimpleSearch(db, qstr) |
args = [self.dataSet[f] for f in self.update_fields] |
ret += "up: %s"%id_val |
args.append(id_val) |
|
SimpleSearch(self.db, self.updQuery, args) |
|
logging.debug("update: %s"%id_val) |
else: |
else: |
# create new row |
# create new row |
fields=string.join(update_fields, ',') |
#fields=string.join(update_fields, ',') |
values=string.join([" %s "%sql_quote(dataSet[x]) for x in update_fields], ',') |
#values=string.join([" %s "%sql_quote(self.dataSet[x]) for x in self.update_fields], ',') |
qstr="INSERT INTO %s (%s) VALUES (%s)"%(table,fields,values) |
#qstr="INSERT INTO %s (%s) VALUES (%s)"%(self.table,fields,self.values) |
SimpleSearch(db, qstr) |
args = [self.dataSet[f] for f in self.update_fields] |
ret += "ad: %s"%dataSet.get(id_field, rowcnt) |
SimpleSearch(self.db, self.addQuery, args) |
|
logging.debug("add: %s"%self.dataSet.get(self.id_field, rowcnt)) |
#logging.info(" row:"+"%d (%s)"%(rowcnt,id_val)) |
|
if (rowcnt % 10) == 0: |
#logging.info(" row:"+"%d (%s)"%(self.rowcnt,id_val)) |
logging.info(" row:"+"%d (%s)"%(rowcnt,id_val)) |
if (self.rowcnt % 10) == 0: |
dbCon.commit() |
logging.info(" row:"+"%d (%s)"%(self.rowcnt,id_val)) |
|
self.dbCon.commit() |
|
|
|
return |
|
|
|
def handle_col(self, end_condition): |
|
dispatcher = { |
|
(saxtools.START_ELEMENT, fm_ns, u'DATA'): |
|
self.handle_data_tag, |
|
} |
|
#print "START COL" |
|
yield None |
|
#delegate is a generator that handles all the events "within" |
|
#this element |
|
delegate = None |
|
while not self.event == end_condition: |
|
delegate = saxtools.tenorsax.event_loop_body( |
|
dispatcher, delegate, self.event) |
|
yield None |
|
#Element closed. Wrap up |
|
#print "END COL" |
|
self.colIdx += 1 |
|
return |
|
|
|
def handle_data_tag(self, end_condition): |
|
#print "START DATA" |
|
content = u'' |
|
yield None |
|
# gather child elements |
|
while not self.event == end_condition: |
|
if self.event[0] == saxtools.CHARACTER_DATA: |
|
content += self.params |
|
yield None |
|
#Element closed. Wrap up |
|
field = self.fieldNames[self.colIdx] |
|
self.dataSet[field] = content |
|
#print " DATA(", field, ") ", repr(content) |
|
return |
|
|
dbCon.commit() |
|
if sync_mode: |
|
# delete unmatched entries in db |
|
for id in dbIDs.keys(): |
|
# find all not-updated fields |
|
if dbIDs[id] == 0: |
|
logging.info(" delete:"+id) |
|
qstr = "DELETE FROM %s WHERE %%s = '%%s'"%table |
|
SimpleSearch(db, qstr, (id_field,id)) |
|
|
|
elif dbIDs[id] > 1: |
|
logging.info(" sync:"+"id used more than once?"+id) |
|
|
|
dbCon.commit() |
|
|
|
return ret |
|
|
|
## |
## |
## public static int main() |
## public static int main() |
Line 210 opars.add_option("--fields", default=Non
|
Line 319 opars.add_option("--fields", default=Non
|
help="list of fields to update (comma separated)", metavar="LIST") |
help="list of fields to update (comma separated)", metavar="LIST") |
opars.add_option("--id-field", default=None, |
opars.add_option("--id-field", default=None, |
dest="id_field", |
dest="id_field", |
help="name of id field for synchronisation", metavar="NAME") |
help="name of id field for synchronisation (only appends data otherwise)", metavar="NAME") |
opars.add_option("--sync-mode", default=False, action="store_true", |
opars.add_option("--sync-mode", default=False, action="store_true", |
dest="sync_mode", |
dest="sync_mode", |
help="do full sync (remove unmatched fields from db)") |
help="do full sync based on id field (remove unmatched fields from db)") |
opars.add_option("-d", "--debug", default=False, action="store_true", |
opars.add_option("-d", "--debug", default=False, action="store_true", |
dest="debug", |
dest="debug", |
help="debug mode (more output)") |
help="debug mode (more output)") |
Line 238 update_fields = None
|
Line 347 update_fields = None
|
if options.update_fields: |
if options.update_fields: |
update_fields = [string.strip(s) for s in options.update_fields.split(',')] |
update_fields = [string.strip(s) for s in options.update_fields.split(',')] |
|
|
importXMLFileFMP(dsn=options.dsn,table=options.table,filename=options.filename, |
parser = sax.make_parser() |
|
#The "consumer" is our own handler |
|
consumer = xml_handler(dsn=options.dsn,table=options.table, |
update_fields=update_fields,id_field=options.id_field, |
update_fields=update_fields,id_field=options.id_field, |
sync_mode=options.sync_mode) |
sync_mode=options.sync_mode) |
|
#Initialize Tenorsax with handler |
|
handler = saxtools.tenorsax(consumer) |
|
#Resulting tenorsax instance is the SAX handler |
|
parser.setContentHandler(handler) |
|
parser.setFeature(sax.handler.feature_namespaces, 1) |
|
parser.parse(options.filename) |
|
|
|
|
print "DONE!" |
print "DONE!" |