Annotation of ZSQLExtend/importASCII.py, revision 1.2

1.1       casties     1: #!/usr/local/bin/python
                      2: #
                      3: 
                      4: import string
                      5: import logging
                      6: import sys
                      7: import types
                      8: import time
                      9: 
                     10: try:
                     11:     import psycopg2 as psycopg
                     12:     psyco = 2
                     13: except:
                     14:     import psycopg
                     15:     psyco = 1
                     16: 
                     17: version_string = "V0.1 ROC 4.12.2007"
                     18: 
                     19: 
                     20: def sql_quote(v):
                     21:     # quote dictionary
                     22:     quote_dict = {"\'": "''", "\\": "\\\\"}
                     23:     for dkey in quote_dict.keys():
                     24:         if string.find(v, dkey) >= 0:
                     25:             v=string.join(string.split(v,dkey),quote_dict[dkey])
                     26:     return "'%s'"%v
                     27: 
1.2     ! casties    28: def SimpleSearch(curs,query, args=None, ascii=False, result=True):
1.1       casties    29:     """execute sql query and return data"""
                     30:     #logging.debug("executing: "+query+" "+repr(args))
                     31:     if ascii:
                     32:         # encode all in UTF-8
                     33:         query = query.encode("UTF-8")
                     34:         if args is not None:
                     35:             encargs = []
                     36:             for a in args:
                     37:                 if a is not None:
                     38:                     a = a.encode("UTF-8")
                     39:                 encargs.append(a)
                     40:             
                     41:             args = encargs
                     42: 
                     43:     curs.execute(query, args)
                     44:     #logger.debug("sql done")
1.2     ! casties    45:     if result:
        !            46:         try:
        !            47:             return curs.fetchall()
        !            48:         except:
        !            49:             return None
1.1       casties    50: 
                     51: 
                     52: class TableColumn:
                     53:     """simple type for storing sql column name and type"""
                     54:     
                     55:     def __init__(self, name, type=None):
                     56:         #print "new tablecolumn(%s,%s)"%(name, type)
                     57:         self.name = name
                     58:         self.type = type
                     59:         
                     60:     def getName(self):
                     61:         return self.name
                     62:     
                     63:     def getType(self):
                     64:         if self.type is not None:
                     65:             return self.type
                     66:         else:
                     67:             return "text"
                     68: 
                     69:     def __str__(self):
                     70:         return self.name
                     71:     
                     72:     
                     73: class ASCII_handler:
                     74:     def __init__(self,options):
                     75:         """Handler to import text format file (separated values format) into the table.
                     76:         @param options: dict of options
                     77:         @param options.dsn: database connection string
                     78:         @param options.table: name of the table the xml shall be imported into
                     79:         @param options.filename: xmlfile filename
                     80:         @param options.update_fields: (optional) list of fields to update; default is to create all fields
                     81:         @param options.id_field: (optional) field which uniquely identifies an entry for updating purposes.
                     82:         @param options.sync_mode: (optional) really synchronise, i.e. delete entries not in XML file
                     83:         @param options.ascii_db: (optional) assume ascii encoding in db
                     84:         @param options.replace_table: (optional) delete and re-insert data
                     85:         @param options.backup_table: (optional) create backup of old table (breaks indices)
                     86:         @param options.use_logger_instance: (optional) use this instance of a logger
                     87:         """
                     88:         
                     89:         # set up logger
                     90:         if hasattr(options, 'use_logger_instance'):
                     91:             self.logger = options.use_logger_instance
                     92:         else:
                     93:             self.logger = logging.getLogger('db.import.ascii')
                     94: 
                     95:         
                     96:         # connect database
                     97:         self.dbCon = psycopg.connect(options.dsn)
                     98:         self.db = self.dbCon.cursor()
                     99:         assert self.db, "AIIEE no db cursor for %s!!"%options.dsn
                    100:     
                    101:         self.table = getattr(options,"table",None)
                    102:         self.update_fields = getattr(options,"update_fields",None)
                    103:         self.id_field = getattr(options,"id_field",None)
                    104:         self.sync_mode = getattr(options,"sync_mode",None)
                    105:         self.update_mode = getattr(options,"update_mode",None)
                    106:         self.ascii_db = getattr(options,"ascii_db",None)
                    107:         self.replace_table = getattr(options,"replace_table",None)
                    108:         self.backup_table = getattr(options,"backup_table",None)
                    109: 
                    110:         self.logger.debug("dsn: "+repr(getattr(options,"dsn",None)))
                    111:         self.logger.debug("table: "+repr(self.table))
                    112:         self.logger.debug("update_fields: "+repr(self.update_fields))
                    113:         self.logger.debug("id_field: "+repr(self.id_field))
                    114:         self.logger.debug("sync_mode: "+repr(self.sync_mode))
                    115:         self.logger.debug("update_mode: "+repr(self.update_mode))
                    116:         self.logger.debug("ascii_db: "+repr(self.ascii_db))
                    117:         self.logger.debug("replace_table: "+repr(self.replace_table))
                    118:         self.logger.debug("backup_table: "+repr(self.backup_table))
                    119:         
                    120:         self.dbIDs = {}
                    121:         self.rowcnt = 0
                    122:         
                    123:         if self.id_field is not None:
                    124:             # prepare a list of ids for sync mode
                    125:             qstr="select %s from %s"%(self.id_field,self.table)
                    126:             for id in SimpleSearch(self.db, qstr):
                    127:                 # value 0: not updated
                    128:                 self.dbIDs[id[0]] = 0;
                    129:                 self.rowcnt += 1
                    130:                 
                    131:             self.logger.info("%d entries in DB to sync"%self.rowcnt)
                    132:         
                    133:         # map XML field names to SQL field names
                    134:         self.xml_field_map = {}
                    135:         # and vice versa
                    136:         self.sql_field_map = {}
                    137:         
                    138:         return
                    139: 
                    140: 
                    141:     def setup(self):
                    142:         """initialisation"""
                    143:         # rename table for backup
                    144:         if self.backup_table:
                    145:             self.orig_table = self.table
                    146:             self.table = self.table + "_tmp"
                    147:             # remove old temp table
                    148:             qstr = "DROP TABLE %s"%(self.table)
                    149:             try:
                    150:                 self.db.execute(qstr)
                    151:             except:
                    152:                 pass
                    153:             
                    154:             self.dbCon.commit()
                    155:            
                    156:             if self.id_field:
                    157:                 # sync mode -- copy table
                    158:                 self.logger.info("copy table %s to %s"%(self.orig_table,self.table))
                    159:                 qstr = "CREATE TABLE %s AS (SELECT * FROM %s)"%(self.table,self.orig_table)
                    160: 
                    161:             else:
                    162:                 # rename table and create empty new one
                    163:                 self.logger.info("create empty table %s"%(self.table))
                    164:                 qstr = "CREATE TABLE %s AS (SELECT * FROM %s WHERE 1=0)"%(self.table,self.orig_table)
                    165:             
                    166:             self.db.execute(qstr)
                    167:             self.dbCon.commit()
                    168:         
                    169:         # delete data from table for replace
                    170:         if self.replace_table:
                    171:             self.logger.info("delete data from table %s"%(self.table))
                    172:             qstr = "TRUNCATE TABLE %s"%(self.table)
                    173:             self.db.execute(qstr)
                    174:             self.dbCon.commit()
                    175:            
                    176:         # try to match date style with XML
                    177:         self.db.execute("set datestyle to 'german'")
                    178:         
                    179:         # get list of fields and types of db table
                    180:         qstr="select attname, format_type(pg_attribute.atttypid, pg_attribute.atttypmod) from pg_attribute, pg_class where attrelid = pg_class.oid and pg_attribute.attnum > 0 and relname = '%s'"
                    181:         self.sql_fields={}
                    182:         for f in SimpleSearch(self.db, qstr%self.table):
                    183:             n = f[0]
                    184:             t = f[1]
                    185:             #print "SQL fields: %s (%s)"%(n,t)
                    186:             self.sql_fields[n] = TableColumn(n,t)
                    187:         
                    188:         self.xml_update_list = []
                    189: 
                    190:         # map fields in text file
                    191:         for (k,v) in self.update_fields.items():
                    192:             # map to sql name and default text type
                    193:             self.xml_field_map[k] = v
                    194:             self.sql_field_map[v.getName()] = k
                    195:             # add to list of updateable fields (without id_field)
                    196:             if v.getName() != self.id_field:
                    197:                 self.xml_update_list.append(k)
                    198:         
                    199:         if self.id_field:
                    200:             self.xml_id = self.sql_field_map[self.id_field]
                    201: 
                    202:         # and translate to list of xml fields
                    203:         self.xml_update_list.sort()
                    204:         
                    205:         # prepare sql statements for update
                    206:         setStr=string.join(["%s = %%s"%self.xml_field_map[f].getName() for f in self.xml_update_list], ', ')
                    207:         self.updQuery="UPDATE %s SET %s WHERE %s = %%s"%(self.table,setStr,self.id_field)
                    208:         # and insert
                    209:         fields=string.join([self.xml_field_map[x].getName() for x in self.xml_update_list], ',')
                    210:         values=string.join(['%s' for f in self.xml_update_list], ',')
                    211:         self.addQuery="INSERT INTO %s (%s) VALUES (%s)"%(self.table,fields,values)
                    212:         self.logger.debug("update-query: "+self.updQuery)
                    213:         self.logger.debug("add-query: "+self.addQuery)
                    214:         return
                    215: 
                    216: 
                    217:     def parse(self, filename):
                    218:         """open file and read data"""
                    219:         self.logger.info("reading data...")
                    220:         self.rowcnt = 0
                    221: 
                    222:         fh = open(filename,"r")
                    223:         self.logger.debug("BEGIN RESULTSET")
                    224:         # parse line-wise
                    225:         for line in fh:
                    226:             self.handle_line(line)
                    227: 
                    228:         # done. Wrap up
                    229:         self.logger.debug("END RESULTSET")
                    230:         self.dbCon.commit()
                    231:         
                    232:         if self.sync_mode:
                    233:             # delete unmatched entries in db
                    234:             self.logger.info("deleting unmatched rows from db")
                    235:             delQuery = "DELETE FROM %s WHERE %s = %%s"%(self.table,self.id_field)
                    236:             for id in self.dbIDs.keys():
                    237:                 # find all not-updated fields
                    238:                 if self.dbIDs[id] == 0:
                    239:                     self.logger.info(" delete:"+id)
1.2     ! casties   240:                     SimpleSearch(self.db, delQuery, [id], ascii=self.ascii_db, result=False)
1.1       casties   241:                     sys.exit(1)
                    242:                     
                    243:                 elif self.dbIDs[id] > 1:
                    244:                     self.logger.info(" sync: ID %s used more than once?"%id)
                    245:             
                    246:             self.dbCon.commit()
                    247:             
                    248:         # reinstate backup tables
                    249:         if self.backup_table:
                    250:             backup_name = "%s_%s"%(self.orig_table,time.strftime('%Y_%m_%d_%H_%M_%S'))
                    251:             self.logger.info("rename backup table %s to %s"%(self.orig_table,backup_name))
                    252:             qstr = "ALTER TABLE %s RENAME TO %s"%(self.orig_table,backup_name)
                    253:             self.db.execute(qstr)
                    254:             self.logger.info("rename working table %s to %s"%(self.table,self.orig_table))
                    255:             qstr = "ALTER TABLE %s RENAME TO %s"%(self.table,self.orig_table)
                    256:             self.db.execute(qstr)
                    257:             self.dbCon.commit()
                    258:         
                    259:         return
                    260: 
                    261:     def handle_line(self, line):
                    262:         """process single line of text data"""
                    263:         self.logger.debug("START ROW")
                    264: 
                    265:         content = line.split()
                    266:         self.xml_data = content
                    267:         self.rowcnt += 1
                    268:         # process collected row data
                    269:         update=False
                    270:         id_val=''
                    271:         # synchronize by id_field
                    272:         if self.id_field:
                    273:             id_val = self.xml_data[self.xml_id]
                    274:             if id_val in self.dbIDs:
                    275:                 self.dbIDs[id_val] += 1
                    276:                 update=True
                    277: 
                    278:         # collect all values
                    279:         args = []
                    280:         for fn in self.xml_update_list:
                    281:             f = self.xml_field_map[fn]
                    282:             val = self.xml_data[fn]
                    283:             type = self.sql_fields[f.getName()].getType()
                    284:             if type == "date" and len(val) == 0: 
                    285:                 # empty date field
                    286:                 val = None
                    287:                 
                    288:             elif type == "integer" and len(val) == 0: 
                    289:                 # empty int field
                    290:                 val = None
                    291:                 
                    292:             args.append(val)
                    293:                     
                    294:         if update:
                    295:             # update existing row (by id_field)
                    296:             # last argument is ID match
                    297:             args.append(id_val)
                    298:             self.logger.debug("update: %s = %s"%(id_val, args))
1.2     ! casties   299:             SimpleSearch(self.db, self.updQuery, args, ascii=self.ascii_db, result=False)
1.1       casties   300: 
                    301:         elif not self.update_mode:
                    302:             # create new row
                    303:             self.logger.debug("insert: %s"%args)
1.2     ! casties   304:             SimpleSearch(self.db, self.addQuery, args, ascii=self.ascii_db, result=False)
1.1       casties   305: 
                    306:         #self.logger.info(" row:"+"%d (%s)"%(self.rowcnt,id_val))
                    307:         if (self.rowcnt % 100) == 0:
                    308:             self.logger.info(" row:"+"%d (id:%s)"%(self.rowcnt,id_val))
                    309:             self.dbCon.commit()
                    310:             
                    311:         self.logger.debug("END ROW")
                    312:         return
                    313: 
                    314: 
                    315: 
                    316: def importASCII(options):
                    317:     """import simple text file (separated values) into the table.     
                    318:         @param options: dict of options
                    319:         @param options.dsn: database connection string
                    320:         @param options.table: name of the table the xml shall be imported into
                    321:         @param options.filename: textfile filename
                    322:         @param options.update_fields: list of fields to update
                    323:         @param options.id_field: (optional) field which uniquely identifies an entry for updating purposes.
                    324:         @param options.sync_mode: (optional) really synchronise, i.e. delete entries not in XML file
                    325:         @param options.replace_table: (optional) delete and re-insert data
                    326:         @param options.backup_table: (optional) create backup of old table (breaks indices)
                    327:         """
                    328: 
                    329:     # process list of fields into hash indexed by column number in text file
                    330:     uf = {}
                    331:     i = 0
                    332:     for f in options.update_fields.split(','):
                    333:         if f.find(':') > 0:
                    334:             (n,t) = f.split(':')
                    335:         else:
                    336:             n = f
                    337:             t = None
                    338:         
                    339:         if n:
                    340:             uf[i] = TableColumn(n,t)
                    341: 
                    342:         i += 1
                    343: 
                    344:     options.update_fields = uf
                    345:     
                    346:     if getattr(options,'id_field',None) and getattr(options,'replace_table',None):
                    347:         logging.error("ABORT: sorry, you can't do both sync (id_field) and replace")
                    348:         return
                    349:         
                    350:     if getattr(options,'sync_mode',None) and getattr(options,'update_mode',None):
                    351:         logging.error("ABORT: sorry, you can't do both sync-mode and update-mode")
                    352:         return
                    353:         
                    354:     if not getattr(options,'id_field',None) and getattr(options,'update_mode',None):
                    355:         logging.error("ABORT: sorry, you can't do update-mode without id-field")
                    356:         return
                    357:         
                    358:     # The "parser" is our own handler
                    359:     parser = ASCII_handler(options)
                    360:     # Initialize handler
                    361:     parser.setup()
                    362:     # run the file
                    363:     parser.parse(options.filename)  
                    364:     
                    365: 
                    366: if __name__ == "__main__":
                    367:     from optparse import OptionParser
                    368: 
                    369:     opars = OptionParser()
                    370:     opars.add_option("-f", "--file", 
                    371:                      dest="filename",
                    372:                      help="text file name", metavar="FILE")
                    373:     opars.add_option("-c", "--dsn", 
                    374:                      dest="dsn", 
                    375:                      help="database connection string")
                    376:     opars.add_option("-t", "--table", 
                    377:                      dest="table", 
                    378:                      help="database table name")
                    379:     opars.add_option("--fields",
                    380:                      dest="update_fields", 
                    381:                      help="list of fields in the text file (comma separated, empty fields are not updated, sql-names)", metavar="LIST")
                    382:     opars.add_option("--id-field", default=None,
                    383:                      dest="id_field", 
                    384:                      help="name of id field for synchronisation (only appends data otherwise, sql-name)", metavar="NAME")
                    385:     opars.add_option("--sync", "--sync-mode", default=False, action="store_true", 
                    386:                      dest="sync_mode", 
                    387:                      help="do full sync based on id field (remove unmatched fields from db)")
                    388:     opars.add_option("--update-only", "--update-mode", default=False, action="store_true", 
                    389:                      dest="update_mode",
                    390:                      help="only update existing rows based on id field")
                    391:     opars.add_option("--ascii-db", default=False, action="store_true", 
                    392:                      dest="ascii_db", 
                    393:                      help="the SQL database stores ASCII instead of unicode")
                    394:     opars.add_option("--replace", default=False, action="store_true", 
                    395:                      dest="replace_table", 
                    396:                      help="replace table i.e. delete and re-insert data")
                    397:     opars.add_option("--backup", default=False, action="store_true", 
                    398:                      dest="backup_table", 
                    399:                      help="create backup of old table (breaks indices)")
                    400:     opars.add_option("-d", "--debug", default=False, action="store_true", 
                    401:                      dest="debug", 
                    402:                      help="debug mode (more output)")
                    403:     
                    404:     (options, args) = opars.parse_args()
                    405:     
                    406:     if (options.filename is None 
                    407:         or options.dsn is None 
                    408:         or options.update_fields is None
                    409:         or options.table is None):
                    410:         # not enough parameters
                    411:         print "importASCII "+version_string
                    412:         opars.print_help()
                    413:         sys.exit(1)
                    414:     
                    415:     if options.debug:
                    416:         loglevel = logging.DEBUG
                    417:     else:
                    418:         loglevel = logging.INFO
                    419:     
                    420:     logging.basicConfig(level=loglevel, 
                    421:                         format='%(asctime)s %(levelname)s %(message)s',
                    422:                         datefmt='%H:%M:%S')
                    423: 
                    424:     importASCII(options)
                    425: 
                    426: 
                    427:     
                    428: 

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>