script/stream/recorder_tweetstream.py
changeset 9 bb44692e09ee
child 11 54d7f1486ac4
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/script/stream/recorder_tweetstream.py	Tue Jan 11 11:17:17 2011 +0100
@@ -0,0 +1,125 @@
+import tweetstream
+from getpass import getpass
+import socket
+socket._fileobject.default_bufsize = 0
+from sqlite3 import *
+from optparse import OptionParser
+import os
+
+
+#columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user']
+columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source']
+#columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following']
+columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count']
+#just put it in a sqlite3 tqble
+
+
+class ReconnectingTweetStream(tweetstream.TrackStream):
+    """TweetStream class that automatically tries to reconnect if the
+    connecting goes down. Reconnecting, and waiting for reconnecting, is
+    blocking.
+
+    :param username: See :TweetStream:
+
+    :param password: See :TweetStream:
+
+    :keyword url: See :TweetStream:
+
+    :keyword reconnects: Number of reconnects before a ConnectionError is
+        raised. Default is 3
+
+    :error_cb: Optional callable that will be called just before trying to
+        reconnect. The callback will be called with a single argument, the
+        exception that caused the reconnect attempt. Default is None
+
+    :retry_wait: Time to wait before reconnecting in seconds. Default is 5
+
+    """
+
+    def __init__(self, user, password, keywords, url="track", reconnects=3, error_cb=None, retry_wait=5, **kwargs):
+        self.max_reconnects = reconnects
+        self.retry_wait = retry_wait
+        self._reconnects = 0
+        self._error_cb = error_cb
+        super(ReconnectingTweetStream,self).__init__(user, password, keywords, url, **kwargs)
+
+    def next(self):
+        while True:
+            try:
+                return super(ReconnectingTweetStream,self).next()
+            except tweetstream.ConnectionError, e:
+                self._reconnects += 1
+                if self._reconnects > self.max_reconnects:
+                    raise ConnectionError("Too many retries")
+
+                # Note: error_cb is not called on the last error since we
+                # raise a ConnectionError instead
+                if  callable(self._error_cb):
+                    self._error_cb(e)
+
+                time.sleep(self.retry_wait)
+        # Don't listen to auth error, since we can't reasonably reconnect
+        # when we get one.
+
+
+
+def process_tweet(tweet, cursor, debug):
+    print tweet
+    cursor.execute("insert into tweet_tweet (json) values (:json);", {"json":unicode(tweet)});
+
+def main(username, password, track, curs, debug, reconnects):
+
+    username = username or raw_input('Twitter username: ')
+    password = password or getpass('Twitter password: ')
+
+    track_list = track or raw_input('Keywords to track (comma seperated): ').strip()
+    track_list = [k for k in track_list.split(',')]
+
+    stream = ReconnectingTweetStream(username, password, track_list, reconnects=reconnects)
+    try:
+        for tweet in stream:
+            process_tweet(tweet, curs, debug)
+    finally:
+        stream.close()
+
+if __name__ == '__main__':
+    
+    parser = OptionParser()
+    parser.add_option("-f", "--file", dest="filename",  
+                      help="write tweet to FILE", metavar="FILE", default="enmi2010_twitter.db")
+    parser.add_option("-u", "--user", dest="username",
+                      help="Twitter user", metavar="USER", default=None)
+    parser.add_option("-w", "--password", dest="password",
+                      help="Twitter password", metavar="PASSWORD", default=None)
+    parser.add_option("-t", "--track", dest="track",
+                      help="Twitter track", metavar="TRACK")
+    parser.add_option("-n", "--new", dest="new", action="store_true",
+                      help="new database", default=False)
+    parser.add_option("-d", "--debug", dest="debug", action="store_true",
+                      help="debug", default=False)
+    parser.add_option("-r", "--reconnects", dest="reconnects",
+                      help="Reconnects", metavar="RECONNECTS", default=10, type='int')
+
+
+    (options, args) = parser.parse_args()
+    
+    if options.debug:
+        print "OPTIONS : "
+        print repr(options)
+    
+    if options.new and os.path.exists(options.filename):
+        os.remove(options.filename)
+    
+    conn = connect(options.filename)
+    try:
+        conn.row_factory = Row
+        curs = conn.cursor()
+    
+        curs.execute("create table if not exists tweet_tweet (json);")
+    
+        try:
+            main(options.username, options.password, options.track, curs, options.debug, options.reconnects)
+        except KeyboardInterrupt:
+            print '\nGoodbye!'
+    finally:
+        conn.close()