| changeset 890 | 9c57883dbb9d |
| parent 888 | 6fc6637d8403 |
| child 893 | 10a19dd4e1c9 |
| 889:c774bdf7d3dd | 890:9c57883dbb9d |
|---|---|
195 self.logger.error("SourceProcess : Error when processing tweet " + repr(e)) |
195 self.logger.error("SourceProcess : Error when processing tweet " + repr(e)) |
196 raise |
196 raise |
197 finally: |
197 finally: |
198 session.rollback() |
198 session.rollback() |
199 session.close() |
199 session.close() |
200 self.logger_queue.close() |
|
201 self.queue.close() |
|
202 self.stream.close() |
200 self.stream.close() |
203 self.stream = None |
201 self.stream = None |
204 if not self.stop_event.is_set(): |
202 if not self.stop_event.is_set(): |
205 self.stop_event.set() |
203 self.stop_event.set() |
206 |
204 |
223 |
221 |
224 if self.stop_event.is_set() and self.stream: |
222 if self.stop_event.is_set() and self.stream: |
225 self.stream.close() |
223 self.stream.close() |
226 elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive: |
224 elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive: |
227 self.stop_event.set() |
225 self.stop_event.set() |
228 |
226 |
227 self.queue.cancel_join_thread() |
|
228 self.logger_queue.cancel_join_thread() |
|
229 self.logger.info("SourceProcess : join") |
229 self.logger.info("SourceProcess : join") |
230 source_stream_iter_thread.join(30) |
230 source_stream_iter_thread.join(30) |
231 |
231 |
232 |
232 |
233 def process_tweet(tweet, source_id, session, consumer_token, access_token, twitter_query_user, token_filename, logger): |
233 def process_tweet(tweet, source_id, session, consumer_token, access_token, twitter_query_user, token_filename, logger): |
328 tweet_txt = src.original_json |
328 tweet_txt = src.original_json |
329 process_tweet(tweet_txt, src.id, session, consumer_token, access_token, twitter_query_user, token_filename, logger) |
329 process_tweet(tweet_txt, src.id, session, consumer_token, access_token, twitter_query_user, token_filename, logger) |
330 session.commit() |
330 session.commit() |
331 |
331 |
332 |
332 |
333 # get tweet source that do not match any message |
|
334 # select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull; |
|
335 def process_log(logger_queues, stop_event): |
333 def process_log(logger_queues, stop_event): |
336 while not stop_event.is_set(): |
334 while not stop_event.is_set(): |
337 for lqueue in logger_queues: |
335 for lqueue in logger_queues: |
338 try: |
336 try: |
339 record = lqueue.get_nowait() |
337 record = lqueue.get_nowait() |
482 utils.get_logger().debug("Joining Source Process") |
480 utils.get_logger().debug("Joining Source Process") |
483 try: |
481 try: |
484 sprocess.join(10) |
482 sprocess.join(10) |
485 except: |
483 except: |
486 utils.get_logger().debug("Pb joining Source Process - terminating") |
484 utils.get_logger().debug("Pb joining Source Process - terminating") |
485 finally: |
|
487 sprocess.terminate() |
486 sprocess.terminate() |
488 |
487 |
489 for i, cprocess in enumerate(tweet_processes): |
488 for i, cprocess in enumerate(tweet_processes): |
490 utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1)) |
489 utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1)) |
491 try: |
490 try: |
493 except: |
492 except: |
494 utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1)) |
493 utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1)) |
495 cprocess.terminate() |
494 cprocess.terminate() |
496 |
495 |
497 |
496 |
498 utils.get_logger().debug("Close queues") |
497 utils.get_logger().debug("Close queues") |
499 |
|
500 if options.process_nb > 1: |
|
501 utils.get_logger().debug("Processing leftovers") |
|
502 session = session_maker() |
|
503 try: |
|
504 process_leftovers(session, consumer_token, access_token, options.twitter_query_user, options.token_filename, options.ask_process_leftovers, utils.get_logger()) |
|
505 session.commit() |
|
506 finally: |
|
507 session.rollback() |
|
508 session.close() |
|
509 |
|
510 for pengine in process_engines: |
|
511 pengine.dispose() |
|
512 |
|
513 try: |
498 try: |
514 queue.close() |
499 queue.close() |
515 for lqueue in logger_queues: |
500 for lqueue in logger_queues: |
516 lqueue.close() |
501 lqueue.close() |
517 except Exception as e: |
502 except Exception as e: |
518 utils.get_logger().error("error when closing queues %s", repr(e)) |
503 utils.get_logger().error("error when closing queues %s", repr(e)) |
519 # do nothing |
504 # do nothing |
520 |
505 |
506 |
|
507 if options.process_nb > 1: |
|
508 utils.get_logger().debug("Processing leftovers") |
|
509 session = session_maker() |
|
510 try: |
|
511 process_leftovers(session, consumer_token, access_token, options.twitter_query_user, options.token_filename, options.ask_process_leftovers, utils.get_logger()) |
|
512 session.commit() |
|
513 finally: |
|
514 session.rollback() |
|
515 session.close() |
|
516 |
|
517 for pengine in process_engines: |
|
518 pengine.dispose() |
|
519 |
|
521 return stop_args |
520 return stop_args |
522 |
521 |
523 |
522 |
524 def main(options): |
523 def main(options): |
525 |
524 |