14 |
15 |
15 |
16 |
16 |
17 |
17 CACHE_ACCESS_TOKEN = {} |
18 CACHE_ACCESS_TOKEN = {} |
18 |
19 |
19 def get_oauth_token(token_file_path=None, application_name=APPLICATION_NAME, consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET): |
20 def get_oauth_token(token_file_path=None, check_access_token=True, application_name=APPLICATION_NAME, consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET): |
20 |
21 |
21 global CACHE_ACCESS_TOKEN |
22 global CACHE_ACCESS_TOKEN |
22 |
23 |
23 if CACHE_ACCESS_TOKEN is not None and application_name in CACHE_ACCESS_TOKEN: |
24 if 'ACCESS_TOKEN_KEY' in globals() and 'ACCESS_TOKEN_SECRET' in globals() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET: |
24 return CACHE_ACCESS_TOKEN[application_name] |
|
25 |
|
26 if token_file_path and os.path.exists(token_file_path): |
|
27 logging.debug("reading token from file %s" % token_file_path) #@UndefinedVariable |
|
28 CACHE_ACCESS_TOKEN[application_name] = twitter.oauth.read_token_file(token_file_path) |
|
29 return CACHE_ACCESS_TOKEN[application_name] |
|
30 #read access token info from path |
|
31 |
|
32 if 'ACCESS_TOKEN_KEY' in dict() and 'ACCESS_TOKEN_SECRET' in dict() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET: |
|
33 return ACCESS_TOKEN_KEY,ACCESS_TOKEN_SECRET |
25 return ACCESS_TOKEN_KEY,ACCESS_TOKEN_SECRET |
34 |
26 |
35 CACHE_ACCESS_TOKEN[application_name] = twitter.oauth_dance.oauth_dance(application_name, consumer_key, consumer_secret, token_file_path) |
27 res = CACHE_ACCESS_TOKEN.get(application_name, None) |
36 return CACHE_ACCESS_TOKEN[application_name] |
28 |
|
29 if res is None and token_file_path and os.path.exists(token_file_path): |
|
30 get_logger().debug("get_oauth_token : reading token from file %s" % token_file_path) #@UndefinedVariable |
|
31 res = twitter.oauth.read_token_file(token_file_path) |
|
32 |
|
33 if res is not None and check_access_token: |
|
34 get_logger().debug("get_oauth_token : Check oauth tokens") #@UndefinedVariable |
|
35 t = twitter.Twitter(auth=twitter.OAuth(res[0], res[1], CONSUMER_KEY, CONSUMER_SECRET)) |
|
36 status = None |
|
37 try: |
|
38 status = t.account.rate_limit_status() |
|
39 except Exception as e: |
|
40 get_logger().debug("get_oauth_token : error getting rate limit status %s" % repr(e)) |
|
41 status = None |
|
42 get_logger().debug("get_oauth_token : Check oauth tokens : status %s" % repr(status)) #@UndefinedVariable |
|
43 if status is None or status['remaining_hits'] == 0: |
|
44 get_logger().debug("get_oauth_token : Problem with status %s" % repr(status)) |
|
45 res = None |
|
46 |
|
47 if res is None: |
|
48 get_logger().debug("get_oauth_token : doing the oauth dance") |
|
49 res = twitter.oauth_dance.oauth_dance(application_name, consumer_key, consumer_secret, token_file_path) |
|
50 |
|
51 CACHE_ACCESS_TOKEN[application_name] = res |
|
52 |
|
53 return res |
37 |
54 |
38 def parse_date(date_str): |
55 def parse_date(date_str): |
39 ts = email.utils.parsedate_tz(date_str) #@UndefinedVariable |
56 ts = email.utils.parsedate_tz(date_str) #@UndefinedVariable |
40 return datetime.datetime(*ts[0:7]) |
57 return datetime.datetime(*ts[0:7]) |
41 |
58 |
102 |
131 |
103 class ObjectsBuffer(object): |
132 class ObjectsBuffer(object): |
104 |
133 |
105 def __init__(self): |
134 def __init__(self): |
106 self.__bufferlist = [] |
135 self.__bufferlist = [] |
|
136 self.__bufferdict = {} |
|
137 |
|
138 def __add_proxy_object(self, proxy): |
|
139 proxy_list = self.__bufferdict.get(proxy.klass, None) |
|
140 if proxy_list is None: |
|
141 proxy_list = [] |
|
142 self.__bufferdict[proxy.klass] = proxy_list |
|
143 proxy_list.append(proxy) |
|
144 self.__bufferlist.append(proxy) |
107 |
145 |
108 def persists(self, session): |
146 def persists(self, session): |
109 for object_proxy in self.__bufferlist: |
147 for object_proxy in self.__bufferlist: |
110 object_proxy.persists(session) |
148 object_proxy.persists(session) |
111 |
149 |
112 def add_object(self, klass, args, kwargs, must_flush): |
150 def add_object(self, klass, args, kwargs, must_flush, instance=None): |
113 new_proxy = ObjectBufferProxy(klass, args, kwargs, must_flush) |
151 new_proxy = ObjectBufferProxy(klass, args, kwargs, must_flush, instance) |
114 self.__bufferlist.append(new_proxy) |
152 self.__add_proxy_object(new_proxy) |
115 return new_proxy |
153 return new_proxy |
116 |
154 |
117 def get(self, klass, **kwargs): |
155 def get(self, klass, **kwargs): |
118 for proxy in self.__bufferlist: |
156 if klass in self.__bufferdict: |
119 if proxy.kwargs is None or len(proxy.kwargs) == 0 or proxy.klass != klass: |
157 for proxy in self.__bufferdict[klass]: |
120 continue |
158 if proxy.kwargs is None or len(proxy.kwargs) == 0 or proxy.klass != klass: |
121 found = True |
159 continue |
122 for k,v in kwargs.items(): |
160 found = True |
123 if (k not in proxy.kwargs) or v != proxy.kwargs[k]: |
161 for k,v in kwargs.items(): |
124 found = False |
162 if (k not in proxy.kwargs) or v != proxy.kwargs[k]: |
125 break |
163 found = False |
126 if found: |
164 break |
127 return proxy |
165 if found: |
128 |
166 return proxy |
129 return None |
167 return None |
130 |
168 |
131 |
|
132 |
|
133 |
|
134 |
|
135 class TwitterProcessorException(Exception): |
169 class TwitterProcessorException(Exception): |
136 pass |
170 pass |
137 |
171 |
138 class TwitterProcessor(object): |
172 class TwitterProcessor(object): |
139 |
173 |
140 def __init__(self, json_dict, json_txt, source_id, session, token_filename=None): |
174 def __init__(self, json_dict, json_txt, source_id, session, access_token=None, token_filename=None): |
141 |
175 |
142 if json_dict is None and json_txt is None: |
176 if json_dict is None and json_txt is None: |
143 raise TwitterProcessorException("No json") |
177 raise TwitterProcessorException("No json") |
144 |
178 |
145 if json_dict is None: |
179 if json_dict is None: |
190 user = ObjectBufferProxy(User, None, None, False, user_obj) |
226 user = ObjectBufferProxy(User, None, None, False, user_obj) |
191 return user |
227 return user |
192 |
228 |
193 user_created_at = user_dict.get("created_at", None) |
229 user_created_at = user_dict.get("created_at", None) |
194 |
230 |
195 if user_created_at is None: |
231 if user_created_at is None and query_twitter: |
196 acess_token_key, access_token_secret = get_oauth_token(self.token_filename) |
232 |
|
233 if self.access_token is not None: |
|
234 acess_token_key, access_token_secret = self.access_token |
|
235 else: |
|
236 acess_token_key, access_token_secret = get_oauth_token(self.token_filename) |
197 t = twitter.Twitter(auth=twitter.OAuth(acess_token_key, access_token_secret, CONSUMER_KEY, CONSUMER_SECRET)) |
237 t = twitter.Twitter(auth=twitter.OAuth(acess_token_key, access_token_secret, CONSUMER_KEY, CONSUMER_SECRET)) |
198 try: |
238 try: |
199 if user_id: |
239 if user_id: |
200 user_dict = t.users.show(user_id=user_id) |
240 user_dict = t.users.show(user_id=user_id) |
201 else: |
241 else: |
202 user_dict = t.users.show(screen_name=user_name) |
242 user_dict = t.users.show(screen_name=user_name) |
203 except Exception as e: |
243 except Exception as e: |
204 logger.info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable |
244 get_logger().info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable |
205 logger.info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable |
245 get_logger().info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable |
206 return None |
246 return None |
207 |
247 |
208 user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"]) |
248 user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"]) |
209 if "id" not in user_dict: |
249 if "id" not in user_dict: |
210 return None |
250 return None |
211 |
251 |
212 #TODO filter get, wrap in proxy |
252 #TODO filter get, wrap in proxy |
213 user = self.session.query(User).filter(User.id == user_dict["id"]).first() |
253 user_obj = self.session.query(User).filter(User.id == user_dict["id"]).first() |
214 |
254 |
215 if user is not None: |
255 if user_obj is not None: |
216 return user |
256 if not do_merge: |
|
257 return ObjectBufferProxy(User, None, None, False, user_obj) |
217 |
258 |
218 user = self.obj_buffer.add_object(User, None, user_dict, True) |
259 user = self.obj_buffer.add_object(User, None, user_dict, True) |
219 |
260 |
220 return user |
261 return user |
221 |
262 |
|
263 def __get_or_create_object(self, klass, filter_by_kwargs, filter, creation_kwargs, must_flush, do_merge): |
|
264 |
|
265 obj_proxy = self.obj_buffer.get(klass, **filter_by_kwargs) |
|
266 if obj_proxy is None: |
|
267 query = self.session.query(klass) |
|
268 if filter is not None: |
|
269 query = query.filter(filter) |
|
270 else: |
|
271 query = query.filter_by(**filter_by_kwargs) |
|
272 obj_instance = query.first() |
|
273 if obj_instance is not None: |
|
274 if not do_merge: |
|
275 obj_proxy = ObjectBufferProxy(klass, None, None, False, obj_instance) |
|
276 else: |
|
277 obj_proxy = self.obj_buffer.add_object(klass, None, creation_kwargs, must_flush, obj_instance) |
|
278 if obj_proxy is None: |
|
279 obj_proxy = self.obj_buffer.add_object(klass, None, creation_kwargs, must_flush) |
|
280 return obj_proxy |
|
281 |
222 |
282 |
223 def __process_entity(self, ind, ind_type): |
283 def __process_entity(self, ind, ind_type): |
224 logger.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable |
284 get_logger().debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable |
225 |
285 |
226 ind = clean_keys(ind) |
286 ind = clean_keys(ind) |
227 |
287 |
|
288 entity_type = self.__get_or_create_object(EntityType, {'label':ind_type}, None, {'label':ind_type}, True, False) |
|
289 |
228 entity_dict = { |
290 entity_dict = { |
229 "indice_start": ind["indices"][0], |
291 "indice_start" : ind["indices"][0], |
230 "indice_end" : ind["indices"][1], |
292 "indice_end" : ind["indices"][1], |
231 "tweet_id" : self.tweet.id, |
293 "tweet_id" : self.tweet.id, |
|
294 "entity_type_id" : entity_type.id, |
|
295 "source" : adapt_json(ind) |
232 } |
296 } |
233 |
297 |
|
298 def process_medias(): |
|
299 |
|
300 media_id = ind.get('id', None) |
|
301 if media_id is None: |
|
302 return None, None |
|
303 |
|
304 type_str = ind.get("type", "photo") |
|
305 media_type = self.__get_or_create_object(MediaType, {'label': type_str}, None, {'label':type_str}, True, False) |
|
306 media_ind = adapt_fields(ind, fields_adapter["entities"]["medias"]) |
|
307 if "type" in media_ind: |
|
308 del(media_ind["type"]) |
|
309 media_ind['type_id'] = media_type.id |
|
310 media = self.__get_or_create_object(Media, {'id':media_id}, None, media_ind, True, False) |
|
311 |
|
312 entity_dict['media_id'] = media.id |
|
313 return EntityMedia, entity_dict |
|
314 |
234 def process_hashtags(): |
315 def process_hashtags(): |
235 text = ind.get("text", ind.get("hashtag", None)) |
316 text = ind.get("text", ind.get("hashtag", None)) |
236 if text is None: |
317 if text is None: |
237 return None |
318 return None, None |
238 hashtag = self.obj_buffer.get(Hashtag, text=text) |
319 ind['text'] = text |
239 if hashtag is None: |
320 hashtag = self.__get_or_create_object(Hashtag, {'text':text}, Hashtag.text.ilike(text), ind, True, False) |
240 hashtag_obj = self.session.query(Hashtag).filter(Hashtag.text.ilike(text)).first() |
|
241 if hashtag_obj is not None: |
|
242 hashtag = ObjectBufferProxy(Hashtag, None, None, False, hashtag_obj) |
|
243 |
|
244 if hashtag is None: |
|
245 ind["text"] = text |
|
246 hashtag = self.obj_buffer.add_object(Hashtag, None, ind, True) |
|
247 entity_dict['hashtag_id'] = hashtag.id |
321 entity_dict['hashtag_id'] = hashtag.id |
248 return EntityHashtag, entity_dict |
322 return EntityHashtag, entity_dict |
249 |
323 |
250 def process_user_mentions(): |
324 def process_user_mentions(): |
251 user_mention = self.__get_user(ind) |
325 user_mention = self.__get_user(ind, False, False) |
252 if user_mention is None: |
326 if user_mention is None: |
253 entity_dict['user_id'] = None |
327 entity_dict['user_id'] = None |
254 else: |
328 else: |
255 entity_dict['user_id'] = user_mention.id |
329 entity_dict['user_id'] = user_mention.id |
256 return EntityUser, entity_dict |
330 return EntityUser, entity_dict |
257 |
331 |
258 def process_urls(): |
332 def process_urls(): |
259 url = self.obj_buffer.get(Url, url=ind["url"]) |
333 url = self.__get_or_create_object(Url, {'url':ind["url"]}, None, ind, True, False) |
260 if url is None: |
|
261 url_obj = self.session.query(Url).filter(Url.url == ind["url"]).first() |
|
262 if url_obj is not None: |
|
263 url = ObjectBufferProxy(Url, None, None, False, url_obj) |
|
264 if url is None: |
|
265 url = self.obj_buffer.add_object(Url, None, ind, True) |
|
266 entity_dict['url_id'] = url.id |
334 entity_dict['url_id'] = url.id |
267 return EntityUrl, entity_dict |
335 return EntityUrl, entity_dict |
268 |
336 |
269 #{'': lambda } |
337 #{'': lambda } |
270 entity_klass, entity_dict = { |
338 entity_klass, entity_dict = { |
271 'hashtags': process_hashtags, |
339 'hashtags': process_hashtags, |
272 'user_mentions' : process_user_mentions, |
340 'user_mentions' : process_user_mentions, |
273 'urls' : process_urls |
341 'urls' : process_urls, |
274 }[ind_type]() |
342 'media': process_medias, |
275 |
343 }.get(ind_type, lambda: (Entity, entity_dict))() |
276 logger.debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable |
344 |
|
345 get_logger().debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable |
277 if entity_klass: |
346 if entity_klass: |
278 self.obj_buffer.add_object(entity_klass, None, entity_dict, False) |
347 self.obj_buffer.add_object(entity_klass, None, entity_dict, False) |
279 |
348 |
280 |
349 |
281 def __process_twitter_stream(self): |
350 def __process_twitter_stream(self): |
285 return |
354 return |
286 |
355 |
287 ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"]) |
356 ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"]) |
288 |
357 |
289 # get or create user |
358 # get or create user |
290 user = self.__get_user(self.json_dict["user"]) |
359 user = self.__get_user(self.json_dict["user"], True) |
291 if user is None: |
360 if user is None: |
292 logger.warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable |
361 get_logger().warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable |
293 ts_copy["user_id"] = None |
362 ts_copy["user_id"] = None |
294 else: |
363 else: |
295 ts_copy["user_id"] = user.id |
364 ts_copy["user_id"] = user.id |
296 |
365 |
297 del(ts_copy['user']) |
366 del(ts_copy['user']) |
298 ts_copy["tweet_source_id"] = self.source_id |
367 ts_copy["tweet_source_id"] = self.source_id |
299 |
368 |
300 self.tweet = self.obj_buffer.add_object(Tweet, None, ts_copy, True) |
369 self.tweet = self.obj_buffer.add_object(Tweet, None, ts_copy, True) |
301 |
370 |
302 # get entities |
371 self.__process_entities() |
|
372 |
|
373 |
|
374 def __process_entities(self): |
303 if "entities" in self.json_dict: |
375 if "entities" in self.json_dict: |
304 for ind_type, entity_list in self.json_dict["entities"].items(): |
376 for ind_type, entity_list in self.json_dict["entities"].items(): |
305 for ind in entity_list: |
377 for ind in entity_list: |
306 self.__process_entity(ind, ind_type) |
378 self.__process_entity(ind, ind_type) |
307 else: |
379 else: |
308 extractor = twitter_text.Extractor(self.tweet.text) |
380 |
309 |
381 text = self.tweet.text |
|
382 extractor = twitter_text.Extractor(text) |
310 for ind in extractor.extract_hashtags_with_indices(): |
383 for ind in extractor.extract_hashtags_with_indices(): |
311 self.__process_entity(ind, "hashtags") |
384 self.__process_entity(ind, "hashtags") |
312 |
385 |
|
386 for ind in extractor.extract_urls_with_indices(): |
|
387 self.__process_entity(ind, "urls") |
|
388 |
313 for ind in extractor.extract_mentioned_screen_names_with_indices(): |
389 for ind in extractor.extract_mentioned_screen_names_with_indices(): |
314 self.__process_entity(ind, "user_mentions") |
390 self.__process_entity(ind, "user_mentions") |
315 |
|
316 for ind in extractor.extract_urls_with_indices(): |
|
317 self.__process_entity(ind, "urls") |
|
318 |
|
319 self.session.flush() |
|
320 |
|
321 |
391 |
322 def __process_twitter_rest(self): |
392 def __process_twitter_rest(self): |
323 tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count() |
393 tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count() |
324 if tweet_nb > 0: |
394 if tweet_nb > 0: |
325 return |
395 return |