1 from models import * |
1 from models import Tweet, User, Hashtag, EntityHashtag, EntityUser, Url, \ |
2 from sqlalchemy.sql import select, or_ |
2 EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY, \ |
3 import anyjson |
3 ACCESS_TOKEN_SECRET, adapt_date, adapt_json |
|
4 from sqlalchemy.sql import select, or_ #@UnresolvedImport |
|
5 import anyjson #@UnresolvedImport |
4 import datetime |
6 import datetime |
5 import email.utils |
7 import email.utils |
6 import logging |
8 import logging #@UnresolvedImport |
7 import os.path |
9 import os.path |
8 import sys |
10 import sys |
9 import twitter |
11 import twitter.oauth #@UnresolvedImport |
10 import twitter.oauth |
12 import twitter.oauth_dance #@UnresolvedImport |
11 import twitter.oauth_dance |
13 import twitter_text #@UnresolvedImport |
12 import twitter_text |
14 |
13 |
15 |
14 |
16 |
15 CACHE_ACCESS_TOKEN = {} |
17 CACHE_ACCESS_TOKEN = {} |
16 |
18 |
17 def get_oauth_token(token_file_path=None, application_name=APPLICATION_NAME, consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET): |
19 def get_oauth_token(token_file_path=None, application_name=APPLICATION_NAME, consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET): |
20 |
22 |
21 if CACHE_ACCESS_TOKEN is not None and application_name in CACHE_ACCESS_TOKEN: |
23 if CACHE_ACCESS_TOKEN is not None and application_name in CACHE_ACCESS_TOKEN: |
22 return CACHE_ACCESS_TOKEN[application_name] |
24 return CACHE_ACCESS_TOKEN[application_name] |
23 |
25 |
24 if token_file_path and os.path.exists(token_file_path): |
26 if token_file_path and os.path.exists(token_file_path): |
25 logging.debug("reading token from file %s" % token_file_path) |
27 logging.debug("reading token from file %s" % token_file_path) #@UndefinedVariable |
26 CACHE_ACCESS_TOKEN[application_name] = twitter.oauth.read_token_file(token_file_path) |
28 CACHE_ACCESS_TOKEN[application_name] = twitter.oauth.read_token_file(token_file_path) |
27 return CACHE_ACCESS_TOKEN[application_name] |
29 return CACHE_ACCESS_TOKEN[application_name] |
28 #read access token info from path |
30 #read access token info from path |
29 |
31 |
30 if 'ACCESS_TOKEN_KEY' in dict() and 'ACCESS_TOKEN_SECRET' in dict() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET: |
32 if 'ACCESS_TOKEN_KEY' in dict() and 'ACCESS_TOKEN_SECRET' in dict() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET: |
32 |
34 |
33 CACHE_ACCESS_TOKEN[application_name] = twitter.oauth_dance.oauth_dance(application_name, consumer_key, consumer_secret, token_file_path) |
35 CACHE_ACCESS_TOKEN[application_name] = twitter.oauth_dance.oauth_dance(application_name, consumer_key, consumer_secret, token_file_path) |
34 return CACHE_ACCESS_TOKEN[application_name] |
36 return CACHE_ACCESS_TOKEN[application_name] |
35 |
37 |
36 def parse_date(date_str): |
38 def parse_date(date_str): |
37 ts = email.utils.parsedate_tz(date_str) |
39 ts = email.utils.parsedate_tz(date_str) #@UndefinedVariable |
38 return datetime.datetime(*ts[0:7]) |
40 return datetime.datetime(*ts[0:7]) |
39 |
41 |
40 def clean_keys(dict_val): |
42 def clean_keys(dict_val): |
41 return dict([(str(key),value) for key,value in dict_val.items()]) |
43 return dict([(str(key),value) for key,value in dict_val.items()]) |
42 |
44 |
101 |
103 |
102 self.session = session |
104 self.session = session |
103 self.token_filename = token_filename |
105 self.token_filename = token_filename |
104 |
106 |
105 def __get_user(self, user_dict): |
107 def __get_user(self, user_dict): |
106 logging.debug("Get user : " + repr(user_dict)) |
108 logging.debug("Get user : " + repr(user_dict)) #@UndefinedVariable |
107 |
109 |
108 user_id = user_dict.get("id",None) |
110 user_id = user_dict.get("id",None) |
109 user_name = user_dict.get("screen_name", user_dict.get("name", None)) |
111 user_name = user_dict.get("screen_name", user_dict.get("name", None)) |
110 |
112 |
111 if user_id is None and user_name is None: |
113 if user_id is None and user_name is None: |
128 if user_id: |
130 if user_id: |
129 user_dict = t.users.show(user_id=user_id) |
131 user_dict = t.users.show(user_id=user_id) |
130 else: |
132 else: |
131 user_dict = t.users.show(screen_name=user_name) |
133 user_dict = t.users.show(screen_name=user_name) |
132 except Exception as e: |
134 except Exception as e: |
133 logging.info("get_user : TWITTER ERROR : " + repr(e)) |
135 logging.info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable |
134 logging.info("get_user : TWITTER ERROR : " + str(e)) |
136 logging.info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable |
135 |
137 |
136 user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"]) |
138 user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"]) |
137 if "id" not in user_dict: |
139 if "id" not in user_dict: |
138 return None |
140 return None |
139 |
141 |
143 self.session.flush() |
145 self.session.flush() |
144 |
146 |
145 return user |
147 return user |
146 |
148 |
147 def __process_entity(self, ind, ind_type): |
149 def __process_entity(self, ind, ind_type): |
148 logging.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) |
150 logging.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable |
149 |
151 |
150 ind = clean_keys(ind) |
152 ind = clean_keys(ind) |
151 |
153 |
152 entity_dict = { |
154 entity_dict = { |
153 "indice_start": ind["indices"][0], |
155 "indice_start": ind["indices"][0], |
198 'hashtags': process_hashtags, |
200 'hashtags': process_hashtags, |
199 'user_mentions' : process_user_mentions, |
201 'user_mentions' : process_user_mentions, |
200 'urls' : process_urls |
202 'urls' : process_urls |
201 }[ind_type]() |
203 }[ind_type]() |
202 |
204 |
203 logging.debug("Process_entity entity_dict: " + repr(entity_dict)) |
205 logging.debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable |
204 if entity: |
206 if entity: |
205 self.session.add(entity) |
207 self.session.add(entity) |
206 self.session.flush() |
208 self.session.flush() |
207 |
209 |
208 |
210 |
215 ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"]) |
217 ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"]) |
216 |
218 |
217 # get or create user |
219 # get or create user |
218 user = self.__get_user(self.json_dict["user"]) |
220 user = self.__get_user(self.json_dict["user"]) |
219 if user is None: |
221 if user is None: |
220 logging.warning("USER not found " + repr(self.json_dict["user"])) |
222 logging.warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable |
221 ts_copy["user"] = None |
223 ts_copy["user"] = None |
222 ts_copy["user_id"] = None |
224 ts_copy["user_id"] = None |
223 else: |
225 else: |
224 ts_copy["user"] = user |
226 ts_copy["user"] = user |
225 ts_copy["user_id"] = ts_copy["user"].id |
227 ts_copy["user_id"] = ts_copy["user"].id |
263 'screen_name' : self.json_dict["from_user"], |
265 'screen_name' : self.json_dict["from_user"], |
264 } |
266 } |
265 |
267 |
266 user = self.__get_user(user_fields) |
268 user = self.__get_user(user_fields) |
267 if user is None: |
269 if user is None: |
268 logging.warning("USER not found " + repr(user_fields)) |
270 logging.warning("USER not found " + repr(user_fields)) #@UndefinedVariable |
269 tweet_fields["user"] = None |
271 tweet_fields["user"] = None |
270 tweet_fields["user_id"] = None |
272 tweet_fields["user_id"] = None |
271 else: |
273 else: |
272 tweet_fields["user"] = user |
274 tweet_fields["user"] = user |
273 tweet_fields["user_id"] = user.id |
275 tweet_fields["user_id"] = user.id |
308 elif options.logfile == "stderr": |
310 elif options.logfile == "stderr": |
309 logging_config["stream"] = sys.stderr |
311 logging_config["stream"] = sys.stderr |
310 else: |
312 else: |
311 logging_config["filename"] = options.logfile |
313 logging_config["filename"] = options.logfile |
312 |
314 |
313 logging_config["level"] = max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)) |
315 logging_config["level"] = max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)) #@UndefinedVariable |
314 logging.basicConfig(**logging_config) |
316 logging.basicConfig(**logging_config) #@UndefinedVariable |
315 |
317 |
316 options.debug = (options.verbose-options.quiet > 0) |
318 options.debug = (options.verbose-options.quiet > 0) |
317 |
319 |
318 def set_logging_options(parser): |
320 def set_logging_options(parser): |
319 parser.add_option("-l", "--log", dest="logfile", |
321 parser.add_option("-l", "--log", dest="logfile", |
326 |
328 |
327 def get_filter_query(session, start_date, end_date, hashtags, tweet_exclude_table): |
329 def get_filter_query(session, start_date, end_date, hashtags, tweet_exclude_table): |
328 |
330 |
329 query = session.query(Tweet).join(EntityHashtag).join(Hashtag) |
331 query = session.query(Tweet).join(EntityHashtag).join(Hashtag) |
330 if tweet_exclude_table is not None: |
332 if tweet_exclude_table is not None: |
331 query = query.filter(~Tweet.id.in_(select([tweet_exclude_table.c.id]))) |
333 query = query.filter(~Tweet.id.in_(select([tweet_exclude_table.c.id]))) #@UndefinedVariable |
332 |
334 |
333 query = query.filter(Tweet.created_at >= start_date).filter(Tweet.created_at <= end_date) |
335 query = query.filter(Tweet.created_at >= start_date).filter(Tweet.created_at <= end_date) |
334 |
336 |
335 if hashtags : |
337 if hashtags : |
336 def merge_hash(l,h): |
338 def merge_hash(l,h): |
337 l.extend(h.split(",")) |
339 l.extend(h.split(",")) |
338 return l |
340 return l |
339 htags = reduce(merge_hash, hashtags, []) |
341 htags = reduce(merge_hash, hashtags, []) |
340 |
342 |
341 query = query.filter(or_(*map(lambda h: Hashtag.text.contains(h), htags))) |
343 query = query.filter(or_(*map(lambda h: Hashtag.text.contains(h), htags))) #@UndefinedVariable |
342 |
344 |
343 return query |
345 return query |
344 |
346 |
345 |
347 |
346 def get_user_query(session, start_date, end_date, hashtags, tweet_exclude_table): |
348 def get_user_query(session, start_date, end_date, hashtags, tweet_exclude_table): |
347 |
349 |
348 query = session.query(User).join(Tweet).join(EntityHashtag).join(Hashtag) |
350 query = session.query(User).join(Tweet).join(EntityHashtag).join(Hashtag) |
349 if tweet_exclude_table is not None: |
351 if tweet_exclude_table is not None: |
350 query = query.filter(~Tweet.id.in_(select([tweet_exclude_table.c.id]))) |
352 query = query.filter(~Tweet.id.in_(select([tweet_exclude_table.c.id]))) #@UndefinedVariable |
351 |
353 |
352 query = query.filter(Tweet.created_at >= start_date).filter(Tweet.created_at <= end_date) |
354 query = query.filter(Tweet.created_at >= start_date).filter(Tweet.created_at <= end_date) |
353 |
355 |
354 if hashtags : |
356 if hashtags : |
355 def merge_hash(l,h): |
357 def merge_hash(l,h): |
356 l.extend(h.split(",")) |
358 l.extend(h.split(",")) |
357 return l |
359 return l |
358 htags = reduce(merge_hash, hashtags, []) |
360 htags = reduce(merge_hash, hashtags, []) |
359 |
361 |
360 query = query.filter(or_(*map(lambda h: Hashtag.text.contains(h), htags))) |
362 query = query.filter(or_(*map(lambda h: Hashtag.text.contains(h), htags))) #@UndefinedVariable |
361 |
363 |
362 return query.distinct() |
364 return query.distinct() |
363 |
365 |
364 |
366 |