--- a/src/jocondelab/management/commands/import_csv.py Fri Jun 21 17:09:03 2013 +0200
+++ b/src/jocondelab/management/commands/import_csv.py Mon Jun 24 00:38:29 2013 +0200
@@ -7,7 +7,7 @@
from ..utils import show_progress
from core.import_processor import (CharFieldProcessor, DateFieldProcessor,
- BooleanFieldProcessor, TermProcessor)
+ BooleanFieldProcessor, TermProcessor, TrimCharFieldProcessor)
from core.models import (Notice, AutrNoticeTerm, DomnNoticeTerm, EcolNoticeTerm,
EpoqNoticeTerm, LieuxNoticeTerm, PeriNoticeTerm, ReprNoticeTerm)
from core.settings import (AUTR_CONTEXT, DOMN_CONTEXT, ECOL_CONTEXT, EPOQ_CONTEXT,
@@ -24,8 +24,9 @@
logger = logging.getLogger(__name__)
NOTICE_FIELD_PROCESSORS = {
- 'dmaj': DateFieldProcessor('dmaj'),
- 'dmis': DateFieldProcessor('dmis'),
+ 'ref' : TrimCharFieldProcessor('ref'),
+ 'dmaj' : DateFieldProcessor('dmaj'),
+ 'dmis' : DateFieldProcessor('dmis'),
'image': BooleanFieldProcessor('image'),
'autr_terms' : TermProcessor('autr' , AUTR_CONTEXT , AutrNoticeTerm),
'domn_terms' : TermProcessor('domn' , DOMN_CONTEXT , DomnNoticeTerm),
@@ -33,7 +34,7 @@
'epoq_terms' : TermProcessor('epoq' , EPOQ_CONTEXT , EpoqNoticeTerm),
'lieux_terms': TermProcessor('lieux', LIEUX_CONTEXT, LieuxNoticeTerm),
'peri_terms' : TermProcessor('peri' , PERI_CONTEXT , PeriNoticeTerm),
- 'repr_terms' : TermProcessor('repr' , REPR_CONTEXT , ReprNoticeTerm, re_sub = None, re_split = "[;,:\(\)\#]")
+ 'repr_terms' : TermProcessor('repr' , REPR_CONTEXT , ReprNoticeTerm, re_sub = None, re_split = "[\;\,\:\(\)\#]")
}
POST_NOTICE_FIELDS = ['autr_terms','domn_terms','ecol_terms','epoq_terms','lieux_terms','peri_terms','repr_terms']
@@ -61,7 +62,7 @@
make_option('-b', '--batch-size',
dest= 'batch_size',
type='int',
- default= 5000,
+ default= 50,
help= 'number of object to import in bulk operations'
),
make_option('-e', '--encoding',
@@ -106,9 +107,9 @@
filepath = os.path.abspath(args[0])
self.stdout.write("Importing %s" % filepath)
+ self.encoding = options.get('encoding', "latin-1")
- total = 0
- max_lines = options.get('max_lines', sys.maxint)
+ max_lines = options.get('max_lines', sys.maxint)
self.stdout.write("Calculating size")
with open(filepath,'rb') as csv_file:
@@ -118,49 +119,60 @@
dialect.quoting = csv.QUOTE_NONE
csv_file.seek(0)
- for _ in csv.DictReader(csv_file, dialect=dialect):
- total += 1
- if total > max_lines:
+ reader = csv.DictReader(csv_file, dialect=dialect)
+
+ for i,_ in enumerate(reader):
+ if i >= (max_lines-1):
break
-
- self.stdout.write("Importing %d lines" % total)
+
transaction.enter_transaction_management()
transaction.managed()
objects_buffer = {}
- nb_lines = min(max_lines, total)
+ nb_lines = min(max_lines, i+1)
+
+ self.stdout.write("Importing %d lines" % (nb_lines))
batch_size = options.get('batch_size', 5000)
cont_on_error = options.get('cont', True)
with open(filepath,'rb') as csvfile:
- counter = 0
reader = csv.DictReader(csvfile, dialect=dialect, restkey="EXTRA")
+ writer = None
- for row in reader:
+ for i,row in enumerate(reader):
try:
- counter += 1
- show_progress(counter, total, u"Processing line %s" % row['REF'], 50, self.stdout)
+ if i+1 > nb_lines:
+ break
+
+ writer = show_progress(i+1, nb_lines, u"Processing line %s" % (row['REF'].strip()), 50, writer)
+
+ def safe_decode(val, encoding):
+ if val:
+ return val.decode(encoding)
+ else:
+ return val
+
+ row = dict([(safe_decode(key, self.encoding), safe_decode(value, self.encoding)) for key, value in row.items()])
+
notice_obj = Notice()
objects_buffer.setdefault(Notice, []).append(notice_obj)
for k,v in row.items():
- processor = NOTICE_FIELD_PROCESSORS.get(k.lower(), None) #TODO : put default processor
+ processor = NOTICE_FIELD_PROCESSORS.get(k.lower(), DEFAULT_FIELD_PROCESSOR_KLASS(k.lower())) #TODO : put default processor
new_objs = processor.process(notice_obj, v) if processor else None
if new_objs:
objects_buffer.update(new_objs)
- if not (counter%batch_size):
+ if not ((i+1)%batch_size):
for klass, obj_list in objects_buffer.iteritems():
klass.objects.bulk_create(obj_list)
objects_buffer = {}
transaction.commit()
- if counter > nb_lines:
- break
except Exception as e:
- error_msg = "%s - Error treating line %d/%d: id %s : %s\n" % (datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),counter, reader.line_num, row['REF'] if (row and 'REF' in row and row['REF']) else 'n/a', repr(e) )
- logger.exception(error_msg, e)
+ error_msg = "%s - Error treating line %d/%d: id %s : %s\n" % (datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),i+1, reader.line_num, row['REF'] if (row and 'REF' in row and row['REF']) else 'n/a', repr(e) )
+ logger.exception(error_msg)
if not cont_on_error:
raise
@@ -171,36 +183,40 @@
objects_buffer = {}
transaction.commit()
except Exception as e:
- error_msg = "%s - Error treating line %d/%d: %s\n" % (datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),counter, reader.line_num, repr(e) )
- logger.exception(error_msg, e)
+ error_msg = "%s - Error treating line : %s\n" % (datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), repr(e) )
+ logger.exception(error_msg)
+ if not cont_on_error:
+ raise
+
notice_count = Notice.objects.count()
self.stdout.write("Processing %d notices" % notice_count)
-
+
+ writer = None
for i,notice_obj in enumerate(Notice.objects.iterator()):
- show_progress(i+1, notice_count, u"Processing notice %s" % notice_obj.ref, 50, self.stdout)
+ writer = show_progress(i+1, notice_count, u"Processing notice %s" % notice_obj.ref, 50, writer)
for field in POST_NOTICE_FIELDS:
processor = NOTICE_FIELD_PROCESSORS.get(field, DEFAULT_FIELD_PROCESSOR_KLASS(field))
new_objs = processor.process(notice_obj, None) if processor else None
if new_objs:
- objects_buffer.update(new_objs)
-
+ for k,v in new_objs.iteritems():
+ objects_buffer.setdefault(k,[]).extend(v)
if not ((i+1)%batch_size):
- for klass, obj_list in objects_buffer.iteritems():
- klass.objects.bulk_create(obj_list)
+ for _, obj_list in objects_buffer.iteritems():
+ map(lambda o: o.save(), obj_list)
objects_buffer = {}
transaction.commit()
if objects_buffer:
try:
- for klass, obj_list in objects_buffer.iteritems():
- klass.objects.bulk_create(obj_list)
+ for _, obj_list in objects_buffer.iteritems():
+ map(lambda o: o.save(), obj_list)
objects_buffer = {}
transaction.commit()
except Exception as e:
- error_msg = "%s - Error treating line %d/%d: %s\n" % (datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),counter, reader.line_num, repr(e) )
- logger.exception(error_msg, e)
+ error_msg = "%s - Error treating line: %s\n" % (datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), repr(e) )
+ logger.exception(error_msg)
\ No newline at end of file