src/p4l/management/commands/import_record.py
changeset 0 81e7900b06a7
child 6 ff4d2d4f1fb0
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/p4l/management/commands/import_record.py	Tue Aug 27 11:01:32 2013 +0200
@@ -0,0 +1,393 @@
+# -*- coding: utf-8 -*-
+
+from p4l.utils import show_progress, get_code_from_language_uri
+from p4l.models import Record, Language
+from rdflib import Graph, Namespace, BNode, URIRef
+from rdflib.plugins.sparql import prepareQuery
+from django.core.management import BaseCommand
+from optparse import make_option
+import xml.etree.cElementTree as ET
+from django.db import reset_queries, transaction
+import traceback
+import logging
+import sys
+
+logger = logging.getLogger(__name__)
+
+
+RDF = Namespace("http://www.w3.org/1999/02/22-rdf-syntax-ns#")
+RDFS = Namespace("http://www.w3.org/2000/01/rdf-schema#")
+DCT = Namespace("http://purl.org/dc/terms/")
+IIEP = Namespace("http://www.iiep.unesco.org/plan4learning/model.owl#")
+UNESCO = Namespace("http://www.iiep.unesco.org/Ontology/")
+
+DEFAULT_LANGUAGE_URI = "http://psi.oasis-open.org/iso/639/#eng"
+
+DEFAULT_LANGUAGE_QUERY =  """SELECT ( COALESCE(?lang, ?other_lang) as ?main_lang) WHERE {
+    OPTIONAL { ?s dct:language ?lang }.
+    OPTIONAL { ?s iiep:otherLanguage ?other_lang }.
+}"""
+
+
+class Command(BaseCommand):
+
+    args = "record_url ..."
+
+    help = "Import p4l record rdf format"
+
+    option_list = BaseCommand.option_list + (
+        make_option('-b', '--batch-size',
+            dest= 'batch_size',
+            type='int',
+            default= 50,
+            help= 'number of object to import in bulk operations' 
+        ),
+    )
+
+    def __init__(self, *args, **kwargs):
+        super(Command, self).__init__(*args, **kwargs)
+        self.__query_cache = {}
+
+
+    def __get_sparql_query(self, query, namespaces):
+
+        return self.__query_cache[query] \
+            if query in self.__query_cache \
+            else self.__query_cache.setdefault(query, prepareQuery(query, initNs=namespaces))
+
+    def get_empty_graph(self):
+        record_graph = Graph()
+        record_graph.bind('iiep',"http://www.iiep.unesco.org/plan4learning/model.owl#")
+        record_graph.bind('dct',"http://purl.org/dc/terms/")
+        return record_graph
+
+    def extract_single_value_form_graph(self, graph, q, bindings={}, index=0, convert=lambda v:unicode(v) if v is not None else None):
+        return next(self.extract_multiple_values_from_graph(graph, q, bindings, index, convert), None)
+
+    def extract_multiple_values_from_graph(self, graph, q, bindings={}, index=0, convert=lambda v:unicode(v) if v is not None else None):
+
+        index_list = index
+        if isinstance(index, int):
+            index_list = range(index+1)
+
+        if hasattr(convert, '__call__'):
+            convert_dict = dict((k, convert) for k in index_list)
+        else:
+            convert_dict = convert
+
+        convert_dict = dict((k, f if hasattr(f,'__call__') else lambda v:unicode(v) if v is not None else None) for k,f in convert_dict.iteritems())
+
+        for row in graph.query(self.__get_sparql_query(q, dict(graph.namespaces())), initBindings=bindings):
+            if len(row) < len(index_list):
+                break
+            else:
+                res = dict([ (k, convert_dict.get(k, lambda v:unicode(v) if v is not None else None)(v)) for k, v in zip(index_list, row)])
+                if isinstance(index, int):
+                    yield res[index]
+                else:
+                    yield res
+
+
+    def convert_bool(self, val):
+        if val == True or val == False:
+            return val
+        if val is None:
+            return False
+        if isinstance(val, basestring):
+            if len(val) == 0:
+                return False
+            if val[0].lower() in ['t','y','1','o']:
+                return True
+            else:
+                return False        
+        return bool(val)
+
+    def convert_lang(self, val, default_lang):
+        return unicode(val) if (val is not None and len(unicode(val))>0) else default_lang
+
+
+    def get_record_default_language(self, g, record_uri):
+        lang_uri = self.extract_single_value_form_graph(g, DEFAULT_LANGUAGE_QUERY, bindings={'s': URIRef(record_uri)})
+        if not lang_uri:
+            lang_uri = DEFAULT_LANGUAGE_URI
+        lang_code = get_code_from_language_uri(lang_uri)
+        if lang_code is None:
+            logger.warn("get_record_default_language: no code found for %s in record %s" % (lang_uri, record_uri))
+            return get_code_from_language_uri(DEFAULT_LANGUAGE_URI)
+        return lang_code
+
+
+    def add_to_related_collection(self, coll, graph, fields, q, bindings={},  convert=lambda v: unicode(v) if v is not None else None, through_fields=None):
+        
+        for val in self.extract_multiple_values_from_graph(graph, q, bindings=bindings, index=fields, convert=convert):
+
+            if through_fields:                
+                new_obj_val = dict([(k,v) for k,v in val.iteritems() if k not in through_fields])
+            else:
+                new_obj_val = val
+
+            if hasattr(coll, 'through'):
+                new_obj_rel, _ = coll.model.objects.get_or_create(**new_obj_val)
+                if through_fields:
+                    through_vals = {coll.source_field_name: coll.instance, coll.target_field_name: new_obj_rel}
+                    through_vals.update(dict([(k,v) for k,v in val.iteritems() if k in through_fields]))
+                    coll.through.objects.create(**through_vals)
+                    new_obj = None
+                else:
+                    new_obj = new_obj_rel
+
+            else:
+                new_obj = coll.create(**new_obj_val)
+            
+            if new_obj:
+                coll.add(new_obj)
+
+
+
+
+    def build_record(self, graph):
+
+        record_uri = self.extract_single_value_form_graph(graph,"SELECT DISTINCT ?s WHERE { ?s rdf:type iiep:Record .}")
+        default_language_code = self.get_record_default_language(graph, record_uri)
+
+        record = Record()
+        record.uri = record_uri
+        record.identifier = self.extract_single_value_form_graph(graph,"SELECT DISTINCT ?o WHERE { ?s dct:identifier ?o .}", bindings={'s':URIRef(record.uri)})
+        record.notes = self.extract_single_value_form_graph(graph,"SELECT DISTINCT ?o WHERE { ?s iiep:notes ?o .}", bindings={'s':URIRef(record.uri)})
+        record.recordType = self.extract_single_value_form_graph(graph,"SELECT DISTINCT ?o WHERE { ?s dct:type ?o .}", bindings={'s':URIRef(record.uri)})
+        record.isDocumentPart = self.extract_single_value_form_graph(graph,"SELECT DISTINCT ?o WHERE { ?s iiep:isDocumentPart ?o .}", bindings={'s':URIRef(record.uri)}, convert=self.convert_bool)
+        record.editionStatement = self.extract_single_value_form_graph(graph,"SELECT DISTINCT ?o WHERE { ?s iiep:editionStatement ?o .}", bindings={'s':URIRef(record.uri)})
+
+        language = self.extract_single_value_form_graph(graph,"SELECT DISTINCT ?o WHERE { ?s dct:language ?o .}", bindings={'s':URIRef(record.uri)})
+        if language:
+            record.language, _ = Language.objects.get_or_create(language=language)
+
+        record.save()
+
+        self.add_to_related_collection(record.otherLanguages, graph,  ['language'], "SELECT ?o WHERE { ?s iiep:otherLanguage ?o .}", bindings={'s':URIRef(record.uri)})
+        self.add_to_related_collection(record.subjects, graph, ['subject'], "SELECT ?o WHERE { ?s dct:subject ?o .}", bindings={'s':URIRef(record.uri)})
+        self.add_to_related_collection(record.themes, graph, ['theme'], "SELECT ?o WHERE { ?s iiep:theme ?o .}", bindings={'s':URIRef(record.uri)})
+        self.add_to_related_collection(record.countries, graph,  ['country'], "SELECT ?o WHERE { ?s iiep:country ?o .}", bindings={'s':URIRef(record.uri)})
+        self.add_to_related_collection(record.authors, graph, ['name'], "SELECT ?o WHERE { ?s iiep:author ?o .}", bindings={'s':URIRef(record.uri)})
+        self.add_to_related_collection(record.subjectPersons, graph, ['name'], "SELECT ?o WHERE { ?s iiep:subjectPerson ?o .}", bindings={'s':URIRef(record.uri)})
+        self.add_to_related_collection(record.projectNames, graph, ['label','acronym'], "SELECT ?l ?a WHERE { [ iiep:projectName ?bnode ]. ?bnode rdfs:label ?l. OPTIONAL { ?bnode iiep:acronym ?a } }")
+
+        self.add_to_related_collection(
+            record.periodicals,
+            graph, 
+            ['label','lang'],
+            "SELECT DISTINCT ?o  ( lang(?o) as ?l) WHERE { ?s iiep:periodical ?o .}",
+            bindings={'s':URIRef(record.uri)},
+            convert={'lang':lambda l: self.convert_lang(l, default_language_code)},
+            through_fields = ['lang']
+        )
+
+        self.add_to_related_collection(
+            record.meetings,
+            graph, 
+            ['label', 'meetingNumber', 'meetingPlace', 'meetingDate', 'meetingYear', 'lang'],
+            "SELECT ?l ?mn ?mp ?md ?my (lang(COALESCE(?l,?nm, ?mp,?md,?my)) as ?lang) WHERE { [iiep:meeting ?bnode]. OPTIONAL { ?bnode rdfs:label ?l }. OPTIONAL { ?bnode iiep:meetingNumber ?mn }. OPTIONAL { ?bnode iiep:meetingPlace ?mp }.  OPTIONAL { ?bnode iiep:meetingDate ?md }. OPTIONAL { ?bnode iiep:meetingYear ?my }}",
+            convert={'lang':lambda l: self.convert_lang(l, default_language_code), 'meetingYear' : lambda y: int(y) if y is not None else None},
+            through_fields = ['lang']
+        )
+
+        self.add_to_related_collection(
+            record.series,
+            graph, 
+            ['title', 'volume', 'lang'],
+            "SELECT ?t ?vol (lang(COALESCE(?t,?vol)) as ?lang) WHERE { [iiep:serie ?bnode]. OPTIONAL { ?bnode dct:title ?t }. OPTIONAL { ?bnode iiep:volume ?vol } }",
+            convert={'lang':lambda l: self.convert_lang(l, default_language_code)},
+            through_fields = ['lang']
+        )
+
+        self.add_to_related_collection(
+            record.subjectCorporateBodies,
+            graph,
+            ['label', 'acronym'],
+            "SELECT ?l ?a WHERE { [iiep:subjectCorporateBody ?bnode]. OPTIONAL { ?bnode rdfs:label ?l }. OPTIONAL { ?bnode iiep:acronym ?a } }",
+        )
+
+        self.add_to_related_collection(
+            record.subjectMeetings,
+            graph,
+            ['label', 'meetingNumber', 'meetingPlace', 'meetingDate', 'meetingYear'],
+            "SELECT ?l ?mn ?mp ?md ?my WHERE { [iiep:meeting ?bnode]. OPTIONAL { ?bnode rdfs:label ?l }. OPTIONAL { ?bnode iiep:meetingNumber ?mn }. OPTIONAL { ?bnode iiep:meetingPlace ?mp }.  OPTIONAL { ?bnode iiep:meetingDate ?md }. OPTIONAL { ?bnode iiep:meetingYear ?my }}",            
+            convert={'meetingYear' : lambda y: int(y) if y is not None else None}
+        )
+
+        self.add_to_related_collection(
+            record.corporateAuthors,
+            graph,
+            ['label', 'acronym'],
+            "SELECT ?l ?a WHERE { [iiep:corporateAuthor ?bnode]. OPTIONAL { ?bnode rdfs:label ?l }. OPTIONAL { ?bnode iiep:acronym ?a } }",            
+        )
+
+        self.add_to_related_collection(
+            record.issns,
+            graph,
+            ['issn', 'lang'],
+            "SELECT ?issn (lang(COALESCE(?issn)) as ?lang) WHERE { ?s iiep:issn ?issn . }",
+            bindings={'s':URIRef(record.uri)},
+            convert={'lang':lambda l: self.convert_lang(l, default_language_code)}
+        )
+
+        self.add_to_related_collection(
+            record.isbns,
+            graph,
+            ['isbn', 'lang'],
+            "SELECT ?isbn (lang(COALESCE(?isbn)) as ?lang) WHERE { ?s iiep:isbn ?isbn . }",
+            bindings={'s':URIRef(record.uri)},
+            convert={'lang':lambda l: self.convert_lang(l, default_language_code)}
+        )
+
+        self.add_to_related_collection(
+            record.documentCodes,
+            graph,
+            ['documentCode', 'lang'],
+            "SELECT ?c (lang(COALESCE(?c)) as ?lang) WHERE { ?s iiep:documentCode ?c . }",
+            bindings={'s':URIRef(record.uri)},
+            convert={'lang':lambda l: self.convert_lang(l, default_language_code)}
+        )
+
+        self.add_to_related_collection(
+            record.titles,
+            graph,
+            ['title', 'lang'],
+            "SELECT ?t (lang(COALESCE(?t)) as ?lang) WHERE { ?s dct:title ?t . }",
+            bindings={'s':URIRef(record.uri)},
+            convert={'lang':lambda l: self.convert_lang(l, default_language_code)}
+        )
+
+        self.add_to_related_collection(
+            record.addedTitles,
+            graph,
+            ['title', 'lang'],
+            "SELECT ?t (lang(COALESCE(?t)) as ?lang) WHERE { ?s iiep:addedTitle ?t . }",
+            bindings={'s':URIRef(record.uri)},
+            convert={'lang':lambda l: self.convert_lang(l, default_language_code)}
+        )
+
+        self.add_to_related_collection(
+            record.titlesMainDocument,
+            graph,
+            ['title', 'lang'],
+            "SELECT ?t (lang(COALESCE(?t)) as ?lang) WHERE { ?s iiep:titleMainDocument ?t . }",
+            bindings={'s':URIRef(record.uri)},
+            convert={'lang':lambda l: self.convert_lang(l, default_language_code)}
+        )
+
+        self.add_to_related_collection(
+            record.imprints,
+            graph,
+            ['imprintCity', 'publisher', 'imprintDate', 'lang'],
+            "SELECT ?c ?p ?d (lang(COALESCE(?c, ?p, ?d)) as ?lang) WHERE { [ iiep:imprint ?bnode ]. OPTIONAL { ?bnode iiep:imprintCity ?c }. OPTIONAL { ?bnode dct:publisher ?p }. OPTIONAL { ?bnode iiep:imprintDate ?d }}",
+            convert={'lang':lambda l: self.convert_lang(l, default_language_code)}
+        )
+
+        self.add_to_related_collection(
+            record.collations,
+            graph,
+            ['collation', 'lang'],
+            "SELECT ?c (lang(COALESCE(?c)) as ?lang) WHERE { ?s iiep:collation ?c . }",
+            bindings={'s':URIRef(record.uri)},
+            convert={'lang':lambda l: self.convert_lang(l, default_language_code)}
+        )
+
+        self.add_to_related_collection(
+            record.volumeIssues,
+            graph,
+            ['volume', 'number', 'lang'],
+            "SELECT ?v ?n (lang(COALESCE(?v, ?n)) as ?lang) WHERE { [ iiep:volumeIssue ?bnode ]. OPTIONAL { ?bnode iiep:volume ?v }. OPTIONAL { ?bnode iiep:number ?v }}",
+            convert={'lang':lambda l: self.convert_lang(l, default_language_code)}
+        )
+
+        self.add_to_related_collection(
+            record.urls,
+            graph,
+            ['address', 'display', 'accessLevel'],
+            "SELECT ?a ?d ?al WHERE { [ iiep:url ?bnode ]. OPTIONAL { ?bnode iiep:address ?a }. OPTIONAL { ?bnode iiep:display ?d }. OPTIONAL { ?bnode iiep:accessLevel ?al }.}",
+        )
+
+        return record
+
+
+    def filter_node(self, node, graph, res_graph):
+        for p,o in graph[node]:
+            res_graph.add((node,p,o))
+            if isinstance(o, BNode):
+                self.filter_node(o, graph, res_graph)
+
+
+
+    def calculate_records_nb(self, records_url):
+        context = ET.iterparse(records_url, events=("end",))
+        i = 0
+        for _,elem in context:
+            if elem.tag == "{%s}Record" % IIEP:
+                i += 1
+        return i
+
+    def process_url(self, records_url, options):
+
+        total_records = self.calculate_records_nb(records_url)
+        writer = None
+        errors=[]
+
+        context = ET.iterparse(records_url, events=("end",))
+        i = 0
+        for event,elem in context:
+            if elem.tag == "{%s}Record" % IIEP:
+                i += 1
+                writer = show_progress(i, total_records, "Processing record nb %d " % i, 50, writer=writer)
+                try:
+                    record_graph = self.get_empty_graph()
+                    record_graph.parse(data=ET.tostring(elem, encoding='utf-8'), format='xml')
+                    # add transaction management
+                    self.build_record(record_graph)                    
+                except Exception as e:
+                    transaction.rollback()
+                    msg = "Error processing resource %d in %s : %s" % (i, records_url, repr(e))
+                    logger.exception(msg)
+                    errors.append((i, records_url, msg))
+                else:
+                    transaction.commit()
+
+                if i%self.batch_size == 0:                    
+                    reset_queries()
+
+        return errors
+
+
+    # def process_url(self, records_url, options):
+    #     #open graph with rdflib
+    #     #TODO: manage memory        
+    #     g = Graph()
+    #     print("Loading %s" % records_url)
+    #     g.parse(records_url)
+    #     print("Parsing %s done" % records_url)
+    #     for i,record_uri in enumerate(g[:RDF.type:IIEP.Record]):
+    #         print(i, repr(record_uri))
+    #         record_graph = self.get_empty_graph()
+    #         self.filter_node(record_uri, g, record_graph)
+    #         self.build_record(record_graph)
+    #         if i > 3:
+    #             break
+
+
+
+
+    def handle(self, *args, **options):
+
+        self.batch_size = options.get('batch_size', 50)
+        transaction.enter_transaction_management()
+        transaction.managed(True)
+
+        for records_url in args:
+            print("Processing %s" % records_url)
+            errors = self.process_url(records_url, options)
+            print("Processing %s Done" % records_url)
+            if errors:
+                print("%d error(s) when processing %s, check your log file." % (len(errors), records_url))
+
+        transaction.leave_transaction_management()
+