108 |
109 |
109 time.sleep(self.retry_wait) |
110 time.sleep(self.retry_wait) |
110 # Don't listen to auth error, since we can't reasonably reconnect |
111 # Don't listen to auth error, since we can't reasonably reconnect |
111 # when we get one. |
112 # when we get one. |
112 |
113 |
113 |
114 def add_process_event(type, args, session_maker): |
114 |
115 session = session_maker() |
115 |
116 try: |
116 class SourceProcess(Process): |
117 evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type) |
117 |
118 session.add(evt) |
118 def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue): |
119 session.commit() |
|
120 finally: |
|
121 session.close() |
|
122 |
|
123 |
|
124 class BaseProcess(Process): |
|
125 |
|
126 def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): |
|
127 self.parent_pid = parent_pid |
119 self.session_maker = session_maker |
128 self.session_maker = session_maker |
120 self.queue = queue |
129 self.queue = queue |
|
130 self.options = options |
|
131 self.logger_queue = logger_queue |
|
132 self.stop_event = stop_event |
|
133 self.access_token = access_token |
|
134 |
|
135 super(BaseProcess, self).__init__() |
|
136 |
|
137 # |
|
138 # from http://stackoverflow.com/questions/2542610/python-daemon-doesnt-kill-its-kids |
|
139 # |
|
140 def parent_is_alive(self): |
|
141 try: |
|
142 # try to call Parent |
|
143 os.kill(self.parent_pid, 0) |
|
144 except OSError: |
|
145 # *beeep* oh no! The phone's disconnected! |
|
146 return False |
|
147 else: |
|
148 # *ring* Hi mom! |
|
149 return True |
|
150 |
|
151 |
|
152 def __get_process_event_args(self): |
|
153 return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__, 'access_token':self.access_token} |
|
154 |
|
155 def run(self): |
|
156 try: |
|
157 add_process_event("start_worker", self.__get_process_event_args(), self.session_maker) |
|
158 self.do_run() |
|
159 finally: |
|
160 add_process_event("stop_worker",self.__get_process_event_args(), self.session_maker) |
|
161 |
|
162 def do_run(self): |
|
163 raise NotImplementedError() |
|
164 |
|
165 |
|
166 |
|
167 class SourceProcess(BaseProcess): |
|
168 |
|
169 def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): |
121 self.track = options.track |
170 self.track = options.track |
122 self.reconnects = options.reconnects |
171 self.reconnects = options.reconnects |
123 self.token_filename = options.token_filename |
172 self.token_filename = options.token_filename |
124 self.stop_event = stop_event |
173 super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid) |
125 self.options = options |
174 |
126 self.access_token = access_token |
175 def do_run(self): |
127 self.logger_queue = logger_queue |
|
128 super(SourceProcess, self).__init__() |
|
129 |
|
130 def run(self): |
|
131 |
176 |
132 #import pydevd |
177 #import pydevd |
133 #pydevd.settrace(suspend=False) |
178 #pydevd.settrace(suspend=False) |
134 |
179 |
135 self.logger = set_logging_process(self.options, self.logger_queue) |
180 self.logger = set_logging_process(self.options, self.logger_queue) |
146 |
191 |
147 session = self.session_maker() |
192 session = self.session_maker() |
148 |
193 |
149 try: |
194 try: |
150 for tweet in stream: |
195 for tweet in stream: |
151 self.logger.debug("tweet " + repr(tweet)) |
196 if not self.parent_is_alive(): |
|
197 sys.exit() |
|
198 self.logger.debug("SourceProcess : tweet " + repr(tweet)) |
152 source = TweetSource(original_json=tweet) |
199 source = TweetSource(original_json=tweet) |
153 self.logger.debug("source created") |
200 self.logger.debug("SourceProcess : source created") |
154 add_retries = 0 |
201 add_retries = 0 |
155 while add_retries < 10: |
202 while add_retries < 10: |
156 try: |
203 try: |
157 add_retries += 1 |
204 add_retries += 1 |
158 session.add(source) |
205 session.add(source) |
159 session.flush() |
206 session.flush() |
160 break |
207 break |
161 except OperationalError as e: |
208 except OperationalError as e: |
162 session.rollback() |
209 session.rollback() |
163 self.logger.debug("Operational Error %s nb %d" % (repr(e), add_retries)) |
210 self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries)) |
164 if add_retries == 10: |
211 if add_retries == 10: |
165 raise e |
212 raise e |
166 |
213 |
167 source_id = source.id |
214 source_id = source.id |
168 self.logger.debug("before queue + source id " + repr(source_id)) |
215 self.logger.debug("SourceProcess : before queue + source id " + repr(source_id)) |
169 self.logger.info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime))) |
216 self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime))) |
170 session.commit() |
217 session.commit() |
171 self.queue.put((source_id, tweet), False) |
218 self.queue.put((source_id, tweet), False) |
172 |
219 |
173 except Exception as e: |
220 except Exception as e: |
174 self.logger.error("Error when processing tweet " + repr(e)) |
221 self.logger.error("SourceProcess : Error when processing tweet " + repr(e)) |
175 finally: |
222 finally: |
176 session.rollback() |
223 session.rollback() |
177 stream.close() |
224 stream.close() |
178 session.close() |
225 session.close() |
179 self.queue.close() |
226 self.queue.close() |
194 logger.debug(u"Process_tweet :" + repr(tweet)) |
241 logger.debug(u"Process_tweet :" + repr(tweet)) |
195 processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None) |
242 processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None) |
196 processor.process() |
243 processor.process() |
197 except Exception as e: |
244 except Exception as e: |
198 message = u"Error %s processing tweet %s" % (repr(e), tweet) |
245 message = u"Error %s processing tweet %s" % (repr(e), tweet) |
199 logger.error(message) |
246 logger.exception(message) |
200 output = StringIO.StringIO() |
247 output = StringIO.StringIO() |
201 traceback.print_exception(Exception, e, None, None, output) |
248 try: |
202 error_stack = output.getvalue() |
249 traceback.print_exc(file=output) |
203 output.close() |
250 error_stack = output.getvalue() |
|
251 finally: |
|
252 output.close() |
204 session.rollback() |
253 session.rollback() |
205 tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack) |
254 tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack) |
206 session.add(tweet_log) |
255 session.add(tweet_log) |
207 session.commit() |
256 session.commit() |
208 |
257 |
209 |
258 |
210 |
259 |
211 class TweetProcess(Process): |
260 class TweetProcess(BaseProcess): |
212 |
261 |
213 def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue): |
262 def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): |
214 self.session_maker = session_maker |
263 super(TweetProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid) |
215 self.queue = queue |
264 |
216 self.stop_event = stop_event |
265 |
217 self.options = options |
266 def do_run(self): |
218 self.access_token = access_token |
|
219 self.logger_queue = logger_queue |
|
220 super(TweetProcess, self).__init__() |
|
221 |
|
222 |
|
223 def run(self): |
|
224 |
267 |
225 self.logger = set_logging_process(self.options, self.logger_queue) |
268 self.logger = set_logging_process(self.options, self.logger_queue) |
226 session = self.session_maker() |
269 session = self.session_maker() |
227 try: |
270 try: |
228 while not self.stop_event.is_set(): |
271 while not self.stop_event.is_set() and self.parent_is_alive(): |
229 try: |
272 try: |
230 source_id, tweet_txt = queue.get(True, 3) |
273 source_id, tweet_txt = self.queue.get(True, 3) |
231 self.logger.debug("Processing source id " + repr(source_id)) |
274 self.logger.debug("Processing source id " + repr(source_id)) |
232 except Exception as e: |
275 except Exception as e: |
233 self.logger.debug('Process tweet exception in loop : ' + repr(e)) |
276 self.logger.debug('Process tweet exception in loop : ' + repr(e)) |
234 continue |
277 continue |
235 process_tweet(tweet_txt, source_id, session, self.access_token, self.logger) |
278 process_tweet(tweet_txt, source_id, session, self.access_token, self.logger) |
291 parser.add_option("-d", "--duration", dest="duration", |
338 parser.add_option("-d", "--duration", dest="duration", |
292 help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int') |
339 help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int') |
293 parser.add_option("-N", "--nb-process", dest="process_nb", |
340 parser.add_option("-N", "--nb-process", dest="process_nb", |
294 help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int') |
341 help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int') |
295 |
342 |
296 |
|
297 |
|
298 utils.set_logging_options(parser) |
343 utils.set_logging_options(parser) |
299 |
344 |
300 return parser.parse_args() |
345 return parser.parse_args() |
301 |
346 |
|
347 |
|
348 def do_run(options, session_maker): |
|
349 |
|
350 stop_args = {} |
|
351 |
|
352 access_token = None |
|
353 if not options.username or not options.password: |
|
354 access_token = utils.get_oauth_token(options.token_filename) |
|
355 |
|
356 session = session_maker() |
|
357 try: |
|
358 process_leftovers(session, access_token, utils.get_logger()) |
|
359 session.commit() |
|
360 finally: |
|
361 session.rollback() |
|
362 session.close() |
|
363 |
|
364 if options.process_nb <= 0: |
|
365 utils.get_logger().debug("Leftovers processed. Exiting.") |
|
366 return None |
|
367 |
|
368 queue = mQueue() |
|
369 stop_event = Event() |
|
370 |
|
371 #workaround for bug on using urllib2 and multiprocessing |
|
372 req = urllib2.Request('http://localhost') |
|
373 conn = None |
|
374 try: |
|
375 conn = urllib2.urlopen(req) |
|
376 except: |
|
377 utils.get_logger().debug("could not open localhost") |
|
378 #donothing |
|
379 finally: |
|
380 if conn is not None: |
|
381 conn.close() |
|
382 |
|
383 process_engines = [] |
|
384 logger_queues = [] |
|
385 |
|
386 SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) |
|
387 process_engines.append(engine_process) |
|
388 lqueue = mQueue(1) |
|
389 logger_queues.append(lqueue) |
|
390 pid = os.getpid() |
|
391 sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) |
|
392 |
|
393 tweet_processes = [] |
|
394 |
|
395 for i in range(options.process_nb - 1): |
|
396 SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) |
|
397 process_engines.append(engine_process) |
|
398 lqueue = mQueue(1) |
|
399 logger_queues.append(lqueue) |
|
400 cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) |
|
401 tweet_processes.append(cprocess) |
|
402 |
|
403 def interupt_handler(signum, frame): |
|
404 utils.get_logger().debug("shutdown asked " + repr(signum) + " " + repr(inspect.getframeinfo(frame, 9))) |
|
405 stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)}) |
|
406 stop_event.set() |
|
407 |
|
408 signal.signal(signal.SIGINT , interupt_handler) |
|
409 signal.signal(signal.SIGHUP , interupt_handler) |
|
410 signal.signal(signal.SIGALRM, interupt_handler) |
|
411 signal.signal(signal.SIGTERM, interupt_handler) |
|
412 |
|
413 log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,)) |
|
414 log_thread.daemon = True |
|
415 |
|
416 log_thread.start() |
|
417 |
|
418 sprocess.start() |
|
419 for cprocess in tweet_processes: |
|
420 cprocess.start() |
|
421 |
|
422 add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name,sprocess.pid), 'consumers':dict([(p.name,p.pid) for p in tweet_processes])}, session_maker) |
|
423 |
|
424 if options.duration >= 0: |
|
425 end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration) |
|
426 |
|
427 |
|
428 while not stop_event.is_set(): |
|
429 if options.duration >= 0 and datetime.datetime.utcnow() >= end_ts: |
|
430 stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts}) |
|
431 stop_event.set() |
|
432 break |
|
433 if sprocess.is_alive(): |
|
434 time.sleep(1) |
|
435 else: |
|
436 stop_args.update({'message': 'Source process killed'}) |
|
437 stop_event.set() |
|
438 break |
|
439 utils.get_logger().debug("Joining Source Process") |
|
440 try: |
|
441 sprocess.join(10) |
|
442 except: |
|
443 utils.get_logger().debug("Pb joining Source Process - terminating") |
|
444 sprocess.terminate() |
|
445 |
|
446 for i, cprocess in enumerate(tweet_processes): |
|
447 utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1)) |
|
448 try: |
|
449 cprocess.join(3) |
|
450 except: |
|
451 utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1)) |
|
452 cprocess.terminate() |
|
453 |
|
454 |
|
455 utils.get_logger().debug("Close queues") |
|
456 try: |
|
457 queue.close() |
|
458 for lqueue in logger_queues: |
|
459 lqueue.close() |
|
460 except exception as e: |
|
461 utils.get_logger().error("error when closing queues %s", repr(e)) |
|
462 #do nothing |
|
463 |
|
464 |
|
465 if options.process_nb > 1: |
|
466 utils.get_logger().debug("Processing leftovers") |
|
467 session = session_maker() |
|
468 try: |
|
469 process_leftovers(session, access_token, utils.get_logger()) |
|
470 session.commit() |
|
471 finally: |
|
472 session.rollback() |
|
473 session.close() |
|
474 |
|
475 for pengine in process_engines: |
|
476 pengine.dispose() |
|
477 |
|
478 return stop_args |
|
479 |
302 |
480 |
303 if __name__ == '__main__': |
481 if __name__ == '__main__': |
304 |
482 |
305 (options, args) = get_options() |
483 (options, args) = get_options() |
306 |
484 |
307 set_logging(options) |
485 set_logging(options) |
308 |
486 |
309 if options.debug: |
487 utils.get_logger().debug("OPTIONS : " + repr(options)) |
310 print "OPTIONS : " |
|
311 print repr(options) |
|
312 |
|
313 |
488 |
314 conn_str = options.conn_str.strip() |
489 conn_str = options.conn_str.strip() |
315 if not re.match("^\w+://.+", conn_str): |
490 if not re.match("^\w+://.+", conn_str): |
316 conn_str = 'sqlite:///' + options.conn_str |
491 conn_str = 'sqlite:///' + options.conn_str |
317 |
492 |
318 if conn_str.startswith("sqlite") and options.new: |
493 if conn_str.startswith("sqlite") and options.new: |
319 filepath = conn_str[conn_str.find(":///")+4:] |
494 filepath = conn_str[conn_str.find(":///") + 4:] |
320 if os.path.exists(filepath): |
495 if os.path.exists(filepath): |
321 i = 1 |
496 i = 1 |
322 basename, extension = os.path.splitext(filepath) |
497 basename, extension = os.path.splitext(filepath) |
323 new_path = '%s.%d%s' % (basename, i, extension) |
498 new_path = '%s.%d%s' % (basename, i, extension) |
324 while i < 1000000 and os.path.exists(new_path): |
499 while i < 1000000 and os.path.exists(new_path): |
337 message = "Database %s not empty exiting" % conn_str |
512 message = "Database %s not empty exiting" % conn_str |
338 utils.get_logger().error(message) |
513 utils.get_logger().error(message) |
339 sys.exit(message) |
514 sys.exit(message) |
340 |
515 |
341 metadata.create_all(engine) |
516 metadata.create_all(engine) |
342 |
517 stop_args = {} |
343 access_token = None |
518 try: |
344 if not options.username or not options.password: |
519 add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session) |
345 access_token = utils.get_oauth_token(options.token_filename) |
520 stop_args = do_run(options, Session) |
346 |
521 except Exception as e: |
347 session = Session() |
522 utils.get_logger().exception("Error in main thread") |
348 try: |
523 outfile = StringIO.StringIO() |
349 process_leftovers(session, access_token, utils.get_logger()) |
524 try: |
350 session.commit() |
525 traceback.print_exc(file=outfile) |
351 finally: |
526 stop_args = {'error': repr(e), 'message': getattr(e, 'message', ''), 'stacktrace':outfile.getvalue()} |
352 session.rollback() |
|
353 session.close() |
|
354 |
|
355 if options.process_nb <= 0: |
|
356 utils.get_logger().debug("Leftovers processed. Exiting.") |
|
357 sys.exit() |
|
358 |
|
359 queue = mQueue() |
|
360 stop_event = Event() |
|
361 |
|
362 #workaround for bug on using urllib2 and multiprocessing |
|
363 req = urllib2.Request('http://localhost') |
|
364 conn = None |
|
365 try: |
|
366 conn = urllib2.urlopen(req) |
|
367 except: |
|
368 pass |
|
369 #donothing |
|
370 finally: |
|
371 if conn is not None: |
|
372 conn.close() |
|
373 |
|
374 process_engines = [] |
|
375 logger_queues = [] |
|
376 |
|
377 SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) |
|
378 process_engines.append(engine_process) |
|
379 lqueue = mQueue(1) |
|
380 logger_queues.append(lqueue) |
|
381 sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue) |
|
382 |
|
383 tweet_processes = [] |
|
384 |
|
385 for i in range(options.process_nb - 1): |
|
386 SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) |
|
387 process_engines.append(engine_process) |
|
388 lqueue = mQueue(1) |
|
389 logger_queues.append(lqueue) |
|
390 cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue) |
|
391 tweet_processes.append(cprocess) |
|
392 |
|
393 def interupt_handler(signum, frame): |
|
394 stop_event.set() |
|
395 |
|
396 signal.signal(signal.SIGINT, interupt_handler) |
|
397 |
|
398 log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues,stop_event,)) |
|
399 log_thread.daemon = True |
|
400 |
|
401 sprocess.start() |
|
402 for cprocess in tweet_processes: |
|
403 cprocess.start() |
|
404 |
|
405 log_thread.start() |
|
406 |
|
407 if options.duration >= 0: |
|
408 end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration) |
|
409 |
|
410 |
|
411 while not stop_event.is_set(): |
|
412 if options.duration >= 0 and datetime.datetime.utcnow() >= end_ts: |
|
413 stop_event.set() |
|
414 break |
|
415 if sprocess.is_alive(): |
|
416 time.sleep(1) |
|
417 else: |
|
418 stop_event.set() |
|
419 break |
|
420 utils.get_logger().debug("Joining Source Process") |
|
421 try: |
|
422 sprocess.join(10) |
|
423 except: |
|
424 utils.get_logger().debug("Pb joining Source Process - terminating") |
|
425 sprocess.terminate() |
|
426 |
|
427 for i, cprocess in enumerate(tweet_processes): |
|
428 utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1)) |
|
429 try: |
|
430 cprocess.join(3) |
|
431 except: |
|
432 utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1)) |
|
433 cprocess.terminate() |
|
434 |
|
435 |
|
436 utils.get_logger().debug("Close queues") |
|
437 try: |
|
438 queue.close() |
|
439 for lqueue in logger_queues: |
|
440 lqueue.close() |
|
441 except exception as e: |
|
442 utils.get_logger().error("error when closing queues %s", repr(e)) |
|
443 #do nothing |
|
444 |
|
445 |
|
446 if options.process_nb > 1: |
|
447 utils.get_logger().debug("Processing leftovers") |
|
448 session = Session() |
|
449 try: |
|
450 process_leftovers(session, access_token, utils.get_logger()) |
|
451 session.commit() |
|
452 finally: |
527 finally: |
453 session.rollback() |
528 outfile.close() |
454 session.close() |
529 raise |
455 |
530 finally: |
456 for pengine in process_engines: |
531 add_process_event(type="shutdown", args=stop_args, session_maker=Session) |
457 pengine.dispose() |
532 |
458 |
|
459 utils.get_logger().debug("Done. Exiting.") |
533 utils.get_logger().debug("Done. Exiting.") |
460 |
534 |