# -*- coding: utf-8 -*-
'''
Created on May 25, 2011
- after `import_rdf` call commands `import_hdabo_db -c` then `rebuild_index`
@author: ymh
'''
import base64
import collections
import logging
import math
from optparse import make_option
import os
import shutil
import sys
import tempfile
import RDF
from django.core.management.base import BaseCommand, CommandError
from django.core.management.color import no_style
from django.db import connections, DEFAULT_DB_ALIAS
from django.db import models
from django.db import transaction
import isodate
import pytz
from hdabo.models import (Datasheet, DocumentFormat, Domain, Organisation,
Tag, TaggedSheet, TimePeriod, Location, TagCategory)
logger = logging.getLogger(__name__)
RDF_EXT_MAP = {
'.xml': 'rdfxm',
'.rdf': 'rdfxm',
'.ttl': 'turtle',
'.nt': 'ntriples'
}
class ProcessedObjects(models.Model):
obj_name = models.CharField(max_length=1024, db_index=True, null=False, blank=False)
obj_id = models.IntegerField(db_index=True, null=False, blank=False)
def __unicode__(self):
return "ProcessedObject -> id : %d, obj_name: %s, obj_id: %d" % (self.id or "None", self.obj_name, self.obj_id)
class Meta:
app_label = 'import_rdf'
class KeyDefaultdict(collections.defaultdict):
def __missing__(self, key):
if self.default_factory:
dict.__setitem__(self, key, self.default_factory(key))
return self[key]
else:
collections.defaultdict.__missing__(self, key)
class GraphCacheMap(collections.defaultdict):
def __init__(self, key=[]):
self.key = key
super(GraphCacheMap, self).__init__(GraphCacheMap)
self.key_index = 0
def get_index(self):
self.key_index += 1
return "%016x" % self.key_index
class GraphCache(object):
def __init__(self, temp_folder=tempfile.tempdir, fmt='turtle'):
self.base_temp_folder = temp_folder
self.temp_folder = None
self.format = fmt
self.filename_map = GraphCacheMap()
def clear_filename_cache(self, keys=None):
"""Clear from filename cache (memory)
:param tuple keys: the key to clear. ``None`` will clear all.
"""
if not keys:
self.filename_map.clear()
return
if not isinstance(keys, (list,tuple)):
keys = (keys,)
current_cache_dict = self.filename_map
for k in keys[:-1]:
current_cache_dict = current_cache_dict.get(k)
if not isinstance(current_cache_dict,dict):
raise KeyError("%r not found " % keys)
current_cache_dict.pop(keys[-1])
def clear(self, keys=None):
"""Clear from disk cache
:param tuple keys: the key to clear. ``None`` will clear all.
"""
if not keys:
path = self.temp_folder
self.temp_folder = None
self.filename_map.clear()
else:
path = self.__build_path(keys)
if not os.path.exists(path):
raise KeyError("%r not found" % keys)
if os.path.isfile(path):
os.remove(path)
else:
shutil.rmtree(path)
self.clear_filename_cache(keys)
def get_from_filename_cache(self, keys):
"""get from filename cache.
:raises: KeyError if not found
"""
if not keys:
raise KeyError("Keys is None or empty")
if not isinstance(keys, (tuple,list)):
keys = (keys,)
#print("FILENAME MAP : " + repr(self.filename_map))
try:
res = reduce(lambda d,k: d[k], keys, self.filename_map)
if(isinstance(res,GraphCacheMap)):
return res.key
else:
return res
except:
KeyError("%r not found" % keys)
def __build_path(self, keys):
if not keys:
raise KeyError("Keys is None or empty")
if not isinstance(keys, (tuple,list)):
keys = (keys,)
file_parts = self.get_from_filename_cache(keys)
if file_parts:
return os.path.join(self.temp_folder,os.path.join(*file_parts))
else:
return self.temp_folder
def get_len(self, keys):
if not isinstance(keys, (tuple,list)):
keys = [keys,]
elif isinstance(keys, tuple):
keys = list(keys)
res = reduce(lambda d,k: d[k], keys, self.filename_map)
return len(res)
#TODO: treat errors
def get_iter(self,keys):
if not isinstance(keys, (tuple,list)):
keys = [keys,]
elif isinstance(keys, tuple):
keys = list(keys)
res = reduce(lambda d,k: d[k], keys, self.filename_map)
for graphfile_key in res.keys():
g = RDF.Model()
parser=RDF.Parser(name=self.format)
parser.parse_into_model(g, "file://"+self.__build_path(keys + [graphfile_key,]))
yield g
def get(self, keys, default=None):
"""get graph from disk cache
:raises: KeyError if file not found"""
path = self.__build_path(keys)
if not os.path.exists(path):
return default
if not os.path.isfile(path):
raise KeyError("%r found but not a file" % keys)
#g = rdflib.Graph()
g = RDF.Model()
try:
# g.parse(path, format=self.format)
parser=RDF.Parser(name=self.format)
parser.parse_into_model(g, "file://"+path)
except Exception as e:
raise KeyError("Bad key %r. Error when reading file %r" % (keys, e))
return g
def get_content(self, keys, default=[]):
"""get dir content from disk cache
:returns: list of (keys, name, is_dir) with is_dir = True when item is a sub folder. Default when nothing is found.
:raises: KeyError if dir exists but is not folder"""
vir_folder = self.get_from_filename_cache(keys)
if not isinstance(vir_folder, dict):
raise KeyError("Bad key %r. path is not a folder" % keys)
return [(tuple(keys)+(k,),k,isinstance(v, dict)) for k,v in vir_folder.items()]
def put_in_filename_cache(self, keys, val):
if not keys:
raise KeyError("Keys is None or empty")
if not isinstance(keys, (tuple,list)):
keys = (keys,)
reduce(lambda d,k: d.setdefault(k,{}), keys[:-1], self.obj_cache)[keys[-1]] = val
def put(self, keys, g):
if g is None:
raise Exception("Null graph")
if not self.temp_folder or not os.path.exists(self.temp_folder):
self.temp_folder = tempfile.mkdtemp(dir=self.base_temp_folder)
if not os.path.isdir(self.temp_folder):
raise Exception("Temp folder for disk cache is not a dir")
path_parts = []
current_map = self.filename_map
for k in keys[:-1]:
if k in current_map:
current_map = current_map[k]
else:
new_key = current_map.key + [current_map.get_index(),]
current_map = current_map.setdefault(k, GraphCacheMap(new_key))
path_parts.append(current_map.key[-1])
current_index = current_map.get_index()
path_parts.append(current_index)
current_map[keys[-1]] = path_parts;
if len(keys)>1:
path_dir = self.__build_path(keys[:-1])
if not os.path.isdir(path_dir):
os.makedirs(path_dir)
path = self.__build_path(keys)
ser = RDF.Serializer(name=self.format)
ser.serialize_model_to_file(path, g)
HDA_ONTOLOGY_BASE_URL = 'http://data.culture.fr/ontologies/hda/0.1#'
TYPES_LIST = [
'Categorie',
'Tag',
'Site',
'Ville',
'Institution',
'Theme',
'Domaine',
'SousDomaine',
'Periode',
'Notice'
]
TARGET_TYPE_LIST = [
TaggedSheet,
Datasheet,
Domain,
TimePeriod,
Tag,
TagCategory,
Organisation,
Location,
DocumentFormat
]
RDF_NS = RDF.NS('http://www.w3.org/1999/02/22-rdf-syntax-ns#')
RDFS_NS = RDF.NS('http://www.w3.org/2000/01/rdf-schema#')
HDA_NS = RDF.NS(HDA_ONTOLOGY_BASE_URL)
DC_NS = RDF.NS("http://purl.org/dc/terms/")
INSEE_NS = RDF.NS("http://rdf.insee.fr/def/geo#")
XMLSCHEMA_NS = RDF.NS("http://www.w3.org/2001/XMLSchema#")
FOAF_NS = RDF.NS("http://xmlns.com/foaf/0.1/")
DBPEDIA_NS = RDF.NS("http://dbpedia.org/ontology/")
def show_progress(current_line, total_line, width, message=""):
percent = (float(current_line) / float(total_line)) * 100.0
marks = math.floor(width * (percent / 100.0))
spaces = math.floor(width - marks)
loader = '[' + ('=' * int(marks)) + (' ' * int(spaces)) + ']'
message_fill = " " * max(0,width - len(message))
sys.stdout.write("%s %d%% %d/%d %s%s\r" % (loader, percent, current_line, total_line, message, message_fill)) #takes the header into account
if percent >= 100:
sys.stdout.write("\n")
sys.stdout.flush()
class RdfImporter(object):
importer_stats = KeyDefaultdict(lambda k: {'object_name': k, 'created': 0, 'updated': 0, 'untouched': 0, 'deleted': 0})
def __init__(self, g, obj_type):
self.graph = g
self.obj_type = obj_type
self.obj_uri_node = self.graph.get_source(RDF_NS.type, HDA_NS[self.obj_type])
if self.obj_uri_node is None:
raise Exception("For importer %s type node is not foud" % self.obj_type)
self.attr_rdf_map = {'id': DC_NS.identifier}
self.attr_cache = {}
def import_graph(self):
for obj_instance, created, dirty in self.do_import_graph():
obj_name = type(obj_instance).__name__
ProcessedObjects.objects.create(obj_name = obj_name, obj_id = obj_instance.pk)
obj_stat = self.importer_stats[obj_name]
if created:
obj_stat['created'] += 1
elif dirty:
obj_stat['updated'] += 1
else:
obj_stat['untouched'] += 1
def do_import_graph(self):
pass
def _process_attr_node(self, attr_node):
if attr_node is None:
return None
if attr_node.is_resource():
return str(attr_node.uri)
if attr_node.is_blank():
new_g = RDF.Model()
qs = RDF.Statement(subject=attr_node, predicate=None, object=None)
new_g.add_statements(self.graph.find_statements(qs))
return new_g
#attr_node is litteral
lit_val = attr_node.literal_value
res_type = lit_val['datatype']
res_raw = lit_val['string']
res = res_raw
if res_type == XMLSCHEMA_NS.dateTime.uri:
res = isodate.parse_datetime(res_raw)
elif res_type == XMLSCHEMA_NS.integer.uri:
res = int(res_raw)
return res
def __getattr__(self, name):
if name in ('attr_cache'):
raise AttributeError, name
if name in self.attr_cache:
return self.attr_cache[name]
if name not in self.attr_rdf_map:
raise AttributeError("%s not in attributes", name)
res = None
if isinstance(self.attr_rdf_map[name], (list, tuple, set)):
res = reduce(
lambda r, pred: r + [self._process_attr_node(attr_node) for attr_node in self.graph.get_targets(self.obj_uri_node, pred)],
self.attr_rdf_map[name],
[]
)
else:
attr_node = self.graph.get_target(self.obj_uri_node, self.attr_rdf_map[name])
res = self._process_attr_node(attr_node)
self.attr_cache[name] = res
return res
def RdfImporterFactory(type_name):
newImporter = type(type_name+"RdfImporter", (RdfImporter,), {"__init__": lambda self,g: RdfImporter.__init__(self, g, type_name)})
return newImporter
class VilleImporter(RdfImporter):
def __init__(self, g):
super(VilleImporter, self).__init__(g, 'Ville')
self.attr_rdf_map.update({
'codeINSEE': INSEE_NS.codeINSEE,
'nom': INSEE_NS.nom
})
def do_import_graph(self):
loc, created = Location.objects.get_or_create(insee=self.codeINSEE)
dirty = False
if loc.name != self.nom:
loc.name = self.nom
dirty = True
if dirty:
loc.save()
return [(loc, created, dirty), ]
class CategorieImporter(RdfImporter):
def __init__(self, g):
super(CategorieImporter, self).__init__(g, 'Categorie')
self.attr_rdf_map.update({
'label': RDFS_NS.label
})
def do_import_graph(self):
cat, created = TagCategory.objects.get_or_create(label=self.label)
dirty = True
val = self.id
if cat.natural_key != val:
cat.natural_key = val
dirty = True
if dirty :
cat.save()
return [(cat, created, dirty), ]
class TagImporter(RdfImporter):
def __init__(self, g):
super(TagImporter, self).__init__(g, 'Tag')
self.attr_rdf_map.update({
"label": RDFS_NS.label,
"alternative_label": HDA_NS.alternativeLabel,
"normalized_label": HDA_NS.normalizedLabel,
"original_label": HDA_NS.originalLabel,
"alias": HDA_NS.alias,
"category": HDA_NS.category,
"wikipedia_url": HDA_NS.wikipediaURL,
"wikipedia_pageid": HDA_NS.wikipediaPageId,
"alternative_wikipedia_url": HDA_NS.alternativeWikipediaURL,
"alternative_wikipedia_pageid": HDA_NS.alternativeWikipediaPageId,
"url_status": HDA_NS.urlStatus,
"dbpedia_uri": HDA_NS.dbpediaURI,
"popularity": HDA_NS.popularity,
"created_at": DC_NS.created
})
def do_import_graph(self):
if self.id == "___":
return []
tag, created = Tag.objects.get_or_create(label=self.label, original_label=self.original_label, url_status=self.url_status, defaults={'natural_key': self.id})
dirty = False
tag.force_natural_key = True
tag.natural_key = self.id
for field in [f for f in self.attr_rdf_map.keys() if f not in ['label', 'original_label', 'url_status']]:
if field == 'id':
val = self.id
if tag.natural_key != val:
dirty = True
tag.natural_key = val
elif field == 'created_at':
val = self.created_at
if tag.created_at.replace(tzinfo=pytz.UTC) != val:
dirty = True
tag.created_at = val
elif field in ['alternative_wikipedia_pageid', 'wikipedia_pageid']:
val = getattr(self, field)
if val == '':
val = None
if getattr(tag, field) != val:
dirty = True
setattr(tag, field, val)
elif field != 'category':
val = getattr(self, field)
if getattr(tag, field) != val:
dirty = True
setattr(tag, field, val)
elif self.category:
catid = self.category.split("/")[-1]
newCat = TagCategory.objects.filter(natural_key=catid).first()
if tag.category != newCat:
tag.category = newCat
dirty = True
if dirty:
tag.save()
return [(tag, created, dirty), ]
class DomaineImporter(RdfImporter):
def __init__(self, g, obj_type='Domaine'):
super(DomaineImporter, self).__init__(g, obj_type)
self.attr_rdf_map.update({
'label': RDFS_NS.label,
'value': RDF_NS.value
})
def do_import_graph(self):
domain, created = Domain.objects.get_or_create(natural_key=self.id, defaults={'label': self.value, 'school_period': Domain.DOMAIN_PERIOD_DICT[u'Global']})
dirty = False
if domain.label != self.value:
domain.label=self.value
dirty = True
if domain.school_period != Domain.DOMAIN_PERIOD_DICT[u'Global']:
domain.school_period = Domain.DOMAIN_PERIOD_DICT[u'Global']
dirty = True
if dirty:
domain.save()
return [(domain, created, dirty),]
class SousDomaineImporter(DomaineImporter):
def __init__(self, g):
super(SousDomaineImporter, self).__init__(g, 'SousDomaine')
class ThemeImporter(DomaineImporter):
LEVEL_MAP = {
1: Domain.DOMAIN_PERIOD_DICT[u'Primaire'],
2: Domain.DOMAIN_PERIOD_DICT[u'Collège'],
3: Domain.DOMAIN_PERIOD_DICT[u'Lycée']
}
def __init__(self, g):
super(ThemeImporter, self).__init__(g, 'Theme')
self.attr_rdf_map.update({
'level': HDA_NS.niveau
})
def do_import_graph(self):
domain, created = Domain.objects.get_or_create(natural_key=self.id, defaults={'label': self.value, 'school_period': ThemeImporter.LEVEL_MAP[self.level]})
dirty = False
if domain.label != self.value:
domain.label=self.value
dirty = True
if domain.school_period != ThemeImporter.LEVEL_MAP[self.level]:
domain.school_period = ThemeImporter.LEVEL_MAP[self.level]
dirty = True
if dirty:
domain.save()
return [(domain, created, dirty),]
class PeriodeImporter(RdfImporter):
LEVEL_MAP = {
1: Domain.DOMAIN_PERIOD_DICT[u'Primaire'],
2: Domain.DOMAIN_PERIOD_DICT[u'Collège'],
3: Domain.DOMAIN_PERIOD_DICT[u'Lycée']
}
def __init__(self, g):
super(PeriodeImporter, self).__init__(g, 'Periode')
self.attr_rdf_map.update({
'value': RDF_NS.value,
'level': HDA_NS.niveau
})
def do_import_graph(self):
period, created = TimePeriod.objects.get_or_create(natural_key=self.id, defaults={'label': self.value, 'school_period': ThemeImporter.LEVEL_MAP[self.level]})
dirty = False
if period.label != self.value:
period.label=self.value
dirty = True
if period.school_period != ThemeImporter.LEVEL_MAP[self.level]:
period.school_period = ThemeImporter.LEVEL_MAP[self.level]
dirty = True
if dirty:
period.save()
return [(period, created, dirty),]
class SiteImporter(RdfImporter):
def __init__(self, g):
super(SiteImporter, self).__init__(g, 'Site')
self.attr_rdf_map.update({
'name': RDFS_NS.label,
'website': FOAF_NS.homepage,
})
def do_import_graph(self):
org, created = Organisation.objects.get_or_create(hda_id = self.id)
dirty = False
if self.name != org.name:
org.name = self.name
dirty = True
if self.website != org.website:
org.website = self.website
dirty = True
if dirty:
org.save()
return [(org, created, dirty),]
class InstitutionImporter(RdfImporter):
def __init__(self, g):
super(InstitutionImporter, self).__init__(g, 'Institution')
self.attr_rdf_map.update({
'name': RDFS_NS.label,
'website': FOAF_NS.homepage,
'location': HDA_NS.ville
})
def do_import_graph(self):
org, created = Organisation.objects.get_or_create(hda_id = self.id)
dirty = False
if self.name != org.name:
org.name = self.name
dirty = True
if self.website != org.website:
org.website = self.website
dirty = True
ville = self.location
if ville:
ville_id = ville.split("/")[-1]
location = Location.objects.filter(insee=ville_id).first()
if location and location.name != org.location:
org.location = location.name
dirty = True
if dirty:
org.save()
return [(org, created, dirty),]
class DocumentTagImporter(RdfImporter):
def __init__(self, g, datasheet):
super(DocumentTagImporter, self).__init__(g, 'DocumentTag')
self.datasheet = datasheet
self.attr_rdf_map.update({
'created_at': DC_NS.created,
'order': HDA_NS.order,
'original_order': HDA_NS.originalOrder,
'index_note': HDA_NS.indexNote,
'wikipedia_revision_id': DBPEDIA_NS.wikiPageRevisionID,
'tag': HDA_NS.tag
})
def do_import_graph(self):
tag = Tag.objects.filter(natural_key=self.tag.split("/")[-1]).first()
if tag is None:
logger.warn("Tag %r not found for datasheet %r", self.tag.split("/")[-1], self.datasheet.hda_id)
ts = TaggedSheet.objects.filter(tag=tag, datasheet=self.datasheet).first()
created = False
if ts is None:
ts = TaggedSheet.objects.create(tag=tag, datasheet=self.datasheet)
created = True
dirty = False
val = self.created_at
if ts.created_at.replace(tzinfo=pytz.UTC) != val:
dirty = True
ts.created_at = val
for field in ['order', 'original_order', 'index_note', 'wikipedia_revision_id']:
val = getattr(self, field)
if val == '':
val = None
if getattr(ts, field) != val:
setattr(ts, field, val)
dirty = True
if dirty:
ts.save()
return [(ts,created,dirty),]
class NoticeImporter(RdfImporter):
def __init__(self, g):
super(NoticeImporter, self).__init__(g, 'Notice')
#print("NOTICE IMPORTER")
#ser = RDF.Serializer(name='turtle')
#print(ser.serialize_model_to_string(g))
self.attr_rdf_map.update({
'title': DC_NS.title,
'description': DC_NS.description,
'url': HDA_NS.url,
'organisation': HDA_NS.institution,
'original_creation_date': DC_NS.created,
'original_modification_date': DC_NS.issued,
'town': HDA_NS.ville,
'format': DC_NS.format,
'domains': [HDA_NS.domaine, HDA_NS.sousDomainePrimaire],
'primary_periods': [HDA_NS.periodePrimaire,],
'college_periods': [HDA_NS.periodeCollege,],
'highschool_periods': [HDA_NS.periodeLycee,],
'primary_themes': [HDA_NS.themePrimaire,],
'college_themes': [HDA_NS.themeCollege,],
'highschool_themes': [HDA_NS.themeLycee,],
'tags': [HDA_NS.documentTag,]
})
@transaction.atomic
def do_import_graph(self):
ds, created = Datasheet.objects.get_or_create(hda_id=self.id, defaults={'original_creation_date': self.original_creation_date, 'original_modification_date': self.original_modification_date})
dirty=False
res = []
for field in ['title', 'description', 'url']:
val = getattr(self, field)
if getattr(ds, field) != val:
setattr(ds, field, val)
dirty = True
for field in [ 'original_creation_date', 'original_modification_date']:
val = getattr(self, field).date()
if getattr(ds, field) != val:
setattr(ds, field, val)
dirty = True
org_url = self.organisation
org = None
if org_url:
org_id = org_url.split("/")[-1]
org = Organisation.objects.filter(hda_id=org_id).first()
if org != ds.organisation:
ds.organisation = org
dirty = True
town_url = self.town
town = None
if town_url:
town_id = town_url.split("/")[-1]
town = Location.objects.filter(insee=town_id).first()
if town != ds.town:
ds.town = town
dirty = True
fmt = self.format
format_obj = None
if fmt:
format_obj, f_created = DocumentFormat.objects.get_or_create(label=fmt)
res.append((format_obj, f_created, False))
if format_obj != ds.format:
ds.format = format_obj
dirty = True
if not ds.validated:
ds.validated = True
dirty = True
for (field, ObjKlass) in [('domains',Domain), ('primary_periods', TimePeriod), ('college_periods', TimePeriod), ('highschool_periods', TimePeriod), ('primary_themes', Domain), ('college_themes', Domain), ('highschool_themes', Domain)]:
tgt_obj_ids = [obj_url.split("/")[-1] for obj_url in getattr(self, field)]
tgt_obj_ids.sort()
ds_objs_ids = [d.natural_key for d in getattr(ds, field).all()]
ds_objs_ids.sort()
if ds_objs_ids != tgt_obj_ids:
#get trought class
ThroughKlass = getattr(Datasheet, field).through
ThroughKlass.objects.filter(datasheet=ds).delete()
for i,tgt_obj in enumerate(ObjKlass.objects.filter(natural_key__in = tgt_obj_ids)):
link_args = {'datasheet': ds, ObjKlass.__name__.lower(): tgt_obj, 'sort_value': i}
ThroughKlass.objects.create(**link_args)
dirty = True
if dirty:
ds.save()
res.append((ds, created, dirty))
#create TaggedSheet
for tagged_sheet_graph in self.tags:
importer = DocumentTagImporter(tagged_sheet_graph, ds)
importer.import_graph()
return res
RDF_IMPORTERS = KeyDefaultdict(RdfImporterFactory, {
'Ville': VilleImporter,
'Categorie': CategorieImporter,
'Tag': TagImporter,
'Domaine': DomaineImporter,
'SousDomaine': SousDomaineImporter,
'Theme': ThemeImporter,
'Periode': PeriodeImporter,
'Site': SiteImporter,
'Institution': InstitutionImporter,
'Notice': NoticeImporter
})
class Command(BaseCommand):
'''
Command to import csvfile
'''
args = '<path_to_rdf_file>'
options = '[--type TYPE]'
help = """Import of a rdf file for hdabo
Options:
--type TYPE : type of object to import
"""
option_list = BaseCommand.option_list + (
make_option('-t', '--type',
action='append',
type='string',
dest='types',
default=[],
help='types to process.'),
)
def get_entity_cache_keys(self, s):
return [base64.urlsafe_b64encode(p) for p in s.rstrip("/").rsplit("/",1)]
def handle(self, *args, **options):
if len(args) == 0:
raise CommandError("Gives at last one rdf file to import")
self.types_list = options.get('types', TYPES_LIST) or TYPES_LIST
if any([t not in TYPES_LIST for t in self.types_list]):
raise CommandError("Types is %r : all types mus be in %r" % (self.types_list, TYPES_LIST))
with transaction.atomic():
connection = connections[DEFAULT_DB_ALIAS]
sql_temp, _ = connection.creation.sql_create_model(ProcessedObjects, no_style(), [])
cursor = connection.cursor()
for stmt in sql_temp:
cursor.execute(stmt.replace("CREATE TABLE", "CREATE TEMPORARY TABLE"))
#lines = options.get('lines', 0)
#self.ignore_existing = options.get('ignore_existing', False)
#open rdf file
src_uri = RDF.Uri("file://" + args[0])
_, fileExtension = os.path.splitext(args[0])
parser = RDF.Parser(name=RDF_EXT_MAP.get(fileExtension, 'rdfxml'))
current_graph = RDF.Model()
current_entity = None
current_id = None
current_type = None
blank_node_statements = {}
graph_disk_cache = GraphCache()
for statement in parser.parse_as_stream(uri=src_uri):
if statement.predicate.uri == RDF.Uri("http://www.w3.org/1999/02/22-rdf-syntax-ns#type") and statement.subject.type != RDF.node_type('NODE_TYPE_BLANK'):
current_type = unicode(statement.object.uri)
if statement.predicate.uri == RDF.Uri("http://purl.org/dc/terms/identifier") and statement.subject.type != RDF.node_type('NODE_TYPE_BLANK'):
current_id = statement.object.literal[0]
if statement.subject.type == RDF.node_type('NODE_TYPE_RESOURCE'):
if current_entity is not None and current_entity != statement.subject:
cache_keys = [current_type, current_id]
graph_disk_cache.put(cache_keys, current_graph)
current_graph = RDF.Model()
current_id = None
current_type = None
current_graph.append(RDF.Statement(statement=statement))
current_entity = statement.subject
elif statement.subject.type == RDF.node_type('NODE_TYPE_BLANK'):
if not current_graph.find_statements(RDF.Statement(None, None, statement.subject)).end():
current_graph.append(RDF.Statement(statement=statement))
else:
blank_node_statements.setdefault(statement.subject,[]).append(statement)
else:
current_graph.append(RDF.Statement(statement=statement))
if statement.object.type == RDF.node_type('NODE_TYPE_BLANK') and statement.object in blank_node_statements:
for stmt in blank_node_statements.pop(statement.object):
current_graph.append(RDF.Statement(statement=stmt))
if current_entity is not None and current_type is not None and current_id is not None:
cache_keys = [current_type, current_id]
graph_disk_cache.put(cache_keys, current_graph)
if blank_node_statements:
raise CommandError("Orphan rdf statements for blank nodes %r", blank_node_statements)
for i, obj_type in enumerate(self.types_list):
print("stage %d/%d : processing %s :" % (i+1, len(self.types_list), obj_type))
obj_type_url = HDA_ONTOLOGY_BASE_URL+obj_type
obj_len = graph_disk_cache.get_len(obj_type_url)
for i,g in enumerate(graph_disk_cache.get_iter(obj_type_url)):
importer_class = RDF_IMPORTERS[obj_type]
importer = importer_class(g)
obj_id = importer.id
show_progress(i+1, obj_len, 80, "ID: %s" % obj_id[:76])
importer.import_graph()
print("")
for target_type in TARGET_TYPE_LIST:
delete_qs = target_type.objects.exclude(id__in = ProcessedObjects.objects.filter(obj_name=target_type.__name__).values_list('obj_id', flat=True))
RdfImporter.importer_stats[target_type.__name__]['deleted'] = delete_qs.count()
print("Deleting %s : %d " % (target_type.__name__, RdfImporter.importer_stats[target_type.__name__]['deleted']))
delete_qs.delete()
print("Import stats :")
for obj_stat in RdfImporter.importer_stats.values():
print(" Obj: {object_name} -> created: {created:d}, updated: {updated:d}, deleted: {deleted:d}, untouched: {untouched:d}".format(**obj_stat))
print("")
shutil.rmtree(graph_disk_cache.temp_folder)