1 from models import Tweet, User, Hashtag, EntityHashtag, EntityUser, Url, \ |
1 from models import (Tweet, User, Hashtag, EntityHashtag, EntityUser, Url, |
2 EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY, \ |
2 EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY, |
3 ACCESS_TOKEN_SECRET, adapt_date, adapt_json |
3 ACCESS_TOKEN_SECRET, adapt_date, adapt_json, TweetSource, TweetLog) |
4 from sqlalchemy.sql import select, or_ #@UnresolvedImport |
4 from sqlalchemy.sql import select, or_ #@UnresolvedImport |
5 import anyjson #@UnresolvedImport |
5 import anyjson #@UnresolvedImport |
6 import datetime |
6 import datetime |
7 import email.utils |
7 import email.utils |
8 import logging #@UnresolvedImport |
8 import logging #@UnresolvedImport |
75 else: |
75 else: |
76 return value |
76 return value |
77 return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()]) |
77 return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()]) |
78 |
78 |
79 |
79 |
|
80 class ObjectBufferProxy(object): |
|
81 def __init__(self, klass, args, kwargs, must_flush, instance=None): |
|
82 self.klass= klass |
|
83 self.args = args |
|
84 self.kwargs = kwargs |
|
85 self.must_flush = must_flush |
|
86 self.instance = instance |
|
87 |
|
88 def persists(self, session): |
|
89 new_args = [arg() if callable(arg) else arg for arg in self.args] if self.args is not None else [] |
|
90 new_kwargs = dict([(k,v()) if callable(v) else (k,v) for k,v in self.kwargs.items()]) if self.kwargs is not None else {} |
|
91 |
|
92 self.instance = self.klass(*new_args, **new_kwargs) |
|
93 session.add(self.instance) |
|
94 if self.must_flush: |
|
95 session.flush() |
|
96 |
|
97 def __getattr__(self, name): |
|
98 return lambda : getattr(self.instance, name) if self.instance else None |
|
99 |
|
100 |
|
101 |
|
102 |
|
103 class ObjectsBuffer(object): |
|
104 |
|
105 def __init__(self): |
|
106 self.__bufferlist = [] |
|
107 |
|
108 def persists(self, session): |
|
109 for object_proxy in self.__bufferlist: |
|
110 object_proxy.persists(session) |
|
111 |
|
112 def add_object(self, klass, args, kwargs, must_flush): |
|
113 new_proxy = ObjectBufferProxy(klass, args, kwargs, must_flush) |
|
114 self.__bufferlist.append(new_proxy) |
|
115 return new_proxy |
|
116 |
|
117 def get(self, klass, **kwargs): |
|
118 for proxy in self.__bufferlist: |
|
119 if proxy.kwargs is None or len(proxy.kwargs) == 0 or proxy.klass != klass: |
|
120 continue |
|
121 found = True |
|
122 for k,v in kwargs.items(): |
|
123 if (k not in proxy.kwargs) or v != proxy.kwargs[k]: |
|
124 found = False |
|
125 break |
|
126 if found: |
|
127 return proxy |
|
128 |
|
129 return None |
|
130 |
|
131 |
|
132 |
|
133 |
80 |
134 |
81 class TwitterProcessorException(Exception): |
135 class TwitterProcessorException(Exception): |
82 pass |
136 pass |
83 |
137 |
84 class TwitterProcessor(object): |
138 class TwitterProcessor(object): |
85 |
139 |
86 def __init__(self, json_dict, json_txt, session, token_filename=None): |
140 def __init__(self, json_dict, json_txt, source_id, session, token_filename=None): |
87 |
141 |
88 if json_dict is None and json_txt is None: |
142 if json_dict is None and json_txt is None: |
89 raise TwitterProcessorException("No json") |
143 raise TwitterProcessorException("No json") |
90 |
144 |
91 if json_dict is None: |
145 if json_dict is None: |
99 self.json_txt = json_txt |
153 self.json_txt = json_txt |
100 |
154 |
101 if "id" not in self.json_dict: |
155 if "id" not in self.json_dict: |
102 raise TwitterProcessorException("No id in json") |
156 raise TwitterProcessorException("No id in json") |
103 |
157 |
|
158 self.source_id = source_id |
104 self.session = session |
159 self.session = session |
105 self.token_filename = token_filename |
160 self.token_filename = token_filename |
|
161 self.obj_buffer = ObjectsBuffer() |
|
162 |
106 |
163 |
107 def __get_user(self, user_dict): |
164 def __get_user(self, user_dict): |
108 logging.debug("Get user : " + repr(user_dict)) #@UndefinedVariable |
165 logger.debug("Get user : " + repr(user_dict)) #@UndefinedVariable |
109 |
166 |
110 user_id = user_dict.get("id",None) |
167 user_id = user_dict.get("id",None) |
111 user_name = user_dict.get("screen_name", user_dict.get("name", None)) |
168 user_name = user_dict.get("screen_name", user_dict.get("name", None)) |
112 |
169 |
113 if user_id is None and user_name is None: |
170 if user_id is None and user_name is None: |
114 return None |
171 return None |
115 |
172 |
|
173 user = None |
116 if user_id: |
174 if user_id: |
117 user = self.session.query(User).filter(User.id == user_id).first() |
175 user = self.obj_buffer.get(User, id=user_id) |
118 else: |
176 else: |
119 user = self.session.query(User).filter(User.screen_name.ilike(user_name)).first() |
177 user = self.obj_buffer.get(User, screen_name=user_name) |
120 |
178 |
121 if user is not None: |
179 if user is not None: |
|
180 return user |
|
181 |
|
182 #todo : add methpds to objectbuffer to get buffer user |
|
183 user_obj = None |
|
184 if user_id: |
|
185 user_obj = self.session.query(User).filter(User.id == user_id).first() |
|
186 else: |
|
187 user_obj = self.session.query(User).filter(User.screen_name.ilike(user_name)).first() |
|
188 |
|
189 if user_obj is not None: |
|
190 user = ObjectBufferProxy(User, None, None, False, user_obj) |
122 return user |
191 return user |
123 |
192 |
124 user_created_at = user_dict.get("created_at", None) |
193 user_created_at = user_dict.get("created_at", None) |
125 |
194 |
126 if user_created_at is None: |
195 if user_created_at is None: |
130 if user_id: |
199 if user_id: |
131 user_dict = t.users.show(user_id=user_id) |
200 user_dict = t.users.show(user_id=user_id) |
132 else: |
201 else: |
133 user_dict = t.users.show(screen_name=user_name) |
202 user_dict = t.users.show(screen_name=user_name) |
134 except Exception as e: |
203 except Exception as e: |
135 logging.info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable |
204 logger.info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable |
136 logging.info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable |
205 logger.info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable |
137 return None |
206 return None |
138 |
207 |
139 user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"]) |
208 user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"]) |
140 if "id" not in user_dict: |
209 if "id" not in user_dict: |
141 return None |
210 return None |
142 |
211 |
|
212 #TODO filter get, wrap in proxy |
143 user = self.session.query(User).filter(User.id == user_dict["id"]).first() |
213 user = self.session.query(User).filter(User.id == user_dict["id"]).first() |
144 |
214 |
145 if user is not None: |
215 if user is not None: |
146 return user |
216 return user |
147 |
217 |
148 user = User(**user_dict) |
218 user = self.obj_buffer.add_object(User, None, user_dict, True) |
149 |
219 |
150 self.session.add(user) |
220 return user |
151 self.session.flush() |
221 |
152 |
|
153 return user |
|
154 |
222 |
155 def __process_entity(self, ind, ind_type): |
223 def __process_entity(self, ind, ind_type): |
156 logging.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable |
224 logger.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable |
157 |
225 |
158 ind = clean_keys(ind) |
226 ind = clean_keys(ind) |
159 |
227 |
160 entity_dict = { |
228 entity_dict = { |
161 "indice_start": ind["indices"][0], |
229 "indice_start": ind["indices"][0], |
162 "indice_end" : ind["indices"][1], |
230 "indice_end" : ind["indices"][1], |
163 "tweet_id" : self.tweet.id, |
231 "tweet_id" : self.tweet.id, |
164 "tweet" : self.tweet |
|
165 } |
232 } |
166 |
233 |
167 def process_hashtags(): |
234 def process_hashtags(): |
168 text = ind.get("text", ind.get("hashtag", None)) |
235 text = ind.get("text", ind.get("hashtag", None)) |
169 if text is None: |
236 if text is None: |
170 return None |
237 return None |
171 hashtag = self.session.query(Hashtag).filter(Hashtag.text.ilike(text)).first() |
238 hashtag = self.obj_buffer.get(Hashtag, text=text) |
|
239 if hashtag is None: |
|
240 hashtag_obj = self.session.query(Hashtag).filter(Hashtag.text.ilike(text)).first() |
|
241 if hashtag_obj is not None: |
|
242 hashtag = ObjectBufferProxy(Hashtag, None, None, False, hashtag_obj) |
|
243 |
172 if hashtag is None: |
244 if hashtag is None: |
173 ind["text"] = text |
245 ind["text"] = text |
174 hashtag = Hashtag(**ind) |
246 hashtag = self.obj_buffer.add_object(Hashtag, None, ind, True) |
175 self.session.add(hashtag) |
|
176 self.session.flush() |
|
177 entity_dict['hashtag'] = hashtag |
|
178 entity_dict['hashtag_id'] = hashtag.id |
247 entity_dict['hashtag_id'] = hashtag.id |
179 entity = EntityHashtag(**entity_dict) |
248 return EntityHashtag, entity_dict |
180 return entity |
|
181 |
249 |
182 def process_user_mentions(): |
250 def process_user_mentions(): |
183 user_mention = self.__get_user(ind) |
251 user_mention = self.__get_user(ind) |
184 if user_mention is None: |
252 if user_mention is None: |
185 entity_dict['user'] = None |
|
186 entity_dict['user_id'] = None |
253 entity_dict['user_id'] = None |
187 else: |
254 else: |
188 entity_dict['user'] = user_mention |
|
189 entity_dict['user_id'] = user_mention.id |
255 entity_dict['user_id'] = user_mention.id |
190 entity = EntityUser(**entity_dict) |
256 return EntityUser, entity_dict |
191 return entity |
|
192 |
257 |
193 def process_urls(): |
258 def process_urls(): |
194 url = self.session.query(Url).filter(Url.url == ind["url"]).first() |
259 url = self.obj_buffer.get(Url, url=ind["url"]) |
195 if url is None: |
260 if url is None: |
196 url = Url(**ind) |
261 url_obj = self.session.query(Url).filter(Url.url == ind["url"]).first() |
197 self.session.add(url) |
262 if url_obj is not None: |
198 self.session.flush() |
263 url = ObjectBufferProxy(Url, None, None, False, url_obj) |
199 entity_dict['url'] = url |
264 if url is None: |
|
265 url = self.obj_buffer.add_object(Url, None, ind, True) |
200 entity_dict['url_id'] = url.id |
266 entity_dict['url_id'] = url.id |
201 entity = EntityUrl(**entity_dict) |
267 return EntityUrl, entity_dict |
202 return entity |
|
203 |
268 |
204 #{'': lambda } |
269 #{'': lambda } |
205 entity = { |
270 entity_klass, entity_dict = { |
206 'hashtags': process_hashtags, |
271 'hashtags': process_hashtags, |
207 'user_mentions' : process_user_mentions, |
272 'user_mentions' : process_user_mentions, |
208 'urls' : process_urls |
273 'urls' : process_urls |
209 }[ind_type]() |
274 }[ind_type]() |
210 |
275 |
211 logging.debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable |
276 logger.debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable |
212 if entity: |
277 if entity_klass: |
213 self.session.add(entity) |
278 self.obj_buffer.add_object(entity_klass, None, entity_dict, False) |
214 self.session.flush() |
|
215 |
279 |
216 |
280 |
217 def __process_twitter_stream(self): |
281 def __process_twitter_stream(self): |
218 |
282 |
219 tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count() |
283 tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count() |
223 ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"]) |
287 ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"]) |
224 |
288 |
225 # get or create user |
289 # get or create user |
226 user = self.__get_user(self.json_dict["user"]) |
290 user = self.__get_user(self.json_dict["user"]) |
227 if user is None: |
291 if user is None: |
228 logging.warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable |
292 logger.warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable |
229 ts_copy["user"] = None |
|
230 ts_copy["user_id"] = None |
293 ts_copy["user_id"] = None |
231 else: |
294 else: |
232 ts_copy["user"] = user |
295 ts_copy["user_id"] = user.id |
233 ts_copy["user_id"] = ts_copy["user"].id |
296 |
234 ts_copy["original_json"] = self.json_txt |
297 del(ts_copy['user']) |
235 |
298 ts_copy["tweet_source_id"] = self.source_id |
236 self.tweet = Tweet(**ts_copy) |
299 |
237 self.session.add(self.tweet) |
300 self.tweet = self.obj_buffer.add_object(Tweet, None, ts_copy, True) |
238 |
301 |
239 # get entities |
302 # get entities |
240 if "entities" in self.json_dict: |
303 if "entities" in self.json_dict: |
241 for ind_type, entity_list in self.json_dict["entities"].items(): |
304 for ind_type, entity_list in self.json_dict["entities"].items(): |
242 for ind in entity_list: |
305 for ind in entity_list: |
284 'screen_name' : self.json_dict["from_user"], |
348 'screen_name' : self.json_dict["from_user"], |
285 } |
349 } |
286 |
350 |
287 user = self.__get_user(user_fields) |
351 user = self.__get_user(user_fields) |
288 if user is None: |
352 if user is None: |
289 logging.warning("USER not found " + repr(user_fields)) #@UndefinedVariable |
353 logger.warning("USER not found " + repr(user_fields)) #@UndefinedVariable |
290 tweet_fields["user"] = None |
|
291 tweet_fields["user_id"] = None |
354 tweet_fields["user_id"] = None |
292 else: |
355 else: |
293 tweet_fields["user"] = user |
|
294 tweet_fields["user_id"] = user.id |
356 tweet_fields["user_id"] = user.id |
295 |
357 |
296 tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"]) |
358 tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"]) |
297 self.tweet = Tweet(**tweet_fields) |
359 self.tweet = self.obj_buffer.add_object(Tweet, None, tweet_fields, True) |
298 self.session.add(self.tweet) |
|
299 |
360 |
300 text = self.tweet.text |
361 text = self.tweet.text |
301 |
362 |
302 extractor = twitter_text.Extractor(text) |
363 extractor = twitter_text.Extractor(text) |
303 |
364 |
304 for ind in extractor.extract_hashtags_with_indices(): |
365 for ind in extractor.extract_hashtags_with_indices(): |
305 self.__process_entity(ind, "hashtags") |
366 self.__process_entity(ind, "hashtags") |
306 |
367 |
|
368 for ind in extractor.extract_urls_with_indices(): |
|
369 self.__process_entity(ind, "urls") |
|
370 |
307 for ind in extractor.extract_mentioned_screen_names_with_indices(): |
371 for ind in extractor.extract_mentioned_screen_names_with_indices(): |
308 self.__process_entity(ind, "user_mentions") |
372 self.__process_entity(ind, "user_mentions") |
309 |
373 |
310 for ind in extractor.extract_urls_with_indices(): |
|
311 self.__process_entity(ind, "urls") |
|
312 |
|
313 self.session.flush() |
|
314 |
374 |
315 |
375 |
316 def process(self): |
376 def process(self): |
|
377 |
|
378 if self.source_id is None: |
|
379 tweet_source = self.obj_buffer.add_object(TweetSource, None, {'original_json':self.json_txt}, True) |
|
380 self.source_id = tweet_source.id |
|
381 |
317 if "metadata" in self.json_dict: |
382 if "metadata" in self.json_dict: |
318 self.__process_twitter_rest() |
383 self.__process_twitter_rest() |
319 else: |
384 else: |
320 self.__process_twitter_stream() |
385 self.__process_twitter_stream() |
321 |
386 |
322 |
387 self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['OK']}, False) |
323 def set_logging(options): |
388 |
324 |
389 self.obj_buffer.persists(self.session) |
325 logging_config = {} |
390 |
|
391 |
|
392 def set_logging(options, plogger=None): |
|
393 |
|
394 logging_config = { |
|
395 "format" : '%(asctime)s %(levelname)s:%(name)s:%(message)s', |
|
396 "level" : max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)), #@UndefinedVariable |
|
397 } |
326 |
398 |
327 if options.logfile == "stdout": |
399 if options.logfile == "stdout": |
328 logging_config["stream"] = sys.stdout |
400 logging_config["stream"] = sys.stdout |
329 elif options.logfile == "stderr": |
401 elif options.logfile == "stderr": |
330 logging_config["stream"] = sys.stderr |
402 logging_config["stream"] = sys.stderr |
331 else: |
403 else: |
332 logging_config["filename"] = options.logfile |
404 logging_config["filename"] = options.logfile |
333 |
405 |
334 logging_config["level"] = max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)) #@UndefinedVariable |
406 logger = plogger |
335 logging.basicConfig(**logging_config) #@UndefinedVariable |
407 if logger is None: |
|
408 logger = logging.getLogger() #@UndefinedVariable |
|
409 |
|
410 if len(logger.handlers) == 0: |
|
411 filename = logging_config.get("filename") |
|
412 if filename: |
|
413 mode = logging_config.get("filemode", 'a') |
|
414 hdlr = logging.FileHandler(filename, mode) #@UndefinedVariable |
|
415 else: |
|
416 stream = logging_config.get("stream") |
|
417 hdlr = logging.StreamHandler(stream) #@UndefinedVariable |
|
418 fs = logging_config.get("format", logging.BASIC_FORMAT) #@UndefinedVariable |
|
419 dfs = logging_config.get("datefmt", None) |
|
420 fmt = logging.Formatter(fs, dfs) #@UndefinedVariable |
|
421 hdlr.setFormatter(fmt) |
|
422 logger.addHandler(hdlr) |
|
423 level = logging_config.get("level") |
|
424 if level is not None: |
|
425 logger.setLevel(level) |
336 |
426 |
337 options.debug = (options.verbose-options.quiet > 0) |
427 options.debug = (options.verbose-options.quiet > 0) |
338 |
428 |
339 def set_logging_options(parser): |
429 def set_logging_options(parser): |
340 parser.add_option("-l", "--log", dest="logfile", |
430 parser.add_option("-l", "--log", dest="logfile", |