Annotation of ZSQLExtend/importASCII.py, revision 1.1
1.1 ! casties 1: #!/usr/local/bin/python
! 2: #
! 3:
! 4: import string
! 5: import logging
! 6: import sys
! 7: import types
! 8: import time
! 9:
! 10: try:
! 11: import psycopg2 as psycopg
! 12: psyco = 2
! 13: except:
! 14: import psycopg
! 15: psyco = 1
! 16:
! 17: version_string = "V0.1 ROC 4.12.2007"
! 18:
! 19:
! 20: def sql_quote(v):
! 21: # quote dictionary
! 22: quote_dict = {"\'": "''", "\\": "\\\\"}
! 23: for dkey in quote_dict.keys():
! 24: if string.find(v, dkey) >= 0:
! 25: v=string.join(string.split(v,dkey),quote_dict[dkey])
! 26: return "'%s'"%v
! 27:
! 28: def SimpleSearch(curs,query, args=None, ascii=False):
! 29: """execute sql query and return data"""
! 30: #logging.debug("executing: "+query+" "+repr(args))
! 31: if ascii:
! 32: # encode all in UTF-8
! 33: query = query.encode("UTF-8")
! 34: if args is not None:
! 35: encargs = []
! 36: for a in args:
! 37: if a is not None:
! 38: a = a.encode("UTF-8")
! 39: encargs.append(a)
! 40:
! 41: args = encargs
! 42:
! 43: curs.execute(query, args)
! 44: #logger.debug("sql done")
! 45: try:
! 46: return curs.fetchall()
! 47: except:
! 48: return None
! 49:
! 50:
! 51: class TableColumn:
! 52: """simple type for storing sql column name and type"""
! 53:
! 54: def __init__(self, name, type=None):
! 55: #print "new tablecolumn(%s,%s)"%(name, type)
! 56: self.name = name
! 57: self.type = type
! 58:
! 59: def getName(self):
! 60: return self.name
! 61:
! 62: def getType(self):
! 63: if self.type is not None:
! 64: return self.type
! 65: else:
! 66: return "text"
! 67:
! 68: def __str__(self):
! 69: return self.name
! 70:
! 71:
! 72: class ASCII_handler:
! 73: def __init__(self,options):
! 74: """Handler to import text format file (separated values format) into the table.
! 75: @param options: dict of options
! 76: @param options.dsn: database connection string
! 77: @param options.table: name of the table the xml shall be imported into
! 78: @param options.filename: xmlfile filename
! 79: @param options.update_fields: (optional) list of fields to update; default is to create all fields
! 80: @param options.id_field: (optional) field which uniquely identifies an entry for updating purposes.
! 81: @param options.sync_mode: (optional) really synchronise, i.e. delete entries not in XML file
! 82: @param options.ascii_db: (optional) assume ascii encoding in db
! 83: @param options.replace_table: (optional) delete and re-insert data
! 84: @param options.backup_table: (optional) create backup of old table (breaks indices)
! 85: @param options.use_logger_instance: (optional) use this instance of a logger
! 86: """
! 87:
! 88: # set up logger
! 89: if hasattr(options, 'use_logger_instance'):
! 90: self.logger = options.use_logger_instance
! 91: else:
! 92: self.logger = logging.getLogger('db.import.ascii')
! 93:
! 94:
! 95: # connect database
! 96: self.dbCon = psycopg.connect(options.dsn)
! 97: self.db = self.dbCon.cursor()
! 98: assert self.db, "AIIEE no db cursor for %s!!"%options.dsn
! 99:
! 100: self.table = getattr(options,"table",None)
! 101: self.update_fields = getattr(options,"update_fields",None)
! 102: self.id_field = getattr(options,"id_field",None)
! 103: self.sync_mode = getattr(options,"sync_mode",None)
! 104: self.update_mode = getattr(options,"update_mode",None)
! 105: self.ascii_db = getattr(options,"ascii_db",None)
! 106: self.replace_table = getattr(options,"replace_table",None)
! 107: self.backup_table = getattr(options,"backup_table",None)
! 108:
! 109: self.logger.debug("dsn: "+repr(getattr(options,"dsn",None)))
! 110: self.logger.debug("table: "+repr(self.table))
! 111: self.logger.debug("update_fields: "+repr(self.update_fields))
! 112: self.logger.debug("id_field: "+repr(self.id_field))
! 113: self.logger.debug("sync_mode: "+repr(self.sync_mode))
! 114: self.logger.debug("update_mode: "+repr(self.update_mode))
! 115: self.logger.debug("ascii_db: "+repr(self.ascii_db))
! 116: self.logger.debug("replace_table: "+repr(self.replace_table))
! 117: self.logger.debug("backup_table: "+repr(self.backup_table))
! 118:
! 119: self.dbIDs = {}
! 120: self.rowcnt = 0
! 121:
! 122: if self.id_field is not None:
! 123: # prepare a list of ids for sync mode
! 124: qstr="select %s from %s"%(self.id_field,self.table)
! 125: for id in SimpleSearch(self.db, qstr):
! 126: # value 0: not updated
! 127: self.dbIDs[id[0]] = 0;
! 128: self.rowcnt += 1
! 129:
! 130: self.logger.info("%d entries in DB to sync"%self.rowcnt)
! 131:
! 132: # map XML field names to SQL field names
! 133: self.xml_field_map = {}
! 134: # and vice versa
! 135: self.sql_field_map = {}
! 136:
! 137: return
! 138:
! 139:
! 140: def setup(self):
! 141: """initialisation"""
! 142: # rename table for backup
! 143: if self.backup_table:
! 144: self.orig_table = self.table
! 145: self.table = self.table + "_tmp"
! 146: # remove old temp table
! 147: qstr = "DROP TABLE %s"%(self.table)
! 148: try:
! 149: self.db.execute(qstr)
! 150: except:
! 151: pass
! 152:
! 153: self.dbCon.commit()
! 154:
! 155: if self.id_field:
! 156: # sync mode -- copy table
! 157: self.logger.info("copy table %s to %s"%(self.orig_table,self.table))
! 158: qstr = "CREATE TABLE %s AS (SELECT * FROM %s)"%(self.table,self.orig_table)
! 159:
! 160: else:
! 161: # rename table and create empty new one
! 162: self.logger.info("create empty table %s"%(self.table))
! 163: qstr = "CREATE TABLE %s AS (SELECT * FROM %s WHERE 1=0)"%(self.table,self.orig_table)
! 164:
! 165: self.db.execute(qstr)
! 166: self.dbCon.commit()
! 167:
! 168: # delete data from table for replace
! 169: if self.replace_table:
! 170: self.logger.info("delete data from table %s"%(self.table))
! 171: qstr = "TRUNCATE TABLE %s"%(self.table)
! 172: self.db.execute(qstr)
! 173: self.dbCon.commit()
! 174:
! 175: # try to match date style with XML
! 176: self.db.execute("set datestyle to 'german'")
! 177:
! 178: # get list of fields and types of db table
! 179: qstr="select attname, format_type(pg_attribute.atttypid, pg_attribute.atttypmod) from pg_attribute, pg_class where attrelid = pg_class.oid and pg_attribute.attnum > 0 and relname = '%s'"
! 180: self.sql_fields={}
! 181: for f in SimpleSearch(self.db, qstr%self.table):
! 182: n = f[0]
! 183: t = f[1]
! 184: #print "SQL fields: %s (%s)"%(n,t)
! 185: self.sql_fields[n] = TableColumn(n,t)
! 186:
! 187: self.xml_update_list = []
! 188:
! 189: # map fields in text file
! 190: for (k,v) in self.update_fields.items():
! 191: # map to sql name and default text type
! 192: self.xml_field_map[k] = v
! 193: self.sql_field_map[v.getName()] = k
! 194: # add to list of updateable fields (without id_field)
! 195: if v.getName() != self.id_field:
! 196: self.xml_update_list.append(k)
! 197:
! 198: if self.id_field:
! 199: self.xml_id = self.sql_field_map[self.id_field]
! 200:
! 201: # and translate to list of xml fields
! 202: self.xml_update_list.sort()
! 203:
! 204: # prepare sql statements for update
! 205: setStr=string.join(["%s = %%s"%self.xml_field_map[f].getName() for f in self.xml_update_list], ', ')
! 206: self.updQuery="UPDATE %s SET %s WHERE %s = %%s"%(self.table,setStr,self.id_field)
! 207: # and insert
! 208: fields=string.join([self.xml_field_map[x].getName() for x in self.xml_update_list], ',')
! 209: values=string.join(['%s' for f in self.xml_update_list], ',')
! 210: self.addQuery="INSERT INTO %s (%s) VALUES (%s)"%(self.table,fields,values)
! 211: self.logger.debug("update-query: "+self.updQuery)
! 212: self.logger.debug("add-query: "+self.addQuery)
! 213: return
! 214:
! 215:
! 216: def parse(self, filename):
! 217: """open file and read data"""
! 218: self.logger.info("reading data...")
! 219: self.rowcnt = 0
! 220:
! 221: fh = open(filename,"r")
! 222: self.logger.debug("BEGIN RESULTSET")
! 223: # parse line-wise
! 224: for line in fh:
! 225: self.handle_line(line)
! 226:
! 227: # done. Wrap up
! 228: self.logger.debug("END RESULTSET")
! 229: self.dbCon.commit()
! 230:
! 231: if self.sync_mode:
! 232: # delete unmatched entries in db
! 233: self.logger.info("deleting unmatched rows from db")
! 234: delQuery = "DELETE FROM %s WHERE %s = %%s"%(self.table,self.id_field)
! 235: for id in self.dbIDs.keys():
! 236: # find all not-updated fields
! 237: if self.dbIDs[id] == 0:
! 238: self.logger.info(" delete:"+id)
! 239: SimpleSearch(self.db, delQuery, [id], ascii=self.ascii_db)
! 240: sys.exit(1)
! 241:
! 242: elif self.dbIDs[id] > 1:
! 243: self.logger.info(" sync: ID %s used more than once?"%id)
! 244:
! 245: self.dbCon.commit()
! 246:
! 247: # reinstate backup tables
! 248: if self.backup_table:
! 249: backup_name = "%s_%s"%(self.orig_table,time.strftime('%Y_%m_%d_%H_%M_%S'))
! 250: self.logger.info("rename backup table %s to %s"%(self.orig_table,backup_name))
! 251: qstr = "ALTER TABLE %s RENAME TO %s"%(self.orig_table,backup_name)
! 252: self.db.execute(qstr)
! 253: self.logger.info("rename working table %s to %s"%(self.table,self.orig_table))
! 254: qstr = "ALTER TABLE %s RENAME TO %s"%(self.table,self.orig_table)
! 255: self.db.execute(qstr)
! 256: self.dbCon.commit()
! 257:
! 258: return
! 259:
! 260: def handle_line(self, line):
! 261: """process single line of text data"""
! 262: self.logger.debug("START ROW")
! 263:
! 264: content = line.split()
! 265: self.xml_data = content
! 266: self.rowcnt += 1
! 267: # process collected row data
! 268: update=False
! 269: id_val=''
! 270: # synchronize by id_field
! 271: if self.id_field:
! 272: id_val = self.xml_data[self.xml_id]
! 273: if id_val in self.dbIDs:
! 274: self.dbIDs[id_val] += 1
! 275: update=True
! 276:
! 277: # collect all values
! 278: args = []
! 279: for fn in self.xml_update_list:
! 280: f = self.xml_field_map[fn]
! 281: val = self.xml_data[fn]
! 282: type = self.sql_fields[f.getName()].getType()
! 283: if type == "date" and len(val) == 0:
! 284: # empty date field
! 285: val = None
! 286:
! 287: elif type == "integer" and len(val) == 0:
! 288: # empty int field
! 289: val = None
! 290:
! 291: args.append(val)
! 292:
! 293: if update:
! 294: # update existing row (by id_field)
! 295: # last argument is ID match
! 296: args.append(id_val)
! 297: self.logger.debug("update: %s = %s"%(id_val, args))
! 298: SimpleSearch(self.db, self.updQuery, args, ascii=self.ascii_db)
! 299:
! 300: elif not self.update_mode:
! 301: # create new row
! 302: self.logger.debug("insert: %s"%args)
! 303: SimpleSearch(self.db, self.addQuery, args, ascii=self.ascii_db)
! 304:
! 305: #self.logger.info(" row:"+"%d (%s)"%(self.rowcnt,id_val))
! 306: if (self.rowcnt % 100) == 0:
! 307: self.logger.info(" row:"+"%d (id:%s)"%(self.rowcnt,id_val))
! 308: self.dbCon.commit()
! 309:
! 310: self.logger.debug("END ROW")
! 311: return
! 312:
! 313:
! 314:
! 315: def importASCII(options):
! 316: """import simple text file (separated values) into the table.
! 317: @param options: dict of options
! 318: @param options.dsn: database connection string
! 319: @param options.table: name of the table the xml shall be imported into
! 320: @param options.filename: textfile filename
! 321: @param options.update_fields: list of fields to update
! 322: @param options.id_field: (optional) field which uniquely identifies an entry for updating purposes.
! 323: @param options.sync_mode: (optional) really synchronise, i.e. delete entries not in XML file
! 324: @param options.replace_table: (optional) delete and re-insert data
! 325: @param options.backup_table: (optional) create backup of old table (breaks indices)
! 326: """
! 327:
! 328: # process list of fields into hash indexed by column number in text file
! 329: uf = {}
! 330: i = 0
! 331: for f in options.update_fields.split(','):
! 332: if f.find(':') > 0:
! 333: (n,t) = f.split(':')
! 334: else:
! 335: n = f
! 336: t = None
! 337:
! 338: if n:
! 339: uf[i] = TableColumn(n,t)
! 340:
! 341: i += 1
! 342:
! 343: options.update_fields = uf
! 344:
! 345: if getattr(options,'id_field',None) and getattr(options,'replace_table',None):
! 346: logging.error("ABORT: sorry, you can't do both sync (id_field) and replace")
! 347: return
! 348:
! 349: if getattr(options,'sync_mode',None) and getattr(options,'update_mode',None):
! 350: logging.error("ABORT: sorry, you can't do both sync-mode and update-mode")
! 351: return
! 352:
! 353: if not getattr(options,'id_field',None) and getattr(options,'update_mode',None):
! 354: logging.error("ABORT: sorry, you can't do update-mode without id-field")
! 355: return
! 356:
! 357: # The "parser" is our own handler
! 358: parser = ASCII_handler(options)
! 359: # Initialize handler
! 360: parser.setup()
! 361: # run the file
! 362: parser.parse(options.filename)
! 363:
! 364:
! 365: if __name__ == "__main__":
! 366: from optparse import OptionParser
! 367:
! 368: opars = OptionParser()
! 369: opars.add_option("-f", "--file",
! 370: dest="filename",
! 371: help="text file name", metavar="FILE")
! 372: opars.add_option("-c", "--dsn",
! 373: dest="dsn",
! 374: help="database connection string")
! 375: opars.add_option("-t", "--table",
! 376: dest="table",
! 377: help="database table name")
! 378: opars.add_option("--fields",
! 379: dest="update_fields",
! 380: help="list of fields in the text file (comma separated, empty fields are not updated, sql-names)", metavar="LIST")
! 381: opars.add_option("--id-field", default=None,
! 382: dest="id_field",
! 383: help="name of id field for synchronisation (only appends data otherwise, sql-name)", metavar="NAME")
! 384: opars.add_option("--sync", "--sync-mode", default=False, action="store_true",
! 385: dest="sync_mode",
! 386: help="do full sync based on id field (remove unmatched fields from db)")
! 387: opars.add_option("--update-only", "--update-mode", default=False, action="store_true",
! 388: dest="update_mode",
! 389: help="only update existing rows based on id field")
! 390: opars.add_option("--ascii-db", default=False, action="store_true",
! 391: dest="ascii_db",
! 392: help="the SQL database stores ASCII instead of unicode")
! 393: opars.add_option("--replace", default=False, action="store_true",
! 394: dest="replace_table",
! 395: help="replace table i.e. delete and re-insert data")
! 396: opars.add_option("--backup", default=False, action="store_true",
! 397: dest="backup_table",
! 398: help="create backup of old table (breaks indices)")
! 399: opars.add_option("-d", "--debug", default=False, action="store_true",
! 400: dest="debug",
! 401: help="debug mode (more output)")
! 402:
! 403: (options, args) = opars.parse_args()
! 404:
! 405: if (options.filename is None
! 406: or options.dsn is None
! 407: or options.update_fields is None
! 408: or options.table is None):
! 409: # not enough parameters
! 410: print "importASCII "+version_string
! 411: opars.print_help()
! 412: sys.exit(1)
! 413:
! 414: if options.debug:
! 415: loglevel = logging.DEBUG
! 416: else:
! 417: loglevel = logging.INFO
! 418:
! 419: logging.basicConfig(level=loglevel,
! 420: format='%(asctime)s %(levelname)s %(message)s',
! 421: datefmt='%H:%M:%S')
! 422:
! 423: importASCII(options)
! 424:
! 425:
! 426:
! 427:
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>