|
1 # -*- coding: utf-8 -*- |
|
2 |
|
3 from p4l.utils import show_progress, get_code_from_language_uri |
|
4 from p4l.models import Record, Language |
|
5 from rdflib import Graph, Namespace, BNode, URIRef |
|
6 from rdflib.plugins.sparql import prepareQuery |
|
7 from django.core.management import BaseCommand |
|
8 from optparse import make_option |
|
9 import xml.etree.cElementTree as ET |
|
10 from django.db import reset_queries, transaction |
|
11 import traceback |
|
12 import logging |
|
13 import sys |
|
14 |
|
15 logger = logging.getLogger(__name__) |
|
16 |
|
17 |
|
18 RDF = Namespace("http://www.w3.org/1999/02/22-rdf-syntax-ns#") |
|
19 RDFS = Namespace("http://www.w3.org/2000/01/rdf-schema#") |
|
20 DCT = Namespace("http://purl.org/dc/terms/") |
|
21 IIEP = Namespace("http://www.iiep.unesco.org/plan4learning/model.owl#") |
|
22 UNESCO = Namespace("http://www.iiep.unesco.org/Ontology/") |
|
23 |
|
24 DEFAULT_LANGUAGE_URI = "http://psi.oasis-open.org/iso/639/#eng" |
|
25 |
|
26 DEFAULT_LANGUAGE_QUERY = """SELECT ( COALESCE(?lang, ?other_lang) as ?main_lang) WHERE { |
|
27 OPTIONAL { ?s dct:language ?lang }. |
|
28 OPTIONAL { ?s iiep:otherLanguage ?other_lang }. |
|
29 }""" |
|
30 |
|
31 |
|
32 class Command(BaseCommand): |
|
33 |
|
34 args = "record_url ..." |
|
35 |
|
36 help = "Import p4l record rdf format" |
|
37 |
|
38 option_list = BaseCommand.option_list + ( |
|
39 make_option('-b', '--batch-size', |
|
40 dest= 'batch_size', |
|
41 type='int', |
|
42 default= 50, |
|
43 help= 'number of object to import in bulk operations' |
|
44 ), |
|
45 ) |
|
46 |
|
47 def __init__(self, *args, **kwargs): |
|
48 super(Command, self).__init__(*args, **kwargs) |
|
49 self.__query_cache = {} |
|
50 |
|
51 |
|
52 def __get_sparql_query(self, query, namespaces): |
|
53 |
|
54 return self.__query_cache[query] \ |
|
55 if query in self.__query_cache \ |
|
56 else self.__query_cache.setdefault(query, prepareQuery(query, initNs=namespaces)) |
|
57 |
|
58 def get_empty_graph(self): |
|
59 record_graph = Graph() |
|
60 record_graph.bind('iiep',"http://www.iiep.unesco.org/plan4learning/model.owl#") |
|
61 record_graph.bind('dct',"http://purl.org/dc/terms/") |
|
62 return record_graph |
|
63 |
|
64 def extract_single_value_form_graph(self, graph, q, bindings={}, index=0, convert=lambda v:unicode(v) if v is not None else None): |
|
65 return next(self.extract_multiple_values_from_graph(graph, q, bindings, index, convert), None) |
|
66 |
|
67 def extract_multiple_values_from_graph(self, graph, q, bindings={}, index=0, convert=lambda v:unicode(v) if v is not None else None): |
|
68 |
|
69 index_list = index |
|
70 if isinstance(index, int): |
|
71 index_list = range(index+1) |
|
72 |
|
73 if hasattr(convert, '__call__'): |
|
74 convert_dict = dict((k, convert) for k in index_list) |
|
75 else: |
|
76 convert_dict = convert |
|
77 |
|
78 convert_dict = dict((k, f if hasattr(f,'__call__') else lambda v:unicode(v) if v is not None else None) for k,f in convert_dict.iteritems()) |
|
79 |
|
80 for row in graph.query(self.__get_sparql_query(q, dict(graph.namespaces())), initBindings=bindings): |
|
81 if len(row) < len(index_list): |
|
82 break |
|
83 else: |
|
84 res = dict([ (k, convert_dict.get(k, lambda v:unicode(v) if v is not None else None)(v)) for k, v in zip(index_list, row)]) |
|
85 if isinstance(index, int): |
|
86 yield res[index] |
|
87 else: |
|
88 yield res |
|
89 |
|
90 |
|
91 def convert_bool(self, val): |
|
92 if val == True or val == False: |
|
93 return val |
|
94 if val is None: |
|
95 return False |
|
96 if isinstance(val, basestring): |
|
97 if len(val) == 0: |
|
98 return False |
|
99 if val[0].lower() in ['t','y','1','o']: |
|
100 return True |
|
101 else: |
|
102 return False |
|
103 return bool(val) |
|
104 |
|
105 def convert_lang(self, val, default_lang): |
|
106 return unicode(val) if (val is not None and len(unicode(val))>0) else default_lang |
|
107 |
|
108 |
|
109 def get_record_default_language(self, g, record_uri): |
|
110 lang_uri = self.extract_single_value_form_graph(g, DEFAULT_LANGUAGE_QUERY, bindings={'s': URIRef(record_uri)}) |
|
111 if not lang_uri: |
|
112 lang_uri = DEFAULT_LANGUAGE_URI |
|
113 lang_code = get_code_from_language_uri(lang_uri) |
|
114 if lang_code is None: |
|
115 logger.warn("get_record_default_language: no code found for %s in record %s" % (lang_uri, record_uri)) |
|
116 return get_code_from_language_uri(DEFAULT_LANGUAGE_URI) |
|
117 return lang_code |
|
118 |
|
119 |
|
120 def add_to_related_collection(self, coll, graph, fields, q, bindings={}, convert=lambda v: unicode(v) if v is not None else None, through_fields=None): |
|
121 |
|
122 for val in self.extract_multiple_values_from_graph(graph, q, bindings=bindings, index=fields, convert=convert): |
|
123 |
|
124 if through_fields: |
|
125 new_obj_val = dict([(k,v) for k,v in val.iteritems() if k not in through_fields]) |
|
126 else: |
|
127 new_obj_val = val |
|
128 |
|
129 if hasattr(coll, 'through'): |
|
130 new_obj_rel, _ = coll.model.objects.get_or_create(**new_obj_val) |
|
131 if through_fields: |
|
132 through_vals = {coll.source_field_name: coll.instance, coll.target_field_name: new_obj_rel} |
|
133 through_vals.update(dict([(k,v) for k,v in val.iteritems() if k in through_fields])) |
|
134 coll.through.objects.create(**through_vals) |
|
135 new_obj = None |
|
136 else: |
|
137 new_obj = new_obj_rel |
|
138 |
|
139 else: |
|
140 new_obj = coll.create(**new_obj_val) |
|
141 |
|
142 if new_obj: |
|
143 coll.add(new_obj) |
|
144 |
|
145 |
|
146 |
|
147 |
|
148 def build_record(self, graph): |
|
149 |
|
150 record_uri = self.extract_single_value_form_graph(graph,"SELECT DISTINCT ?s WHERE { ?s rdf:type iiep:Record .}") |
|
151 default_language_code = self.get_record_default_language(graph, record_uri) |
|
152 |
|
153 record = Record() |
|
154 record.uri = record_uri |
|
155 record.identifier = self.extract_single_value_form_graph(graph,"SELECT DISTINCT ?o WHERE { ?s dct:identifier ?o .}", bindings={'s':URIRef(record.uri)}) |
|
156 record.notes = self.extract_single_value_form_graph(graph,"SELECT DISTINCT ?o WHERE { ?s iiep:notes ?o .}", bindings={'s':URIRef(record.uri)}) |
|
157 record.recordType = self.extract_single_value_form_graph(graph,"SELECT DISTINCT ?o WHERE { ?s dct:type ?o .}", bindings={'s':URIRef(record.uri)}) |
|
158 record.isDocumentPart = self.extract_single_value_form_graph(graph,"SELECT DISTINCT ?o WHERE { ?s iiep:isDocumentPart ?o .}", bindings={'s':URIRef(record.uri)}, convert=self.convert_bool) |
|
159 record.editionStatement = self.extract_single_value_form_graph(graph,"SELECT DISTINCT ?o WHERE { ?s iiep:editionStatement ?o .}", bindings={'s':URIRef(record.uri)}) |
|
160 |
|
161 language = self.extract_single_value_form_graph(graph,"SELECT DISTINCT ?o WHERE { ?s dct:language ?o .}", bindings={'s':URIRef(record.uri)}) |
|
162 if language: |
|
163 record.language, _ = Language.objects.get_or_create(language=language) |
|
164 |
|
165 record.save() |
|
166 |
|
167 self.add_to_related_collection(record.otherLanguages, graph, ['language'], "SELECT ?o WHERE { ?s iiep:otherLanguage ?o .}", bindings={'s':URIRef(record.uri)}) |
|
168 self.add_to_related_collection(record.subjects, graph, ['subject'], "SELECT ?o WHERE { ?s dct:subject ?o .}", bindings={'s':URIRef(record.uri)}) |
|
169 self.add_to_related_collection(record.themes, graph, ['theme'], "SELECT ?o WHERE { ?s iiep:theme ?o .}", bindings={'s':URIRef(record.uri)}) |
|
170 self.add_to_related_collection(record.countries, graph, ['country'], "SELECT ?o WHERE { ?s iiep:country ?o .}", bindings={'s':URIRef(record.uri)}) |
|
171 self.add_to_related_collection(record.authors, graph, ['name'], "SELECT ?o WHERE { ?s iiep:author ?o .}", bindings={'s':URIRef(record.uri)}) |
|
172 self.add_to_related_collection(record.subjectPersons, graph, ['name'], "SELECT ?o WHERE { ?s iiep:subjectPerson ?o .}", bindings={'s':URIRef(record.uri)}) |
|
173 self.add_to_related_collection(record.projectNames, graph, ['label','acronym'], "SELECT ?l ?a WHERE { [ iiep:projectName ?bnode ]. ?bnode rdfs:label ?l. OPTIONAL { ?bnode iiep:acronym ?a } }") |
|
174 |
|
175 self.add_to_related_collection( |
|
176 record.periodicals, |
|
177 graph, |
|
178 ['label','lang'], |
|
179 "SELECT DISTINCT ?o ( lang(?o) as ?l) WHERE { ?s iiep:periodical ?o .}", |
|
180 bindings={'s':URIRef(record.uri)}, |
|
181 convert={'lang':lambda l: self.convert_lang(l, default_language_code)}, |
|
182 through_fields = ['lang'] |
|
183 ) |
|
184 |
|
185 self.add_to_related_collection( |
|
186 record.meetings, |
|
187 graph, |
|
188 ['label', 'meetingNumber', 'meetingPlace', 'meetingDate', 'meetingYear', 'lang'], |
|
189 "SELECT ?l ?mn ?mp ?md ?my (lang(COALESCE(?l,?nm, ?mp,?md,?my)) as ?lang) WHERE { [iiep:meeting ?bnode]. OPTIONAL { ?bnode rdfs:label ?l }. OPTIONAL { ?bnode iiep:meetingNumber ?mn }. OPTIONAL { ?bnode iiep:meetingPlace ?mp }. OPTIONAL { ?bnode iiep:meetingDate ?md }. OPTIONAL { ?bnode iiep:meetingYear ?my }}", |
|
190 convert={'lang':lambda l: self.convert_lang(l, default_language_code), 'meetingYear' : lambda y: int(y) if y is not None else None}, |
|
191 through_fields = ['lang'] |
|
192 ) |
|
193 |
|
194 self.add_to_related_collection( |
|
195 record.series, |
|
196 graph, |
|
197 ['title', 'volume', 'lang'], |
|
198 "SELECT ?t ?vol (lang(COALESCE(?t,?vol)) as ?lang) WHERE { [iiep:serie ?bnode]. OPTIONAL { ?bnode dct:title ?t }. OPTIONAL { ?bnode iiep:volume ?vol } }", |
|
199 convert={'lang':lambda l: self.convert_lang(l, default_language_code)}, |
|
200 through_fields = ['lang'] |
|
201 ) |
|
202 |
|
203 self.add_to_related_collection( |
|
204 record.subjectCorporateBodies, |
|
205 graph, |
|
206 ['label', 'acronym'], |
|
207 "SELECT ?l ?a WHERE { [iiep:subjectCorporateBody ?bnode]. OPTIONAL { ?bnode rdfs:label ?l }. OPTIONAL { ?bnode iiep:acronym ?a } }", |
|
208 ) |
|
209 |
|
210 self.add_to_related_collection( |
|
211 record.subjectMeetings, |
|
212 graph, |
|
213 ['label', 'meetingNumber', 'meetingPlace', 'meetingDate', 'meetingYear'], |
|
214 "SELECT ?l ?mn ?mp ?md ?my WHERE { [iiep:meeting ?bnode]. OPTIONAL { ?bnode rdfs:label ?l }. OPTIONAL { ?bnode iiep:meetingNumber ?mn }. OPTIONAL { ?bnode iiep:meetingPlace ?mp }. OPTIONAL { ?bnode iiep:meetingDate ?md }. OPTIONAL { ?bnode iiep:meetingYear ?my }}", |
|
215 convert={'meetingYear' : lambda y: int(y) if y is not None else None} |
|
216 ) |
|
217 |
|
218 self.add_to_related_collection( |
|
219 record.corporateAuthors, |
|
220 graph, |
|
221 ['label', 'acronym'], |
|
222 "SELECT ?l ?a WHERE { [iiep:corporateAuthor ?bnode]. OPTIONAL { ?bnode rdfs:label ?l }. OPTIONAL { ?bnode iiep:acronym ?a } }", |
|
223 ) |
|
224 |
|
225 self.add_to_related_collection( |
|
226 record.issns, |
|
227 graph, |
|
228 ['issn', 'lang'], |
|
229 "SELECT ?issn (lang(COALESCE(?issn)) as ?lang) WHERE { ?s iiep:issn ?issn . }", |
|
230 bindings={'s':URIRef(record.uri)}, |
|
231 convert={'lang':lambda l: self.convert_lang(l, default_language_code)} |
|
232 ) |
|
233 |
|
234 self.add_to_related_collection( |
|
235 record.isbns, |
|
236 graph, |
|
237 ['isbn', 'lang'], |
|
238 "SELECT ?isbn (lang(COALESCE(?isbn)) as ?lang) WHERE { ?s iiep:isbn ?isbn . }", |
|
239 bindings={'s':URIRef(record.uri)}, |
|
240 convert={'lang':lambda l: self.convert_lang(l, default_language_code)} |
|
241 ) |
|
242 |
|
243 self.add_to_related_collection( |
|
244 record.documentCodes, |
|
245 graph, |
|
246 ['documentCode', 'lang'], |
|
247 "SELECT ?c (lang(COALESCE(?c)) as ?lang) WHERE { ?s iiep:documentCode ?c . }", |
|
248 bindings={'s':URIRef(record.uri)}, |
|
249 convert={'lang':lambda l: self.convert_lang(l, default_language_code)} |
|
250 ) |
|
251 |
|
252 self.add_to_related_collection( |
|
253 record.titles, |
|
254 graph, |
|
255 ['title', 'lang'], |
|
256 "SELECT ?t (lang(COALESCE(?t)) as ?lang) WHERE { ?s dct:title ?t . }", |
|
257 bindings={'s':URIRef(record.uri)}, |
|
258 convert={'lang':lambda l: self.convert_lang(l, default_language_code)} |
|
259 ) |
|
260 |
|
261 self.add_to_related_collection( |
|
262 record.addedTitles, |
|
263 graph, |
|
264 ['title', 'lang'], |
|
265 "SELECT ?t (lang(COALESCE(?t)) as ?lang) WHERE { ?s iiep:addedTitle ?t . }", |
|
266 bindings={'s':URIRef(record.uri)}, |
|
267 convert={'lang':lambda l: self.convert_lang(l, default_language_code)} |
|
268 ) |
|
269 |
|
270 self.add_to_related_collection( |
|
271 record.titlesMainDocument, |
|
272 graph, |
|
273 ['title', 'lang'], |
|
274 "SELECT ?t (lang(COALESCE(?t)) as ?lang) WHERE { ?s iiep:titleMainDocument ?t . }", |
|
275 bindings={'s':URIRef(record.uri)}, |
|
276 convert={'lang':lambda l: self.convert_lang(l, default_language_code)} |
|
277 ) |
|
278 |
|
279 self.add_to_related_collection( |
|
280 record.imprints, |
|
281 graph, |
|
282 ['imprintCity', 'publisher', 'imprintDate', 'lang'], |
|
283 "SELECT ?c ?p ?d (lang(COALESCE(?c, ?p, ?d)) as ?lang) WHERE { [ iiep:imprint ?bnode ]. OPTIONAL { ?bnode iiep:imprintCity ?c }. OPTIONAL { ?bnode dct:publisher ?p }. OPTIONAL { ?bnode iiep:imprintDate ?d }}", |
|
284 convert={'lang':lambda l: self.convert_lang(l, default_language_code)} |
|
285 ) |
|
286 |
|
287 self.add_to_related_collection( |
|
288 record.collations, |
|
289 graph, |
|
290 ['collation', 'lang'], |
|
291 "SELECT ?c (lang(COALESCE(?c)) as ?lang) WHERE { ?s iiep:collation ?c . }", |
|
292 bindings={'s':URIRef(record.uri)}, |
|
293 convert={'lang':lambda l: self.convert_lang(l, default_language_code)} |
|
294 ) |
|
295 |
|
296 self.add_to_related_collection( |
|
297 record.volumeIssues, |
|
298 graph, |
|
299 ['volume', 'number', 'lang'], |
|
300 "SELECT ?v ?n (lang(COALESCE(?v, ?n)) as ?lang) WHERE { [ iiep:volumeIssue ?bnode ]. OPTIONAL { ?bnode iiep:volume ?v }. OPTIONAL { ?bnode iiep:number ?v }}", |
|
301 convert={'lang':lambda l: self.convert_lang(l, default_language_code)} |
|
302 ) |
|
303 |
|
304 self.add_to_related_collection( |
|
305 record.urls, |
|
306 graph, |
|
307 ['address', 'display', 'accessLevel'], |
|
308 "SELECT ?a ?d ?al WHERE { [ iiep:url ?bnode ]. OPTIONAL { ?bnode iiep:address ?a }. OPTIONAL { ?bnode iiep:display ?d }. OPTIONAL { ?bnode iiep:accessLevel ?al }.}", |
|
309 ) |
|
310 |
|
311 return record |
|
312 |
|
313 |
|
314 def filter_node(self, node, graph, res_graph): |
|
315 for p,o in graph[node]: |
|
316 res_graph.add((node,p,o)) |
|
317 if isinstance(o, BNode): |
|
318 self.filter_node(o, graph, res_graph) |
|
319 |
|
320 |
|
321 |
|
322 def calculate_records_nb(self, records_url): |
|
323 context = ET.iterparse(records_url, events=("end",)) |
|
324 i = 0 |
|
325 for _,elem in context: |
|
326 if elem.tag == "{%s}Record" % IIEP: |
|
327 i += 1 |
|
328 return i |
|
329 |
|
330 def process_url(self, records_url, options): |
|
331 |
|
332 total_records = self.calculate_records_nb(records_url) |
|
333 writer = None |
|
334 errors=[] |
|
335 |
|
336 context = ET.iterparse(records_url, events=("end",)) |
|
337 i = 0 |
|
338 for event,elem in context: |
|
339 if elem.tag == "{%s}Record" % IIEP: |
|
340 i += 1 |
|
341 writer = show_progress(i, total_records, "Processing record nb %d " % i, 50, writer=writer) |
|
342 try: |
|
343 record_graph = self.get_empty_graph() |
|
344 record_graph.parse(data=ET.tostring(elem, encoding='utf-8'), format='xml') |
|
345 # add transaction management |
|
346 self.build_record(record_graph) |
|
347 except Exception as e: |
|
348 transaction.rollback() |
|
349 msg = "Error processing resource %d in %s : %s" % (i, records_url, repr(e)) |
|
350 logger.exception(msg) |
|
351 errors.append((i, records_url, msg)) |
|
352 else: |
|
353 transaction.commit() |
|
354 |
|
355 if i%self.batch_size == 0: |
|
356 reset_queries() |
|
357 |
|
358 return errors |
|
359 |
|
360 |
|
361 # def process_url(self, records_url, options): |
|
362 # #open graph with rdflib |
|
363 # #TODO: manage memory |
|
364 # g = Graph() |
|
365 # print("Loading %s" % records_url) |
|
366 # g.parse(records_url) |
|
367 # print("Parsing %s done" % records_url) |
|
368 # for i,record_uri in enumerate(g[:RDF.type:IIEP.Record]): |
|
369 # print(i, repr(record_uri)) |
|
370 # record_graph = self.get_empty_graph() |
|
371 # self.filter_node(record_uri, g, record_graph) |
|
372 # self.build_record(record_graph) |
|
373 # if i > 3: |
|
374 # break |
|
375 |
|
376 |
|
377 |
|
378 |
|
379 def handle(self, *args, **options): |
|
380 |
|
381 self.batch_size = options.get('batch_size', 50) |
|
382 transaction.enter_transaction_management() |
|
383 transaction.managed(True) |
|
384 |
|
385 for records_url in args: |
|
386 print("Processing %s" % records_url) |
|
387 errors = self.process_url(records_url, options) |
|
388 print("Processing %s Done" % records_url) |
|
389 if errors: |
|
390 print("%d error(s) when processing %s, check your log file." % (len(errors), records_url)) |
|
391 |
|
392 transaction.leave_transaction_management() |
|
393 |