32 get_logger().debug("get_oauth_token : reading token from file %s" % token_file_path) #@UndefinedVariable |
29 get_logger().debug("get_oauth_token : reading token from file %s" % token_file_path) #@UndefinedVariable |
33 res = twitter.oauth.read_token_file(token_file_path) |
30 res = twitter.oauth.read_token_file(token_file_path) |
34 |
31 |
35 if res is not None and check_access_token: |
32 if res is not None and check_access_token: |
36 get_logger().debug("get_oauth_token : Check oauth tokens") #@UndefinedVariable |
33 get_logger().debug("get_oauth_token : Check oauth tokens") #@UndefinedVariable |
37 t = twitter.Twitter(auth=twitter.OAuth(res[0], res[1], CONSUMER_KEY, CONSUMER_SECRET)) |
34 t = twitter.Twitter(auth=twitter.OAuth(res[0], res[1], consumer_key, consumer_secret)) |
38 status = None |
35 status = None |
39 try: |
36 try: |
40 status = t.account.rate_limit_status() |
37 status = t.application.rate_limit_status(resources="account") |
41 except Exception as e: |
38 except Exception as e: |
42 get_logger().debug("get_oauth_token : error getting rate limit status %s " % repr(e)) |
39 get_logger().debug("get_oauth_token : error getting rate limit status %s " % repr(e)) |
43 get_logger().debug("get_oauth_token : error getting rate limit status %s " % str(e)) |
40 get_logger().debug("get_oauth_token : error getting rate limit status %s " % str(e)) |
44 status = None |
41 status = None |
45 get_logger().debug("get_oauth_token : Check oauth tokens : status %s" % repr(status)) #@UndefinedVariable |
42 get_logger().debug("get_oauth_token : Check oauth tokens : status %s" % repr(status)) #@UndefinedVariable |
46 if status is None or status['remaining_hits'] == 0: |
43 if status is None or status.get("resources",{}).get("account",{}).get('/account/verify_credentials',{}).get('remaining',0) == 0: |
47 get_logger().debug("get_oauth_token : Problem with status %s" % repr(status)) |
44 get_logger().debug("get_oauth_token : Problem with status %s" % repr(status)) |
48 res = None |
45 res = None |
49 |
46 |
50 if res is None: |
47 if res is None: |
51 get_logger().debug("get_oauth_token : doing the oauth dance") |
48 get_logger().debug("get_oauth_token : doing the oauth dance") |
52 res = twitter.oauth_dance(application_name, consumer_key, consumer_secret, token_file_path) |
49 res = twitter.oauth_dance(application_name, consumer_key, consumer_secret, token_file_path) |
|
50 |
53 |
51 |
54 CACHE_ACCESS_TOKEN[application_name] = res |
52 CACHE_ACCESS_TOKEN[application_name] = res |
55 |
53 |
|
54 get_logger().debug("get_oauth_token : done got %s" % repr(res)) |
56 return res |
55 return res |
57 |
56 |
58 def parse_date(date_str): |
57 def parse_date(date_str): |
59 ts = email.utils.parsedate_tz(date_str) #@UndefinedVariable |
58 ts = email.utils.parsedate_tz(date_str) #@UndefinedVariable |
60 return datetime.datetime(*ts[0:7]) |
59 return datetime.datetime(*ts[0:7]) |
167 break |
166 break |
168 if found: |
167 if found: |
169 return proxy |
168 return proxy |
170 return None |
169 return None |
171 |
170 |
172 class TwitterProcessorException(Exception): |
|
173 pass |
|
174 |
|
175 class TwitterProcessor(object): |
|
176 |
|
177 def __init__(self, json_dict, json_txt, source_id, session, access_token=None, token_filename=None, user_query_twitter=False): |
|
178 |
|
179 if json_dict is None and json_txt is None: |
|
180 raise TwitterProcessorException("No json") |
|
181 |
|
182 if json_dict is None: |
|
183 self.json_dict = anyjson.deserialize(json_txt) |
|
184 else: |
|
185 self.json_dict = json_dict |
|
186 |
|
187 if not json_txt: |
|
188 self.json_txt = anyjson.serialize(json_dict) |
|
189 else: |
|
190 self.json_txt = json_txt |
|
191 |
|
192 if "id" not in self.json_dict: |
|
193 raise TwitterProcessorException("No id in json") |
|
194 |
|
195 self.source_id = source_id |
|
196 self.session = session |
|
197 self.token_filename = token_filename |
|
198 self.access_token = access_token |
|
199 self.obj_buffer = ObjectsBuffer() |
|
200 self.user_query_twitter = user_query_twitter |
|
201 |
|
202 |
|
203 |
|
204 def __get_user(self, user_dict, do_merge): |
|
205 get_logger().debug("Get user : " + repr(user_dict)) #@UndefinedVariable |
|
206 |
|
207 user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"]) |
|
208 |
|
209 user_id = user_dict.get("id",None) |
|
210 user_name = user_dict.get("screen_name", user_dict.get("name", None)) |
|
211 |
|
212 if user_id is None and user_name is None: |
|
213 return None |
|
214 |
|
215 user = None |
|
216 if user_id: |
|
217 user = self.obj_buffer.get(User, id=user_id) |
|
218 else: |
|
219 user = self.obj_buffer.get(User, screen_name=user_name) |
|
220 |
|
221 #to do update user id needed |
|
222 if user is not None: |
|
223 user_created_at = None |
|
224 if user.args is not None: |
|
225 user_created_at = user.args.get('created_at', None) |
|
226 if user_created_at is None and user_dict.get('created_at', None) is not None and do_merge: |
|
227 if user.args is None: |
|
228 user.args = user_dict |
|
229 else: |
|
230 user.args.update(user_dict) |
|
231 return user |
|
232 |
|
233 #todo : add methpds to objectbuffer to get buffer user |
|
234 user_obj = None |
|
235 if user_id: |
|
236 user_obj = self.session.query(User).filter(User.id == user_id).first() |
|
237 else: |
|
238 user_obj = self.session.query(User).filter(User.screen_name.ilike(user_name)).first() |
|
239 |
|
240 #todo update user if needed |
|
241 if user_obj is not None: |
|
242 if user_obj.created_at is not None or user_dict.get('created_at', None) is None or not do_merge : |
|
243 user = ObjectBufferProxy(User, None, None, False, user_obj) |
|
244 else: |
|
245 user = self.obj_buffer.add_object(User, None, user_dict, True, user_obj) |
|
246 return user |
|
247 |
|
248 user_created_at = user_dict.get("created_at", None) |
|
249 |
|
250 if user_created_at is None and self.user_query_twitter: |
|
251 |
|
252 if self.access_token is not None: |
|
253 acess_token_key, access_token_secret = self.access_token |
|
254 else: |
|
255 acess_token_key, access_token_secret = get_oauth_token(self.token_filename) |
|
256 t = twitter.Twitter(auth=twitter.OAuth(acess_token_key, access_token_secret, CONSUMER_KEY, CONSUMER_SECRET)) |
|
257 try: |
|
258 if user_id: |
|
259 user_dict = t.users.show(user_id=user_id) |
|
260 else: |
|
261 user_dict = t.users.show(screen_name=user_name) |
|
262 except Exception as e: |
|
263 get_logger().info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable |
|
264 get_logger().info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable |
|
265 return None |
|
266 |
|
267 if "id" not in user_dict: |
|
268 return None |
|
269 |
|
270 #TODO filter get, wrap in proxy |
|
271 user_obj = self.session.query(User).filter(User.id == user_dict["id"]).first() |
|
272 |
|
273 if user_obj is not None and not do_merge: |
|
274 return ObjectBufferProxy(User, None, None, False, user_obj) |
|
275 else: |
|
276 return self.obj_buffer.add_object(User, None, user_dict, True) |
|
277 |
|
278 def __get_or_create_object(self, klass, filter_by_kwargs, filter_arg, creation_kwargs, must_flush, do_merge): |
|
279 |
|
280 obj_proxy = self.obj_buffer.get(klass, **filter_by_kwargs) |
|
281 if obj_proxy is None: |
|
282 query = self.session.query(klass) |
|
283 if filter_arg is not None: |
|
284 query = query.filter(filter_arg) |
|
285 else: |
|
286 query = query.filter_by(**filter_by_kwargs) |
|
287 obj_instance = query.first() |
|
288 if obj_instance is not None: |
|
289 if not do_merge: |
|
290 obj_proxy = ObjectBufferProxy(klass, None, None, False, obj_instance) |
|
291 else: |
|
292 obj_proxy = self.obj_buffer.add_object(klass, None, creation_kwargs, must_flush, obj_instance) |
|
293 if obj_proxy is None: |
|
294 obj_proxy = self.obj_buffer.add_object(klass, None, creation_kwargs, must_flush) |
|
295 return obj_proxy |
|
296 |
|
297 |
|
298 def __process_entity(self, ind, ind_type): |
|
299 get_logger().debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable |
|
300 |
|
301 ind = clean_keys(ind) |
|
302 |
|
303 entity_type = self.__get_or_create_object(EntityType, {'label':ind_type}, None, {'label':ind_type}, True, False) |
|
304 |
|
305 entity_dict = { |
|
306 "indice_start" : ind["indices"][0], |
|
307 "indice_end" : ind["indices"][1], |
|
308 "tweet_id" : self.tweet.id, |
|
309 "entity_type_id" : entity_type.id, |
|
310 "source" : adapt_json(ind) |
|
311 } |
|
312 |
|
313 def process_medias(): |
|
314 |
|
315 media_id = ind.get('id', None) |
|
316 if media_id is None: |
|
317 return None, None |
|
318 |
|
319 type_str = ind.get("type", "photo") |
|
320 media_type = self.__get_or_create_object(MediaType, {'label': type_str}, None, {'label':type_str}, True, False) |
|
321 media_ind = adapt_fields(ind, fields_adapter["entities"]["medias"]) |
|
322 if "type" in media_ind: |
|
323 del(media_ind["type"]) |
|
324 media_ind['type_id'] = media_type.id |
|
325 media = self.__get_or_create_object(Media, {'id':media_id}, None, media_ind, True, False) |
|
326 |
|
327 entity_dict['media_id'] = media.id |
|
328 return EntityMedia, entity_dict |
|
329 |
|
330 def process_hashtags(): |
|
331 text = ind.get("text", ind.get("hashtag", None)) |
|
332 if text is None: |
|
333 return None, None |
|
334 ind['text'] = text |
|
335 hashtag = self.__get_or_create_object(Hashtag, {'text':text}, Hashtag.text.ilike(text), ind, True, False) |
|
336 entity_dict['hashtag_id'] = hashtag.id |
|
337 return EntityHashtag, entity_dict |
|
338 |
|
339 def process_user_mentions(): |
|
340 user_mention = self.__get_user(ind, False) |
|
341 if user_mention is None: |
|
342 entity_dict['user_id'] = None |
|
343 else: |
|
344 entity_dict['user_id'] = user_mention.id |
|
345 return EntityUser, entity_dict |
|
346 |
|
347 def process_urls(): |
|
348 url = self.__get_or_create_object(Url, {'url':ind["url"]}, None, ind, True, False) |
|
349 entity_dict['url_id'] = url.id |
|
350 return EntityUrl, entity_dict |
|
351 |
|
352 #{'': lambda } |
|
353 entity_klass, entity_dict = { |
|
354 'hashtags': process_hashtags, |
|
355 'user_mentions' : process_user_mentions, |
|
356 'urls' : process_urls, |
|
357 'media': process_medias, |
|
358 }.get(ind_type, lambda: (Entity, entity_dict))() |
|
359 |
|
360 get_logger().debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable |
|
361 if entity_klass: |
|
362 self.obj_buffer.add_object(entity_klass, None, entity_dict, False) |
|
363 |
|
364 |
|
365 def __process_twitter_stream(self): |
|
366 |
|
367 tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count() |
|
368 if tweet_nb > 0: |
|
369 return |
|
370 |
|
371 ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"]) |
|
372 |
|
373 # get or create user |
|
374 user = self.__get_user(self.json_dict["user"], True) |
|
375 if user is None: |
|
376 get_logger().warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable |
|
377 ts_copy["user_id"] = None |
|
378 else: |
|
379 ts_copy["user_id"] = user.id |
|
380 |
|
381 del(ts_copy['user']) |
|
382 ts_copy["tweet_source_id"] = self.source_id |
|
383 |
|
384 self.tweet = self.obj_buffer.add_object(Tweet, None, ts_copy, True) |
|
385 |
|
386 self.__process_entities() |
|
387 |
|
388 |
|
389 def __process_entities(self): |
|
390 if "entities" in self.json_dict: |
|
391 for ind_type, entity_list in self.json_dict["entities"].items(): |
|
392 for ind in entity_list: |
|
393 self.__process_entity(ind, ind_type) |
|
394 else: |
|
395 |
|
396 text = self.tweet.text |
|
397 extractor = twitter_text.Extractor(text) |
|
398 for ind in extractor.extract_hashtags_with_indices(): |
|
399 self.__process_entity(ind, "hashtags") |
|
400 |
|
401 for ind in extractor.extract_urls_with_indices(): |
|
402 self.__process_entity(ind, "urls") |
|
403 |
|
404 for ind in extractor.extract_mentioned_screen_names_with_indices(): |
|
405 self.__process_entity(ind, "user_mentions") |
|
406 |
|
407 def __process_twitter_rest(self): |
|
408 tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count() |
|
409 if tweet_nb > 0: |
|
410 return |
|
411 |
|
412 |
|
413 tweet_fields = { |
|
414 'created_at': self.json_dict["created_at"], |
|
415 'favorited': False, |
|
416 'id': self.json_dict["id"], |
|
417 'id_str': self.json_dict["id_str"], |
|
418 #'in_reply_to_screen_name': ts["to_user"], |
|
419 'in_reply_to_user_id': self.json_dict.get("in_reply_to_user_id",None), |
|
420 'in_reply_to_user_id_str': self.json_dict.get("in_reply_to_user_id_str", None), |
|
421 #'place': ts["place"], |
|
422 'source': self.json_dict["source"], |
|
423 'text': self.json_dict["text"], |
|
424 'truncated': False, |
|
425 'tweet_source_id' : self.source_id, |
|
426 } |
|
427 |
|
428 #user |
|
429 |
|
430 user_fields = { |
|
431 'lang' : self.json_dict.get('iso_language_code',None), |
|
432 'profile_image_url' : self.json_dict["profile_image_url"], |
|
433 'screen_name' : self.json_dict["from_user"], |
|
434 'id' : self.json_dict["from_user_id"], |
|
435 'id_str' : self.json_dict["from_user_id_str"], |
|
436 'name' : self.json_dict['from_user_name'], |
|
437 } |
|
438 |
|
439 user = self.__get_user(user_fields, do_merge=False) |
|
440 if user is None: |
|
441 get_logger().warning("USER not found " + repr(user_fields)) #@UndefinedVariable |
|
442 tweet_fields["user_id"] = None |
|
443 else: |
|
444 tweet_fields["user_id"] = user.id |
|
445 |
|
446 tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"]) |
|
447 self.tweet = self.obj_buffer.add_object(Tweet, None, tweet_fields, True) |
|
448 |
|
449 self.__process_entities() |
|
450 |
|
451 |
|
452 |
|
453 def process(self): |
|
454 |
|
455 if self.source_id is None: |
|
456 tweet_source = self.obj_buffer.add_object(TweetSource, None, {'original_json':self.json_txt}, True) |
|
457 self.source_id = tweet_source.id |
|
458 |
|
459 if "metadata" in self.json_dict: |
|
460 self.__process_twitter_rest() |
|
461 else: |
|
462 self.__process_twitter_stream() |
|
463 |
|
464 self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['OK']}, True) |
|
465 |
|
466 self.obj_buffer.persists(self.session) |
|
467 |
171 |
468 |
172 |
469 def set_logging(options, plogger=None, queue=None): |
173 def set_logging(options, plogger=None, queue=None): |
470 |
174 |
471 logging_config = { |
175 logging_config = { |