src/jocondelab/management/commands/import_csv.py
author ymh <ymh.work@gmail.com>
Tue, 25 Jun 2013 00:00:03 +0200
changeset 33 61c3ffd94f11
parent 28 5918a9d353d0
child 34 b1fd0e0197c8
permissions -rw-r--r--
- correct imports. - optimize import csv - add srep thesaurus

# -*- coding: utf-8 -*-
'''
Created on Jun 10, 2013

@author: ymh
'''

from ..utils import show_progress
from core.import_processor import (CharFieldProcessor, DateFieldProcessor, 
    BooleanFieldProcessor, TermProcessor, TrimCharFieldProcessor)
from core.models import (Notice, AutrNoticeTerm, DomnNoticeTerm, EcolNoticeTerm, 
    EpoqNoticeTerm, LieuxNoticeTerm, PeriNoticeTerm, ReprNoticeTerm)
from core.settings import (AUTR_CONTEXT, DOMN_CONTEXT, ECOL_CONTEXT, EPOQ_CONTEXT, 
    LIEUX_CONTEXT, PERI_CONTEXT, REPR_CONTEXT, SREP_CONTEXT)
from django.core.management import BaseCommand
from django.db import transaction
from optparse import make_option
import csv
import datetime
import logging
import os.path
import sys
from core.models.notice import SrepNoticeTerm

logger = logging.getLogger(__name__)

NOTICE_FIELD_PROCESSORS = {
    'ref'  : TrimCharFieldProcessor('ref'),
    'dmaj' : DateFieldProcessor('dmaj'),
    'dmis' : DateFieldProcessor('dmis'),
    'image': BooleanFieldProcessor('image'),
    'autr_terms' : TermProcessor('autr' , AUTR_CONTEXT , AutrNoticeTerm),
    'domn_terms' : TermProcessor('domn' , DOMN_CONTEXT , DomnNoticeTerm),
    'ecol_terms' : TermProcessor('ecol' , ECOL_CONTEXT , EcolNoticeTerm),
    'epoq_terms' : TermProcessor('epoq' , EPOQ_CONTEXT , EpoqNoticeTerm),
    'lieux_terms': TermProcessor('lieux', LIEUX_CONTEXT, LieuxNoticeTerm),
    'peri_terms' : TermProcessor('peri' , PERI_CONTEXT , PeriNoticeTerm),    
    'repr_terms' : TermProcessor('repr' , REPR_CONTEXT , ReprNoticeTerm, re_sub = None, re_split = "[\;\,\:\(\)\#]"),
    'srep_terms' : TermProcessor('srep' , SREP_CONTEXT , SrepNoticeTerm, re_sub = None, re_split = "[\;\,\:\(\)\#]"),
}

POST_NOTICE_FIELDS = ['autr_terms','domn_terms','ecol_terms','epoq_terms','lieux_terms','peri_terms','repr_terms', 'srep_terms']
DEFAULT_FIELD_PROCESSOR_KLASS = CharFieldProcessor

class Command(BaseCommand):

    args = "csv_file"
    
    help = "Import Mistral csv file"

    option_list = BaseCommand.option_list + (
        make_option('--check-id',
            action= 'store_true',
            dest= 'check_id',
            default= False,
            help= 'check a notice id before trying to insert it, may be a lot slower' 
        ),
        make_option('-n', '--max-lines',
            dest= 'max_lines',
            type='int',
            default= sys.maxint,
            help= 'max number of line to process, -1 process all file' 
        ),
        make_option('-b', '--batch-size',
            dest= 'batch_size',
            type='int',
            default= 50,
            help= 'number of object to import in bulk operations' 
        ),
        make_option('-e', '--encoding',
            dest= 'encoding',
            default= 'latin1',
            help= 'csv files encoding' 
        ),
        make_option('--skip',
            dest= 'skip',
            type='int',
            default= 0,
            help= 'number of entry to skip' 
        ),
        make_option('--stop',
            dest= 'cont',
            action= 'store_false',
            default= True,
            help= 'stop on error' 
        ),
    )

    def __safe_get(self, dict_arg, key, conv = lambda x: x, default= None):
        val = dict_arg.get(key, default)
        return conv(val) if val else default

    def __safe_decode(self, s):
        if not isinstance(s, basestring):
            return s
        try:
            return s.decode('utf8')
        except:
            try:
                return s.decode('latin1')
            except:
                return s.decode('utf8','replace')


    def __init__(self):
        super(Command, self).__init__()
        
    def handle(self, *args, **options):
        
        filepath = os.path.abspath(args[0])
        self.stdout.write("Importing %s" % filepath)
        self.encoding = options.get('encoding', "latin-1")
        
        max_lines = options.get('max_lines', sys.maxint)        
        
        self.stdout.write("Calculating size")
        with open(filepath,'rb') as csv_file:
            dialect = csv.Sniffer().sniff(csv_file.read(1024))
            dialect.doublequote = True
            dialect.delimiter = '\t'
            dialect.quoting = csv.QUOTE_NONE
            csv_file.seek(0)
            
            reader = csv.DictReader(csv_file, dialect=dialect)
            
            for i,_ in enumerate(reader):
                if i >= (max_lines-1):
                    break
                
        
        transaction.enter_transaction_management()
        transaction.managed()
        
        objects_buffer = {}
        nb_lines = min(max_lines, i+1)
        
        self.stdout.write("Importing %d lines" % (nb_lines))
        batch_size = options.get('batch_size', 5000)
        cont_on_error = options.get('cont', True)
        
        with open(filepath,'rb') as csvfile:
            reader = csv.DictReader(csvfile, dialect=dialect, restkey="EXTRA")
            writer = None
            
            for i,row in enumerate(reader):
                try:
                    if i+1 > nb_lines:
                        break
                    
                    writer = show_progress(i+1, nb_lines, u"Processing line %s" % (row['REF'].strip()), 50, writer)
                    
                    def safe_decode(val, encoding):
                        if val:
                            return val.decode(encoding)
                        else:
                            return val
                                                        
                    row = dict([(safe_decode(key, self.encoding), safe_decode(value, self.encoding)) for key, value in row.items()])

                    notice_obj = Notice()
                    objects_buffer.setdefault(Notice, []).append(notice_obj)
                    
                    for k,v in row.items():
                        processor = NOTICE_FIELD_PROCESSORS.get(k.lower(), DEFAULT_FIELD_PROCESSOR_KLASS(k.lower())) #TODO : put default processor
                        new_objs = processor.process(notice_obj, v) if processor else None
                        if new_objs:
                            objects_buffer.update(new_objs)
                    
                    if not ((i+1)%batch_size):
                        for klass, obj_list in objects_buffer.iteritems():
                            klass.objects.bulk_create(obj_list)
                        objects_buffer = {}
                        transaction.commit()
                    
                except Exception as e:
                    error_msg = "%s - Error treating line %d/%d: id %s : %s\n" % (datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),i+1, reader.line_num, row['REF'] if (row and 'REF' in row and row['REF']) else 'n/a', repr(e) )
                    logger.exception(error_msg)
                    if not cont_on_error:
                        raise
                    
        if objects_buffer:
            try:
                for klass, obj_list in objects_buffer.iteritems():
                    klass.objects.bulk_create(obj_list)
                objects_buffer = {}
                transaction.commit()
            except Exception as e:
                error_msg = "%s - Error treating line : %s\n" % (datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), repr(e) )
                logger.exception(error_msg)
                if not cont_on_error:
                    raise

        
        notice_count = Notice.objects.count()
        
        self.stdout.write("Processing %d notices" % notice_count)

        writer = None        
        for i,notice_obj in enumerate(Notice.objects.iterator()):
            writer = show_progress(i+1, notice_count, u"Processing notice %s" % notice_obj.ref, 50, writer)
            for field in POST_NOTICE_FIELDS:
                processor = NOTICE_FIELD_PROCESSORS.get(field, DEFAULT_FIELD_PROCESSOR_KLASS(field))
                new_objs = processor.process(notice_obj, None) if processor else None
                if new_objs:
                    for k,v in new_objs.iteritems():                        
                        objects_buffer.setdefault(k,[]).extend(v)
            if not ((i+1)%batch_size):
                for _, obj_list in objects_buffer.iteritems():
                    map(lambda o: o.save(), obj_list)
                objects_buffer = {}
                transaction.commit()

        if objects_buffer:
            try:
                for _, obj_list in objects_buffer.iteritems():
                    map(lambda o: o.save(), obj_list)
                objects_buffer = {}
                transaction.commit()
            except Exception as e:
                error_msg = "%s - Error treating line: %s\n" % (datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), repr(e) )
                logger.exception(error_msg)