diff -r 000000000000 -r 4095911a7830 src/core/wp_utils.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/core/wp_utils.py Sat Jun 15 01:33:28 2013 +0200 @@ -0,0 +1,212 @@ +# -*- coding: utf-8 -*- +from .models import Term, TermLabel, TERM_URL_STATUS_DICT +from django.conf import settings +from django.utils.http import urlquote +from wikitools import api, wiki +import logging + +logger = logging.getLogger(__name__) + + +def normalize_term(term): + if len(term) == 0: + return term + term = term.strip() + term = term.replace("_", " ") + term = " ".join(term.split()) + term = term[0].upper() + term[1:] + return term + +def switch_case_group(term): + seg_group = term.split() + uc_group = [] + lc_group = [] + for seg in seg_group: + is_all_upper = all(c.isupper() or not c.isalpha() for c in seg) + if is_all_upper and not lc_group: + uc_group.append(seg) + elif not is_all_upper and uc_group: + lc_group.append(seg) + else: + return term + + if uc_group and lc_group and len(uc_group)+len(lc_group) == len(seg_group): + return " ".join(lc_group + [normalize_term(t.lower()) for t in uc_group]) + elif uc_group and not lc_group and len(uc_group) == len(seg_group): + return " ".join([normalize_term(t.lower()) for t in uc_group]) + else: + return term + + +def urlize_for_wikipedia(label): + return urlquote(label.replace(" ", "_")) + + +def __is_homonymie(page_dict): + for cat in page_dict.get(u"categories", []): + if u'Catégorie:Homonymie' in cat.get(u"title", u"") or u'Category:Disambiguation pages' in cat.get(u"title", u""): + return True + return False + + +def query_wikipedia_title(site, label=None, pageid=None): + + params = {'action':'query', 'prop':'info|categories|langlinks', 'inprop':'url', 'lllimit':'500', 'cllimit':'500'} + + if label: + params['titles'] = label + else: + params['pageids'] = pageid + + response = None + + def return_null_result(): + return { 'new_label': None, 'alternative_label': None, 'status': TERM_URL_STATUS_DICT["null_result"], 'wikipedia_url': None, 'pageid': None, 'alternative_wikipedia_url': None, 'alternative_pageid': None, 'dbpedia_uri': None, 'revision_id': None, 'response': response } + + try: + wpquery = api.APIRequest(site, params) #@UndefinedVariable + response = wpquery.query() + except: + logger.exception("Exception when querying wikipedia") + return return_null_result() + + original_response = response + + + query_dict = response['query'] + # get page if multiple pages or none -> return Tag.null_result + pages = query_dict.get("pages", {}) + if len(pages) > 1 or len(pages) == 0: + return return_null_result() + + page = pages.values()[0] + + if u"invalid" in page or u"missing" in page: + return return_null_result() + + url = page.get(u'fullurl', None) + pageid = page.get(u'pageid', None) + new_label = page[u'title'] + alternative_label = None + alternative_url = None + alternative_pageid = None + + if __is_homonymie(page): + status = TERM_URL_STATUS_DICT["homonyme"] + elif u"redirect" in page: + status = TERM_URL_STATUS_DICT["redirection"] + else: + status = TERM_URL_STATUS_DICT["match"] + + if status == TERM_URL_STATUS_DICT["redirection"]: + params['redirects'] = True + try: + wpquery = api.APIRequest(site, params) #@UndefinedVariable + response = wpquery.query() + except: + logger.exception("Exception when querying wikipedia for redirects") + return return_null_result() + query_dict = response['query'] + pages = query_dict.get("pages", {}) + #we know that we have at least one answer + if len(pages) > 1 or len(pages) == 0: + return return_null_result() + page = pages.values()[0] + alternative_label = page.get('title', None) + alternative_url = page.get('fullurl', None) + alternative_pageid = page.get('pageid',None) + + + revision_id = page.get('lastrevid', None) + + + if status == TERM_URL_STATUS_DICT['match'] or status == TERM_URL_STATUS_DICT['redirection']: + dbpedia_uri = settings.DBPEDIA_URI_TEMPLATE % (urlize_for_wikipedia(new_label)) + else: + dbpedia_uri = None + + + return { 'new_label': new_label, 'alternative_label': alternative_label, 'status': status, 'wikipedia_url': url, 'pageid': pageid, 'alternative_wikipedia_url': alternative_url, 'alternative_pageid': alternative_pageid, 'dbpedia_uri': dbpedia_uri, 'revision_id': revision_id, 'response': original_response } + + + +def get_or_create_term(term_label, term_uri, term_lang, thesaurus, wp_label_transform=(lambda l:l)): + + term_label_normalized = normalize_term(term_label) + # We get the wikipedia references for the tag_label + # We get or create the tag object + + term = None + if term_uri is not None and Term.objects.filter(uri=term_uri).exists(): + term = Term.objects.get(uri=term_uri) + else: + for t in Term.objects.filter(normalized_label__iexact=term_label_normalized, lang=term_lang, thesaurus=thesaurus): + if term is None or t.url_status != TERM_URL_STATUS_DICT['null_result']: + term = t + if term.url_status != TERM_URL_STATUS_DICT['null_result']: + break + + if term is None: + term = Term(label=term_label, thesaurus=thesaurus, normalized_label=term_label_normalized, uri=term_uri, lang = term_lang) + created = True + else: + created = False + + site = wiki.Wiki(settings.WIKIPEDIA_API_URL) #@UndefinedVariable + + if created: + + process_term(site, term, label=wp_label_transform(term_label_normalized)) + + term_label_obj = TermLabel(label=term_label, term=term, lang=term_lang) + term_label_obj.save() + + elif term.wikipedia_pageid: + wp_res = query_wikipedia_title(site, pageid=term.wikipedia_pageid) + wikipedia_revision_id = wp_res['revision_id'] + term.wikipedia_revision_id = wikipedia_revision_id + term.save() + else: + wikipedia_revision_id = None + + + return term, wikipedia_revision_id, created + + +def process_term(site, term, label=None, verbosity=0): + + if site == None: + site = wiki.Wiki(settings.WIKIPEDIA_API_URL) # @UndefinedVariable + + if not label: + label = term.label + wp_res = query_wikipedia_title(site, label=label) + new_label = wp_res['new_label'] + alternative_label= wp_res['alternative_label'] + status = wp_res['status'] + url = wp_res['wikipedia_url'] + alternative_url = wp_res['alternative_wikipedia_url'] + pageid = wp_res['pageid'] + alternative_pageid = wp_res['alternative_pageid'] + response = wp_res['response'] + dbpedia_uri = wp_res["dbpedia_uri"] + revision_id = wp_res["revision_id"] + + if verbosity >= 2 : + print "response from query to %s with parameters %s :" % (site.apibase, repr(new_label)) + print repr(response) + + if new_label is not None: + term.wp_label = new_label + if status is not None: + term.url_status = status + term.wikipedia_url = url + term.wikipedia_pageid = pageid + term.dbpedia_uri = dbpedia_uri + term.alternative_label = alternative_label + term.alternative_wikipedia_url = alternative_url + term.alternative_wikipedia_pageid = alternative_pageid + term.wikipedia_revision_id=revision_id + + term.save() +