diff -r b7f4b0554ef8 -r bb44692e09ee script/stream/recorder_tweetstream.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/script/stream/recorder_tweetstream.py Tue Jan 11 11:17:17 2011 +0100 @@ -0,0 +1,125 @@ +import tweetstream +from getpass import getpass +import socket +socket._fileobject.default_bufsize = 0 +from sqlite3 import * +from optparse import OptionParser +import os + + +#columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user'] +columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source'] +#columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following'] +columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count'] +#just put it in a sqlite3 tqble + + +class ReconnectingTweetStream(tweetstream.TrackStream): + """TweetStream class that automatically tries to reconnect if the + connecting goes down. Reconnecting, and waiting for reconnecting, is + blocking. + + :param username: See :TweetStream: + + :param password: See :TweetStream: + + :keyword url: See :TweetStream: + + :keyword reconnects: Number of reconnects before a ConnectionError is + raised. Default is 3 + + :error_cb: Optional callable that will be called just before trying to + reconnect. The callback will be called with a single argument, the + exception that caused the reconnect attempt. Default is None + + :retry_wait: Time to wait before reconnecting in seconds. Default is 5 + + """ + + def __init__(self, user, password, keywords, url="track", reconnects=3, error_cb=None, retry_wait=5, **kwargs): + self.max_reconnects = reconnects + self.retry_wait = retry_wait + self._reconnects = 0 + self._error_cb = error_cb + super(ReconnectingTweetStream,self).__init__(user, password, keywords, url, **kwargs) + + def next(self): + while True: + try: + return super(ReconnectingTweetStream,self).next() + except tweetstream.ConnectionError, e: + self._reconnects += 1 + if self._reconnects > self.max_reconnects: + raise ConnectionError("Too many retries") + + # Note: error_cb is not called on the last error since we + # raise a ConnectionError instead + if callable(self._error_cb): + self._error_cb(e) + + time.sleep(self.retry_wait) + # Don't listen to auth error, since we can't reasonably reconnect + # when we get one. + + + +def process_tweet(tweet, cursor, debug): + print tweet + cursor.execute("insert into tweet_tweet (json) values (:json);", {"json":unicode(tweet)}); + +def main(username, password, track, curs, debug, reconnects): + + username = username or raw_input('Twitter username: ') + password = password or getpass('Twitter password: ') + + track_list = track or raw_input('Keywords to track (comma seperated): ').strip() + track_list = [k for k in track_list.split(',')] + + stream = ReconnectingTweetStream(username, password, track_list, reconnects=reconnects) + try: + for tweet in stream: + process_tweet(tweet, curs, debug) + finally: + stream.close() + +if __name__ == '__main__': + + parser = OptionParser() + parser.add_option("-f", "--file", dest="filename", + help="write tweet to FILE", metavar="FILE", default="enmi2010_twitter.db") + parser.add_option("-u", "--user", dest="username", + help="Twitter user", metavar="USER", default=None) + parser.add_option("-w", "--password", dest="password", + help="Twitter password", metavar="PASSWORD", default=None) + parser.add_option("-t", "--track", dest="track", + help="Twitter track", metavar="TRACK") + parser.add_option("-n", "--new", dest="new", action="store_true", + help="new database", default=False) + parser.add_option("-d", "--debug", dest="debug", action="store_true", + help="debug", default=False) + parser.add_option("-r", "--reconnects", dest="reconnects", + help="Reconnects", metavar="RECONNECTS", default=10, type='int') + + + (options, args) = parser.parse_args() + + if options.debug: + print "OPTIONS : " + print repr(options) + + if options.new and os.path.exists(options.filename): + os.remove(options.filename) + + conn = connect(options.filename) + try: + conn.row_factory = Row + curs = conn.cursor() + + curs.execute("create table if not exists tweet_tweet (json);") + + try: + main(options.username, options.password, options.track, curs, options.debug, options.reconnects) + except KeyboardInterrupt: + print '\nGoodbye!' + finally: + conn.close()