script/stream/recorder_tweetstream.py
changeset 199 514e0ee0c68a
parent 82 210dc265c70f
child 206 6d642d650470
equal deleted inserted replaced
198:249867084b9f 199:514e0ee0c68a
     1 from getpass import getpass
     1 from getpass import getpass
     2 from iri_tweet import models, utils
     2 from iri_tweet import models, utils
     3 from optparse import OptionParser
     3 from optparse import OptionParser
     4 from sqlalchemy.orm import sessionmaker
     4 from sqlalchemy.orm import sessionmaker
     5 from sqlite3 import *
     5 from sqlite3 import *
       
     6 import datetime
     6 import logging
     7 import logging
     7 import os
     8 import os
     8 import socket
     9 import socket
     9 import sys
    10 import sys
    10 import tweetstream
    11 import tweetstream
    45     def __init__(self, auth, keywords, url="track", reconnects=3, error_cb=None, retry_wait=5, **kwargs):
    46     def __init__(self, auth, keywords, url="track", reconnects=3, error_cb=None, retry_wait=5, **kwargs):
    46         self.max_reconnects = reconnects
    47         self.max_reconnects = reconnects
    47         self.retry_wait = retry_wait
    48         self.retry_wait = retry_wait
    48         self._reconnects = 0
    49         self._reconnects = 0
    49         self._error_cb = error_cb
    50         self._error_cb = error_cb
    50         super(ReconnectingTweetStream,self).__init__(auth, keywords, url, **kwargs)
    51         super(ReconnectingTweetStream, self).__init__(auth, keywords, url, **kwargs)
    51 
    52 
    52     def next(self):
    53     def next(self):
    53         while True:
    54         while True:
    54             try:
    55             try:
    55                 return super(ReconnectingTweetStream,self).next()
    56                 return super(ReconnectingTweetStream, self).next()
    56             except tweetstream.ConnectionError, e:
    57             except tweetstream.ConnectionError, e:
    57                 logging.debug("connection error :" + str(e))
    58                 logging.debug("connection error :" + str(e))
    58                 self._reconnects += 1
    59                 self._reconnects += 1
    59                 if self._reconnects > self.max_reconnects:
    60                 if self._reconnects > self.max_reconnects:
    60                     raise tweetstream.ConnectionError("Too many retries")
    61                     raise tweetstream.ConnectionError("Too many retries")
    72 
    73 
    73 def process_tweet(tweet, session, debug, token_filename):
    74 def process_tweet(tweet, session, debug, token_filename):
    74     screen_name = ""
    75     screen_name = ""
    75     if 'user' in tweet and 'screen_name' in tweet['user']:
    76     if 'user' in tweet and 'screen_name' in tweet['user']:
    76         screen_name = tweet['user']['screen_name']
    77         screen_name = tweet['user']['screen_name']
    77     logging.info("Process_tweet from %s : %s" % (screen_name,tweet['text']))
    78     logging.info("Process_tweet from %s : %s" % (screen_name, tweet['text']))
    78     logging.debug("Process_tweet :" + repr(tweet))
    79     logging.debug("Process_tweet :" + repr(tweet))
    79     processor = utils.TwitterProcessor(tweet, None, session, token_filename)
    80     processor = utils.TwitterProcessor(tweet, None, session, token_filename)
    80     processor.process()
    81     processor.process()
    81 
    82 
    82 def main(username, password, track, session, debug, reconnects, token_filename):
    83 def main(username, password, track, session, debug, reconnects, token_filename, duration):
    83 
    84 
    84     #username = username or raw_input('Twitter username: ')
    85     #username = username or raw_input('Twitter username: ')
    85     #password = password or getpass('Twitter password: ')
    86     #password = password or getpass('Twitter password: ')
    86 
    87 
    87     track_list = track or raw_input('Keywords to track (comma seperated): ').strip()
    88     track_list = track or raw_input('Keywords to track (comma seperated): ').strip()
    93         consumer_key = models.CONSUMER_KEY
    94         consumer_key = models.CONSUMER_KEY
    94         consumer_secret = models.CONSUMER_SECRET
    95         consumer_secret = models.CONSUMER_SECRET
    95         auth = tweetstream.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
    96         auth = tweetstream.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
    96         auth.set_access_token(*(utils.get_oauth_token(token_filename)))
    97         auth.set_access_token(*(utils.get_oauth_token(token_filename)))
    97     
    98     
       
    99     if duration >= 0:
       
   100         end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=duration)
       
   101     
    98     stream = ReconnectingTweetStream(auth, track_list, reconnects=reconnects)
   102     stream = ReconnectingTweetStream(auth, track_list, reconnects=reconnects)
    99     try:
   103     try:
   100         for tweet in stream:
   104         for tweet in stream:            
       
   105             if duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
       
   106                 print "Stop recording after %d seconds." % (duration)
       
   107                 break
   101             process_tweet(tweet, session, debug, token_filename)
   108             process_tweet(tweet, session, debug, token_filename)
   102             session.commit()
   109             session.commit()
   103     finally:
   110     finally:
   104         stream.close()
   111         stream.close()
   105         
   112         
   106 def get_options():
   113 def get_options():
   107     parser = OptionParser()
   114     parser = OptionParser()
   108     parser.add_option("-f", "--file", dest="filename",  
   115     parser.add_option("-f", "--file", dest="filename",
   109                       help="write tweet to FILE", metavar="FILE", default="enmi2010_twitter.db")
   116                       help="write tweet to FILE", metavar="FILE", default="enmi2010_twitter.db")
   110     parser.add_option("-u", "--user", dest="username",
   117     parser.add_option("-u", "--user", dest="username",
   111                       help="Twitter user", metavar="USER", default=None)
   118                       help="Twitter user", metavar="USER", default=None)
   112     parser.add_option("-w", "--password", dest="password",
   119     parser.add_option("-w", "--password", dest="password",
   113                       help="Twitter password", metavar="PASSWORD", default=None)
   120                       help="Twitter password", metavar="PASSWORD", default=None)
   117                       help="new database", default=False)
   124                       help="new database", default=False)
   118     parser.add_option("-r", "--reconnects", dest="reconnects",
   125     parser.add_option("-r", "--reconnects", dest="reconnects",
   119                       help="Reconnects", metavar="RECONNECTS", default=10, type='int')
   126                       help="Reconnects", metavar="RECONNECTS", default=10, type='int')
   120     parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
   127     parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
   121                       help="Token file name")
   128                       help="Token file name")
       
   129     parser.add_option("-d", "--duration", dest="duration",
       
   130                       help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
       
   131 
   122 
   132 
   123     utils.set_logging_options(parser)
   133     utils.set_logging_options(parser)
   124 
   134 
   125     return parser.parse_args()
   135     return parser.parse_args()
   126     
   136     
   137         print repr(options)
   147         print repr(options)
   138     
   148     
   139     if options.new and os.path.exists(options.filename):
   149     if options.new and os.path.exists(options.filename):
   140         os.remove(options.filename)
   150         os.remove(options.filename)
   141     
   151     
   142     engine, metadata = models.setup_database('sqlite:///'+options.filename, echo=(options.debug))
   152     engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug))
   143     Session = sessionmaker(bind=engine)
   153     Session = sessionmaker(bind=engine)
   144     session = Session()
   154     session = Session()
   145 
   155 
   146     try:
   156     try:
   147         try:
   157         try:
   148             main(options.username, options.password, options.track, session, options.debug, options.reconnects, options.token_filename)
   158             main(options.username, options.password, options.track, session, options.debug, options.reconnects, options.token_filename, options.duration)
   149         except KeyboardInterrupt:
   159         except KeyboardInterrupt:
   150             print '\nGoodbye!'
   160             print '\nGoodbye!'
   151         session.commit()
   161         session.commit()
   152     finally:
   162     finally:
   153         session.close()
   163         session.close()