script/stream/recorder_tweetstream.py
changeset 15 5d552b6a0e55
parent 11 54d7f1486ac4
child 82 210dc265c70f
equal deleted inserted replaced
12:4daf47fcf792 15:5d552b6a0e55
     6 import logging
     6 import logging
     7 import os
     7 import os
     8 import socket
     8 import socket
     9 import sys
     9 import sys
    10 import tweetstream
    10 import tweetstream
       
    11 import tweetstream.auth
    11 socket._fileobject.default_bufsize = 0
    12 socket._fileobject.default_bufsize = 0
    12 
    13 
    13 
    14 
    14 
    15 
    15 #columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user']
    16 #columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user']
    39 
    40 
    40     :retry_wait: Time to wait before reconnecting in seconds. Default is 5
    41     :retry_wait: Time to wait before reconnecting in seconds. Default is 5
    41 
    42 
    42     """
    43     """
    43 
    44 
    44     def __init__(self, user, password, keywords, url="track", reconnects=3, error_cb=None, retry_wait=5, **kwargs):
    45     def __init__(self, auth, keywords, url="track", reconnects=3, error_cb=None, retry_wait=5, **kwargs):
    45         self.max_reconnects = reconnects
    46         self.max_reconnects = reconnects
    46         self.retry_wait = retry_wait
    47         self.retry_wait = retry_wait
    47         self._reconnects = 0
    48         self._reconnects = 0
    48         self._error_cb = error_cb
    49         self._error_cb = error_cb
    49         super(ReconnectingTweetStream,self).__init__(user, password, keywords, url, **kwargs)
    50         super(ReconnectingTweetStream,self).__init__(auth, keywords, url, **kwargs)
    50 
    51 
    51     def next(self):
    52     def next(self):
    52         while True:
    53         while True:
    53             try:
    54             try:
    54                 return super(ReconnectingTweetStream,self).next()
    55                 return super(ReconnectingTweetStream,self).next()
    55             except tweetstream.ConnectionError, e:
    56             except tweetstream.ConnectionError, e:
       
    57                 logging.debug("connection error :" + str(e))
    56                 self._reconnects += 1
    58                 self._reconnects += 1
    57                 if self._reconnects > self.max_reconnects:
    59                 if self._reconnects > self.max_reconnects:
    58                     raise ConnectionError("Too many retries")
    60                     raise tweetstream.ConnectionError("Too many retries")
    59 
    61 
    60                 # Note: error_cb is not called on the last error since we
    62                 # Note: error_cb is not called on the last error since we
    61                 # raise a ConnectionError instead
    63                 # raise a ConnectionError instead
    62                 if  callable(self._error_cb):
    64                 if  callable(self._error_cb):
    63                     self._error_cb(e)
    65                     self._error_cb(e)
    66         # Don't listen to auth error, since we can't reasonably reconnect
    68         # Don't listen to auth error, since we can't reasonably reconnect
    67         # when we get one.
    69         # when we get one.
    68 
    70 
    69 
    71 
    70 
    72 
    71 def process_tweet(tweet, session, debug):
    73 def process_tweet(tweet, session, debug, token_filename):
    72     
    74     
    73     logging.debug("Process_tweet :" + repr(tweet))
    75     logging.debug("Process_tweet :" + repr(tweet))
    74     processor = utils.TwitterProcessor(tweet, None, session)
    76     processor = utils.TwitterProcessor(tweet, None, session, token_filename)
    75     processor.process()
    77     processor.process()
    76 
    78 
    77 def main(username, password, track, session, debug, reconnects):
    79 def main(username, password, track, session, debug, reconnects, token_filename):
    78 
    80 
    79     username = username or raw_input('Twitter username: ')
    81     #username = username or raw_input('Twitter username: ')
    80     password = password or getpass('Twitter password: ')
    82     #password = password or getpass('Twitter password: ')
    81 
    83 
    82     track_list = track or raw_input('Keywords to track (comma seperated): ').strip()
    84     track_list = track or raw_input('Keywords to track (comma seperated): ').strip()
    83     track_list = [k for k in track_list.split(',')]
    85     track_list = [k for k in track_list.split(',')]
    84 
    86     
    85     stream = ReconnectingTweetStream(username, password, track_list, reconnects=reconnects)
    87     if username and password:
       
    88         auth = tweetstream.auth.BasicAuthHandler(username, password)        
       
    89     else:
       
    90         consumer_key = models.CONSUMER_KEY
       
    91         consumer_secret = models.CONSUMER_SECRET
       
    92         auth = tweetstream.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
       
    93         auth.set_access_token(*(utils.get_oauth_token(token_filename)))
       
    94     
       
    95     stream = ReconnectingTweetStream(auth, track_list, reconnects=reconnects)
    86     try:
    96     try:
    87         for tweet in stream:
    97         for tweet in stream:
    88             process_tweet(tweet, session, debug)
    98             process_tweet(tweet, session, debug, token_filename)
    89             session.commit()
    99             session.commit()
    90     finally:
   100     finally:
    91         stream.close()
   101         stream.close()
    92         
   102         
    93 def get_options():
   103 def get_options():
    96                       help="write tweet to FILE", metavar="FILE", default="enmi2010_twitter.db")
   106                       help="write tweet to FILE", metavar="FILE", default="enmi2010_twitter.db")
    97     parser.add_option("-u", "--user", dest="username",
   107     parser.add_option("-u", "--user", dest="username",
    98                       help="Twitter user", metavar="USER", default=None)
   108                       help="Twitter user", metavar="USER", default=None)
    99     parser.add_option("-w", "--password", dest="password",
   109     parser.add_option("-w", "--password", dest="password",
   100                       help="Twitter password", metavar="PASSWORD", default=None)
   110                       help="Twitter password", metavar="PASSWORD", default=None)
   101     parser.add_option("-t", "--track", dest="track",
   111     parser.add_option("-T", "--track", dest="track",
   102                       help="Twitter track", metavar="TRACK")
   112                       help="Twitter track", metavar="TRACK")
   103     parser.add_option("-n", "--new", dest="new", action="store_true",
   113     parser.add_option("-n", "--new", dest="new", action="store_true",
   104                       help="new database", default=False)
   114                       help="new database", default=False)
   105     parser.add_option("-r", "--reconnects", dest="reconnects",
   115     parser.add_option("-r", "--reconnects", dest="reconnects",
   106                       help="Reconnects", metavar="RECONNECTS", default=10, type='int')
   116                       help="Reconnects", metavar="RECONNECTS", default=10, type='int')
   107     
   117     parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
       
   118                       help="Token file name")
       
   119 
   108     utils.set_logging_options(parser)
   120     utils.set_logging_options(parser)
   109 
   121 
   110     return parser.parse_args()
   122     return parser.parse_args()
   111     
   123     
   112 
   124 
   128     Session = sessionmaker(bind=engine)
   140     Session = sessionmaker(bind=engine)
   129     session = Session()
   141     session = Session()
   130 
   142 
   131     try:
   143     try:
   132         try:
   144         try:
   133             main(options.username, options.password, options.track, session, options.debug, options.reconnects)
   145             main(options.username, options.password, options.track, session, options.debug, options.reconnects, options.token_filename)
   134         except KeyboardInterrupt:
   146         except KeyboardInterrupt:
   135             print '\nGoodbye!'
   147             print '\nGoodbye!'
   136         session.commit()
   148         session.commit()
   137     finally:
   149     finally:
   138         session.close()
   150         session.close()