src/p4l/management/commands/import_record.py
changeset 0 81e7900b06a7
child 6 ff4d2d4f1fb0
equal deleted inserted replaced
-1:000000000000 0:81e7900b06a7
       
     1 # -*- coding: utf-8 -*-
       
     2 
       
     3 from p4l.utils import show_progress, get_code_from_language_uri
       
     4 from p4l.models import Record, Language
       
     5 from rdflib import Graph, Namespace, BNode, URIRef
       
     6 from rdflib.plugins.sparql import prepareQuery
       
     7 from django.core.management import BaseCommand
       
     8 from optparse import make_option
       
     9 import xml.etree.cElementTree as ET
       
    10 from django.db import reset_queries, transaction
       
    11 import traceback
       
    12 import logging
       
    13 import sys
       
    14 
       
    15 logger = logging.getLogger(__name__)
       
    16 
       
    17 
       
    18 RDF = Namespace("http://www.w3.org/1999/02/22-rdf-syntax-ns#")
       
    19 RDFS = Namespace("http://www.w3.org/2000/01/rdf-schema#")
       
    20 DCT = Namespace("http://purl.org/dc/terms/")
       
    21 IIEP = Namespace("http://www.iiep.unesco.org/plan4learning/model.owl#")
       
    22 UNESCO = Namespace("http://www.iiep.unesco.org/Ontology/")
       
    23 
       
    24 DEFAULT_LANGUAGE_URI = "http://psi.oasis-open.org/iso/639/#eng"
       
    25 
       
    26 DEFAULT_LANGUAGE_QUERY =  """SELECT ( COALESCE(?lang, ?other_lang) as ?main_lang) WHERE {
       
    27     OPTIONAL { ?s dct:language ?lang }.
       
    28     OPTIONAL { ?s iiep:otherLanguage ?other_lang }.
       
    29 }"""
       
    30 
       
    31 
       
    32 class Command(BaseCommand):
       
    33 
       
    34     args = "record_url ..."
       
    35 
       
    36     help = "Import p4l record rdf format"
       
    37 
       
    38     option_list = BaseCommand.option_list + (
       
    39         make_option('-b', '--batch-size',
       
    40             dest= 'batch_size',
       
    41             type='int',
       
    42             default= 50,
       
    43             help= 'number of object to import in bulk operations' 
       
    44         ),
       
    45     )
       
    46 
       
    47     def __init__(self, *args, **kwargs):
       
    48         super(Command, self).__init__(*args, **kwargs)
       
    49         self.__query_cache = {}
       
    50 
       
    51 
       
    52     def __get_sparql_query(self, query, namespaces):
       
    53 
       
    54         return self.__query_cache[query] \
       
    55             if query in self.__query_cache \
       
    56             else self.__query_cache.setdefault(query, prepareQuery(query, initNs=namespaces))
       
    57 
       
    58     def get_empty_graph(self):
       
    59         record_graph = Graph()
       
    60         record_graph.bind('iiep',"http://www.iiep.unesco.org/plan4learning/model.owl#")
       
    61         record_graph.bind('dct',"http://purl.org/dc/terms/")
       
    62         return record_graph
       
    63 
       
    64     def extract_single_value_form_graph(self, graph, q, bindings={}, index=0, convert=lambda v:unicode(v) if v is not None else None):
       
    65         return next(self.extract_multiple_values_from_graph(graph, q, bindings, index, convert), None)
       
    66 
       
    67     def extract_multiple_values_from_graph(self, graph, q, bindings={}, index=0, convert=lambda v:unicode(v) if v is not None else None):
       
    68 
       
    69         index_list = index
       
    70         if isinstance(index, int):
       
    71             index_list = range(index+1)
       
    72 
       
    73         if hasattr(convert, '__call__'):
       
    74             convert_dict = dict((k, convert) for k in index_list)
       
    75         else:
       
    76             convert_dict = convert
       
    77 
       
    78         convert_dict = dict((k, f if hasattr(f,'__call__') else lambda v:unicode(v) if v is not None else None) for k,f in convert_dict.iteritems())
       
    79 
       
    80         for row in graph.query(self.__get_sparql_query(q, dict(graph.namespaces())), initBindings=bindings):
       
    81             if len(row) < len(index_list):
       
    82                 break
       
    83             else:
       
    84                 res = dict([ (k, convert_dict.get(k, lambda v:unicode(v) if v is not None else None)(v)) for k, v in zip(index_list, row)])
       
    85                 if isinstance(index, int):
       
    86                     yield res[index]
       
    87                 else:
       
    88                     yield res
       
    89 
       
    90 
       
    91     def convert_bool(self, val):
       
    92         if val == True or val == False:
       
    93             return val
       
    94         if val is None:
       
    95             return False
       
    96         if isinstance(val, basestring):
       
    97             if len(val) == 0:
       
    98                 return False
       
    99             if val[0].lower() in ['t','y','1','o']:
       
   100                 return True
       
   101             else:
       
   102                 return False        
       
   103         return bool(val)
       
   104 
       
   105     def convert_lang(self, val, default_lang):
       
   106         return unicode(val) if (val is not None and len(unicode(val))>0) else default_lang
       
   107 
       
   108 
       
   109     def get_record_default_language(self, g, record_uri):
       
   110         lang_uri = self.extract_single_value_form_graph(g, DEFAULT_LANGUAGE_QUERY, bindings={'s': URIRef(record_uri)})
       
   111         if not lang_uri:
       
   112             lang_uri = DEFAULT_LANGUAGE_URI
       
   113         lang_code = get_code_from_language_uri(lang_uri)
       
   114         if lang_code is None:
       
   115             logger.warn("get_record_default_language: no code found for %s in record %s" % (lang_uri, record_uri))
       
   116             return get_code_from_language_uri(DEFAULT_LANGUAGE_URI)
       
   117         return lang_code
       
   118 
       
   119 
       
   120     def add_to_related_collection(self, coll, graph, fields, q, bindings={},  convert=lambda v: unicode(v) if v is not None else None, through_fields=None):
       
   121         
       
   122         for val in self.extract_multiple_values_from_graph(graph, q, bindings=bindings, index=fields, convert=convert):
       
   123 
       
   124             if through_fields:                
       
   125                 new_obj_val = dict([(k,v) for k,v in val.iteritems() if k not in through_fields])
       
   126             else:
       
   127                 new_obj_val = val
       
   128 
       
   129             if hasattr(coll, 'through'):
       
   130                 new_obj_rel, _ = coll.model.objects.get_or_create(**new_obj_val)
       
   131                 if through_fields:
       
   132                     through_vals = {coll.source_field_name: coll.instance, coll.target_field_name: new_obj_rel}
       
   133                     through_vals.update(dict([(k,v) for k,v in val.iteritems() if k in through_fields]))
       
   134                     coll.through.objects.create(**through_vals)
       
   135                     new_obj = None
       
   136                 else:
       
   137                     new_obj = new_obj_rel
       
   138 
       
   139             else:
       
   140                 new_obj = coll.create(**new_obj_val)
       
   141             
       
   142             if new_obj:
       
   143                 coll.add(new_obj)
       
   144 
       
   145 
       
   146 
       
   147 
       
   148     def build_record(self, graph):
       
   149 
       
   150         record_uri = self.extract_single_value_form_graph(graph,"SELECT DISTINCT ?s WHERE { ?s rdf:type iiep:Record .}")
       
   151         default_language_code = self.get_record_default_language(graph, record_uri)
       
   152 
       
   153         record = Record()
       
   154         record.uri = record_uri
       
   155         record.identifier = self.extract_single_value_form_graph(graph,"SELECT DISTINCT ?o WHERE { ?s dct:identifier ?o .}", bindings={'s':URIRef(record.uri)})
       
   156         record.notes = self.extract_single_value_form_graph(graph,"SELECT DISTINCT ?o WHERE { ?s iiep:notes ?o .}", bindings={'s':URIRef(record.uri)})
       
   157         record.recordType = self.extract_single_value_form_graph(graph,"SELECT DISTINCT ?o WHERE { ?s dct:type ?o .}", bindings={'s':URIRef(record.uri)})
       
   158         record.isDocumentPart = self.extract_single_value_form_graph(graph,"SELECT DISTINCT ?o WHERE { ?s iiep:isDocumentPart ?o .}", bindings={'s':URIRef(record.uri)}, convert=self.convert_bool)
       
   159         record.editionStatement = self.extract_single_value_form_graph(graph,"SELECT DISTINCT ?o WHERE { ?s iiep:editionStatement ?o .}", bindings={'s':URIRef(record.uri)})
       
   160 
       
   161         language = self.extract_single_value_form_graph(graph,"SELECT DISTINCT ?o WHERE { ?s dct:language ?o .}", bindings={'s':URIRef(record.uri)})
       
   162         if language:
       
   163             record.language, _ = Language.objects.get_or_create(language=language)
       
   164 
       
   165         record.save()
       
   166 
       
   167         self.add_to_related_collection(record.otherLanguages, graph,  ['language'], "SELECT ?o WHERE { ?s iiep:otherLanguage ?o .}", bindings={'s':URIRef(record.uri)})
       
   168         self.add_to_related_collection(record.subjects, graph, ['subject'], "SELECT ?o WHERE { ?s dct:subject ?o .}", bindings={'s':URIRef(record.uri)})
       
   169         self.add_to_related_collection(record.themes, graph, ['theme'], "SELECT ?o WHERE { ?s iiep:theme ?o .}", bindings={'s':URIRef(record.uri)})
       
   170         self.add_to_related_collection(record.countries, graph,  ['country'], "SELECT ?o WHERE { ?s iiep:country ?o .}", bindings={'s':URIRef(record.uri)})
       
   171         self.add_to_related_collection(record.authors, graph, ['name'], "SELECT ?o WHERE { ?s iiep:author ?o .}", bindings={'s':URIRef(record.uri)})
       
   172         self.add_to_related_collection(record.subjectPersons, graph, ['name'], "SELECT ?o WHERE { ?s iiep:subjectPerson ?o .}", bindings={'s':URIRef(record.uri)})
       
   173         self.add_to_related_collection(record.projectNames, graph, ['label','acronym'], "SELECT ?l ?a WHERE { [ iiep:projectName ?bnode ]. ?bnode rdfs:label ?l. OPTIONAL { ?bnode iiep:acronym ?a } }")
       
   174 
       
   175         self.add_to_related_collection(
       
   176             record.periodicals,
       
   177             graph, 
       
   178             ['label','lang'],
       
   179             "SELECT DISTINCT ?o  ( lang(?o) as ?l) WHERE { ?s iiep:periodical ?o .}",
       
   180             bindings={'s':URIRef(record.uri)},
       
   181             convert={'lang':lambda l: self.convert_lang(l, default_language_code)},
       
   182             through_fields = ['lang']
       
   183         )
       
   184 
       
   185         self.add_to_related_collection(
       
   186             record.meetings,
       
   187             graph, 
       
   188             ['label', 'meetingNumber', 'meetingPlace', 'meetingDate', 'meetingYear', 'lang'],
       
   189             "SELECT ?l ?mn ?mp ?md ?my (lang(COALESCE(?l,?nm, ?mp,?md,?my)) as ?lang) WHERE { [iiep:meeting ?bnode]. OPTIONAL { ?bnode rdfs:label ?l }. OPTIONAL { ?bnode iiep:meetingNumber ?mn }. OPTIONAL { ?bnode iiep:meetingPlace ?mp }.  OPTIONAL { ?bnode iiep:meetingDate ?md }. OPTIONAL { ?bnode iiep:meetingYear ?my }}",
       
   190             convert={'lang':lambda l: self.convert_lang(l, default_language_code), 'meetingYear' : lambda y: int(y) if y is not None else None},
       
   191             through_fields = ['lang']
       
   192         )
       
   193 
       
   194         self.add_to_related_collection(
       
   195             record.series,
       
   196             graph, 
       
   197             ['title', 'volume', 'lang'],
       
   198             "SELECT ?t ?vol (lang(COALESCE(?t,?vol)) as ?lang) WHERE { [iiep:serie ?bnode]. OPTIONAL { ?bnode dct:title ?t }. OPTIONAL { ?bnode iiep:volume ?vol } }",
       
   199             convert={'lang':lambda l: self.convert_lang(l, default_language_code)},
       
   200             through_fields = ['lang']
       
   201         )
       
   202 
       
   203         self.add_to_related_collection(
       
   204             record.subjectCorporateBodies,
       
   205             graph,
       
   206             ['label', 'acronym'],
       
   207             "SELECT ?l ?a WHERE { [iiep:subjectCorporateBody ?bnode]. OPTIONAL { ?bnode rdfs:label ?l }. OPTIONAL { ?bnode iiep:acronym ?a } }",
       
   208         )
       
   209 
       
   210         self.add_to_related_collection(
       
   211             record.subjectMeetings,
       
   212             graph,
       
   213             ['label', 'meetingNumber', 'meetingPlace', 'meetingDate', 'meetingYear'],
       
   214             "SELECT ?l ?mn ?mp ?md ?my WHERE { [iiep:meeting ?bnode]. OPTIONAL { ?bnode rdfs:label ?l }. OPTIONAL { ?bnode iiep:meetingNumber ?mn }. OPTIONAL { ?bnode iiep:meetingPlace ?mp }.  OPTIONAL { ?bnode iiep:meetingDate ?md }. OPTIONAL { ?bnode iiep:meetingYear ?my }}",            
       
   215             convert={'meetingYear' : lambda y: int(y) if y is not None else None}
       
   216         )
       
   217 
       
   218         self.add_to_related_collection(
       
   219             record.corporateAuthors,
       
   220             graph,
       
   221             ['label', 'acronym'],
       
   222             "SELECT ?l ?a WHERE { [iiep:corporateAuthor ?bnode]. OPTIONAL { ?bnode rdfs:label ?l }. OPTIONAL { ?bnode iiep:acronym ?a } }",            
       
   223         )
       
   224 
       
   225         self.add_to_related_collection(
       
   226             record.issns,
       
   227             graph,
       
   228             ['issn', 'lang'],
       
   229             "SELECT ?issn (lang(COALESCE(?issn)) as ?lang) WHERE { ?s iiep:issn ?issn . }",
       
   230             bindings={'s':URIRef(record.uri)},
       
   231             convert={'lang':lambda l: self.convert_lang(l, default_language_code)}
       
   232         )
       
   233 
       
   234         self.add_to_related_collection(
       
   235             record.isbns,
       
   236             graph,
       
   237             ['isbn', 'lang'],
       
   238             "SELECT ?isbn (lang(COALESCE(?isbn)) as ?lang) WHERE { ?s iiep:isbn ?isbn . }",
       
   239             bindings={'s':URIRef(record.uri)},
       
   240             convert={'lang':lambda l: self.convert_lang(l, default_language_code)}
       
   241         )
       
   242 
       
   243         self.add_to_related_collection(
       
   244             record.documentCodes,
       
   245             graph,
       
   246             ['documentCode', 'lang'],
       
   247             "SELECT ?c (lang(COALESCE(?c)) as ?lang) WHERE { ?s iiep:documentCode ?c . }",
       
   248             bindings={'s':URIRef(record.uri)},
       
   249             convert={'lang':lambda l: self.convert_lang(l, default_language_code)}
       
   250         )
       
   251 
       
   252         self.add_to_related_collection(
       
   253             record.titles,
       
   254             graph,
       
   255             ['title', 'lang'],
       
   256             "SELECT ?t (lang(COALESCE(?t)) as ?lang) WHERE { ?s dct:title ?t . }",
       
   257             bindings={'s':URIRef(record.uri)},
       
   258             convert={'lang':lambda l: self.convert_lang(l, default_language_code)}
       
   259         )
       
   260 
       
   261         self.add_to_related_collection(
       
   262             record.addedTitles,
       
   263             graph,
       
   264             ['title', 'lang'],
       
   265             "SELECT ?t (lang(COALESCE(?t)) as ?lang) WHERE { ?s iiep:addedTitle ?t . }",
       
   266             bindings={'s':URIRef(record.uri)},
       
   267             convert={'lang':lambda l: self.convert_lang(l, default_language_code)}
       
   268         )
       
   269 
       
   270         self.add_to_related_collection(
       
   271             record.titlesMainDocument,
       
   272             graph,
       
   273             ['title', 'lang'],
       
   274             "SELECT ?t (lang(COALESCE(?t)) as ?lang) WHERE { ?s iiep:titleMainDocument ?t . }",
       
   275             bindings={'s':URIRef(record.uri)},
       
   276             convert={'lang':lambda l: self.convert_lang(l, default_language_code)}
       
   277         )
       
   278 
       
   279         self.add_to_related_collection(
       
   280             record.imprints,
       
   281             graph,
       
   282             ['imprintCity', 'publisher', 'imprintDate', 'lang'],
       
   283             "SELECT ?c ?p ?d (lang(COALESCE(?c, ?p, ?d)) as ?lang) WHERE { [ iiep:imprint ?bnode ]. OPTIONAL { ?bnode iiep:imprintCity ?c }. OPTIONAL { ?bnode dct:publisher ?p }. OPTIONAL { ?bnode iiep:imprintDate ?d }}",
       
   284             convert={'lang':lambda l: self.convert_lang(l, default_language_code)}
       
   285         )
       
   286 
       
   287         self.add_to_related_collection(
       
   288             record.collations,
       
   289             graph,
       
   290             ['collation', 'lang'],
       
   291             "SELECT ?c (lang(COALESCE(?c)) as ?lang) WHERE { ?s iiep:collation ?c . }",
       
   292             bindings={'s':URIRef(record.uri)},
       
   293             convert={'lang':lambda l: self.convert_lang(l, default_language_code)}
       
   294         )
       
   295 
       
   296         self.add_to_related_collection(
       
   297             record.volumeIssues,
       
   298             graph,
       
   299             ['volume', 'number', 'lang'],
       
   300             "SELECT ?v ?n (lang(COALESCE(?v, ?n)) as ?lang) WHERE { [ iiep:volumeIssue ?bnode ]. OPTIONAL { ?bnode iiep:volume ?v }. OPTIONAL { ?bnode iiep:number ?v }}",
       
   301             convert={'lang':lambda l: self.convert_lang(l, default_language_code)}
       
   302         )
       
   303 
       
   304         self.add_to_related_collection(
       
   305             record.urls,
       
   306             graph,
       
   307             ['address', 'display', 'accessLevel'],
       
   308             "SELECT ?a ?d ?al WHERE { [ iiep:url ?bnode ]. OPTIONAL { ?bnode iiep:address ?a }. OPTIONAL { ?bnode iiep:display ?d }. OPTIONAL { ?bnode iiep:accessLevel ?al }.}",
       
   309         )
       
   310 
       
   311         return record
       
   312 
       
   313 
       
   314     def filter_node(self, node, graph, res_graph):
       
   315         for p,o in graph[node]:
       
   316             res_graph.add((node,p,o))
       
   317             if isinstance(o, BNode):
       
   318                 self.filter_node(o, graph, res_graph)
       
   319 
       
   320 
       
   321 
       
   322     def calculate_records_nb(self, records_url):
       
   323         context = ET.iterparse(records_url, events=("end",))
       
   324         i = 0
       
   325         for _,elem in context:
       
   326             if elem.tag == "{%s}Record" % IIEP:
       
   327                 i += 1
       
   328         return i
       
   329 
       
   330     def process_url(self, records_url, options):
       
   331 
       
   332         total_records = self.calculate_records_nb(records_url)
       
   333         writer = None
       
   334         errors=[]
       
   335 
       
   336         context = ET.iterparse(records_url, events=("end",))
       
   337         i = 0
       
   338         for event,elem in context:
       
   339             if elem.tag == "{%s}Record" % IIEP:
       
   340                 i += 1
       
   341                 writer = show_progress(i, total_records, "Processing record nb %d " % i, 50, writer=writer)
       
   342                 try:
       
   343                     record_graph = self.get_empty_graph()
       
   344                     record_graph.parse(data=ET.tostring(elem, encoding='utf-8'), format='xml')
       
   345                     # add transaction management
       
   346                     self.build_record(record_graph)                    
       
   347                 except Exception as e:
       
   348                     transaction.rollback()
       
   349                     msg = "Error processing resource %d in %s : %s" % (i, records_url, repr(e))
       
   350                     logger.exception(msg)
       
   351                     errors.append((i, records_url, msg))
       
   352                 else:
       
   353                     transaction.commit()
       
   354 
       
   355                 if i%self.batch_size == 0:                    
       
   356                     reset_queries()
       
   357 
       
   358         return errors
       
   359 
       
   360 
       
   361     # def process_url(self, records_url, options):
       
   362     #     #open graph with rdflib
       
   363     #     #TODO: manage memory        
       
   364     #     g = Graph()
       
   365     #     print("Loading %s" % records_url)
       
   366     #     g.parse(records_url)
       
   367     #     print("Parsing %s done" % records_url)
       
   368     #     for i,record_uri in enumerate(g[:RDF.type:IIEP.Record]):
       
   369     #         print(i, repr(record_uri))
       
   370     #         record_graph = self.get_empty_graph()
       
   371     #         self.filter_node(record_uri, g, record_graph)
       
   372     #         self.build_record(record_graph)
       
   373     #         if i > 3:
       
   374     #             break
       
   375 
       
   376 
       
   377 
       
   378 
       
   379     def handle(self, *args, **options):
       
   380 
       
   381         self.batch_size = options.get('batch_size', 50)
       
   382         transaction.enter_transaction_management()
       
   383         transaction.managed(True)
       
   384 
       
   385         for records_url in args:
       
   386             print("Processing %s" % records_url)
       
   387             errors = self.process_url(records_url, options)
       
   388             print("Processing %s Done" % records_url)
       
   389             if errors:
       
   390                 print("%d error(s) when processing %s, check your log file." % (len(errors), records_url))
       
   391 
       
   392         transaction.leave_transaction_management()
       
   393