Correction in importing csv.
authorymh <ymh.work@gmail.com>
Mon, 24 Jun 2013 00:38:29 +0200
changeset 28 5918a9d353d0
parent 27 d5f4dde387a9
child 29 6de87539985e
Correction in importing csv.
.settings/org.eclipse.core.resources.prefs
src/core/import_processor.py
src/core/rdf_models.py
src/jocondelab/management/commands/import_csv.py
src/jocondelab/management/commands/import_skos.py
src/jocondelab/management/commands/import_term_labels.py
src/jocondelab/management/commands/import_terms.py
--- a/.settings/org.eclipse.core.resources.prefs	Fri Jun 21 17:09:03 2013 +0200
+++ b/.settings/org.eclipse.core.resources.prefs	Mon Jun 24 00:38:29 2013 +0200
@@ -11,8 +11,10 @@
 encoding//src/jocondelab/forms.py=utf-8
 encoding//src/jocondelab/management/commands/import_csv.py=utf-8
 encoding//src/jocondelab/management/commands/import_skos.py=utf-8
+encoding//src/jocondelab/management/commands/import_term_labels.py=utf-8
 encoding//src/jocondelab/management/commands/import_terms.py=utf-8
 encoding//src/jocondelab/management/utils.py=utf-8
 encoding//src/jocondelab/models.py=utf-8
 encoding//src/jocondelab/utils.py=utf-8
 encoding//src/jocondelab/views.py=utf-8
+encoding//virtualenv/web/env/venv_jocondelab/lib/python2.7/site-packages/rdflib_sqlalchemy/__init__.py=utf-8
--- a/src/core/import_processor.py	Fri Jun 21 17:09:03 2013 +0200
+++ b/src/core/import_processor.py	Mon Jun 24 00:38:29 2013 +0200
@@ -24,6 +24,11 @@
         setattr(obj, self.field, value)
         return {}
 
+class TrimCharFieldProcessor(CharFieldProcessor):
+    
+    def process(self, obj, value):
+        return super(TrimCharFieldProcessor, self).process(obj, value.strip())
+
 class BooleanFieldProcessor(ImportProcessor):
     
     def process(self, obj, value):
@@ -32,14 +37,14 @@
 class DateFieldProcessor(ImportProcessor):
     
     def process(self, obj, value):
-        setattr(obj, self.field, parser.parse(value))
+        setattr(obj, self.field, parser.parse(value) if value else None)
 
 class TermProcessor(ImportProcessor):
     
-    def __init__(self, field, context, notice_term_klass, re_split = "[;,:]", re_sub = "\(.+?\)"):
+    def __init__(self, field, context, notice_term_klass, re_split = r"[\;\,\:\(\)]", re_sub = "\(.+?\)"):
         ImportProcessor.__init__(self, field)
-        self.re_split = re_split
-        self.re_sub = re_sub
+        self.re_split = re.compile(re_split)
+        self.re_sub = re.compile(re_sub) if re_sub else None
         self.context = context
         self.notice_term_klass = notice_term_klass
     
@@ -56,11 +61,10 @@
         #remove everything between ()
         value = getattr(obj, self.field)
         if self.re_sub:
-            value = re.sub(self.re_sub, "", value)
-        for token in re.split(self.re_split, value):
+            value = self.re_sub.sub("", value)
+        for token in self.re_split.split(value):
             token = token.strip()
             nt = self.build_notice_term(token, obj)
             if nt is not None:
                 res.setdefault(self.notice_term_klass,[]).append(nt)
-                
         return res
--- a/src/core/rdf_models.py	Fri Jun 21 17:09:03 2013 +0200
+++ b/src/core/rdf_models.py	Mon Jun 24 00:38:29 2013 +0200
@@ -33,7 +33,7 @@
         return ConjunctiveGraph.open(self, connect_config, create=create)
 
     def get_uri_for_term(self, term, context):
-        c = URIRef(context)
+        c = self.get_context(URIRef(context))
         tl = Literal(term)
         
         for s,p,_ in self.triples((None, None, tl), context=c):
--- a/src/jocondelab/management/commands/import_csv.py	Fri Jun 21 17:09:03 2013 +0200
+++ b/src/jocondelab/management/commands/import_csv.py	Mon Jun 24 00:38:29 2013 +0200
@@ -7,7 +7,7 @@
 
 from ..utils import show_progress
 from core.import_processor import (CharFieldProcessor, DateFieldProcessor, 
-    BooleanFieldProcessor, TermProcessor)
+    BooleanFieldProcessor, TermProcessor, TrimCharFieldProcessor)
 from core.models import (Notice, AutrNoticeTerm, DomnNoticeTerm, EcolNoticeTerm, 
     EpoqNoticeTerm, LieuxNoticeTerm, PeriNoticeTerm, ReprNoticeTerm)
 from core.settings import (AUTR_CONTEXT, DOMN_CONTEXT, ECOL_CONTEXT, EPOQ_CONTEXT, 
@@ -24,8 +24,9 @@
 logger = logging.getLogger(__name__)
 
 NOTICE_FIELD_PROCESSORS = {
-    'dmaj': DateFieldProcessor('dmaj'),
-    'dmis': DateFieldProcessor('dmis'),
+    'ref'  : TrimCharFieldProcessor('ref'),
+    'dmaj' : DateFieldProcessor('dmaj'),
+    'dmis' : DateFieldProcessor('dmis'),
     'image': BooleanFieldProcessor('image'),
     'autr_terms' : TermProcessor('autr' , AUTR_CONTEXT , AutrNoticeTerm),
     'domn_terms' : TermProcessor('domn' , DOMN_CONTEXT , DomnNoticeTerm),
@@ -33,7 +34,7 @@
     'epoq_terms' : TermProcessor('epoq' , EPOQ_CONTEXT , EpoqNoticeTerm),
     'lieux_terms': TermProcessor('lieux', LIEUX_CONTEXT, LieuxNoticeTerm),
     'peri_terms' : TermProcessor('peri' , PERI_CONTEXT , PeriNoticeTerm),
-    'repr_terms' : TermProcessor('repr' , REPR_CONTEXT , ReprNoticeTerm, re_sub = None, re_split = "[;,:\(\)\#]")
+    'repr_terms' : TermProcessor('repr' , REPR_CONTEXT , ReprNoticeTerm, re_sub = None, re_split = "[\;\,\:\(\)\#]")
 }
 
 POST_NOTICE_FIELDS = ['autr_terms','domn_terms','ecol_terms','epoq_terms','lieux_terms','peri_terms','repr_terms']
@@ -61,7 +62,7 @@
         make_option('-b', '--batch-size',
             dest= 'batch_size',
             type='int',
-            default= 5000,
+            default= 50,
             help= 'number of object to import in bulk operations' 
         ),
         make_option('-e', '--encoding',
@@ -106,9 +107,9 @@
         
         filepath = os.path.abspath(args[0])
         self.stdout.write("Importing %s" % filepath)
+        self.encoding = options.get('encoding', "latin-1")
         
-        total = 0
-        max_lines = options.get('max_lines', sys.maxint)
+        max_lines = options.get('max_lines', sys.maxint)        
         
         self.stdout.write("Calculating size")
         with open(filepath,'rb') as csv_file:
@@ -118,49 +119,60 @@
             dialect.quoting = csv.QUOTE_NONE
             csv_file.seek(0)
             
-            for _ in csv.DictReader(csv_file, dialect=dialect):
-                total += 1
-                if total > max_lines:
+            reader = csv.DictReader(csv_file, dialect=dialect)
+            
+            for i,_ in enumerate(reader):
+                if i >= (max_lines-1):
                     break
-        
-        self.stdout.write("Importing %d lines" % total)
+                
         
         transaction.enter_transaction_management()
         transaction.managed()
         
         objects_buffer = {}
-        nb_lines = min(max_lines, total)
+        nb_lines = min(max_lines, i+1)
+        
+        self.stdout.write("Importing %d lines" % (nb_lines))
         batch_size = options.get('batch_size', 5000)
         cont_on_error = options.get('cont', True)
         
         with open(filepath,'rb') as csvfile:
-            counter = 0
             reader = csv.DictReader(csvfile, dialect=dialect, restkey="EXTRA")
+            writer = None
             
-            for row in reader:
+            for i,row in enumerate(reader):
                 try:
-                    counter += 1
-                    show_progress(counter, total, u"Processing line %s" % row['REF'], 50, self.stdout)
+                    if i+1 > nb_lines:
+                        break
+                    
+                    writer = show_progress(i+1, nb_lines, u"Processing line %s" % (row['REF'].strip()), 50, writer)
+                    
+                    def safe_decode(val, encoding):
+                        if val:
+                            return val.decode(encoding)
+                        else:
+                            return val
+                                                        
+                    row = dict([(safe_decode(key, self.encoding), safe_decode(value, self.encoding)) for key, value in row.items()])
+
                     notice_obj = Notice()
                     objects_buffer.setdefault(Notice, []).append(notice_obj)
                     
                     for k,v in row.items():
-                        processor = NOTICE_FIELD_PROCESSORS.get(k.lower(), None) #TODO : put default processor
+                        processor = NOTICE_FIELD_PROCESSORS.get(k.lower(), DEFAULT_FIELD_PROCESSOR_KLASS(k.lower())) #TODO : put default processor
                         new_objs = processor.process(notice_obj, v) if processor else None
                         if new_objs:
                             objects_buffer.update(new_objs)
                     
-                    if not (counter%batch_size):
+                    if not ((i+1)%batch_size):
                         for klass, obj_list in objects_buffer.iteritems():
                             klass.objects.bulk_create(obj_list)
                         objects_buffer = {}
                         transaction.commit()
                     
-                    if counter > nb_lines:
-                        break
                 except Exception as e:
-                    error_msg = "%s - Error treating line %d/%d: id %s : %s\n" % (datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),counter, reader.line_num, row['REF'] if (row and 'REF' in row and row['REF']) else 'n/a', repr(e) )
-                    logger.exception(error_msg, e)
+                    error_msg = "%s - Error treating line %d/%d: id %s : %s\n" % (datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),i+1, reader.line_num, row['REF'] if (row and 'REF' in row and row['REF']) else 'n/a', repr(e) )
+                    logger.exception(error_msg)
                     if not cont_on_error:
                         raise
                     
@@ -171,36 +183,40 @@
                 objects_buffer = {}
                 transaction.commit()
             except Exception as e:
-                error_msg = "%s - Error treating line %d/%d: %s\n" % (datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),counter, reader.line_num, repr(e) )
-                logger.exception(error_msg, e)
+                error_msg = "%s - Error treating line : %s\n" % (datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), repr(e) )
+                logger.exception(error_msg)
+                if not cont_on_error:
+                    raise
+
         
         notice_count = Notice.objects.count()
         
         self.stdout.write("Processing %d notices" % notice_count)
-        
+
+        writer = None        
         for i,notice_obj in enumerate(Notice.objects.iterator()):
-            show_progress(i+1, notice_count, u"Processing notice %s" % notice_obj.ref, 50, self.stdout)
+            writer = show_progress(i+1, notice_count, u"Processing notice %s" % notice_obj.ref, 50, writer)
             for field in POST_NOTICE_FIELDS:
                 processor = NOTICE_FIELD_PROCESSORS.get(field, DEFAULT_FIELD_PROCESSOR_KLASS(field))
                 new_objs = processor.process(notice_obj, None) if processor else None
                 if new_objs:
-                    objects_buffer.update(new_objs)
-                    
+                    for k,v in new_objs.iteritems():                        
+                        objects_buffer.setdefault(k,[]).extend(v)
             if not ((i+1)%batch_size):
-                for klass, obj_list in objects_buffer.iteritems():
-                    klass.objects.bulk_create(obj_list)
+                for _, obj_list in objects_buffer.iteritems():
+                    map(lambda o: o.save(), obj_list)
                 objects_buffer = {}
                 transaction.commit()
 
         if objects_buffer:
             try:
-                for klass, obj_list in objects_buffer.iteritems():
-                    klass.objects.bulk_create(obj_list)
+                for _, obj_list in objects_buffer.iteritems():
+                    map(lambda o: o.save(), obj_list)
                 objects_buffer = {}
                 transaction.commit()
             except Exception as e:
-                error_msg = "%s - Error treating line %d/%d: %s\n" % (datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),counter, reader.line_num, repr(e) )
-                logger.exception(error_msg, e)
+                error_msg = "%s - Error treating line: %s\n" % (datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), repr(e) )
+                logger.exception(error_msg)
 
         
         
\ No newline at end of file
--- a/src/jocondelab/management/commands/import_skos.py	Fri Jun 21 17:09:03 2013 +0200
+++ b/src/jocondelab/management/commands/import_skos.py	Mon Jun 24 00:38:29 2013 +0200
@@ -50,9 +50,27 @@
             self.stdout.write("graph size %d" % len(self.graph))
             self.graph.commit()
 
-        self.graph.close()
+        self.graph.close()        
         self.store = plugin.get("SQLAlchemy", Store)(identifier=self.ident)
         self.graph = ConjunctiveGraph(self.store, identifier=self.ident)
         self.graph.open(self.connect_config, create=False)
+        
+        self.stdout.write("correct alt labels")
+        litteral_statements = self.store.tables['literal_statements']
+        with self.store.engine.connect() as connection:
+            q = litteral_statements.select().where(litteral_statements.c.predicate == "http://www.w3.org/2004/02/skos/core#altLabel")
+            for row in connection.execute(q):
+                if row['object'] and row['object'] != row['object'].strip():                  
+                    u_q = litteral_statements.update().where(
+                        litteral_statements.c.subject == row['subject'] and
+                        litteral_statements.c.predicate == row['predicate'] and
+                        litteral_statements.c.object == row['object'] and
+                        litteral_statements.c.context == row['context'] and
+                        litteral_statements.c.termComb == row['termcomb'] and
+                        litteral_statements.c.objLanguage == row['objlanguage'] and
+                        litteral_statements.c.objDatatype == row['objdatatype']
+                        ).values(object = row['object'].strip() )
+                    connection.execute(u_q)
+
         self.stdout.write("graph size %d" % len(self.graph))
         self.stdout.write("graph contexts %s" % repr([g for g in self.graph.contexts()]))
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/jocondelab/management/commands/import_term_labels.py	Mon Jun 24 00:38:29 2013 +0200
@@ -0,0 +1,69 @@
+# -*- coding: utf-8 -*-
+'''
+Created on Jun 11, 2013
+
+@author: ymh
+'''
+
+from core.models import Term, Thesaurus
+from core.models.term import TermLabel
+from core.rdf_models import graph
+from django.core.management.base import NoArgsCommand
+from django.db import transaction
+from optparse import make_option
+from rdflib.term import URIRef
+
+
+class Command(NoArgsCommand):
+
+    help = "Import graph terms in relational database, to be run after import_skos"
+    
+    option_list = NoArgsCommand.option_list + (
+        make_option('-b', '--batch-size',
+            dest= 'batch_size',
+            type='int',
+            default= 50,
+            help= 'number of object to import in bulk operations' 
+        ),                                               
+    )
+
+    
+    def handle_noargs(self, **options):
+        
+        batch_size = options.get('batch_size', 50)
+        
+        transaction.enter_transaction_management()
+        transaction.managed()
+
+                
+        for thes in Thesaurus.objects.all():
+            context = graph.get_context(URIRef(thes.uri))
+            for i,(s,_,o) in enumerate(graph.triples((None, URIRef("http://www.w3.org/2004/02/skos/core#prefLabel"), None), context=context)):
+                self.stdout.write("%d - Thesaurus %s term pref label %s" % (i+1,thes.label, repr(o)))
+                try:                
+                    term = Term.objects.get(uri=unicode(s))
+                    lang = getattr(o, 'language', None)
+                    pref_label = unicode(o).strip()
+                    if not TermLabel.objects.filter(label=pref_label, term=term, lang=lang).exists():
+                        TermLabel.objects.create(label=pref_label, term=term, lang=lang)
+                except Term.DoesNotExist:
+                    self.stdout.write("Thesaurus %s term pref label %s for %s does not exists" % (thes.label, repr(o), repr(s)))                
+                if not ((i+1) % batch_size):
+                    transaction.commit()
+            transaction.commit()
+            
+            for i,(s,_,o) in enumerate(graph.triples((None, URIRef("http://www.w3.org/2004/02/skos/core#altLabel"), None), context=context)):
+                self.stdout.write("%d - Thesaurus %s term alt label %s for %s" % (i+1, thes.label, repr(o), repr(s)))
+                try:
+                    term = Term.objects.get(uri=unicode(s))
+                    alt_label = unicode(o).strip()
+                    lang = getattr(o, 'language', None)
+                    if not TermLabel.objects.filter(label=alt_label, term=term, lang=lang).exists():
+                        TermLabel.objects.create(label=alt_label, term=term, lang=lang)
+                except Term.DoesNotExist:
+                    self.stdout.write("Thesaurus %s term alt label %s for %s does not exists" % (thes.label, repr(o), repr(s)))
+                if not ((i+1) % batch_size):
+                    transaction.commit()
+
+            transaction.commit()
+            
\ No newline at end of file
--- a/src/jocondelab/management/commands/import_terms.py	Fri Jun 21 17:09:03 2013 +0200
+++ b/src/jocondelab/management/commands/import_terms.py	Mon Jun 24 00:38:29 2013 +0200
@@ -74,6 +74,8 @@
         transaction.commit()
         
         for thes in Thesaurus.objects.all():
+            if thes.label == "AUTR":
+                continue
             self.stdout.write("Processing Thesaurus %s" % thes.label)
             for _,p,o in graph.triples((URIRef(thes.uri), None, None)):
                 if p == URIRef("http://purl.org/dc/elements/1.1/title"):