32 columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count'] |
35 columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count'] |
33 #just put it in a sqlite3 tqble |
36 #just put it in a sqlite3 tqble |
34 |
37 |
35 |
38 |
36 def set_logging(options): |
39 def set_logging(options): |
37 utils.set_logging(options, logging.getLogger('iri_tweet')) |
40 utils.set_logging(options, logging.getLogger('iri.tweet')) |
38 utils.set_logging(options, logging.getLogger('multiprocessing')) |
41 utils.set_logging(options, logging.getLogger('multiprocessing')) |
39 if options.debug >= 2: |
42 if options.debug >= 2: |
40 utils.set_logging(options, logging.getLogger('sqlalchemy.engine')) |
43 utils.set_logging(options, logging.getLogger('sqlalchemy.engine')) |
41 #utils.set_logging(options, logging.getLogger('sqlalchemy.dialects')) |
44 #utils.set_logging(options, logging.getLogger('sqlalchemy.dialects')) |
42 #utils.set_logging(options, logging.getLogger('sqlalchemy.pool')) |
45 #utils.set_logging(options, logging.getLogger('sqlalchemy.pool')) |
43 #utils.set_logging(options, logging.getLogger('sqlalchemy.orm')) |
46 #utils.set_logging(options, logging.getLogger('sqlalchemy.orm')) |
|
47 |
|
48 def set_logging_process(options, queue): |
|
49 qlogger = utils.set_logging(options, logging.getLogger('iri.tweet.p'), queue) |
|
50 qlogger.propagate = 0 |
|
51 return qlogger |
44 |
52 |
45 def get_auth(options, access_token): |
53 def get_auth(options, access_token): |
46 if options.username and options.password: |
54 if options.username and options.password: |
47 auth = tweepy.auth.BasicAuthHandler(options.username, options.password) |
55 auth = tweepy.auth.BasicAuthHandler(options.username, options.password) |
48 else: |
56 else: |
105 |
113 |
106 |
114 |
107 |
115 |
108 class SourceProcess(Process): |
116 class SourceProcess(Process): |
109 |
117 |
110 def __init__(self, session_maker, queue, options, access_token, stop_event): |
118 def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue): |
111 self.session_maker = session_maker |
119 self.session_maker = session_maker |
112 self.queue = queue |
120 self.queue = queue |
113 self.track = options.track |
121 self.track = options.track |
114 self.reconnects = options.reconnects |
122 self.reconnects = options.reconnects |
115 self.token_filename = options.token_filename |
123 self.token_filename = options.token_filename |
116 self.stop_event = stop_event |
124 self.stop_event = stop_event |
117 self.options = options |
125 self.options = options |
118 self.access_token = access_token |
126 self.access_token = access_token |
|
127 self.logger_queue = logger_queue |
119 super(SourceProcess, self).__init__() |
128 super(SourceProcess, self).__init__() |
120 |
129 |
121 def run(self): |
130 def run(self): |
|
131 |
122 #import pydevd |
132 #import pydevd |
123 #pydevd.settrace(suspend=False) |
133 #pydevd.settrace(suspend=False) |
124 |
134 |
125 set_logging(self.options) |
135 self.logger = set_logging_process(self.options, self.logger_queue) |
126 self.auth = get_auth(self.options, self.access_token) |
136 self.auth = get_auth(self.options, self.access_token) |
127 |
137 |
128 utils.get_logger().debug("SourceProcess : run") |
138 self.logger.debug("SourceProcess : run") |
129 track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip() |
139 track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip() |
130 track_list = [k for k in track_list.split(',')] |
140 track_list = [k for k in track_list.split(',')] |
131 |
141 |
132 utils.get_logger().debug("SourceProcess : before connecting to stream " + repr(track_list)) |
142 self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list)) |
133 stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True) |
143 stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True) |
134 utils.get_logger().debug("SourceProcess : after connecting to stream") |
144 self.logger.debug("SourceProcess : after connecting to stream") |
135 stream.muststop = lambda: self.stop_event.is_set() |
145 stream.muststop = lambda: self.stop_event.is_set() |
136 |
146 |
137 session = self.session_maker() |
147 session = self.session_maker() |
138 |
148 |
139 try: |
149 try: |
140 for tweet in stream: |
150 for tweet in stream: |
141 utils.get_logger().debug("tweet " + repr(tweet)) |
151 self.logger.debug("tweet " + repr(tweet)) |
142 source = TweetSource(original_json=tweet) |
152 source = TweetSource(original_json=tweet) |
143 utils.get_logger().debug("source created") |
153 self.logger.debug("source created") |
144 add_retries = 0 |
154 add_retries = 0 |
145 while add_retries < 10: |
155 while add_retries < 10: |
146 try: |
156 try: |
147 add_retries += 1 |
157 add_retries += 1 |
148 session.add(source) |
158 session.add(source) |
149 session.flush() |
159 session.flush() |
150 break |
160 break |
151 except OperationalError as e: |
161 except OperationalError as e: |
152 session.rollback() |
162 session.rollback() |
153 utils.get_logger().debug("Operational Error %s nb %d" % (repr(e), add_retries)) |
163 self.logger.debug("Operational Error %s nb %d" % (repr(e), add_retries)) |
154 if add_retries == 10: |
164 if add_retries == 10: |
155 raise e |
165 raise e |
156 |
166 |
157 source_id = source.id |
167 source_id = source.id |
158 utils.get_logger().debug("before queue + source id " + repr(source_id)) |
168 self.logger.debug("before queue + source id " + repr(source_id)) |
159 utils.get_logger().info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime))) |
169 self.logger.info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime))) |
160 session.commit() |
170 session.commit() |
161 self.queue.put((source_id, tweet), False) |
171 self.queue.put((source_id, tweet), False) |
162 |
172 |
163 except Exception as e: |
173 except Exception as e: |
164 utils.get_logger().error("Error when processing tweet " + repr(e)) |
174 self.logger.error("Error when processing tweet " + repr(e)) |
165 finally: |
175 finally: |
166 session.rollback() |
176 session.rollback() |
167 stream.close() |
177 stream.close() |
168 session.close() |
178 session.close() |
169 self.queue.close() |
179 self.queue.close() |
170 self.stop_event.set() |
180 self.stop_event.set() |
171 |
181 |
172 |
182 |
173 def process_tweet(tweet, source_id, session, access_token): |
183 def process_tweet(tweet, source_id, session, access_token, logger): |
174 try: |
184 try: |
175 tweet_obj = anyjson.deserialize(tweet) |
185 tweet_obj = anyjson.deserialize(tweet) |
|
186 if 'text' not in tweet_obj: |
|
187 tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET']) |
|
188 session.add(tweet_log) |
|
189 return |
176 screen_name = "" |
190 screen_name = "" |
177 if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']: |
191 if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']: |
178 screen_name = tweet_obj['user']['screen_name'] |
192 screen_name = tweet_obj['user']['screen_name'] |
179 utils.get_logger().debug(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text'])) |
193 logger.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text'])) |
180 utils.get_logger().debug(u"Process_tweet :" + repr(tweet)) |
194 logger.debug(u"Process_tweet :" + repr(tweet)) |
181 processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None) |
195 processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None) |
182 processor.process() |
196 processor.process() |
183 except Exception as e: |
197 except Exception as e: |
184 message = u"Error %s processing tweet %s" % (repr(e), tweet) |
198 message = u"Error %s processing tweet %s" % (repr(e), tweet) |
185 utils.get_logger().error(message) |
199 logger.error(message) |
186 output = StringIO.StringIO() |
200 output = StringIO.StringIO() |
187 traceback.print_exception(Exception, e, None, None, output) |
201 traceback.print_exception(Exception, e, None, None, output) |
188 error_stack = output.getvalue() |
202 error_stack = output.getvalue() |
189 output.close() |
203 output.close() |
190 session.rollback() |
204 session.rollback() |
194 |
208 |
195 |
209 |
196 |
210 |
197 class TweetProcess(Process): |
211 class TweetProcess(Process): |
198 |
212 |
199 def __init__(self, session_maker, queue, options, access_token, stop_event): |
213 def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue): |
200 self.session_maker = session_maker |
214 self.session_maker = session_maker |
201 self.queue = queue |
215 self.queue = queue |
202 self.stop_event = stop_event |
216 self.stop_event = stop_event |
203 self.options = options |
217 self.options = options |
204 self.access_token = access_token |
218 self.access_token = access_token |
|
219 self.logger_queue = logger_queue |
205 super(TweetProcess, self).__init__() |
220 super(TweetProcess, self).__init__() |
206 |
221 |
207 |
222 |
208 def run(self): |
223 def run(self): |
209 |
224 |
210 set_logging(self.options) |
225 self.logger = set_logging_process(self.options, self.logger_queue) |
211 session = self.session_maker() |
226 session = self.session_maker() |
212 try: |
227 try: |
213 while not self.stop_event.is_set(): |
228 while not self.stop_event.is_set(): |
214 try: |
229 try: |
215 source_id, tweet_txt = queue.get(True, 3) |
230 source_id, tweet_txt = queue.get(True, 3) |
216 utils.get_logger().debug("Processing source id " + repr(source_id)) |
231 self.logger.debug("Processing source id " + repr(source_id)) |
217 except Exception as e: |
232 except Exception as e: |
218 utils.get_logger().debug('Process tweet exception in loop : ' + repr(e)) |
233 self.logger.debug('Process tweet exception in loop : ' + repr(e)) |
219 continue |
234 continue |
220 process_tweet(tweet_txt, source_id, session, self.access_token) |
235 process_tweet(tweet_txt, source_id, session, self.access_token, self.logger) |
221 session.commit() |
236 session.commit() |
222 finally: |
237 finally: |
223 session.rollback() |
238 session.rollback() |
224 self.stop_event.set() |
239 self.stop_event.set() |
225 session.close() |
240 session.close() |
229 engine, metadata = models.setup_database(conn_str, echo=False, create_all=False) |
244 engine, metadata = models.setup_database(conn_str, echo=False, create_all=False) |
230 Session = scoped_session(sessionmaker(bind=engine, autocommit=False)) |
245 Session = scoped_session(sessionmaker(bind=engine, autocommit=False)) |
231 return Session, engine, metadata |
246 return Session, engine, metadata |
232 |
247 |
233 |
248 |
234 def process_leftovers(session, access_token): |
249 def process_leftovers(session, access_token, logger): |
235 |
250 |
236 sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None) |
251 sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None) |
237 |
252 |
238 for src in sources: |
253 for src in sources: |
239 tweet_txt = src.original_json |
254 tweet_txt = src.original_json |
240 process_tweet(tweet_txt, src.id, session, access_token) |
255 process_tweet(tweet_txt, src.id, session, access_token, logger) |
241 session.commit() |
256 session.commit() |
242 |
257 |
243 |
258 |
244 |
259 |
245 #get tweet source that do not match any message |
260 #get tweet source that do not match any message |
246 #select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull; |
261 #select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull; |
|
262 def process_log(logger_queues, stop_event): |
|
263 while not stop_event.is_set(): |
|
264 for lqueue in logger_queues: |
|
265 try: |
|
266 record = lqueue.get_nowait() |
|
267 logging.getLogger(record.name).handle(record) |
|
268 except Queue.Empty: |
|
269 continue |
|
270 time.sleep(0.1) |
247 |
271 |
248 |
272 |
249 def get_options(): |
273 def get_options(): |
250 parser = OptionParser() |
274 parser = OptionParser() |
251 parser.add_option("-f", "--file", dest="conn_str", |
275 parser.add_option("-f", "--file", dest="conn_str", |
342 pass |
366 pass |
343 #donothing |
367 #donothing |
344 finally: |
368 finally: |
345 if conn is not None: |
369 if conn is not None: |
346 conn.close() |
370 conn.close() |
347 |
371 |
348 |
372 process_engines = [] |
349 sprocess = SourceProcess(Session, queue, options, access_token, stop_event) |
373 logger_queues = [] |
|
374 |
|
375 SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) |
|
376 process_engines.append(engine_process) |
|
377 lqueue = mQueue(1) |
|
378 logger_queues.append(lqueue) |
|
379 sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue) |
350 |
380 |
351 tweet_processes = [] |
381 tweet_processes = [] |
352 |
382 |
353 for i in range(options.process_nb - 1): |
383 for i in range(options.process_nb - 1): |
354 Session, engine, metadata = get_sessionmaker(conn_str) |
384 SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) |
355 cprocess = TweetProcess(Session, queue, options, access_token, stop_event) |
385 process_engines.append(engine_process) |
|
386 lqueue = mQueue(1) |
|
387 logger_queues.append(lqueue) |
|
388 cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue) |
356 tweet_processes.append(cprocess) |
389 tweet_processes.append(cprocess) |
357 |
390 |
358 def interupt_handler(signum, frame): |
391 def interupt_handler(signum, frame): |
359 stop_event.set() |
392 stop_event.set() |
360 |
393 |
361 signal.signal(signal.SIGINT, interupt_handler) |
394 signal.signal(signal.SIGINT, interupt_handler) |
|
395 |
|
396 log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues,stop_event,)) |
|
397 log_thread.daemon = True |
362 |
398 |
363 sprocess.start() |
399 sprocess.start() |
364 for cprocess in tweet_processes: |
400 for cprocess in tweet_processes: |
365 cprocess.start() |
401 cprocess.start() |
366 |
402 |
|
403 log_thread.start() |
|
404 |
367 if options.duration >= 0: |
405 if options.duration >= 0: |
368 end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration) |
406 end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration) |
|
407 |
369 |
408 |
370 while not stop_event.is_set(): |
409 while not stop_event.is_set(): |
371 if options.duration >= 0 and datetime.datetime.utcnow() >= end_ts: |
410 if options.duration >= 0 and datetime.datetime.utcnow() >= end_ts: |
372 stop_event.set() |
411 stop_event.set() |
373 break |
412 break |
374 if sprocess.is_alive(): |
413 if sprocess.is_alive(): |
375 time.sleep(1) |
414 time.sleep(1) |
376 else: |
415 else: |
377 stop_event.set() |
416 stop_event.set() |
378 break |
417 break |
379 |
|
380 utils.get_logger().debug("Joining Source Process") |
418 utils.get_logger().debug("Joining Source Process") |
381 try: |
419 try: |
382 sprocess.join(10) |
420 sprocess.join(10) |
383 except: |
421 except: |
384 utils.get_logger().debug("Pb joining Source Process - terminating") |
422 utils.get_logger().debug("Pb joining Source Process - terminating") |
385 sprocess.terminate() |
423 sprocess.terminate() |
386 |
424 |
387 utils.get_logger().debug("Joining Queue") |
|
388 #queue.join() |
|
389 for i, cprocess in enumerate(tweet_processes): |
425 for i, cprocess in enumerate(tweet_processes): |
390 utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1)) |
426 utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1)) |
391 try: |
427 try: |
392 cprocess.join(3) |
428 cprocess.join(3) |
393 except: |
429 except: |
394 utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1)) |
430 utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1)) |
395 cprocess.terminate() |
431 cprocess.terminate() |
396 |
432 |
397 utils.get_logger().debug("Processing leftovers") |
433 |
398 session = Session() |
434 utils.get_logger().debug("Close queues") |
399 try: |
435 try: |
400 process_leftovers(session, access_token) |
436 queue.close() |
401 session.commit() |
437 for lqueue in logger_queues: |
402 finally: |
438 lqueue.close() |
403 session.rollback() |
439 except exception as e: |
404 session.close() |
440 utils.get_logger().error("error when closing queues %s", repr(e)) |
405 |
441 #do nothing |
|
442 |
|
443 |
|
444 if options.process_nb > 1: |
|
445 utils.get_logger().debug("Processing leftovers") |
|
446 session = Session() |
|
447 try: |
|
448 process_leftovers(session, access_token, utils.get_logger()) |
|
449 session.commit() |
|
450 finally: |
|
451 session.rollback() |
|
452 session.close() |
|
453 |
|
454 for pengine in process_engines: |
|
455 pengine.dispose() |
|
456 |
406 utils.get_logger().debug("Done. Exiting.") |
457 utils.get_logger().debug("Done. Exiting.") |
407 |
458 |