--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/core/wp_utils.py Sat Jun 15 01:33:28 2013 +0200
@@ -0,0 +1,212 @@
+# -*- coding: utf-8 -*-
+from .models import Term, TermLabel, TERM_URL_STATUS_DICT
+from django.conf import settings
+from django.utils.http import urlquote
+from wikitools import api, wiki
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+def normalize_term(term):
+ if len(term) == 0:
+ return term
+ term = term.strip()
+ term = term.replace("_", " ")
+ term = " ".join(term.split())
+ term = term[0].upper() + term[1:]
+ return term
+
+def switch_case_group(term):
+ seg_group = term.split()
+ uc_group = []
+ lc_group = []
+ for seg in seg_group:
+ is_all_upper = all(c.isupper() or not c.isalpha() for c in seg)
+ if is_all_upper and not lc_group:
+ uc_group.append(seg)
+ elif not is_all_upper and uc_group:
+ lc_group.append(seg)
+ else:
+ return term
+
+ if uc_group and lc_group and len(uc_group)+len(lc_group) == len(seg_group):
+ return " ".join(lc_group + [normalize_term(t.lower()) for t in uc_group])
+ elif uc_group and not lc_group and len(uc_group) == len(seg_group):
+ return " ".join([normalize_term(t.lower()) for t in uc_group])
+ else:
+ return term
+
+
+def urlize_for_wikipedia(label):
+ return urlquote(label.replace(" ", "_"))
+
+
+def __is_homonymie(page_dict):
+ for cat in page_dict.get(u"categories", []):
+ if u'Catégorie:Homonymie' in cat.get(u"title", u"") or u'Category:Disambiguation pages' in cat.get(u"title", u""):
+ return True
+ return False
+
+
+def query_wikipedia_title(site, label=None, pageid=None):
+
+ params = {'action':'query', 'prop':'info|categories|langlinks', 'inprop':'url', 'lllimit':'500', 'cllimit':'500'}
+
+ if label:
+ params['titles'] = label
+ else:
+ params['pageids'] = pageid
+
+ response = None
+
+ def return_null_result():
+ return { 'new_label': None, 'alternative_label': None, 'status': TERM_URL_STATUS_DICT["null_result"], 'wikipedia_url': None, 'pageid': None, 'alternative_wikipedia_url': None, 'alternative_pageid': None, 'dbpedia_uri': None, 'revision_id': None, 'response': response }
+
+ try:
+ wpquery = api.APIRequest(site, params) #@UndefinedVariable
+ response = wpquery.query()
+ except:
+ logger.exception("Exception when querying wikipedia")
+ return return_null_result()
+
+ original_response = response
+
+
+ query_dict = response['query']
+ # get page if multiple pages or none -> return Tag.null_result
+ pages = query_dict.get("pages", {})
+ if len(pages) > 1 or len(pages) == 0:
+ return return_null_result()
+
+ page = pages.values()[0]
+
+ if u"invalid" in page or u"missing" in page:
+ return return_null_result()
+
+ url = page.get(u'fullurl', None)
+ pageid = page.get(u'pageid', None)
+ new_label = page[u'title']
+ alternative_label = None
+ alternative_url = None
+ alternative_pageid = None
+
+ if __is_homonymie(page):
+ status = TERM_URL_STATUS_DICT["homonyme"]
+ elif u"redirect" in page:
+ status = TERM_URL_STATUS_DICT["redirection"]
+ else:
+ status = TERM_URL_STATUS_DICT["match"]
+
+ if status == TERM_URL_STATUS_DICT["redirection"]:
+ params['redirects'] = True
+ try:
+ wpquery = api.APIRequest(site, params) #@UndefinedVariable
+ response = wpquery.query()
+ except:
+ logger.exception("Exception when querying wikipedia for redirects")
+ return return_null_result()
+ query_dict = response['query']
+ pages = query_dict.get("pages", {})
+ #we know that we have at least one answer
+ if len(pages) > 1 or len(pages) == 0:
+ return return_null_result()
+ page = pages.values()[0]
+ alternative_label = page.get('title', None)
+ alternative_url = page.get('fullurl', None)
+ alternative_pageid = page.get('pageid',None)
+
+
+ revision_id = page.get('lastrevid', None)
+
+
+ if status == TERM_URL_STATUS_DICT['match'] or status == TERM_URL_STATUS_DICT['redirection']:
+ dbpedia_uri = settings.DBPEDIA_URI_TEMPLATE % (urlize_for_wikipedia(new_label))
+ else:
+ dbpedia_uri = None
+
+
+ return { 'new_label': new_label, 'alternative_label': alternative_label, 'status': status, 'wikipedia_url': url, 'pageid': pageid, 'alternative_wikipedia_url': alternative_url, 'alternative_pageid': alternative_pageid, 'dbpedia_uri': dbpedia_uri, 'revision_id': revision_id, 'response': original_response }
+
+
+
+def get_or_create_term(term_label, term_uri, term_lang, thesaurus, wp_label_transform=(lambda l:l)):
+
+ term_label_normalized = normalize_term(term_label)
+ # We get the wikipedia references for the tag_label
+ # We get or create the tag object
+
+ term = None
+ if term_uri is not None and Term.objects.filter(uri=term_uri).exists():
+ term = Term.objects.get(uri=term_uri)
+ else:
+ for t in Term.objects.filter(normalized_label__iexact=term_label_normalized, lang=term_lang, thesaurus=thesaurus):
+ if term is None or t.url_status != TERM_URL_STATUS_DICT['null_result']:
+ term = t
+ if term.url_status != TERM_URL_STATUS_DICT['null_result']:
+ break
+
+ if term is None:
+ term = Term(label=term_label, thesaurus=thesaurus, normalized_label=term_label_normalized, uri=term_uri, lang = term_lang)
+ created = True
+ else:
+ created = False
+
+ site = wiki.Wiki(settings.WIKIPEDIA_API_URL) #@UndefinedVariable
+
+ if created:
+
+ process_term(site, term, label=wp_label_transform(term_label_normalized))
+
+ term_label_obj = TermLabel(label=term_label, term=term, lang=term_lang)
+ term_label_obj.save()
+
+ elif term.wikipedia_pageid:
+ wp_res = query_wikipedia_title(site, pageid=term.wikipedia_pageid)
+ wikipedia_revision_id = wp_res['revision_id']
+ term.wikipedia_revision_id = wikipedia_revision_id
+ term.save()
+ else:
+ wikipedia_revision_id = None
+
+
+ return term, wikipedia_revision_id, created
+
+
+def process_term(site, term, label=None, verbosity=0):
+
+ if site == None:
+ site = wiki.Wiki(settings.WIKIPEDIA_API_URL) # @UndefinedVariable
+
+ if not label:
+ label = term.label
+ wp_res = query_wikipedia_title(site, label=label)
+ new_label = wp_res['new_label']
+ alternative_label= wp_res['alternative_label']
+ status = wp_res['status']
+ url = wp_res['wikipedia_url']
+ alternative_url = wp_res['alternative_wikipedia_url']
+ pageid = wp_res['pageid']
+ alternative_pageid = wp_res['alternative_pageid']
+ response = wp_res['response']
+ dbpedia_uri = wp_res["dbpedia_uri"]
+ revision_id = wp_res["revision_id"]
+
+ if verbosity >= 2 :
+ print "response from query to %s with parameters %s :" % (site.apibase, repr(new_label))
+ print repr(response)
+
+ if new_label is not None:
+ term.wp_label = new_label
+ if status is not None:
+ term.url_status = status
+ term.wikipedia_url = url
+ term.wikipedia_pageid = pageid
+ term.dbpedia_uri = dbpedia_uri
+ term.alternative_label = alternative_label
+ term.alternative_wikipedia_url = alternative_url
+ term.alternative_wikipedia_pageid = alternative_pageid
+ term.wikipedia_revision_id=revision_id
+
+ term.save()
+