script/stream/recorder_tweetstream.py
changeset 464 b9243ade95e2
parent 425 b346fd32fc34
child 528 7fb5a7b0d35c
equal deleted inserted replaced
463:d3b86c65c980 464:b9243ade95e2
   227             session.close()
   227             session.close()
   228             self.queue.close()
   228             self.queue.close()
   229             self.stop_event.set()
   229             self.stop_event.set()
   230 
   230 
   231 
   231 
   232 def process_tweet(tweet, source_id, session, access_token, logger):
   232 def process_tweet(tweet, source_id, session, access_token, twitter_query_user, logger):
   233     try:
   233     try:
   234         tweet_obj = anyjson.deserialize(tweet)
   234         tweet_obj = anyjson.deserialize(tweet)
   235         if 'text' not in tweet_obj:
   235         if 'text' not in tweet_obj:
   236             tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
   236             tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
   237             session.add(tweet_log)
   237             session.add(tweet_log)
   239         screen_name = ""
   239         screen_name = ""
   240         if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
   240         if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
   241             screen_name = tweet_obj['user']['screen_name']
   241             screen_name = tweet_obj['user']['screen_name']
   242         logger.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
   242         logger.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
   243         logger.debug(u"Process_tweet :" + repr(tweet))
   243         logger.debug(u"Process_tweet :" + repr(tweet))
   244         processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None)
   244         processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None, twitter_query_user)
   245         processor.process()
   245         processor.process()
   246     except Exception as e:
   246     except Exception as e:
   247         message = u"Error %s processing tweet %s" % (repr(e), tweet)
   247         message = u"Error %s processing tweet %s" % (repr(e), tweet)
   248         logger.exception(message)
   248         logger.exception(message)
   249         output = StringIO.StringIO()
   249         output = StringIO.StringIO()
   261         
   261         
   262 class TweetProcess(BaseProcess):
   262 class TweetProcess(BaseProcess):
   263     
   263     
   264     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
   264     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
   265         super(TweetProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
   265         super(TweetProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
       
   266         self.twitter_query_user = options.twitter_query_user
   266 
   267 
   267 
   268 
   268     def do_run(self):
   269     def do_run(self):
   269         
   270         
   270         self.logger = set_logging_process(self.options, self.logger_queue)
   271         self.logger = set_logging_process(self.options, self.logger_queue)
   275                     source_id, tweet_txt = self.queue.get(True, 3)
   276                     source_id, tweet_txt = self.queue.get(True, 3)
   276                     self.logger.debug("Processing source id " + repr(source_id))
   277                     self.logger.debug("Processing source id " + repr(source_id))
   277                 except Exception as e:
   278                 except Exception as e:
   278                     self.logger.debug('Process tweet exception in loop : ' + repr(e))
   279                     self.logger.debug('Process tweet exception in loop : ' + repr(e))
   279                     continue
   280                     continue
   280                 process_tweet(tweet_txt, source_id, session, self.access_token, self.logger)
   281                 process_tweet(tweet_txt, source_id, session, self.access_token, self.twitter_query_user, self.logger)
   281                 session.commit()
   282                 session.commit()
   282         finally:
   283         finally:
   283             session.rollback()
   284             session.rollback()
   284             self.stop_event.set()
   285             self.stop_event.set()
   285             session.close()
   286             session.close()
   343                       help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
   344                       help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
   344     parser.add_option("-N", "--nb-process", dest="process_nb",
   345     parser.add_option("-N", "--nb-process", dest="process_nb",
   345                       help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int')
   346                       help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int')
   346     parser.add_option("--url", dest="url",
   347     parser.add_option("--url", dest="url",
   347                       help="The twitter url to connect to.", metavar="URL", default=tweetstream.FilterStream.url)
   348                       help="The twitter url to connect to.", metavar="URL", default=tweetstream.FilterStream.url)
       
   349     parser.add_option("--query-user", dest="twitter_query_user", action="store_true",
       
   350                       help="Query twitter for users", default=False, metavar="QUERY_USER")
       
   351 
   348 
   352 
   349 
   353 
   350     utils.set_logging_options(parser)
   354     utils.set_logging_options(parser)
   351 
   355 
   352     return parser.parse_args()
   356     return parser.parse_args()