script/stream/recorder_tweetstream.py
changeset 528 7fb5a7b0d35c
parent 464 b9243ade95e2
child 693 2ef837069108
equal deleted inserted replaced
527:80e5b9543cac 528:7fb5a7b0d35c
    62         auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
    62         auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
    63         auth.set_access_token(*access_token)
    63         auth.set_access_token(*access_token)
    64     return auth
    64     return auth
    65 
    65 
    66 
    66 
    67 class ReconnectingTweetStream(tweetstream.FilterStream):
       
    68     """TweetStream class that automatically tries to reconnect if the
       
    69     connecting goes down. Reconnecting, and waiting for reconnecting, is
       
    70     blocking.
       
    71 
       
    72     :param username: See :TweetStream:
       
    73 
       
    74     :param password: See :TweetStream:
       
    75 
       
    76     :keyword url: See :TweetStream:
       
    77 
       
    78     :keyword reconnects: Number of reconnects before a ConnectionError is
       
    79         raised. Default is 3
       
    80 
       
    81     :error_cb: Optional callable that will be called just before trying to
       
    82         reconnect. The callback will be called with a single argument, the
       
    83         exception that caused the reconnect attempt. Default is None
       
    84 
       
    85     :retry_wait: Time to wait before reconnecting in seconds. Default is 5
       
    86 
       
    87     """
       
    88 
       
    89     def __init__(self, auth, keywords, reconnects=3, error_cb=None, retry_wait=5, as_text=False, **kwargs):
       
    90         self.max_reconnects = reconnects
       
    91         self.retry_wait = retry_wait
       
    92         self._reconnects = 0
       
    93         self._error_cb = error_cb
       
    94         super(ReconnectingTweetStream, self).__init__(auth=auth, track=keywords, as_text=as_text, **kwargs)
       
    95 
       
    96     def next(self):
       
    97         while True:
       
    98             try:
       
    99                 utils.get_logger().debug("return super.next")
       
   100                 return super(ReconnectingTweetStream, self).next()
       
   101             except tweetstream.ConnectionError, e:
       
   102                 utils.get_logger().debug("connection error :" + str(e))
       
   103                 self._reconnects += 1
       
   104                 if self._reconnects > self.max_reconnects:
       
   105                     raise tweetstream.ConnectionError("Too many retries")
       
   106 
       
   107                 # Note: error_cb is not called on the last error since we
       
   108                 # raise a ConnectionError instead
       
   109                 if  callable(self._error_cb):
       
   110                     self._error_cb(e)
       
   111 
       
   112                 time.sleep(self.retry_wait)
       
   113         # Don't listen to auth error, since we can't reasonably reconnect
       
   114         # when we get one.
       
   115 
       
   116 def add_process_event(type, args, session_maker):
    67 def add_process_event(type, args, session_maker):
   117     session = session_maker()
    68     session = session_maker()
   118     try:
    69     try:
   119         evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type)
    70         evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type)
   120         session.add(evt)
    71         session.add(evt)
   168 
   119 
   169 class SourceProcess(BaseProcess):
   120 class SourceProcess(BaseProcess):
   170     
   121     
   171     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
   122     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
   172         self.track = options.track
   123         self.track = options.track
   173         self.reconnects = options.reconnects
       
   174         self.token_filename = options.token_filename
   124         self.token_filename = options.token_filename
       
   125         self.catchup = options.catchup
       
   126         self.timeout = options.timeout
   175         super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
   127         super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
   176 
   128 
   177     def do_run(self):
   129     def do_run(self):
   178         
   130         
   179         #import pydevd
   131         #import pydevd
   180         #pydevd.settrace(suspend=False)
   132         #pydevd.settrace(suspend=True)
   181 
   133 
   182         self.logger = set_logging_process(self.options, self.logger_queue)
   134         self.logger = set_logging_process(self.options, self.logger_queue)
   183         self.auth = get_auth(self.options, self.access_token) 
   135         self.auth = get_auth(self.options, self.access_token) 
   184         
   136         
   185         self.logger.debug("SourceProcess : run")
   137         self.logger.debug("SourceProcess : run ")
   186         track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
   138         track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
   187         track_list = [k for k in track_list.split(',')]
   139         self.logger.debug("SourceProcess : track list " + track_list)
       
   140         
       
   141         track_list = [k.strip() for k in track_list.split(',')]
   188 
   142 
   189         self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list))                        
   143         self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list))                        
   190         stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True, url=self.options.url)
   144         stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout)
   191         self.logger.debug("SourceProcess : after connecting to stream")
   145         self.logger.debug("SourceProcess : after connecting to stream")
   192         stream.muststop = lambda: self.stop_event.is_set()
   146         stream.muststop = lambda: self.stop_event.is_set()
   193         
   147         
   194         session = self.session_maker()
   148         session = self.session_maker()
   195         
   149         
   290     engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False)
   244     engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False)
   291     Session = scoped_session(Session)
   245     Session = scoped_session(Session)
   292     return Session, engine, metadata
   246     return Session, engine, metadata
   293 
   247 
   294             
   248             
   295 def process_leftovers(session, access_token, logger):
   249 def process_leftovers(session, access_token, twitter_query_user, logger):
   296     
   250     
   297     sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
   251     sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
   298     
   252     
   299     for src in sources:
   253     for src in sources:
   300         tweet_txt = src.original_json
   254         tweet_txt = src.original_json
   301         process_tweet(tweet_txt, src.id, session, access_token, logger)
   255         process_tweet(tweet_txt, src.id, session, access_token, twitter_query_user, logger)
   302         session.commit()
   256         session.commit()
   303 
   257 
   304         
   258         
   305     
   259     
   306     #get tweet source that do not match any message
   260     #get tweet source that do not match any message
   334                       help="Twitter track", metavar="TRACK")
   288                       help="Twitter track", metavar="TRACK")
   335     parser.add_option("-n", "--new", dest="new", action="store_true",
   289     parser.add_option("-n", "--new", dest="new", action="store_true",
   336                       help="new database", default=False)
   290                       help="new database", default=False)
   337     parser.add_option("-D", "--daemon", dest="daemon", action="store_true",
   291     parser.add_option("-D", "--daemon", dest="daemon", action="store_true",
   338                       help="launch daemon", default=False)
   292                       help="launch daemon", default=False)
   339     parser.add_option("-r", "--reconnects", dest="reconnects",
       
   340                       help="Reconnects", metavar="RECONNECTS", default=10, type='int')
       
   341     parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
   293     parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
   342                       help="Token file name")
   294                       help="Token file name")
   343     parser.add_option("-d", "--duration", dest="duration",
   295     parser.add_option("-d", "--duration", dest="duration",
   344                       help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
   296                       help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
   345     parser.add_option("-N", "--nb-process", dest="process_nb",
   297     parser.add_option("-N", "--nb-process", dest="process_nb",
   346                       help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int')
   298                       help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int')
   347     parser.add_option("--url", dest="url",
   299     parser.add_option("--url", dest="url",
   348                       help="The twitter url to connect to.", metavar="URL", default=tweetstream.FilterStream.url)
   300                       help="The twitter url to connect to.", metavar="URL", default=tweetstream.FilterStream.url)
   349     parser.add_option("--query-user", dest="twitter_query_user", action="store_true",
   301     parser.add_option("--query-user", dest="twitter_query_user", action="store_true",
   350                       help="Query twitter for users", default=False, metavar="QUERY_USER")
   302                       help="Query twitter for users", default=False, metavar="QUERY_USER")
       
   303     parser.add_option("--catchup", dest="catchup",
       
   304                       help="catchup count for tweets", default=None, metavar="CATCHUP", type='int')
       
   305     parser.add_option("--timeout", dest="timeout",
       
   306                       help="timeout for connecting in seconds", default=60, metavar="TIMEOUT", type='int')
       
   307     
   351 
   308 
   352 
   309 
   353 
   310 
   354     utils.set_logging_options(parser)
   311     utils.set_logging_options(parser)
   355 
   312 
   364     if not options.username or not options.password:
   321     if not options.username or not options.password:
   365         access_token = utils.get_oauth_token(options.token_filename)
   322         access_token = utils.get_oauth_token(options.token_filename)
   366     
   323     
   367     session = session_maker()
   324     session = session_maker()
   368     try:
   325     try:
   369         process_leftovers(session, access_token, utils.get_logger())
   326         process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger())
   370         session.commit()
   327         session.commit()
   371     finally:
   328     finally:
   372         session.rollback()
   329         session.rollback()
   373         session.close()
   330         session.close()
   374     
   331     
   475     
   432     
   476     if options.process_nb > 1:
   433     if options.process_nb > 1:
   477         utils.get_logger().debug("Processing leftovers")
   434         utils.get_logger().debug("Processing leftovers")
   478         session = session_maker()
   435         session = session_maker()
   479         try:
   436         try:
   480             process_leftovers(session, access_token, utils.get_logger())
   437             process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger())
   481             session.commit()
   438             session.commit()
   482         finally:
   439         finally:
   483             session.rollback()
   440             session.rollback()
   484             session.close()
   441             session.close()
   485 
   442