src/core/wp_utils.py
changeset 0 4095911a7830
child 3 221af1052ec4
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/core/wp_utils.py	Sat Jun 15 01:33:28 2013 +0200
@@ -0,0 +1,212 @@
+# -*- coding: utf-8 -*-
+from .models import Term, TermLabel, TERM_URL_STATUS_DICT
+from django.conf import settings
+from django.utils.http import urlquote
+from wikitools import api, wiki
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+def normalize_term(term):
+    if len(term) == 0:
+        return term
+    term = term.strip()
+    term = term.replace("_", " ")
+    term = " ".join(term.split())
+    term = term[0].upper() + term[1:]
+    return term
+
+def switch_case_group(term):
+    seg_group = term.split()
+    uc_group = []
+    lc_group = []
+    for seg in seg_group:
+        is_all_upper = all(c.isupper() or not c.isalpha() for c in seg) 
+        if is_all_upper and not lc_group:
+            uc_group.append(seg)
+        elif not is_all_upper and uc_group:
+            lc_group.append(seg)
+        else:
+            return term
+            
+    if uc_group and lc_group and len(uc_group)+len(lc_group) == len(seg_group):        
+        return " ".join(lc_group + [normalize_term(t.lower()) for t in uc_group])
+    elif uc_group and not lc_group and len(uc_group) == len(seg_group):
+        return " ".join([normalize_term(t.lower()) for t in uc_group])
+    else:
+        return term
+    
+
+def urlize_for_wikipedia(label):
+    return urlquote(label.replace(" ", "_"))
+
+
+def __is_homonymie(page_dict):
+    for cat in page_dict.get(u"categories", []):
+        if u'Catégorie:Homonymie' in cat.get(u"title", u"") or u'Category:Disambiguation pages' in cat.get(u"title", u""):
+            return True
+    return False
+
+
+def query_wikipedia_title(site, label=None, pageid=None):
+    
+    params = {'action':'query', 'prop':'info|categories|langlinks', 'inprop':'url', 'lllimit':'500', 'cllimit':'500'}
+        
+    if label:
+        params['titles'] = label
+    else:
+        params['pageids'] = pageid
+    
+    response = None
+        
+    def return_null_result():
+        return { 'new_label': None, 'alternative_label': None, 'status': TERM_URL_STATUS_DICT["null_result"], 'wikipedia_url': None, 'pageid': None, 'alternative_wikipedia_url': None, 'alternative_pageid': None, 'dbpedia_uri': None, 'revision_id': None, 'response': response }
+    
+    try:
+        wpquery = api.APIRequest(site, params) #@UndefinedVariable
+        response = wpquery.query()
+    except:
+        logger.exception("Exception when querying wikipedia")
+        return return_null_result()
+        
+    original_response = response
+    
+
+    query_dict = response['query']
+    # get page if multiple pages or none -> return Tag.null_result
+    pages = query_dict.get("pages", {})
+    if len(pages) > 1 or len(pages) == 0:
+        return return_null_result()
+    
+    page = pages.values()[0]
+    
+    if u"invalid" in page or u"missing" in page:
+        return return_null_result()
+
+    url = page.get(u'fullurl', None)
+    pageid = page.get(u'pageid', None)
+    new_label = page[u'title']
+    alternative_label = None
+    alternative_url = None
+    alternative_pageid = None
+    
+    if __is_homonymie(page):
+        status = TERM_URL_STATUS_DICT["homonyme"]
+    elif u"redirect" in page:
+        status = TERM_URL_STATUS_DICT["redirection"]
+    else:
+        status = TERM_URL_STATUS_DICT["match"]
+    
+    if status == TERM_URL_STATUS_DICT["redirection"]:
+        params['redirects'] = True
+        try:
+            wpquery = api.APIRequest(site, params) #@UndefinedVariable    
+            response = wpquery.query()
+        except:
+            logger.exception("Exception when querying wikipedia for redirects")
+            return return_null_result()
+        query_dict = response['query']
+        pages = query_dict.get("pages", {})
+        #we know that we have at least one answer        
+        if len(pages) > 1 or len(pages) == 0:
+            return return_null_result()
+        page = pages.values()[0]
+        alternative_label = page.get('title', None)
+        alternative_url = page.get('fullurl', None)
+        alternative_pageid = page.get('pageid',None)
+        
+
+    revision_id = page.get('lastrevid', None)
+    
+    
+    if status == TERM_URL_STATUS_DICT['match'] or status == TERM_URL_STATUS_DICT['redirection']:
+        dbpedia_uri = settings.DBPEDIA_URI_TEMPLATE % (urlize_for_wikipedia(new_label))
+    else:
+        dbpedia_uri = None
+            
+
+    return { 'new_label': new_label, 'alternative_label': alternative_label, 'status': status, 'wikipedia_url': url, 'pageid': pageid, 'alternative_wikipedia_url': alternative_url, 'alternative_pageid': alternative_pageid, 'dbpedia_uri': dbpedia_uri, 'revision_id': revision_id, 'response': original_response }
+
+
+
+def get_or_create_term(term_label, term_uri, term_lang, thesaurus, wp_label_transform=(lambda l:l)):
+    
+    term_label_normalized = normalize_term(term_label)
+    # We get the wikipedia references for the tag_label
+    # We get or create the tag object
+    
+    term = None
+    if term_uri is not None and Term.objects.filter(uri=term_uri).exists():
+        term = Term.objects.get(uri=term_uri)
+    else:
+        for t in Term.objects.filter(normalized_label__iexact=term_label_normalized, lang=term_lang, thesaurus=thesaurus):
+            if term is None or t.url_status != TERM_URL_STATUS_DICT['null_result']:
+                term = t
+                if term.url_status != TERM_URL_STATUS_DICT['null_result']:
+                    break
+ 
+    if term is None:
+        term = Term(label=term_label, thesaurus=thesaurus, normalized_label=term_label_normalized, uri=term_uri, lang = term_lang)
+        created = True
+    else:
+        created = False
+    
+    site = wiki.Wiki(settings.WIKIPEDIA_API_URL) #@UndefinedVariable
+
+    if created:
+        
+        process_term(site, term, label=wp_label_transform(term_label_normalized))
+
+        term_label_obj = TermLabel(label=term_label, term=term, lang=term_lang)
+        term_label_obj.save()
+        
+    elif term.wikipedia_pageid:
+        wp_res = query_wikipedia_title(site, pageid=term.wikipedia_pageid)
+        wikipedia_revision_id = wp_res['revision_id']
+        term.wikipedia_revision_id = wikipedia_revision_id
+        term.save()
+    else:
+        wikipedia_revision_id = None
+        
+
+    return term, wikipedia_revision_id, created
+
+
+def process_term(site, term, label=None, verbosity=0):
+    
+    if site == None:
+        site = wiki.Wiki(settings.WIKIPEDIA_API_URL)  # @UndefinedVariable
+        
+    if not label:
+        label = term.label
+    wp_res = query_wikipedia_title(site, label=label)
+    new_label = wp_res['new_label']
+    alternative_label= wp_res['alternative_label']
+    status =  wp_res['status']
+    url = wp_res['wikipedia_url']
+    alternative_url = wp_res['alternative_wikipedia_url']
+    pageid = wp_res['pageid']
+    alternative_pageid = wp_res['alternative_pageid']
+    response = wp_res['response']
+    dbpedia_uri =  wp_res["dbpedia_uri"]
+    revision_id = wp_res["revision_id"]
+    
+    if verbosity >= 2 :
+        print "response from query to %s with parameters %s :" % (site.apibase, repr(new_label))
+        print repr(response)
+    
+    if new_label is not None:
+        term.wp_label = new_label
+    if status is not None:
+        term.url_status = status
+    term.wikipedia_url = url
+    term.wikipedia_pageid = pageid
+    term.dbpedia_uri = dbpedia_uri
+    term.alternative_label = alternative_label
+    term.alternative_wikipedia_url = alternative_url
+    term.alternative_wikipedia_pageid = alternative_pageid
+    term.wikipedia_revision_id=revision_id
+        
+    term.save()
+