109 |
109 |
110 time.sleep(self.retry_wait) |
110 time.sleep(self.retry_wait) |
111 # Don't listen to auth error, since we can't reasonably reconnect |
111 # Don't listen to auth error, since we can't reasonably reconnect |
112 # when we get one. |
112 # when we get one. |
113 |
113 |
|
114 def add_process_event(type, args, session_maker): |
|
115 session = session_maker() |
|
116 try: |
|
117 evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type) |
|
118 session.add(evt) |
|
119 session.commit() |
|
120 finally: |
|
121 session.close() |
|
122 |
114 |
123 |
115 class BaseProcess(Process): |
124 class BaseProcess(Process): |
116 |
125 |
117 def __init__(self, parent_pid): |
126 def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): |
118 self.parent_pid = parent_pid |
127 self.parent_pid = parent_pid |
|
128 self.session_maker = session_maker |
|
129 self.queue = queue |
|
130 self.options = options |
|
131 self.logger_queue = logger_queue |
|
132 self.stop_event = stop_event |
|
133 self.access_token = access_token |
|
134 |
119 super(BaseProcess, self).__init__() |
135 super(BaseProcess, self).__init__() |
120 |
136 |
121 # |
137 # |
122 # from http://stackoverflow.com/questions/2542610/python-daemon-doesnt-kill-its-kids |
138 # from http://stackoverflow.com/questions/2542610/python-daemon-doesnt-kill-its-kids |
123 # |
139 # |
129 # *beeep* oh no! The phone's disconnected! |
145 # *beeep* oh no! The phone's disconnected! |
130 return False |
146 return False |
131 else: |
147 else: |
132 # *ring* Hi mom! |
148 # *ring* Hi mom! |
133 return True |
149 return True |
|
150 |
|
151 |
|
152 def __get_process_event_args(self): |
|
153 return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__, 'access_token':self.access_token} |
|
154 |
|
155 def run(self): |
|
156 try: |
|
157 add_process_event("start_worker", self.__get_process_event_args(), self.session_maker) |
|
158 self.do_run() |
|
159 finally: |
|
160 add_process_event("stop_worker",self.__get_process_event_args(), self.session_maker) |
|
161 |
|
162 def do_run(self): |
|
163 raise NotImplementedError() |
|
164 |
134 |
165 |
135 |
166 |
136 class SourceProcess(BaseProcess): |
167 class SourceProcess(BaseProcess): |
137 |
168 |
138 def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): |
169 def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): |
139 self.session_maker = session_maker |
|
140 self.queue = queue |
|
141 self.track = options.track |
170 self.track = options.track |
142 self.reconnects = options.reconnects |
171 self.reconnects = options.reconnects |
143 self.token_filename = options.token_filename |
172 self.token_filename = options.token_filename |
144 self.stop_event = stop_event |
173 super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid) |
145 self.options = options |
174 |
146 self.access_token = access_token |
175 def do_run(self): |
147 self.logger_queue = logger_queue |
|
148 super(SourceProcess, self).__init__(parent_pid) |
|
149 |
|
150 def run(self): |
|
151 |
176 |
152 #import pydevd |
177 #import pydevd |
153 #pydevd.settrace(suspend=False) |
178 #pydevd.settrace(suspend=False) |
154 |
179 |
155 self.logger = set_logging_process(self.options, self.logger_queue) |
180 self.logger = set_logging_process(self.options, self.logger_queue) |
216 logger.debug(u"Process_tweet :" + repr(tweet)) |
241 logger.debug(u"Process_tweet :" + repr(tweet)) |
217 processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None) |
242 processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None) |
218 processor.process() |
243 processor.process() |
219 except Exception as e: |
244 except Exception as e: |
220 message = u"Error %s processing tweet %s" % (repr(e), tweet) |
245 message = u"Error %s processing tweet %s" % (repr(e), tweet) |
221 logger.error(message) |
246 logger.exception(message) |
222 output = StringIO.StringIO() |
247 output = StringIO.StringIO() |
223 traceback.print_exception(Exception, e, None, None, output) |
248 try: |
224 error_stack = output.getvalue() |
249 traceback.print_exc(file=output) |
225 output.close() |
250 error_stack = output.getvalue() |
|
251 finally: |
|
252 output.close() |
226 session.rollback() |
253 session.rollback() |
227 tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack) |
254 tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack) |
228 session.add(tweet_log) |
255 session.add(tweet_log) |
229 session.commit() |
256 session.commit() |
230 |
257 |
231 |
258 |
232 |
259 |
233 class TweetProcess(BaseProcess): |
260 class TweetProcess(BaseProcess): |
234 |
261 |
235 def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): |
262 def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): |
236 self.session_maker = session_maker |
263 super(TweetProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid) |
237 self.queue = queue |
264 |
238 self.stop_event = stop_event |
265 |
239 self.options = options |
266 def do_run(self): |
240 self.access_token = access_token |
|
241 self.logger_queue = logger_queue |
|
242 super(TweetProcess, self).__init__(parent_pid) |
|
243 |
|
244 |
|
245 def run(self): |
|
246 |
267 |
247 self.logger = set_logging_process(self.options, self.logger_queue) |
268 self.logger = set_logging_process(self.options, self.logger_queue) |
248 session = self.session_maker() |
269 session = self.session_maker() |
249 try: |
270 try: |
250 while not self.stop_event.is_set() and self.parent_is_alive(): |
271 while not self.stop_event.is_set() and self.parent_is_alive(): |
251 try: |
272 try: |
252 source_id, tweet_txt = queue.get(True, 3) |
273 source_id, tweet_txt = self.queue.get(True, 3) |
253 self.logger.debug("Processing source id " + repr(source_id)) |
274 self.logger.debug("Processing source id " + repr(source_id)) |
254 except Exception as e: |
275 except Exception as e: |
255 self.logger.debug('Process tweet exception in loop : ' + repr(e)) |
276 self.logger.debug('Process tweet exception in loop : ' + repr(e)) |
256 continue |
277 continue |
257 process_tweet(tweet_txt, source_id, session, self.access_token, self.logger) |
278 process_tweet(tweet_txt, source_id, session, self.access_token, self.logger) |
320 help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int') |
341 help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int') |
321 |
342 |
322 utils.set_logging_options(parser) |
343 utils.set_logging_options(parser) |
323 |
344 |
324 return parser.parse_args() |
345 return parser.parse_args() |
325 |
346 |
326 def add_process_event(type, args, session_maker): |
347 |
|
348 def do_run(options, session_maker): |
|
349 |
|
350 stop_args = {} |
|
351 |
|
352 access_token = None |
|
353 if not options.username or not options.password: |
|
354 access_token = utils.get_oauth_token(options.token_filename) |
|
355 |
327 session = session_maker() |
356 session = session_maker() |
328 try: |
357 try: |
329 evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type) |
358 process_leftovers(session, access_token, utils.get_logger()) |
330 session.add(evt) |
|
331 session.commit() |
359 session.commit() |
332 finally: |
360 finally: |
|
361 session.rollback() |
333 session.close() |
362 session.close() |
|
363 |
|
364 if options.process_nb <= 0: |
|
365 utils.get_logger().debug("Leftovers processed. Exiting.") |
|
366 return None |
|
367 |
|
368 queue = mQueue() |
|
369 stop_event = Event() |
|
370 |
|
371 #workaround for bug on using urllib2 and multiprocessing |
|
372 req = urllib2.Request('http://localhost') |
|
373 conn = None |
|
374 try: |
|
375 conn = urllib2.urlopen(req) |
|
376 except: |
|
377 utils.get_logger().debug("could not open localhost") |
|
378 #donothing |
|
379 finally: |
|
380 if conn is not None: |
|
381 conn.close() |
|
382 |
|
383 process_engines = [] |
|
384 logger_queues = [] |
|
385 |
|
386 SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) |
|
387 process_engines.append(engine_process) |
|
388 lqueue = mQueue(1) |
|
389 logger_queues.append(lqueue) |
|
390 pid = os.getpid() |
|
391 sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) |
|
392 |
|
393 tweet_processes = [] |
|
394 |
|
395 for i in range(options.process_nb - 1): |
|
396 SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) |
|
397 process_engines.append(engine_process) |
|
398 lqueue = mQueue(1) |
|
399 logger_queues.append(lqueue) |
|
400 cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) |
|
401 tweet_processes.append(cprocess) |
|
402 |
|
403 def interupt_handler(signum, frame): |
|
404 utils.get_logger().debug("shutdown asked " + repr(signum) + " " + repr(inspect.getframeinfo(frame, 9))) |
|
405 stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)}) |
|
406 stop_event.set() |
|
407 |
|
408 signal.signal(signal.SIGINT , interupt_handler) |
|
409 signal.signal(signal.SIGHUP , interupt_handler) |
|
410 signal.signal(signal.SIGALRM, interupt_handler) |
|
411 signal.signal(signal.SIGTERM, interupt_handler) |
|
412 |
|
413 log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,)) |
|
414 log_thread.daemon = True |
|
415 |
|
416 log_thread.start() |
|
417 |
|
418 sprocess.start() |
|
419 for cprocess in tweet_processes: |
|
420 cprocess.start() |
|
421 |
|
422 add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name,sprocess.pid), 'consumers':dict([(p.name,p.pid) for p in tweet_processes])}, session_maker) |
|
423 |
|
424 if options.duration >= 0: |
|
425 end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration) |
|
426 |
|
427 |
|
428 while not stop_event.is_set(): |
|
429 if options.duration >= 0 and datetime.datetime.utcnow() >= end_ts: |
|
430 stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts}) |
|
431 stop_event.set() |
|
432 break |
|
433 if sprocess.is_alive(): |
|
434 time.sleep(1) |
|
435 else: |
|
436 stop_args.update({'message': 'Source process killed'}) |
|
437 stop_event.set() |
|
438 break |
|
439 utils.get_logger().debug("Joining Source Process") |
|
440 try: |
|
441 sprocess.join(10) |
|
442 except: |
|
443 utils.get_logger().debug("Pb joining Source Process - terminating") |
|
444 sprocess.terminate() |
|
445 |
|
446 for i, cprocess in enumerate(tweet_processes): |
|
447 utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1)) |
|
448 try: |
|
449 cprocess.join(3) |
|
450 except: |
|
451 utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1)) |
|
452 cprocess.terminate() |
|
453 |
|
454 |
|
455 utils.get_logger().debug("Close queues") |
|
456 try: |
|
457 queue.close() |
|
458 for lqueue in logger_queues: |
|
459 lqueue.close() |
|
460 except exception as e: |
|
461 utils.get_logger().error("error when closing queues %s", repr(e)) |
|
462 #do nothing |
|
463 |
|
464 |
|
465 if options.process_nb > 1: |
|
466 utils.get_logger().debug("Processing leftovers") |
|
467 session = session_maker() |
|
468 try: |
|
469 process_leftovers(session, access_token, utils.get_logger()) |
|
470 session.commit() |
|
471 finally: |
|
472 session.rollback() |
|
473 session.close() |
|
474 |
|
475 for pengine in process_engines: |
|
476 pengine.dispose() |
|
477 |
|
478 return stop_args |
334 |
479 |
335 |
480 |
336 if __name__ == '__main__': |
481 if __name__ == '__main__': |
337 |
482 |
338 stop_args = {} |
|
339 (options, args) = get_options() |
483 (options, args) = get_options() |
340 |
484 |
341 set_logging(options) |
485 set_logging(options) |
342 |
486 |
343 utils.get_logger().debug("OPTIONS : " + repr(options)) |
487 utils.get_logger().debug("OPTIONS : " + repr(options)) |
368 message = "Database %s not empty exiting" % conn_str |
512 message = "Database %s not empty exiting" % conn_str |
369 utils.get_logger().error(message) |
513 utils.get_logger().error(message) |
370 sys.exit(message) |
514 sys.exit(message) |
371 |
515 |
372 metadata.create_all(engine) |
516 metadata.create_all(engine) |
373 |
517 stop_args = {} |
374 add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session) |
518 try: |
375 |
519 add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session) |
376 access_token = None |
520 stop_args = do_run(options, Session) |
377 if not options.username or not options.password: |
521 except Exception as e: |
378 access_token = utils.get_oauth_token(options.token_filename) |
522 utils.get_logger().exception("Error in main thread") |
379 |
523 outfile = StringIO.StringIO() |
380 session = Session() |
524 try: |
381 try: |
525 traceback.print_exc(file=outfile) |
382 process_leftovers(session, access_token, utils.get_logger()) |
526 stop_args = {'error': repr(e), 'message': getattr(e, 'message', ''), 'stacktrace':outfile.getvalue()} |
383 session.commit() |
|
384 finally: |
|
385 session.rollback() |
|
386 session.close() |
|
387 |
|
388 if options.process_nb <= 0: |
|
389 utils.get_logger().debug("Leftovers processed. Exiting.") |
|
390 add_process_event(type="shutdown", args=None, session_maker=Session) |
|
391 sys.exit() |
|
392 |
|
393 queue = mQueue() |
|
394 stop_event = Event() |
|
395 |
|
396 #workaround for bug on using urllib2 and multiprocessing |
|
397 req = urllib2.Request('http://localhost') |
|
398 conn = None |
|
399 try: |
|
400 conn = urllib2.urlopen(req) |
|
401 except: |
|
402 pass |
|
403 #donothing |
|
404 finally: |
|
405 if conn is not None: |
|
406 conn.close() |
|
407 |
|
408 process_engines = [] |
|
409 logger_queues = [] |
|
410 |
|
411 SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) |
|
412 process_engines.append(engine_process) |
|
413 lqueue = mQueue(1) |
|
414 logger_queues.append(lqueue) |
|
415 pid = os.getpid() |
|
416 sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) |
|
417 |
|
418 tweet_processes = [] |
|
419 |
|
420 for i in range(options.process_nb - 1): |
|
421 SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) |
|
422 process_engines.append(engine_process) |
|
423 lqueue = mQueue(1) |
|
424 logger_queues.append(lqueue) |
|
425 cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) |
|
426 tweet_processes.append(cprocess) |
|
427 |
|
428 def interupt_handler(signum, frame): |
|
429 global stop_args |
|
430 utils.get_logger().debug("shutdown asked " + repr(signum) + " " + repr(inspect.getframeinfo(frame, 9))) |
|
431 stop_args = {'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)} |
|
432 stop_event.set() |
|
433 |
|
434 signal.signal(signal.SIGINT , interupt_handler) |
|
435 signal.signal(signal.SIGHUP , interupt_handler) |
|
436 signal.signal(signal.SIGALRM, interupt_handler) |
|
437 signal.signal(signal.SIGTERM, interupt_handler) |
|
438 |
|
439 log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,)) |
|
440 log_thread.daemon = True |
|
441 |
|
442 log_thread.start() |
|
443 |
|
444 sprocess.start() |
|
445 for cprocess in tweet_processes: |
|
446 cprocess.start() |
|
447 |
|
448 add_process_event("pid", {"main":os.getpid(), 'source':sprocess.pid, 'consumers':[p.pid for p in tweet_processes]}, Session) |
|
449 |
|
450 if options.duration >= 0: |
|
451 end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration) |
|
452 |
|
453 |
|
454 while not stop_event.is_set(): |
|
455 if options.duration >= 0 and datetime.datetime.utcnow() >= end_ts: |
|
456 stop_event.set() |
|
457 break |
|
458 if sprocess.is_alive(): |
|
459 time.sleep(1) |
|
460 else: |
|
461 stop_event.set() |
|
462 break |
|
463 utils.get_logger().debug("Joining Source Process") |
|
464 try: |
|
465 sprocess.join(10) |
|
466 except: |
|
467 utils.get_logger().debug("Pb joining Source Process - terminating") |
|
468 sprocess.terminate() |
|
469 |
|
470 for i, cprocess in enumerate(tweet_processes): |
|
471 utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1)) |
|
472 try: |
|
473 cprocess.join(3) |
|
474 except: |
|
475 utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1)) |
|
476 cprocess.terminate() |
|
477 |
|
478 |
|
479 utils.get_logger().debug("Close queues") |
|
480 try: |
|
481 queue.close() |
|
482 for lqueue in logger_queues: |
|
483 lqueue.close() |
|
484 except exception as e: |
|
485 utils.get_logger().error("error when closing queues %s", repr(e)) |
|
486 #do nothing |
|
487 |
|
488 |
|
489 if options.process_nb > 1: |
|
490 utils.get_logger().debug("Processing leftovers") |
|
491 session = Session() |
|
492 try: |
|
493 process_leftovers(session, access_token, utils.get_logger()) |
|
494 session.commit() |
|
495 finally: |
527 finally: |
496 session.rollback() |
528 outfile.close() |
497 session.close() |
529 raise |
498 |
530 finally: |
499 for pengine in process_engines: |
531 add_process_event(type="shutdown", args=stop_args, session_maker=Session) |
500 pengine.dispose() |
532 |
501 |
|
502 add_process_event(type="shutdown", args=stop_args, session_maker=Session) |
|
503 utils.get_logger().debug("Done. Exiting.") |
533 utils.get_logger().debug("Done. Exiting.") |
504 |
534 |