|
1 from models import * |
|
2 import datetime |
|
3 import email.utils |
|
4 import json |
|
5 import logging |
|
6 import sys |
|
7 import twitter |
|
8 import twitter_text |
|
9 import os.path |
|
10 import twitter.oauth |
|
11 |
|
12 |
|
13 def get_oauth_token(token_file_path=None): |
|
14 |
|
15 if token_file_path and os.path.file_exists(token_file_path): |
|
16 logging.debug("reading token from file %s" % token_file_path) |
|
17 return twitter.oauth.read_token_file(token_file_path) |
|
18 #read access token info from path |
|
19 |
|
20 if 'ACCESS_TOKEN_KEY' in dict() and 'ACCESS_TOKEN_SECRET' in dict() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET: |
|
21 return ACCESS_TOKEN_KEY,ACCESS_TOKEN_SECRET |
|
22 |
|
23 return twitter.oauth_dance.oauth_dance(APPLICATION_NAME, CONSUMER_KEY, CONSUMER_SECRET, token_filename) |
|
24 |
|
25 def parse_date(date_str): |
|
26 ts = email.utils.parsedate_tz(date_str) |
|
27 return datetime.datetime(*ts[0:7]) |
|
28 |
|
29 |
|
30 fields_adapter = { |
|
31 'stream': { |
|
32 "tweet": { |
|
33 "created_at" : adapt_date, |
|
34 "coordinates" : adapt_json, |
|
35 "place" : adapt_json, |
|
36 "geo" : adapt_json, |
|
37 # "original_json" : adapt_json, |
|
38 }, |
|
39 "user": { |
|
40 "created_at" : adapt_date, |
|
41 }, |
|
42 }, |
|
43 'rest': { |
|
44 "tweet" : { |
|
45 "place" : adapt_json, |
|
46 "geo" : adapt_json, |
|
47 "created_at" : adapt_date, |
|
48 # "original_json" : adapt_json, |
|
49 }, |
|
50 }, |
|
51 } |
|
52 |
|
53 # |
|
54 # adapt fields, return a copy of the field_dict with adapted fields |
|
55 # |
|
56 def adapt_fields(fields_dict, adapter_mapping): |
|
57 def adapt_one_field(field, value): |
|
58 if field in adapter_mapping and adapter_mapping[field] is not None: |
|
59 return adapter_mapping[field](value) |
|
60 else: |
|
61 return value |
|
62 return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()]) |
|
63 |
|
64 |
|
65 |
|
66 class TwitterProcessorException(Exception): |
|
67 pass |
|
68 |
|
69 class TwitterProcessor(object): |
|
70 |
|
71 def __init__(self, json_dict, json_txt, session): |
|
72 |
|
73 if json_dict is None and json_txt is None: |
|
74 raise TwitterProcessorException("No json") |
|
75 |
|
76 if json_dict is None: |
|
77 self.json_dict = json.loads(json_txt) |
|
78 else: |
|
79 self.json_dict = json_dict |
|
80 |
|
81 if not json_txt: |
|
82 self.json_txt = json.dumps(json_dict) |
|
83 else: |
|
84 self.json_txt = json_txt |
|
85 |
|
86 if "id" not in self.json_dict: |
|
87 raise TwitterProcessorException("No id in json") |
|
88 |
|
89 self.session = session |
|
90 |
|
91 def __get_user(self, user_dict): |
|
92 logging.debug("Get user : " + repr(user_dict)) |
|
93 |
|
94 user_id = user_dict.get("id",None) |
|
95 user_name = user_dict.get("screen_name", user_dict.get("name", None)) |
|
96 |
|
97 if user_id is None and user_name is None: |
|
98 return None |
|
99 |
|
100 if user_id: |
|
101 user = self.session.query(User).filter(User.id == user_id).first() |
|
102 else: |
|
103 user = self.session.query(User).filter(User.screen_name == user_name).first() |
|
104 |
|
105 if user is not None: |
|
106 return user |
|
107 |
|
108 user_created_at = user_dict.get("created_at", None) |
|
109 |
|
110 if user_created_at is None: |
|
111 acess_token_key, access_token_secret = get_oauth_token() |
|
112 t = twitter.Twitter(auth=twitter.OAuth(token_key, token_secret, CONSUMER_KEY, CONSUMER_SECRET)) |
|
113 try: |
|
114 if user_id: |
|
115 user_dict = t.users.show(user_id=user_id) |
|
116 else: |
|
117 user_dict = t.users.show(screen_name=user_name) |
|
118 except Exception as e: |
|
119 logging.info("get_user : TWITTER ERROR : " + repr(e)) |
|
120 logging.info("get_user : TWITTER ERROR : " + str(e)) |
|
121 |
|
122 user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"]) |
|
123 if "id" not in user_dict: |
|
124 return None |
|
125 |
|
126 user = User(**user_dict) |
|
127 |
|
128 self.session.add(user) |
|
129 self.session.flush() |
|
130 |
|
131 return user |
|
132 |
|
133 def __process_entity(self, ind, ind_type): |
|
134 logging.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) |
|
135 |
|
136 entity_dict = { |
|
137 "indice_start": ind["indices"][0], |
|
138 "indice_end" : ind["indices"][1], |
|
139 "tweet_id" : self.tweet.id, |
|
140 "tweet" : self.tweet |
|
141 } |
|
142 |
|
143 def process_hashtags(): |
|
144 text = ind.get("text", ind.get("hashtag", None)) |
|
145 if text is None: |
|
146 return None |
|
147 hashtag = self.session.query(Hashtag).filter(Hashtag.text == text).first() |
|
148 if not hashtag: |
|
149 ind["text"] = text |
|
150 hashtag = Hashtag(**ind) |
|
151 self.session.add(hashtag) |
|
152 self.session.flush() |
|
153 entity_dict['hashtag'] = hashtag |
|
154 entity_dict['hashtag_id'] = hashtag.id |
|
155 entity = EntityHashtag(**entity_dict) |
|
156 return entity |
|
157 |
|
158 def process_user_mentions(): |
|
159 user_mention = self.__get_user(ind) |
|
160 if user_mention is None: |
|
161 entity_dict['user'] = None |
|
162 entity_dict['user_id'] = None |
|
163 else: |
|
164 entity_dict['user'] = user_mention |
|
165 entity_dict['user_id'] = user_mention.id |
|
166 entity = EntityUser(**entity_dict) |
|
167 return entity |
|
168 |
|
169 def process_urls(): |
|
170 url = self.session.query(Url).filter(Url.url == ind["url"]).first() |
|
171 if url is None: |
|
172 url = Url(**ind) |
|
173 self.session.add(url) |
|
174 self.session.flush() |
|
175 entity_dict['url'] = url |
|
176 entity_dict['url_id'] = url.id |
|
177 entity = EntityUrl(**entity_dict) |
|
178 return entity |
|
179 |
|
180 #{'': lambda } |
|
181 entity = { |
|
182 'hashtags': process_hashtags, |
|
183 'user_mentions' : process_user_mentions, |
|
184 'urls' : process_urls |
|
185 }[ind_type]() |
|
186 |
|
187 logging.debug("Process_entity entity_dict: " + repr(entity_dict)) |
|
188 if entity: |
|
189 self.session.add(entity) |
|
190 self.session.flush() |
|
191 |
|
192 |
|
193 def __process_twitter_stream(self): |
|
194 |
|
195 tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count() |
|
196 if tweet_nb > 0: |
|
197 return |
|
198 |
|
199 ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"]) |
|
200 |
|
201 # get or create user |
|
202 user = self.__get_user(self.json_dict["user"]) |
|
203 if user is None: |
|
204 log.warning("USER not found " + repr(ts["user"])) |
|
205 ts_copy["user"] = None |
|
206 ts_copy["user_id"] = None |
|
207 else: |
|
208 ts_copy["user"] = user |
|
209 ts_copy["user_id"] = ts_copy["user"].id |
|
210 ts_copy["original_json"] = self.json_txt |
|
211 |
|
212 self.tweet = Tweet(**ts_copy) |
|
213 self.session.add(self.tweet) |
|
214 self.session.flush() |
|
215 |
|
216 # get entities |
|
217 for ind_type, entity_list in self.json_dict["entities"].items(): |
|
218 for ind in entity_list: |
|
219 self.__process_entity(ind, ind_type) |
|
220 |
|
221 |
|
222 def __process_twitter_rest(self): |
|
223 tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count() |
|
224 if tweet_nb > 0: |
|
225 return |
|
226 |
|
227 tweet_fields = { |
|
228 'created_at': self.json_dict["created_at"], |
|
229 'favorited': False, |
|
230 'id': self.json_dict["id"], |
|
231 'id_str': self.json_dict["id_str"], |
|
232 #'in_reply_to_screen_name': ts["to_user"], |
|
233 'in_reply_to_user_id': self.json_dict["to_user_id"], |
|
234 'in_reply_to_user_id_str': self.json_dict["to_user_id_str"], |
|
235 #'place': ts["place"], |
|
236 'source': self.json_dict["source"], |
|
237 'text': self.json_dict["text"], |
|
238 'truncated': False, |
|
239 'original_json' : self.json_txt, |
|
240 } |
|
241 |
|
242 #user |
|
243 |
|
244 user_fields = { |
|
245 'id' : self.json_dict['from_user_id'], |
|
246 'id_str' : self.json_dict['from_user_id_str'], |
|
247 'lang' : self.json_dict['iso_language_code'], |
|
248 'profile_image_url' : self.json_dict["profile_image_url"], |
|
249 'screen_name' : self.json_dict["from_user"], |
|
250 } |
|
251 |
|
252 user = self.__get_user(user_fields) |
|
253 if user is None: |
|
254 log.warning("USER not found " + repr(user_fields)) |
|
255 tweet_fields["user"] = None |
|
256 tweet_fields["user_id"] = None |
|
257 else: |
|
258 tweet_fields["user"] = user |
|
259 tweet_fields["user_id"] = user.id |
|
260 |
|
261 tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"]) |
|
262 self.tweet = Tweet(**tweet_fields) |
|
263 session.add(self.tweet) |
|
264 |
|
265 text = self.tweet.text |
|
266 |
|
267 extractor = twitter_text.Extractor(text) |
|
268 |
|
269 for ind in extractor.extract_hashtags_with_indices(): |
|
270 self.__process_entity(ind, "hashtags") |
|
271 |
|
272 for ind in extractor.extract_mentioned_screen_names_with_indices(): |
|
273 self.__process_entity(ind, "user_mentions") |
|
274 |
|
275 for ind in extractor.extract_urls_with_indices(): |
|
276 self.__process_entity(ind, "urls") |
|
277 |
|
278 self.session.flush() |
|
279 |
|
280 |
|
281 def process(self): |
|
282 if "metadata" in self.json_dict: |
|
283 self.__process_twitter_rest() |
|
284 else: |
|
285 self.__process_twitter_stream() |
|
286 |
|
287 |
|
288 def set_logging(options): |
|
289 |
|
290 logging_config = {} |
|
291 |
|
292 if options.logfile == "stdout": |
|
293 logging_config["stream"] = sys.stdout |
|
294 elif options.logfile == "stderr": |
|
295 logging_config["stream"] = sys.stderr |
|
296 else: |
|
297 logging_config["filename"] = options.logfile |
|
298 |
|
299 logging_config["level"] = max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)) |
|
300 logging.basicConfig(**logging_config) |
|
301 |
|
302 options.debug = (options.verbose-options.quiet > 0) |
|
303 |
|
304 def set_logging_options(parser): |
|
305 parser.add_option("-l", "--log", dest="logfile", |
|
306 help="log to file", metavar="LOG", default="stderr") |
|
307 parser.add_option("-v", dest="verbose", action="count", |
|
308 help="verbose", metavar="VERBOSE", default=0) |
|
309 parser.add_option("-q", dest="quiet", action="count", |
|
310 help="quiet", metavar="QUIET", default=0) |