1 from models import (Tweet, User, Hashtag, EntityHashtag, EntityUser, Url, |
|
2 EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY, |
|
3 ACCESS_TOKEN_SECRET, adapt_date, adapt_json, TweetSource, TweetLog, MediaType, |
|
4 Media, EntityMedia, Entity, EntityType) |
|
5 from sqlalchemy.sql import select, or_ #@UnresolvedImport |
|
6 import Queue #@UnresolvedImport |
|
7 import anyjson #@UnresolvedImport |
|
8 import datetime |
|
9 import email.utils |
|
10 import logging |
|
11 import os.path |
|
12 import sys |
|
13 import math |
|
14 import twitter.oauth #@UnresolvedImport |
|
15 import twitter.oauth_dance #@UnresolvedImport |
|
16 import twitter_text #@UnresolvedImport |
|
17 |
|
18 |
|
19 CACHE_ACCESS_TOKEN = {} |
|
20 |
|
21 def get_oauth_token(token_file_path=None, check_access_token=True, application_name=APPLICATION_NAME, consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET): |
|
22 |
|
23 global CACHE_ACCESS_TOKEN |
|
24 |
|
25 if 'ACCESS_TOKEN_KEY' in globals() and 'ACCESS_TOKEN_SECRET' in globals() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET: |
|
26 return ACCESS_TOKEN_KEY,ACCESS_TOKEN_SECRET |
|
27 |
|
28 res = CACHE_ACCESS_TOKEN.get(application_name, None) |
|
29 |
|
30 if res is None and token_file_path and os.path.exists(token_file_path): |
|
31 get_logger().debug("get_oauth_token : reading token from file %s" % token_file_path) #@UndefinedVariable |
|
32 res = twitter.oauth.read_token_file(token_file_path) |
|
33 |
|
34 if res is not None and check_access_token: |
|
35 get_logger().debug("get_oauth_token : Check oauth tokens") #@UndefinedVariable |
|
36 t = twitter.Twitter(auth=twitter.OAuth(res[0], res[1], CONSUMER_KEY, CONSUMER_SECRET)) |
|
37 status = None |
|
38 try: |
|
39 status = t.account.rate_limit_status() |
|
40 except Exception as e: |
|
41 get_logger().debug("get_oauth_token : error getting rate limit status %s" % repr(e)) |
|
42 status = None |
|
43 get_logger().debug("get_oauth_token : Check oauth tokens : status %s" % repr(status)) #@UndefinedVariable |
|
44 if status is None or status['remaining_hits'] == 0: |
|
45 get_logger().debug("get_oauth_token : Problem with status %s" % repr(status)) |
|
46 res = None |
|
47 |
|
48 if res is None: |
|
49 get_logger().debug("get_oauth_token : doing the oauth dance") |
|
50 res = twitter.oauth_dance.oauth_dance(application_name, consumer_key, consumer_secret, token_file_path) |
|
51 |
|
52 CACHE_ACCESS_TOKEN[application_name] = res |
|
53 |
|
54 return res |
|
55 |
|
56 def parse_date(date_str): |
|
57 ts = email.utils.parsedate_tz(date_str) #@UndefinedVariable |
|
58 return datetime.datetime(*ts[0:7]) |
|
59 |
|
60 def clean_keys(dict_val): |
|
61 return dict([(str(key),value) for key,value in dict_val.items()]) |
|
62 |
|
63 fields_adapter = { |
|
64 'stream': { |
|
65 "tweet": { |
|
66 "created_at" : adapt_date, |
|
67 "coordinates" : adapt_json, |
|
68 "place" : adapt_json, |
|
69 "geo" : adapt_json, |
|
70 # "original_json" : adapt_json, |
|
71 }, |
|
72 "user": { |
|
73 "created_at" : adapt_date, |
|
74 }, |
|
75 |
|
76 }, |
|
77 |
|
78 'entities' : { |
|
79 "medias": { |
|
80 "sizes" : adapt_json, |
|
81 }, |
|
82 }, |
|
83 'rest': { |
|
84 "tweet" : { |
|
85 "place" : adapt_json, |
|
86 "geo" : adapt_json, |
|
87 "created_at" : adapt_date, |
|
88 # "original_json" : adapt_json, |
|
89 }, |
|
90 }, |
|
91 } |
|
92 |
|
93 # |
|
94 # adapt fields, return a copy of the field_dict with adapted fields |
|
95 # |
|
96 def adapt_fields(fields_dict, adapter_mapping): |
|
97 def adapt_one_field(field, value): |
|
98 if field in adapter_mapping and adapter_mapping[field] is not None: |
|
99 return adapter_mapping[field](value) |
|
100 else: |
|
101 return value |
|
102 return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()]) |
|
103 |
|
104 |
|
105 class ObjectBufferProxy(object): |
|
106 def __init__(self, klass, args, kwargs, must_flush, instance=None): |
|
107 self.klass= klass |
|
108 self.args = args |
|
109 self.kwargs = kwargs |
|
110 self.must_flush = must_flush |
|
111 self.instance = instance |
|
112 |
|
113 def persists(self, session): |
|
114 new_args = [arg() if callable(arg) else arg for arg in self.args] if self.args is not None else [] |
|
115 new_kwargs = dict([(k,v()) if callable(v) else (k,v) for k,v in self.kwargs.items()]) if self.kwargs is not None else {} |
|
116 |
|
117 if self.instance is None: |
|
118 self.instance = self.klass(*new_args, **new_kwargs) |
|
119 else: |
|
120 self.instance = self.klass(*new_args, **new_kwargs) |
|
121 self.instance = session.merge(self.instance) |
|
122 |
|
123 session.add(self.instance) |
|
124 if self.must_flush: |
|
125 session.flush() |
|
126 |
|
127 def __getattr__(self, name): |
|
128 return lambda : getattr(self.instance, name) if self.instance else None |
|
129 |
|
130 |
|
131 |
|
132 |
|
133 class ObjectsBuffer(object): |
|
134 |
|
135 def __init__(self): |
|
136 self.__bufferlist = [] |
|
137 self.__bufferdict = {} |
|
138 |
|
139 def __add_proxy_object(self, proxy): |
|
140 proxy_list = self.__bufferdict.get(proxy.klass, None) |
|
141 if proxy_list is None: |
|
142 proxy_list = [] |
|
143 self.__bufferdict[proxy.klass] = proxy_list |
|
144 proxy_list.append(proxy) |
|
145 self.__bufferlist.append(proxy) |
|
146 |
|
147 def persists(self, session): |
|
148 for object_proxy in self.__bufferlist: |
|
149 object_proxy.persists(session) |
|
150 |
|
151 def add_object(self, klass, args, kwargs, must_flush, instance=None): |
|
152 new_proxy = ObjectBufferProxy(klass, args, kwargs, must_flush, instance) |
|
153 self.__add_proxy_object(new_proxy) |
|
154 return new_proxy |
|
155 |
|
156 def get(self, klass, **kwargs): |
|
157 if klass in self.__bufferdict: |
|
158 for proxy in self.__bufferdict[klass]: |
|
159 if proxy.kwargs is None or len(proxy.kwargs) == 0 or proxy.klass != klass: |
|
160 continue |
|
161 found = True |
|
162 for k,v in kwargs.items(): |
|
163 if (k not in proxy.kwargs) or v != proxy.kwargs[k]: |
|
164 found = False |
|
165 break |
|
166 if found: |
|
167 return proxy |
|
168 return None |
|
169 |
|
170 class TwitterProcessorException(Exception): |
|
171 pass |
|
172 |
|
173 class TwitterProcessor(object): |
|
174 |
|
175 def __init__(self, json_dict, json_txt, source_id, session, access_token=None, token_filename=None, user_query_twitter=False): |
|
176 |
|
177 if json_dict is None and json_txt is None: |
|
178 raise TwitterProcessorException("No json") |
|
179 |
|
180 if json_dict is None: |
|
181 self.json_dict = anyjson.deserialize(json_txt) |
|
182 else: |
|
183 self.json_dict = json_dict |
|
184 |
|
185 if not json_txt: |
|
186 self.json_txt = anyjson.serialize(json_dict) |
|
187 else: |
|
188 self.json_txt = json_txt |
|
189 |
|
190 if "id" not in self.json_dict: |
|
191 raise TwitterProcessorException("No id in json") |
|
192 |
|
193 self.source_id = source_id |
|
194 self.session = session |
|
195 self.token_filename = token_filename |
|
196 self.access_token = access_token |
|
197 self.obj_buffer = ObjectsBuffer() |
|
198 self.user_query_twitter = user_query_twitter |
|
199 |
|
200 |
|
201 |
|
202 def __get_user(self, user_dict, do_merge): |
|
203 get_logger().debug("Get user : " + repr(user_dict)) #@UndefinedVariable |
|
204 |
|
205 user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"]) |
|
206 |
|
207 user_id = user_dict.get("id",None) |
|
208 user_name = user_dict.get("screen_name", user_dict.get("name", None)) |
|
209 |
|
210 if user_id is None and user_name is None: |
|
211 return None |
|
212 |
|
213 user = None |
|
214 if user_id: |
|
215 user = self.obj_buffer.get(User, id=user_id) |
|
216 else: |
|
217 user = self.obj_buffer.get(User, screen_name=user_name) |
|
218 |
|
219 #to do update user id needed |
|
220 if user is not None: |
|
221 user_created_at = None |
|
222 if user.args is not None: |
|
223 user_created_at = user.args.get('created_at', None) |
|
224 if user_created_at is None and user_dict.get('created_at', None) is not None and do_merge: |
|
225 if user.args is None: |
|
226 user.args = user_dict |
|
227 else: |
|
228 user.args.update(user_dict) |
|
229 return user |
|
230 |
|
231 #todo : add methpds to objectbuffer to get buffer user |
|
232 user_obj = None |
|
233 if user_id: |
|
234 user_obj = self.session.query(User).filter(User.id == user_id).first() |
|
235 else: |
|
236 user_obj = self.session.query(User).filter(User.screen_name.ilike(user_name)).first() |
|
237 |
|
238 #todo update user if needed |
|
239 if user_obj is not None: |
|
240 if user_obj.created_at is not None or user_dict.get('created_at', None) is None or not do_merge : |
|
241 user = ObjectBufferProxy(User, None, None, False, user_obj) |
|
242 else: |
|
243 user = self.obj_buffer.add_object(User, None, user_dict, True, user_obj) |
|
244 return user |
|
245 |
|
246 user_created_at = user_dict.get("created_at", None) |
|
247 |
|
248 if user_created_at is None and self.user_query_twitter: |
|
249 |
|
250 if self.access_token is not None: |
|
251 acess_token_key, access_token_secret = self.access_token |
|
252 else: |
|
253 acess_token_key, access_token_secret = get_oauth_token(self.token_filename) |
|
254 t = twitter.Twitter(auth=twitter.OAuth(acess_token_key, access_token_secret, CONSUMER_KEY, CONSUMER_SECRET)) |
|
255 try: |
|
256 if user_id: |
|
257 user_dict = t.users.show(user_id=user_id) |
|
258 else: |
|
259 user_dict = t.users.show(screen_name=user_name) |
|
260 except Exception as e: |
|
261 get_logger().info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable |
|
262 get_logger().info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable |
|
263 return None |
|
264 |
|
265 if "id" not in user_dict: |
|
266 return None |
|
267 |
|
268 #TODO filter get, wrap in proxy |
|
269 user_obj = self.session.query(User).filter(User.id == user_dict["id"]).first() |
|
270 |
|
271 if user_obj is not None and not do_merge: |
|
272 return ObjectBufferProxy(User, None, None, False, user_obj) |
|
273 else: |
|
274 return self.obj_buffer.add_object(User, None, user_dict, True) |
|
275 |
|
276 def __get_or_create_object(self, klass, filter_by_kwargs, filter, creation_kwargs, must_flush, do_merge): |
|
277 |
|
278 obj_proxy = self.obj_buffer.get(klass, **filter_by_kwargs) |
|
279 if obj_proxy is None: |
|
280 query = self.session.query(klass) |
|
281 if filter is not None: |
|
282 query = query.filter(filter) |
|
283 else: |
|
284 query = query.filter_by(**filter_by_kwargs) |
|
285 obj_instance = query.first() |
|
286 if obj_instance is not None: |
|
287 if not do_merge: |
|
288 obj_proxy = ObjectBufferProxy(klass, None, None, False, obj_instance) |
|
289 else: |
|
290 obj_proxy = self.obj_buffer.add_object(klass, None, creation_kwargs, must_flush, obj_instance) |
|
291 if obj_proxy is None: |
|
292 obj_proxy = self.obj_buffer.add_object(klass, None, creation_kwargs, must_flush) |
|
293 return obj_proxy |
|
294 |
|
295 |
|
296 def __process_entity(self, ind, ind_type): |
|
297 get_logger().debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable |
|
298 |
|
299 ind = clean_keys(ind) |
|
300 |
|
301 entity_type = self.__get_or_create_object(EntityType, {'label':ind_type}, None, {'label':ind_type}, True, False) |
|
302 |
|
303 entity_dict = { |
|
304 "indice_start" : ind["indices"][0], |
|
305 "indice_end" : ind["indices"][1], |
|
306 "tweet_id" : self.tweet.id, |
|
307 "entity_type_id" : entity_type.id, |
|
308 "source" : adapt_json(ind) |
|
309 } |
|
310 |
|
311 def process_medias(): |
|
312 |
|
313 media_id = ind.get('id', None) |
|
314 if media_id is None: |
|
315 return None, None |
|
316 |
|
317 type_str = ind.get("type", "photo") |
|
318 media_type = self.__get_or_create_object(MediaType, {'label': type_str}, None, {'label':type_str}, True, False) |
|
319 media_ind = adapt_fields(ind, fields_adapter["entities"]["medias"]) |
|
320 if "type" in media_ind: |
|
321 del(media_ind["type"]) |
|
322 media_ind['type_id'] = media_type.id |
|
323 media = self.__get_or_create_object(Media, {'id':media_id}, None, media_ind, True, False) |
|
324 |
|
325 entity_dict['media_id'] = media.id |
|
326 return EntityMedia, entity_dict |
|
327 |
|
328 def process_hashtags(): |
|
329 text = ind.get("text", ind.get("hashtag", None)) |
|
330 if text is None: |
|
331 return None, None |
|
332 ind['text'] = text |
|
333 hashtag = self.__get_or_create_object(Hashtag, {'text':text}, Hashtag.text.ilike(text), ind, True, False) |
|
334 entity_dict['hashtag_id'] = hashtag.id |
|
335 return EntityHashtag, entity_dict |
|
336 |
|
337 def process_user_mentions(): |
|
338 user_mention = self.__get_user(ind, False) |
|
339 if user_mention is None: |
|
340 entity_dict['user_id'] = None |
|
341 else: |
|
342 entity_dict['user_id'] = user_mention.id |
|
343 return EntityUser, entity_dict |
|
344 |
|
345 def process_urls(): |
|
346 url = self.__get_or_create_object(Url, {'url':ind["url"]}, None, ind, True, False) |
|
347 entity_dict['url_id'] = url.id |
|
348 return EntityUrl, entity_dict |
|
349 |
|
350 #{'': lambda } |
|
351 entity_klass, entity_dict = { |
|
352 'hashtags': process_hashtags, |
|
353 'user_mentions' : process_user_mentions, |
|
354 'urls' : process_urls, |
|
355 'media': process_medias, |
|
356 }.get(ind_type, lambda: (Entity, entity_dict))() |
|
357 |
|
358 get_logger().debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable |
|
359 if entity_klass: |
|
360 self.obj_buffer.add_object(entity_klass, None, entity_dict, False) |
|
361 |
|
362 |
|
363 def __process_twitter_stream(self): |
|
364 |
|
365 tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count() |
|
366 if tweet_nb > 0: |
|
367 return |
|
368 |
|
369 ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"]) |
|
370 |
|
371 # get or create user |
|
372 user = self.__get_user(self.json_dict["user"], True) |
|
373 if user is None: |
|
374 get_logger().warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable |
|
375 ts_copy["user_id"] = None |
|
376 else: |
|
377 ts_copy["user_id"] = user.id |
|
378 |
|
379 del(ts_copy['user']) |
|
380 ts_copy["tweet_source_id"] = self.source_id |
|
381 |
|
382 self.tweet = self.obj_buffer.add_object(Tweet, None, ts_copy, True) |
|
383 |
|
384 self.__process_entities() |
|
385 |
|
386 |
|
387 def __process_entities(self): |
|
388 if "entities" in self.json_dict: |
|
389 for ind_type, entity_list in self.json_dict["entities"].items(): |
|
390 for ind in entity_list: |
|
391 self.__process_entity(ind, ind_type) |
|
392 else: |
|
393 |
|
394 text = self.tweet.text |
|
395 extractor = twitter_text.Extractor(text) |
|
396 for ind in extractor.extract_hashtags_with_indices(): |
|
397 self.__process_entity(ind, "hashtags") |
|
398 |
|
399 for ind in extractor.extract_urls_with_indices(): |
|
400 self.__process_entity(ind, "urls") |
|
401 |
|
402 for ind in extractor.extract_mentioned_screen_names_with_indices(): |
|
403 self.__process_entity(ind, "user_mentions") |
|
404 |
|
405 def __process_twitter_rest(self): |
|
406 tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count() |
|
407 if tweet_nb > 0: |
|
408 return |
|
409 |
|
410 |
|
411 tweet_fields = { |
|
412 'created_at': self.json_dict["created_at"], |
|
413 'favorited': False, |
|
414 'id': self.json_dict["id"], |
|
415 'id_str': self.json_dict["id_str"], |
|
416 #'in_reply_to_screen_name': ts["to_user"], |
|
417 'in_reply_to_user_id': self.json_dict["to_user_id"], |
|
418 'in_reply_to_user_id_str': self.json_dict["to_user_id_str"], |
|
419 #'place': ts["place"], |
|
420 'source': self.json_dict["source"], |
|
421 'text': self.json_dict["text"], |
|
422 'truncated': False, |
|
423 'tweet_source_id' : self.source_id, |
|
424 } |
|
425 |
|
426 #user |
|
427 |
|
428 user_fields = { |
|
429 'lang' : self.json_dict.get('iso_language_code',None), |
|
430 'profile_image_url' : self.json_dict["profile_image_url"], |
|
431 'screen_name' : self.json_dict["from_user"], |
|
432 'id' : self.json_dict["from_user_id"], |
|
433 'id_str' : self.json_dict["from_user_id_str"], |
|
434 'name' : self.json_dict['from_user_name'], |
|
435 } |
|
436 |
|
437 user = self.__get_user(user_fields, do_merge=False) |
|
438 if user is None: |
|
439 get_logger().warning("USER not found " + repr(user_fields)) #@UndefinedVariable |
|
440 tweet_fields["user_id"] = None |
|
441 else: |
|
442 tweet_fields["user_id"] = user.id |
|
443 |
|
444 tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"]) |
|
445 self.tweet = self.obj_buffer.add_object(Tweet, None, tweet_fields, True) |
|
446 |
|
447 self.__process_entities() |
|
448 |
|
449 |
|
450 |
|
451 def process(self): |
|
452 |
|
453 if self.source_id is None: |
|
454 tweet_source = self.obj_buffer.add_object(TweetSource, None, {'original_json':self.json_txt}, True) |
|
455 self.source_id = tweet_source.id |
|
456 |
|
457 if "metadata" in self.json_dict: |
|
458 self.__process_twitter_rest() |
|
459 else: |
|
460 self.__process_twitter_stream() |
|
461 |
|
462 self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['OK']}, True) |
|
463 |
|
464 self.obj_buffer.persists(self.session) |
|
465 |
|
466 |
|
467 def set_logging(options, plogger=None, queue=None): |
|
468 |
|
469 logging_config = { |
|
470 "format" : '%(asctime)s %(levelname)s:%(name)s:%(message)s', |
|
471 "level" : max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)), #@UndefinedVariable |
|
472 } |
|
473 |
|
474 if options.logfile == "stdout": |
|
475 logging_config["stream"] = sys.stdout |
|
476 elif options.logfile == "stderr": |
|
477 logging_config["stream"] = sys.stderr |
|
478 else: |
|
479 logging_config["filename"] = options.logfile |
|
480 |
|
481 logger = plogger |
|
482 if logger is None: |
|
483 logger = get_logger() #@UndefinedVariable |
|
484 |
|
485 if len(logger.handlers) == 0: |
|
486 filename = logging_config.get("filename") |
|
487 if queue is not None: |
|
488 hdlr = QueueHandler(queue, True) |
|
489 elif filename: |
|
490 mode = logging_config.get("filemode", 'a') |
|
491 hdlr = logging.FileHandler(filename, mode) #@UndefinedVariable |
|
492 else: |
|
493 stream = logging_config.get("stream") |
|
494 hdlr = logging.StreamHandler(stream) #@UndefinedVariable |
|
495 |
|
496 fs = logging_config.get("format", logging.BASIC_FORMAT) #@UndefinedVariable |
|
497 dfs = logging_config.get("datefmt", None) |
|
498 fmt = logging.Formatter(fs, dfs) #@UndefinedVariable |
|
499 hdlr.setFormatter(fmt) |
|
500 logger.addHandler(hdlr) |
|
501 level = logging_config.get("level") |
|
502 if level is not None: |
|
503 logger.setLevel(level) |
|
504 |
|
505 options.debug = (options.verbose-options.quiet > 0) |
|
506 return logger |
|
507 |
|
508 def set_logging_options(parser): |
|
509 parser.add_option("-l", "--log", dest="logfile", |
|
510 help="log to file", metavar="LOG", default="stderr") |
|
511 parser.add_option("-v", dest="verbose", action="count", |
|
512 help="verbose", metavar="VERBOSE", default=0) |
|
513 parser.add_option("-q", dest="quiet", action="count", |
|
514 help="quiet", metavar="QUIET", default=0) |
|
515 |
|
516 def get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist): |
|
517 |
|
518 query = query.join(EntityHashtag).join(Hashtag) |
|
519 |
|
520 if tweet_exclude_table is not None: |
|
521 query = query.filter(~Tweet.id.in_(select([tweet_exclude_table.c.id]))) #@UndefinedVariable |
|
522 |
|
523 if start_date: |
|
524 query = query.filter(Tweet.created_at >= start_date) |
|
525 if end_date: |
|
526 query = query.filter(Tweet.created_at <= end_date) |
|
527 |
|
528 if user_whitelist: |
|
529 query = query.join(User).filter(User.screen_name.in_(user_whitelist)) |
|
530 |
|
531 |
|
532 if hashtags : |
|
533 def merge_hash(l,h): |
|
534 l.extend(h.split(",")) |
|
535 return l |
|
536 htags = reduce(merge_hash, hashtags, []) |
|
537 |
|
538 query = query.filter(or_(*map(lambda h: Hashtag.text.contains(h), htags))) #@UndefinedVariable |
|
539 |
|
540 return query |
|
541 |
|
542 |
|
543 |
|
544 def get_filter_query(session, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist): |
|
545 |
|
546 query = session.query(Tweet) |
|
547 query = get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist) |
|
548 return query.order_by(Tweet.created_at) |
|
549 |
|
550 |
|
551 def get_user_query(session, start_date, end_date, hashtags, tweet_exclude_table): |
|
552 |
|
553 query = session.query(User).join(Tweet) |
|
554 |
|
555 query = get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, None) |
|
556 |
|
557 return query.distinct() |
|
558 |
|
559 logger_name = "iri.tweet" |
|
560 |
|
561 def get_logger(): |
|
562 global logger_name |
|
563 return logging.getLogger(logger_name) #@UndefinedVariable |
|
564 |
|
565 |
|
566 # Next two import lines for this demo only |
|
567 |
|
568 class QueueHandler(logging.Handler): #@UndefinedVariable |
|
569 """ |
|
570 This is a logging handler which sends events to a multiprocessing queue. |
|
571 """ |
|
572 |
|
573 def __init__(self, queue, ignore_full): |
|
574 """ |
|
575 Initialise an instance, using the passed queue. |
|
576 """ |
|
577 logging.Handler.__init__(self) #@UndefinedVariable |
|
578 self.queue = queue |
|
579 self.ignore_full = True |
|
580 |
|
581 def emit(self, record): |
|
582 """ |
|
583 Emit a record. |
|
584 |
|
585 Writes the LogRecord to the queue. |
|
586 """ |
|
587 try: |
|
588 ei = record.exc_info |
|
589 if ei: |
|
590 dummy = self.format(record) # just to get traceback text into record.exc_text |
|
591 record.exc_info = None # not needed any more |
|
592 if not self.ignore_full or not self.queue.full(): |
|
593 self.queue.put_nowait(record) |
|
594 except Queue.Full: |
|
595 if self.ignore_full: |
|
596 pass |
|
597 else: |
|
598 raise |
|
599 except (KeyboardInterrupt, SystemExit): |
|
600 raise |
|
601 except: |
|
602 self.handleError(record) |
|
603 |
|
604 def show_progress(current_line, total_line, label, width): |
|
605 |
|
606 percent = (float(current_line) / float(total_line)) * 100.0 |
|
607 |
|
608 marks = math.floor(width * (percent / 100.0)) |
|
609 spaces = math.floor(width - marks) |
|
610 |
|
611 loader = u'[' + (u'=' * int(marks)) + (u' ' * int(spaces)) + u']' |
|
612 |
|
613 sys.stdout.write(u"%s %d%% %d/%d - %r\r" % (loader, percent, current_line - 1, total_line - 1, label[:50].rjust(50))) #takes the header into account |
|
614 if percent >= 100: |
|
615 sys.stdout.write("\n") |
|
616 sys.stdout.flush() |
|