script/stream/recorder_tweetstream.py
changeset 9 bb44692e09ee
child 11 54d7f1486ac4
equal deleted inserted replaced
8:b7f4b0554ef8 9:bb44692e09ee
       
     1 import tweetstream
       
     2 from getpass import getpass
       
     3 import socket
       
     4 socket._fileobject.default_bufsize = 0
       
     5 from sqlite3 import *
       
     6 from optparse import OptionParser
       
     7 import os
       
     8 
       
     9 
       
    10 #columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user']
       
    11 columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source']
       
    12 #columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following']
       
    13 columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count']
       
    14 #just put it in a sqlite3 tqble
       
    15 
       
    16 
       
    17 class ReconnectingTweetStream(tweetstream.TrackStream):
       
    18     """TweetStream class that automatically tries to reconnect if the
       
    19     connecting goes down. Reconnecting, and waiting for reconnecting, is
       
    20     blocking.
       
    21 
       
    22     :param username: See :TweetStream:
       
    23 
       
    24     :param password: See :TweetStream:
       
    25 
       
    26     :keyword url: See :TweetStream:
       
    27 
       
    28     :keyword reconnects: Number of reconnects before a ConnectionError is
       
    29         raised. Default is 3
       
    30 
       
    31     :error_cb: Optional callable that will be called just before trying to
       
    32         reconnect. The callback will be called with a single argument, the
       
    33         exception that caused the reconnect attempt. Default is None
       
    34 
       
    35     :retry_wait: Time to wait before reconnecting in seconds. Default is 5
       
    36 
       
    37     """
       
    38 
       
    39     def __init__(self, user, password, keywords, url="track", reconnects=3, error_cb=None, retry_wait=5, **kwargs):
       
    40         self.max_reconnects = reconnects
       
    41         self.retry_wait = retry_wait
       
    42         self._reconnects = 0
       
    43         self._error_cb = error_cb
       
    44         super(ReconnectingTweetStream,self).__init__(user, password, keywords, url, **kwargs)
       
    45 
       
    46     def next(self):
       
    47         while True:
       
    48             try:
       
    49                 return super(ReconnectingTweetStream,self).next()
       
    50             except tweetstream.ConnectionError, e:
       
    51                 self._reconnects += 1
       
    52                 if self._reconnects > self.max_reconnects:
       
    53                     raise ConnectionError("Too many retries")
       
    54 
       
    55                 # Note: error_cb is not called on the last error since we
       
    56                 # raise a ConnectionError instead
       
    57                 if  callable(self._error_cb):
       
    58                     self._error_cb(e)
       
    59 
       
    60                 time.sleep(self.retry_wait)
       
    61         # Don't listen to auth error, since we can't reasonably reconnect
       
    62         # when we get one.
       
    63 
       
    64 
       
    65 
       
    66 def process_tweet(tweet, cursor, debug):
       
    67     print tweet
       
    68     cursor.execute("insert into tweet_tweet (json) values (:json);", {"json":unicode(tweet)});
       
    69 
       
    70 def main(username, password, track, curs, debug, reconnects):
       
    71 
       
    72     username = username or raw_input('Twitter username: ')
       
    73     password = password or getpass('Twitter password: ')
       
    74 
       
    75     track_list = track or raw_input('Keywords to track (comma seperated): ').strip()
       
    76     track_list = [k for k in track_list.split(',')]
       
    77 
       
    78     stream = ReconnectingTweetStream(username, password, track_list, reconnects=reconnects)
       
    79     try:
       
    80         for tweet in stream:
       
    81             process_tweet(tweet, curs, debug)
       
    82     finally:
       
    83         stream.close()
       
    84 
       
    85 if __name__ == '__main__':
       
    86     
       
    87     parser = OptionParser()
       
    88     parser.add_option("-f", "--file", dest="filename",  
       
    89                       help="write tweet to FILE", metavar="FILE", default="enmi2010_twitter.db")
       
    90     parser.add_option("-u", "--user", dest="username",
       
    91                       help="Twitter user", metavar="USER", default=None)
       
    92     parser.add_option("-w", "--password", dest="password",
       
    93                       help="Twitter password", metavar="PASSWORD", default=None)
       
    94     parser.add_option("-t", "--track", dest="track",
       
    95                       help="Twitter track", metavar="TRACK")
       
    96     parser.add_option("-n", "--new", dest="new", action="store_true",
       
    97                       help="new database", default=False)
       
    98     parser.add_option("-d", "--debug", dest="debug", action="store_true",
       
    99                       help="debug", default=False)
       
   100     parser.add_option("-r", "--reconnects", dest="reconnects",
       
   101                       help="Reconnects", metavar="RECONNECTS", default=10, type='int')
       
   102 
       
   103 
       
   104     (options, args) = parser.parse_args()
       
   105     
       
   106     if options.debug:
       
   107         print "OPTIONS : "
       
   108         print repr(options)
       
   109     
       
   110     if options.new and os.path.exists(options.filename):
       
   111         os.remove(options.filename)
       
   112     
       
   113     conn = connect(options.filename)
       
   114     try:
       
   115         conn.row_factory = Row
       
   116         curs = conn.cursor()
       
   117     
       
   118         curs.execute("create table if not exists tweet_tweet (json);")
       
   119     
       
   120         try:
       
   121             main(options.username, options.password, options.track, curs, options.debug, options.reconnects)
       
   122         except KeyboardInterrupt:
       
   123             print '\nGoodbye!'
       
   124     finally:
       
   125         conn.close()