|
1 # -*- coding: utf-8 -*- |
|
2 from .models import Term, TermLabel, TERM_URL_STATUS_DICT |
|
3 from django.conf import settings |
|
4 from django.utils.http import urlquote |
|
5 from wikitools import api, wiki |
|
6 import logging |
|
7 |
|
8 logger = logging.getLogger(__name__) |
|
9 |
|
10 |
|
11 def normalize_term(term): |
|
12 if len(term) == 0: |
|
13 return term |
|
14 term = term.strip() |
|
15 term = term.replace("_", " ") |
|
16 term = " ".join(term.split()) |
|
17 term = term[0].upper() + term[1:] |
|
18 return term |
|
19 |
|
20 def switch_case_group(term): |
|
21 seg_group = term.split() |
|
22 uc_group = [] |
|
23 lc_group = [] |
|
24 for seg in seg_group: |
|
25 is_all_upper = all(c.isupper() or not c.isalpha() for c in seg) |
|
26 if is_all_upper and not lc_group: |
|
27 uc_group.append(seg) |
|
28 elif not is_all_upper and uc_group: |
|
29 lc_group.append(seg) |
|
30 else: |
|
31 return term |
|
32 |
|
33 if uc_group and lc_group and len(uc_group)+len(lc_group) == len(seg_group): |
|
34 return " ".join(lc_group + [normalize_term(t.lower()) for t in uc_group]) |
|
35 elif uc_group and not lc_group and len(uc_group) == len(seg_group): |
|
36 return " ".join([normalize_term(t.lower()) for t in uc_group]) |
|
37 else: |
|
38 return term |
|
39 |
|
40 |
|
41 def urlize_for_wikipedia(label): |
|
42 return urlquote(label.replace(" ", "_")) |
|
43 |
|
44 |
|
45 def __is_homonymie(page_dict): |
|
46 for cat in page_dict.get(u"categories", []): |
|
47 if u'Catégorie:Homonymie' in cat.get(u"title", u"") or u'Category:Disambiguation pages' in cat.get(u"title", u""): |
|
48 return True |
|
49 return False |
|
50 |
|
51 |
|
52 def query_wikipedia_title(site, label=None, pageid=None): |
|
53 |
|
54 params = {'action':'query', 'prop':'info|categories|langlinks', 'inprop':'url', 'lllimit':'500', 'cllimit':'500'} |
|
55 |
|
56 if label: |
|
57 params['titles'] = label |
|
58 else: |
|
59 params['pageids'] = pageid |
|
60 |
|
61 response = None |
|
62 |
|
63 def return_null_result(): |
|
64 return { 'new_label': None, 'alternative_label': None, 'status': TERM_URL_STATUS_DICT["null_result"], 'wikipedia_url': None, 'pageid': None, 'alternative_wikipedia_url': None, 'alternative_pageid': None, 'dbpedia_uri': None, 'revision_id': None, 'response': response } |
|
65 |
|
66 try: |
|
67 wpquery = api.APIRequest(site, params) #@UndefinedVariable |
|
68 response = wpquery.query() |
|
69 except: |
|
70 logger.exception("Exception when querying wikipedia") |
|
71 return return_null_result() |
|
72 |
|
73 original_response = response |
|
74 |
|
75 |
|
76 query_dict = response['query'] |
|
77 # get page if multiple pages or none -> return Tag.null_result |
|
78 pages = query_dict.get("pages", {}) |
|
79 if len(pages) > 1 or len(pages) == 0: |
|
80 return return_null_result() |
|
81 |
|
82 page = pages.values()[0] |
|
83 |
|
84 if u"invalid" in page or u"missing" in page: |
|
85 return return_null_result() |
|
86 |
|
87 url = page.get(u'fullurl', None) |
|
88 pageid = page.get(u'pageid', None) |
|
89 new_label = page[u'title'] |
|
90 alternative_label = None |
|
91 alternative_url = None |
|
92 alternative_pageid = None |
|
93 |
|
94 if __is_homonymie(page): |
|
95 status = TERM_URL_STATUS_DICT["homonyme"] |
|
96 elif u"redirect" in page: |
|
97 status = TERM_URL_STATUS_DICT["redirection"] |
|
98 else: |
|
99 status = TERM_URL_STATUS_DICT["match"] |
|
100 |
|
101 if status == TERM_URL_STATUS_DICT["redirection"]: |
|
102 params['redirects'] = True |
|
103 try: |
|
104 wpquery = api.APIRequest(site, params) #@UndefinedVariable |
|
105 response = wpquery.query() |
|
106 except: |
|
107 logger.exception("Exception when querying wikipedia for redirects") |
|
108 return return_null_result() |
|
109 query_dict = response['query'] |
|
110 pages = query_dict.get("pages", {}) |
|
111 #we know that we have at least one answer |
|
112 if len(pages) > 1 or len(pages) == 0: |
|
113 return return_null_result() |
|
114 page = pages.values()[0] |
|
115 alternative_label = page.get('title', None) |
|
116 alternative_url = page.get('fullurl', None) |
|
117 alternative_pageid = page.get('pageid',None) |
|
118 |
|
119 |
|
120 revision_id = page.get('lastrevid', None) |
|
121 |
|
122 |
|
123 if status == TERM_URL_STATUS_DICT['match'] or status == TERM_URL_STATUS_DICT['redirection']: |
|
124 dbpedia_uri = settings.DBPEDIA_URI_TEMPLATE % (urlize_for_wikipedia(new_label)) |
|
125 else: |
|
126 dbpedia_uri = None |
|
127 |
|
128 |
|
129 return { 'new_label': new_label, 'alternative_label': alternative_label, 'status': status, 'wikipedia_url': url, 'pageid': pageid, 'alternative_wikipedia_url': alternative_url, 'alternative_pageid': alternative_pageid, 'dbpedia_uri': dbpedia_uri, 'revision_id': revision_id, 'response': original_response } |
|
130 |
|
131 |
|
132 |
|
133 def get_or_create_term(term_label, term_uri, term_lang, thesaurus, wp_label_transform=(lambda l:l)): |
|
134 |
|
135 term_label_normalized = normalize_term(term_label) |
|
136 # We get the wikipedia references for the tag_label |
|
137 # We get or create the tag object |
|
138 |
|
139 term = None |
|
140 if term_uri is not None and Term.objects.filter(uri=term_uri).exists(): |
|
141 term = Term.objects.get(uri=term_uri) |
|
142 else: |
|
143 for t in Term.objects.filter(normalized_label__iexact=term_label_normalized, lang=term_lang, thesaurus=thesaurus): |
|
144 if term is None or t.url_status != TERM_URL_STATUS_DICT['null_result']: |
|
145 term = t |
|
146 if term.url_status != TERM_URL_STATUS_DICT['null_result']: |
|
147 break |
|
148 |
|
149 if term is None: |
|
150 term = Term(label=term_label, thesaurus=thesaurus, normalized_label=term_label_normalized, uri=term_uri, lang = term_lang) |
|
151 created = True |
|
152 else: |
|
153 created = False |
|
154 |
|
155 site = wiki.Wiki(settings.WIKIPEDIA_API_URL) #@UndefinedVariable |
|
156 |
|
157 if created: |
|
158 |
|
159 process_term(site, term, label=wp_label_transform(term_label_normalized)) |
|
160 |
|
161 term_label_obj = TermLabel(label=term_label, term=term, lang=term_lang) |
|
162 term_label_obj.save() |
|
163 |
|
164 elif term.wikipedia_pageid: |
|
165 wp_res = query_wikipedia_title(site, pageid=term.wikipedia_pageid) |
|
166 wikipedia_revision_id = wp_res['revision_id'] |
|
167 term.wikipedia_revision_id = wikipedia_revision_id |
|
168 term.save() |
|
169 else: |
|
170 wikipedia_revision_id = None |
|
171 |
|
172 |
|
173 return term, wikipedia_revision_id, created |
|
174 |
|
175 |
|
176 def process_term(site, term, label=None, verbosity=0): |
|
177 |
|
178 if site == None: |
|
179 site = wiki.Wiki(settings.WIKIPEDIA_API_URL) # @UndefinedVariable |
|
180 |
|
181 if not label: |
|
182 label = term.label |
|
183 wp_res = query_wikipedia_title(site, label=label) |
|
184 new_label = wp_res['new_label'] |
|
185 alternative_label= wp_res['alternative_label'] |
|
186 status = wp_res['status'] |
|
187 url = wp_res['wikipedia_url'] |
|
188 alternative_url = wp_res['alternative_wikipedia_url'] |
|
189 pageid = wp_res['pageid'] |
|
190 alternative_pageid = wp_res['alternative_pageid'] |
|
191 response = wp_res['response'] |
|
192 dbpedia_uri = wp_res["dbpedia_uri"] |
|
193 revision_id = wp_res["revision_id"] |
|
194 |
|
195 if verbosity >= 2 : |
|
196 print "response from query to %s with parameters %s :" % (site.apibase, repr(new_label)) |
|
197 print repr(response) |
|
198 |
|
199 if new_label is not None: |
|
200 term.wp_label = new_label |
|
201 if status is not None: |
|
202 term.url_status = status |
|
203 term.wikipedia_url = url |
|
204 term.wikipedia_pageid = pageid |
|
205 term.dbpedia_uri = dbpedia_uri |
|
206 term.alternative_label = alternative_label |
|
207 term.alternative_wikipedia_url = alternative_url |
|
208 term.alternative_wikipedia_pageid = alternative_pageid |
|
209 term.wikipedia_revision_id=revision_id |
|
210 |
|
211 term.save() |
|
212 |