diff -r 000000000000 -r 81e7900b06a7 src/p4l/management/commands/import_record.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/p4l/management/commands/import_record.py Tue Aug 27 11:01:32 2013 +0200 @@ -0,0 +1,393 @@ +# -*- coding: utf-8 -*- + +from p4l.utils import show_progress, get_code_from_language_uri +from p4l.models import Record, Language +from rdflib import Graph, Namespace, BNode, URIRef +from rdflib.plugins.sparql import prepareQuery +from django.core.management import BaseCommand +from optparse import make_option +import xml.etree.cElementTree as ET +from django.db import reset_queries, transaction +import traceback +import logging +import sys + +logger = logging.getLogger(__name__) + + +RDF = Namespace("http://www.w3.org/1999/02/22-rdf-syntax-ns#") +RDFS = Namespace("http://www.w3.org/2000/01/rdf-schema#") +DCT = Namespace("http://purl.org/dc/terms/") +IIEP = Namespace("http://www.iiep.unesco.org/plan4learning/model.owl#") +UNESCO = Namespace("http://www.iiep.unesco.org/Ontology/") + +DEFAULT_LANGUAGE_URI = "http://psi.oasis-open.org/iso/639/#eng" + +DEFAULT_LANGUAGE_QUERY = """SELECT ( COALESCE(?lang, ?other_lang) as ?main_lang) WHERE { + OPTIONAL { ?s dct:language ?lang }. + OPTIONAL { ?s iiep:otherLanguage ?other_lang }. +}""" + + +class Command(BaseCommand): + + args = "record_url ..." + + help = "Import p4l record rdf format" + + option_list = BaseCommand.option_list + ( + make_option('-b', '--batch-size', + dest= 'batch_size', + type='int', + default= 50, + help= 'number of object to import in bulk operations' + ), + ) + + def __init__(self, *args, **kwargs): + super(Command, self).__init__(*args, **kwargs) + self.__query_cache = {} + + + def __get_sparql_query(self, query, namespaces): + + return self.__query_cache[query] \ + if query in self.__query_cache \ + else self.__query_cache.setdefault(query, prepareQuery(query, initNs=namespaces)) + + def get_empty_graph(self): + record_graph = Graph() + record_graph.bind('iiep',"http://www.iiep.unesco.org/plan4learning/model.owl#") + record_graph.bind('dct',"http://purl.org/dc/terms/") + return record_graph + + def extract_single_value_form_graph(self, graph, q, bindings={}, index=0, convert=lambda v:unicode(v) if v is not None else None): + return next(self.extract_multiple_values_from_graph(graph, q, bindings, index, convert), None) + + def extract_multiple_values_from_graph(self, graph, q, bindings={}, index=0, convert=lambda v:unicode(v) if v is not None else None): + + index_list = index + if isinstance(index, int): + index_list = range(index+1) + + if hasattr(convert, '__call__'): + convert_dict = dict((k, convert) for k in index_list) + else: + convert_dict = convert + + convert_dict = dict((k, f if hasattr(f,'__call__') else lambda v:unicode(v) if v is not None else None) for k,f in convert_dict.iteritems()) + + for row in graph.query(self.__get_sparql_query(q, dict(graph.namespaces())), initBindings=bindings): + if len(row) < len(index_list): + break + else: + res = dict([ (k, convert_dict.get(k, lambda v:unicode(v) if v is not None else None)(v)) for k, v in zip(index_list, row)]) + if isinstance(index, int): + yield res[index] + else: + yield res + + + def convert_bool(self, val): + if val == True or val == False: + return val + if val is None: + return False + if isinstance(val, basestring): + if len(val) == 0: + return False + if val[0].lower() in ['t','y','1','o']: + return True + else: + return False + return bool(val) + + def convert_lang(self, val, default_lang): + return unicode(val) if (val is not None and len(unicode(val))>0) else default_lang + + + def get_record_default_language(self, g, record_uri): + lang_uri = self.extract_single_value_form_graph(g, DEFAULT_LANGUAGE_QUERY, bindings={'s': URIRef(record_uri)}) + if not lang_uri: + lang_uri = DEFAULT_LANGUAGE_URI + lang_code = get_code_from_language_uri(lang_uri) + if lang_code is None: + logger.warn("get_record_default_language: no code found for %s in record %s" % (lang_uri, record_uri)) + return get_code_from_language_uri(DEFAULT_LANGUAGE_URI) + return lang_code + + + def add_to_related_collection(self, coll, graph, fields, q, bindings={}, convert=lambda v: unicode(v) if v is not None else None, through_fields=None): + + for val in self.extract_multiple_values_from_graph(graph, q, bindings=bindings, index=fields, convert=convert): + + if through_fields: + new_obj_val = dict([(k,v) for k,v in val.iteritems() if k not in through_fields]) + else: + new_obj_val = val + + if hasattr(coll, 'through'): + new_obj_rel, _ = coll.model.objects.get_or_create(**new_obj_val) + if through_fields: + through_vals = {coll.source_field_name: coll.instance, coll.target_field_name: new_obj_rel} + through_vals.update(dict([(k,v) for k,v in val.iteritems() if k in through_fields])) + coll.through.objects.create(**through_vals) + new_obj = None + else: + new_obj = new_obj_rel + + else: + new_obj = coll.create(**new_obj_val) + + if new_obj: + coll.add(new_obj) + + + + + def build_record(self, graph): + + record_uri = self.extract_single_value_form_graph(graph,"SELECT DISTINCT ?s WHERE { ?s rdf:type iiep:Record .}") + default_language_code = self.get_record_default_language(graph, record_uri) + + record = Record() + record.uri = record_uri + record.identifier = self.extract_single_value_form_graph(graph,"SELECT DISTINCT ?o WHERE { ?s dct:identifier ?o .}", bindings={'s':URIRef(record.uri)}) + record.notes = self.extract_single_value_form_graph(graph,"SELECT DISTINCT ?o WHERE { ?s iiep:notes ?o .}", bindings={'s':URIRef(record.uri)}) + record.recordType = self.extract_single_value_form_graph(graph,"SELECT DISTINCT ?o WHERE { ?s dct:type ?o .}", bindings={'s':URIRef(record.uri)}) + record.isDocumentPart = self.extract_single_value_form_graph(graph,"SELECT DISTINCT ?o WHERE { ?s iiep:isDocumentPart ?o .}", bindings={'s':URIRef(record.uri)}, convert=self.convert_bool) + record.editionStatement = self.extract_single_value_form_graph(graph,"SELECT DISTINCT ?o WHERE { ?s iiep:editionStatement ?o .}", bindings={'s':URIRef(record.uri)}) + + language = self.extract_single_value_form_graph(graph,"SELECT DISTINCT ?o WHERE { ?s dct:language ?o .}", bindings={'s':URIRef(record.uri)}) + if language: + record.language, _ = Language.objects.get_or_create(language=language) + + record.save() + + self.add_to_related_collection(record.otherLanguages, graph, ['language'], "SELECT ?o WHERE { ?s iiep:otherLanguage ?o .}", bindings={'s':URIRef(record.uri)}) + self.add_to_related_collection(record.subjects, graph, ['subject'], "SELECT ?o WHERE { ?s dct:subject ?o .}", bindings={'s':URIRef(record.uri)}) + self.add_to_related_collection(record.themes, graph, ['theme'], "SELECT ?o WHERE { ?s iiep:theme ?o .}", bindings={'s':URIRef(record.uri)}) + self.add_to_related_collection(record.countries, graph, ['country'], "SELECT ?o WHERE { ?s iiep:country ?o .}", bindings={'s':URIRef(record.uri)}) + self.add_to_related_collection(record.authors, graph, ['name'], "SELECT ?o WHERE { ?s iiep:author ?o .}", bindings={'s':URIRef(record.uri)}) + self.add_to_related_collection(record.subjectPersons, graph, ['name'], "SELECT ?o WHERE { ?s iiep:subjectPerson ?o .}", bindings={'s':URIRef(record.uri)}) + self.add_to_related_collection(record.projectNames, graph, ['label','acronym'], "SELECT ?l ?a WHERE { [ iiep:projectName ?bnode ]. ?bnode rdfs:label ?l. OPTIONAL { ?bnode iiep:acronym ?a } }") + + self.add_to_related_collection( + record.periodicals, + graph, + ['label','lang'], + "SELECT DISTINCT ?o ( lang(?o) as ?l) WHERE { ?s iiep:periodical ?o .}", + bindings={'s':URIRef(record.uri)}, + convert={'lang':lambda l: self.convert_lang(l, default_language_code)}, + through_fields = ['lang'] + ) + + self.add_to_related_collection( + record.meetings, + graph, + ['label', 'meetingNumber', 'meetingPlace', 'meetingDate', 'meetingYear', 'lang'], + "SELECT ?l ?mn ?mp ?md ?my (lang(COALESCE(?l,?nm, ?mp,?md,?my)) as ?lang) WHERE { [iiep:meeting ?bnode]. OPTIONAL { ?bnode rdfs:label ?l }. OPTIONAL { ?bnode iiep:meetingNumber ?mn }. OPTIONAL { ?bnode iiep:meetingPlace ?mp }. OPTIONAL { ?bnode iiep:meetingDate ?md }. OPTIONAL { ?bnode iiep:meetingYear ?my }}", + convert={'lang':lambda l: self.convert_lang(l, default_language_code), 'meetingYear' : lambda y: int(y) if y is not None else None}, + through_fields = ['lang'] + ) + + self.add_to_related_collection( + record.series, + graph, + ['title', 'volume', 'lang'], + "SELECT ?t ?vol (lang(COALESCE(?t,?vol)) as ?lang) WHERE { [iiep:serie ?bnode]. OPTIONAL { ?bnode dct:title ?t }. OPTIONAL { ?bnode iiep:volume ?vol } }", + convert={'lang':lambda l: self.convert_lang(l, default_language_code)}, + through_fields = ['lang'] + ) + + self.add_to_related_collection( + record.subjectCorporateBodies, + graph, + ['label', 'acronym'], + "SELECT ?l ?a WHERE { [iiep:subjectCorporateBody ?bnode]. OPTIONAL { ?bnode rdfs:label ?l }. OPTIONAL { ?bnode iiep:acronym ?a } }", + ) + + self.add_to_related_collection( + record.subjectMeetings, + graph, + ['label', 'meetingNumber', 'meetingPlace', 'meetingDate', 'meetingYear'], + "SELECT ?l ?mn ?mp ?md ?my WHERE { [iiep:meeting ?bnode]. OPTIONAL { ?bnode rdfs:label ?l }. OPTIONAL { ?bnode iiep:meetingNumber ?mn }. OPTIONAL { ?bnode iiep:meetingPlace ?mp }. OPTIONAL { ?bnode iiep:meetingDate ?md }. OPTIONAL { ?bnode iiep:meetingYear ?my }}", + convert={'meetingYear' : lambda y: int(y) if y is not None else None} + ) + + self.add_to_related_collection( + record.corporateAuthors, + graph, + ['label', 'acronym'], + "SELECT ?l ?a WHERE { [iiep:corporateAuthor ?bnode]. OPTIONAL { ?bnode rdfs:label ?l }. OPTIONAL { ?bnode iiep:acronym ?a } }", + ) + + self.add_to_related_collection( + record.issns, + graph, + ['issn', 'lang'], + "SELECT ?issn (lang(COALESCE(?issn)) as ?lang) WHERE { ?s iiep:issn ?issn . }", + bindings={'s':URIRef(record.uri)}, + convert={'lang':lambda l: self.convert_lang(l, default_language_code)} + ) + + self.add_to_related_collection( + record.isbns, + graph, + ['isbn', 'lang'], + "SELECT ?isbn (lang(COALESCE(?isbn)) as ?lang) WHERE { ?s iiep:isbn ?isbn . }", + bindings={'s':URIRef(record.uri)}, + convert={'lang':lambda l: self.convert_lang(l, default_language_code)} + ) + + self.add_to_related_collection( + record.documentCodes, + graph, + ['documentCode', 'lang'], + "SELECT ?c (lang(COALESCE(?c)) as ?lang) WHERE { ?s iiep:documentCode ?c . }", + bindings={'s':URIRef(record.uri)}, + convert={'lang':lambda l: self.convert_lang(l, default_language_code)} + ) + + self.add_to_related_collection( + record.titles, + graph, + ['title', 'lang'], + "SELECT ?t (lang(COALESCE(?t)) as ?lang) WHERE { ?s dct:title ?t . }", + bindings={'s':URIRef(record.uri)}, + convert={'lang':lambda l: self.convert_lang(l, default_language_code)} + ) + + self.add_to_related_collection( + record.addedTitles, + graph, + ['title', 'lang'], + "SELECT ?t (lang(COALESCE(?t)) as ?lang) WHERE { ?s iiep:addedTitle ?t . }", + bindings={'s':URIRef(record.uri)}, + convert={'lang':lambda l: self.convert_lang(l, default_language_code)} + ) + + self.add_to_related_collection( + record.titlesMainDocument, + graph, + ['title', 'lang'], + "SELECT ?t (lang(COALESCE(?t)) as ?lang) WHERE { ?s iiep:titleMainDocument ?t . }", + bindings={'s':URIRef(record.uri)}, + convert={'lang':lambda l: self.convert_lang(l, default_language_code)} + ) + + self.add_to_related_collection( + record.imprints, + graph, + ['imprintCity', 'publisher', 'imprintDate', 'lang'], + "SELECT ?c ?p ?d (lang(COALESCE(?c, ?p, ?d)) as ?lang) WHERE { [ iiep:imprint ?bnode ]. OPTIONAL { ?bnode iiep:imprintCity ?c }. OPTIONAL { ?bnode dct:publisher ?p }. OPTIONAL { ?bnode iiep:imprintDate ?d }}", + convert={'lang':lambda l: self.convert_lang(l, default_language_code)} + ) + + self.add_to_related_collection( + record.collations, + graph, + ['collation', 'lang'], + "SELECT ?c (lang(COALESCE(?c)) as ?lang) WHERE { ?s iiep:collation ?c . }", + bindings={'s':URIRef(record.uri)}, + convert={'lang':lambda l: self.convert_lang(l, default_language_code)} + ) + + self.add_to_related_collection( + record.volumeIssues, + graph, + ['volume', 'number', 'lang'], + "SELECT ?v ?n (lang(COALESCE(?v, ?n)) as ?lang) WHERE { [ iiep:volumeIssue ?bnode ]. OPTIONAL { ?bnode iiep:volume ?v }. OPTIONAL { ?bnode iiep:number ?v }}", + convert={'lang':lambda l: self.convert_lang(l, default_language_code)} + ) + + self.add_to_related_collection( + record.urls, + graph, + ['address', 'display', 'accessLevel'], + "SELECT ?a ?d ?al WHERE { [ iiep:url ?bnode ]. OPTIONAL { ?bnode iiep:address ?a }. OPTIONAL { ?bnode iiep:display ?d }. OPTIONAL { ?bnode iiep:accessLevel ?al }.}", + ) + + return record + + + def filter_node(self, node, graph, res_graph): + for p,o in graph[node]: + res_graph.add((node,p,o)) + if isinstance(o, BNode): + self.filter_node(o, graph, res_graph) + + + + def calculate_records_nb(self, records_url): + context = ET.iterparse(records_url, events=("end",)) + i = 0 + for _,elem in context: + if elem.tag == "{%s}Record" % IIEP: + i += 1 + return i + + def process_url(self, records_url, options): + + total_records = self.calculate_records_nb(records_url) + writer = None + errors=[] + + context = ET.iterparse(records_url, events=("end",)) + i = 0 + for event,elem in context: + if elem.tag == "{%s}Record" % IIEP: + i += 1 + writer = show_progress(i, total_records, "Processing record nb %d " % i, 50, writer=writer) + try: + record_graph = self.get_empty_graph() + record_graph.parse(data=ET.tostring(elem, encoding='utf-8'), format='xml') + # add transaction management + self.build_record(record_graph) + except Exception as e: + transaction.rollback() + msg = "Error processing resource %d in %s : %s" % (i, records_url, repr(e)) + logger.exception(msg) + errors.append((i, records_url, msg)) + else: + transaction.commit() + + if i%self.batch_size == 0: + reset_queries() + + return errors + + + # def process_url(self, records_url, options): + # #open graph with rdflib + # #TODO: manage memory + # g = Graph() + # print("Loading %s" % records_url) + # g.parse(records_url) + # print("Parsing %s done" % records_url) + # for i,record_uri in enumerate(g[:RDF.type:IIEP.Record]): + # print(i, repr(record_uri)) + # record_graph = self.get_empty_graph() + # self.filter_node(record_uri, g, record_graph) + # self.build_record(record_graph) + # if i > 3: + # break + + + + + def handle(self, *args, **options): + + self.batch_size = options.get('batch_size', 50) + transaction.enter_transaction_management() + transaction.managed(True) + + for records_url in args: + print("Processing %s" % records_url) + errors = self.process_url(records_url, options) + print("Processing %s Done" % records_url) + if errors: + print("%d error(s) when processing %s, check your log file." % (len(errors), records_url)) + + transaction.leave_transaction_management() +