1 from models import Tweet, User, Hashtag, EntityHashtag, EntityUser, Url, \ |
1 from models import (Tweet, User, Hashtag, EntityHashtag, EntityUser, Url, |
2 EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY, \ |
2 EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY, |
3 ACCESS_TOKEN_SECRET, adapt_date, adapt_json |
3 ACCESS_TOKEN_SECRET, adapt_date, adapt_json, TweetSource, TweetLog) |
4 from sqlalchemy.sql import select, or_ #@UnresolvedImport |
4 from sqlalchemy.sql import select, or_ #@UnresolvedImport |
5 import anyjson #@UnresolvedImport |
5 import anyjson #@UnresolvedImport |
6 import datetime |
6 import datetime |
7 import email.utils |
7 import email.utils |
8 import logging #@UnresolvedImport |
8 import logging #@UnresolvedImport |
81 class TwitterProcessorException(Exception): |
81 class TwitterProcessorException(Exception): |
82 pass |
82 pass |
83 |
83 |
84 class TwitterProcessor(object): |
84 class TwitterProcessor(object): |
85 |
85 |
86 def __init__(self, json_dict, json_txt, session, token_filename=None): |
86 def __init__(self, json_dict, json_txt, source_id, session, token_filename=None): |
87 |
87 |
88 if json_dict is None and json_txt is None: |
88 if json_dict is None and json_txt is None: |
89 raise TwitterProcessorException("No json") |
89 raise TwitterProcessorException("No json") |
90 |
90 |
91 if json_dict is None: |
91 if json_dict is None: |
99 self.json_txt = json_txt |
99 self.json_txt = json_txt |
100 |
100 |
101 if "id" not in self.json_dict: |
101 if "id" not in self.json_dict: |
102 raise TwitterProcessorException("No id in json") |
102 raise TwitterProcessorException("No id in json") |
103 |
103 |
|
104 self.source_id = source_id |
104 self.session = session |
105 self.session = session |
105 self.token_filename = token_filename |
106 self.token_filename = token_filename |
106 |
107 |
107 def __get_user(self, user_dict): |
108 def __get_user(self, user_dict): |
108 logging.debug("Get user : " + repr(user_dict)) #@UndefinedVariable |
109 logging.debug("Get user : " + repr(user_dict)) #@UndefinedVariable |
223 ts_copy["user"] = None |
224 ts_copy["user"] = None |
224 ts_copy["user_id"] = None |
225 ts_copy["user_id"] = None |
225 else: |
226 else: |
226 ts_copy["user"] = user |
227 ts_copy["user"] = user |
227 ts_copy["user_id"] = ts_copy["user"].id |
228 ts_copy["user_id"] = ts_copy["user"].id |
228 ts_copy["original_json"] = self.json_txt |
229 |
|
230 ts_copy["tweet_source_id"] = self.source_id |
229 |
231 |
230 self.tweet = Tweet(**ts_copy) |
232 self.tweet = Tweet(**ts_copy) |
231 self.session.add(self.tweet) |
233 self.session.add(self.tweet) |
232 self.session.flush() |
234 self.session.flush() |
233 |
235 |
239 |
241 |
240 def __process_twitter_rest(self): |
242 def __process_twitter_rest(self): |
241 tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count() |
243 tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count() |
242 if tweet_nb > 0: |
244 if tweet_nb > 0: |
243 return |
245 return |
244 |
246 |
|
247 |
245 tweet_fields = { |
248 tweet_fields = { |
246 'created_at': self.json_dict["created_at"], |
249 'created_at': self.json_dict["created_at"], |
247 'favorited': False, |
250 'favorited': False, |
248 'id': self.json_dict["id"], |
251 'id': self.json_dict["id"], |
249 'id_str': self.json_dict["id_str"], |
252 'id_str': self.json_dict["id_str"], |
251 'in_reply_to_user_id': self.json_dict["to_user_id"], |
254 'in_reply_to_user_id': self.json_dict["to_user_id"], |
252 'in_reply_to_user_id_str': self.json_dict["to_user_id_str"], |
255 'in_reply_to_user_id_str': self.json_dict["to_user_id_str"], |
253 #'place': ts["place"], |
256 #'place': ts["place"], |
254 'source': self.json_dict["source"], |
257 'source': self.json_dict["source"], |
255 'text': self.json_dict["text"], |
258 'text': self.json_dict["text"], |
256 'truncated': False, |
259 'truncated': False, |
257 'original_json' : self.json_txt, |
260 'tweet_source_id' : self.source_id, |
258 } |
261 } |
259 |
262 |
260 #user |
263 #user |
261 |
264 |
262 user_fields = { |
265 user_fields = { |
293 |
296 |
294 self.session.flush() |
297 self.session.flush() |
295 |
298 |
296 |
299 |
297 def process(self): |
300 def process(self): |
298 if "metadata" in self.json_dict: |
301 |
299 self.__process_twitter_rest() |
302 if self.source_id is None: |
300 else: |
303 tweet_source = TweetSource(original_json=self.json_txt); |
301 self.__process_twitter_stream() |
304 self.session.add(tweet_source) |
|
305 self.session.flush() |
|
306 self.source_id = tweet_source.id |
|
307 |
|
308 try: |
|
309 if "metadata" in self.json_dict: |
|
310 self.__process_twitter_rest() |
|
311 else: |
|
312 self.__process_twitter_stream() |
|
313 |
|
314 tweet_log = TweetLog(tweet_source_id = self.source_id, status = TweetLog.TWEET_STATUS['OK']) |
|
315 except: |
|
316 |
|
317 raise |
302 |
318 |
303 |
319 |
304 def set_logging(options): |
320 def set_logging(options): |
305 |
321 |
306 logging_config = {} |
322 logging_config = {} |