Annotation of ZSQLExtend/importASCII.py, revision 1.2
1.1 casties 1: #!/usr/local/bin/python
2: #
3:
4: import string
5: import logging
6: import sys
7: import types
8: import time
9:
10: try:
11: import psycopg2 as psycopg
12: psyco = 2
13: except:
14: import psycopg
15: psyco = 1
16:
17: version_string = "V0.1 ROC 4.12.2007"
18:
19:
20: def sql_quote(v):
21: # quote dictionary
22: quote_dict = {"\'": "''", "\\": "\\\\"}
23: for dkey in quote_dict.keys():
24: if string.find(v, dkey) >= 0:
25: v=string.join(string.split(v,dkey),quote_dict[dkey])
26: return "'%s'"%v
27:
1.2 ! casties 28: def SimpleSearch(curs,query, args=None, ascii=False, result=True):
1.1 casties 29: """execute sql query and return data"""
30: #logging.debug("executing: "+query+" "+repr(args))
31: if ascii:
32: # encode all in UTF-8
33: query = query.encode("UTF-8")
34: if args is not None:
35: encargs = []
36: for a in args:
37: if a is not None:
38: a = a.encode("UTF-8")
39: encargs.append(a)
40:
41: args = encargs
42:
43: curs.execute(query, args)
44: #logger.debug("sql done")
1.2 ! casties 45: if result:
! 46: try:
! 47: return curs.fetchall()
! 48: except:
! 49: return None
1.1 casties 50:
51:
52: class TableColumn:
53: """simple type for storing sql column name and type"""
54:
55: def __init__(self, name, type=None):
56: #print "new tablecolumn(%s,%s)"%(name, type)
57: self.name = name
58: self.type = type
59:
60: def getName(self):
61: return self.name
62:
63: def getType(self):
64: if self.type is not None:
65: return self.type
66: else:
67: return "text"
68:
69: def __str__(self):
70: return self.name
71:
72:
73: class ASCII_handler:
74: def __init__(self,options):
75: """Handler to import text format file (separated values format) into the table.
76: @param options: dict of options
77: @param options.dsn: database connection string
78: @param options.table: name of the table the xml shall be imported into
79: @param options.filename: xmlfile filename
80: @param options.update_fields: (optional) list of fields to update; default is to create all fields
81: @param options.id_field: (optional) field which uniquely identifies an entry for updating purposes.
82: @param options.sync_mode: (optional) really synchronise, i.e. delete entries not in XML file
83: @param options.ascii_db: (optional) assume ascii encoding in db
84: @param options.replace_table: (optional) delete and re-insert data
85: @param options.backup_table: (optional) create backup of old table (breaks indices)
86: @param options.use_logger_instance: (optional) use this instance of a logger
87: """
88:
89: # set up logger
90: if hasattr(options, 'use_logger_instance'):
91: self.logger = options.use_logger_instance
92: else:
93: self.logger = logging.getLogger('db.import.ascii')
94:
95:
96: # connect database
97: self.dbCon = psycopg.connect(options.dsn)
98: self.db = self.dbCon.cursor()
99: assert self.db, "AIIEE no db cursor for %s!!"%options.dsn
100:
101: self.table = getattr(options,"table",None)
102: self.update_fields = getattr(options,"update_fields",None)
103: self.id_field = getattr(options,"id_field",None)
104: self.sync_mode = getattr(options,"sync_mode",None)
105: self.update_mode = getattr(options,"update_mode",None)
106: self.ascii_db = getattr(options,"ascii_db",None)
107: self.replace_table = getattr(options,"replace_table",None)
108: self.backup_table = getattr(options,"backup_table",None)
109:
110: self.logger.debug("dsn: "+repr(getattr(options,"dsn",None)))
111: self.logger.debug("table: "+repr(self.table))
112: self.logger.debug("update_fields: "+repr(self.update_fields))
113: self.logger.debug("id_field: "+repr(self.id_field))
114: self.logger.debug("sync_mode: "+repr(self.sync_mode))
115: self.logger.debug("update_mode: "+repr(self.update_mode))
116: self.logger.debug("ascii_db: "+repr(self.ascii_db))
117: self.logger.debug("replace_table: "+repr(self.replace_table))
118: self.logger.debug("backup_table: "+repr(self.backup_table))
119:
120: self.dbIDs = {}
121: self.rowcnt = 0
122:
123: if self.id_field is not None:
124: # prepare a list of ids for sync mode
125: qstr="select %s from %s"%(self.id_field,self.table)
126: for id in SimpleSearch(self.db, qstr):
127: # value 0: not updated
128: self.dbIDs[id[0]] = 0;
129: self.rowcnt += 1
130:
131: self.logger.info("%d entries in DB to sync"%self.rowcnt)
132:
133: # map XML field names to SQL field names
134: self.xml_field_map = {}
135: # and vice versa
136: self.sql_field_map = {}
137:
138: return
139:
140:
141: def setup(self):
142: """initialisation"""
143: # rename table for backup
144: if self.backup_table:
145: self.orig_table = self.table
146: self.table = self.table + "_tmp"
147: # remove old temp table
148: qstr = "DROP TABLE %s"%(self.table)
149: try:
150: self.db.execute(qstr)
151: except:
152: pass
153:
154: self.dbCon.commit()
155:
156: if self.id_field:
157: # sync mode -- copy table
158: self.logger.info("copy table %s to %s"%(self.orig_table,self.table))
159: qstr = "CREATE TABLE %s AS (SELECT * FROM %s)"%(self.table,self.orig_table)
160:
161: else:
162: # rename table and create empty new one
163: self.logger.info("create empty table %s"%(self.table))
164: qstr = "CREATE TABLE %s AS (SELECT * FROM %s WHERE 1=0)"%(self.table,self.orig_table)
165:
166: self.db.execute(qstr)
167: self.dbCon.commit()
168:
169: # delete data from table for replace
170: if self.replace_table:
171: self.logger.info("delete data from table %s"%(self.table))
172: qstr = "TRUNCATE TABLE %s"%(self.table)
173: self.db.execute(qstr)
174: self.dbCon.commit()
175:
176: # try to match date style with XML
177: self.db.execute("set datestyle to 'german'")
178:
179: # get list of fields and types of db table
180: qstr="select attname, format_type(pg_attribute.atttypid, pg_attribute.atttypmod) from pg_attribute, pg_class where attrelid = pg_class.oid and pg_attribute.attnum > 0 and relname = '%s'"
181: self.sql_fields={}
182: for f in SimpleSearch(self.db, qstr%self.table):
183: n = f[0]
184: t = f[1]
185: #print "SQL fields: %s (%s)"%(n,t)
186: self.sql_fields[n] = TableColumn(n,t)
187:
188: self.xml_update_list = []
189:
190: # map fields in text file
191: for (k,v) in self.update_fields.items():
192: # map to sql name and default text type
193: self.xml_field_map[k] = v
194: self.sql_field_map[v.getName()] = k
195: # add to list of updateable fields (without id_field)
196: if v.getName() != self.id_field:
197: self.xml_update_list.append(k)
198:
199: if self.id_field:
200: self.xml_id = self.sql_field_map[self.id_field]
201:
202: # and translate to list of xml fields
203: self.xml_update_list.sort()
204:
205: # prepare sql statements for update
206: setStr=string.join(["%s = %%s"%self.xml_field_map[f].getName() for f in self.xml_update_list], ', ')
207: self.updQuery="UPDATE %s SET %s WHERE %s = %%s"%(self.table,setStr,self.id_field)
208: # and insert
209: fields=string.join([self.xml_field_map[x].getName() for x in self.xml_update_list], ',')
210: values=string.join(['%s' for f in self.xml_update_list], ',')
211: self.addQuery="INSERT INTO %s (%s) VALUES (%s)"%(self.table,fields,values)
212: self.logger.debug("update-query: "+self.updQuery)
213: self.logger.debug("add-query: "+self.addQuery)
214: return
215:
216:
217: def parse(self, filename):
218: """open file and read data"""
219: self.logger.info("reading data...")
220: self.rowcnt = 0
221:
222: fh = open(filename,"r")
223: self.logger.debug("BEGIN RESULTSET")
224: # parse line-wise
225: for line in fh:
226: self.handle_line(line)
227:
228: # done. Wrap up
229: self.logger.debug("END RESULTSET")
230: self.dbCon.commit()
231:
232: if self.sync_mode:
233: # delete unmatched entries in db
234: self.logger.info("deleting unmatched rows from db")
235: delQuery = "DELETE FROM %s WHERE %s = %%s"%(self.table,self.id_field)
236: for id in self.dbIDs.keys():
237: # find all not-updated fields
238: if self.dbIDs[id] == 0:
239: self.logger.info(" delete:"+id)
1.2 ! casties 240: SimpleSearch(self.db, delQuery, [id], ascii=self.ascii_db, result=False)
1.1 casties 241: sys.exit(1)
242:
243: elif self.dbIDs[id] > 1:
244: self.logger.info(" sync: ID %s used more than once?"%id)
245:
246: self.dbCon.commit()
247:
248: # reinstate backup tables
249: if self.backup_table:
250: backup_name = "%s_%s"%(self.orig_table,time.strftime('%Y_%m_%d_%H_%M_%S'))
251: self.logger.info("rename backup table %s to %s"%(self.orig_table,backup_name))
252: qstr = "ALTER TABLE %s RENAME TO %s"%(self.orig_table,backup_name)
253: self.db.execute(qstr)
254: self.logger.info("rename working table %s to %s"%(self.table,self.orig_table))
255: qstr = "ALTER TABLE %s RENAME TO %s"%(self.table,self.orig_table)
256: self.db.execute(qstr)
257: self.dbCon.commit()
258:
259: return
260:
261: def handle_line(self, line):
262: """process single line of text data"""
263: self.logger.debug("START ROW")
264:
265: content = line.split()
266: self.xml_data = content
267: self.rowcnt += 1
268: # process collected row data
269: update=False
270: id_val=''
271: # synchronize by id_field
272: if self.id_field:
273: id_val = self.xml_data[self.xml_id]
274: if id_val in self.dbIDs:
275: self.dbIDs[id_val] += 1
276: update=True
277:
278: # collect all values
279: args = []
280: for fn in self.xml_update_list:
281: f = self.xml_field_map[fn]
282: val = self.xml_data[fn]
283: type = self.sql_fields[f.getName()].getType()
284: if type == "date" and len(val) == 0:
285: # empty date field
286: val = None
287:
288: elif type == "integer" and len(val) == 0:
289: # empty int field
290: val = None
291:
292: args.append(val)
293:
294: if update:
295: # update existing row (by id_field)
296: # last argument is ID match
297: args.append(id_val)
298: self.logger.debug("update: %s = %s"%(id_val, args))
1.2 ! casties 299: SimpleSearch(self.db, self.updQuery, args, ascii=self.ascii_db, result=False)
1.1 casties 300:
301: elif not self.update_mode:
302: # create new row
303: self.logger.debug("insert: %s"%args)
1.2 ! casties 304: SimpleSearch(self.db, self.addQuery, args, ascii=self.ascii_db, result=False)
1.1 casties 305:
306: #self.logger.info(" row:"+"%d (%s)"%(self.rowcnt,id_val))
307: if (self.rowcnt % 100) == 0:
308: self.logger.info(" row:"+"%d (id:%s)"%(self.rowcnt,id_val))
309: self.dbCon.commit()
310:
311: self.logger.debug("END ROW")
312: return
313:
314:
315:
316: def importASCII(options):
317: """import simple text file (separated values) into the table.
318: @param options: dict of options
319: @param options.dsn: database connection string
320: @param options.table: name of the table the xml shall be imported into
321: @param options.filename: textfile filename
322: @param options.update_fields: list of fields to update
323: @param options.id_field: (optional) field which uniquely identifies an entry for updating purposes.
324: @param options.sync_mode: (optional) really synchronise, i.e. delete entries not in XML file
325: @param options.replace_table: (optional) delete and re-insert data
326: @param options.backup_table: (optional) create backup of old table (breaks indices)
327: """
328:
329: # process list of fields into hash indexed by column number in text file
330: uf = {}
331: i = 0
332: for f in options.update_fields.split(','):
333: if f.find(':') > 0:
334: (n,t) = f.split(':')
335: else:
336: n = f
337: t = None
338:
339: if n:
340: uf[i] = TableColumn(n,t)
341:
342: i += 1
343:
344: options.update_fields = uf
345:
346: if getattr(options,'id_field',None) and getattr(options,'replace_table',None):
347: logging.error("ABORT: sorry, you can't do both sync (id_field) and replace")
348: return
349:
350: if getattr(options,'sync_mode',None) and getattr(options,'update_mode',None):
351: logging.error("ABORT: sorry, you can't do both sync-mode and update-mode")
352: return
353:
354: if not getattr(options,'id_field',None) and getattr(options,'update_mode',None):
355: logging.error("ABORT: sorry, you can't do update-mode without id-field")
356: return
357:
358: # The "parser" is our own handler
359: parser = ASCII_handler(options)
360: # Initialize handler
361: parser.setup()
362: # run the file
363: parser.parse(options.filename)
364:
365:
366: if __name__ == "__main__":
367: from optparse import OptionParser
368:
369: opars = OptionParser()
370: opars.add_option("-f", "--file",
371: dest="filename",
372: help="text file name", metavar="FILE")
373: opars.add_option("-c", "--dsn",
374: dest="dsn",
375: help="database connection string")
376: opars.add_option("-t", "--table",
377: dest="table",
378: help="database table name")
379: opars.add_option("--fields",
380: dest="update_fields",
381: help="list of fields in the text file (comma separated, empty fields are not updated, sql-names)", metavar="LIST")
382: opars.add_option("--id-field", default=None,
383: dest="id_field",
384: help="name of id field for synchronisation (only appends data otherwise, sql-name)", metavar="NAME")
385: opars.add_option("--sync", "--sync-mode", default=False, action="store_true",
386: dest="sync_mode",
387: help="do full sync based on id field (remove unmatched fields from db)")
388: opars.add_option("--update-only", "--update-mode", default=False, action="store_true",
389: dest="update_mode",
390: help="only update existing rows based on id field")
391: opars.add_option("--ascii-db", default=False, action="store_true",
392: dest="ascii_db",
393: help="the SQL database stores ASCII instead of unicode")
394: opars.add_option("--replace", default=False, action="store_true",
395: dest="replace_table",
396: help="replace table i.e. delete and re-insert data")
397: opars.add_option("--backup", default=False, action="store_true",
398: dest="backup_table",
399: help="create backup of old table (breaks indices)")
400: opars.add_option("-d", "--debug", default=False, action="store_true",
401: dest="debug",
402: help="debug mode (more output)")
403:
404: (options, args) = opars.parse_args()
405:
406: if (options.filename is None
407: or options.dsn is None
408: or options.update_fields is None
409: or options.table is None):
410: # not enough parameters
411: print "importASCII "+version_string
412: opars.print_help()
413: sys.exit(1)
414:
415: if options.debug:
416: loglevel = logging.DEBUG
417: else:
418: loglevel = logging.INFO
419:
420: logging.basicConfig(level=loglevel,
421: format='%(asctime)s %(levelname)s %(message)s',
422: datefmt='%H:%M:%S')
423:
424: importASCII(options)
425:
426:
427:
428:
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>