script/stream/recorder_tweetstream.py
changeset 11 54d7f1486ac4
parent 9 bb44692e09ee
child 15 5d552b6a0e55
equal deleted inserted replaced
10:eb885a117aa0 11:54d7f1486ac4
       
     1 from getpass import getpass
       
     2 from iri_tweet import models, utils
       
     3 from optparse import OptionParser
       
     4 from sqlalchemy.orm import sessionmaker
       
     5 from sqlite3 import *
       
     6 import logging
       
     7 import os
       
     8 import socket
       
     9 import sys
     1 import tweetstream
    10 import tweetstream
     2 from getpass import getpass
       
     3 import socket
       
     4 socket._fileobject.default_bufsize = 0
    11 socket._fileobject.default_bufsize = 0
     5 from sqlite3 import *
    12 
     6 from optparse import OptionParser
       
     7 import os
       
     8 
    13 
     9 
    14 
    10 #columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user']
    15 #columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user']
    11 columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source']
    16 columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source']
    12 #columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following']
    17 #columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following']
    61         # Don't listen to auth error, since we can't reasonably reconnect
    66         # Don't listen to auth error, since we can't reasonably reconnect
    62         # when we get one.
    67         # when we get one.
    63 
    68 
    64 
    69 
    65 
    70 
    66 def process_tweet(tweet, cursor, debug):
    71 def process_tweet(tweet, session, debug):
    67     print tweet
    72     
    68     cursor.execute("insert into tweet_tweet (json) values (:json);", {"json":unicode(tweet)});
    73     logging.debug("Process_tweet :" + repr(tweet))
       
    74     processor = utils.TwitterProcessor(tweet, None, session)
       
    75     processor.process()
    69 
    76 
    70 def main(username, password, track, curs, debug, reconnects):
    77 def main(username, password, track, session, debug, reconnects):
    71 
    78 
    72     username = username or raw_input('Twitter username: ')
    79     username = username or raw_input('Twitter username: ')
    73     password = password or getpass('Twitter password: ')
    80     password = password or getpass('Twitter password: ')
    74 
    81 
    75     track_list = track or raw_input('Keywords to track (comma seperated): ').strip()
    82     track_list = track or raw_input('Keywords to track (comma seperated): ').strip()
    76     track_list = [k for k in track_list.split(',')]
    83     track_list = [k for k in track_list.split(',')]
    77 
    84 
    78     stream = ReconnectingTweetStream(username, password, track_list, reconnects=reconnects)
    85     stream = ReconnectingTweetStream(username, password, track_list, reconnects=reconnects)
    79     try:
    86     try:
    80         for tweet in stream:
    87         for tweet in stream:
    81             process_tweet(tweet, curs, debug)
    88             process_tweet(tweet, session, debug)
       
    89             session.commit()
    82     finally:
    90     finally:
    83         stream.close()
    91         stream.close()
    84 
    92         
    85 if __name__ == '__main__':
    93 def get_options():
    86     
       
    87     parser = OptionParser()
    94     parser = OptionParser()
    88     parser.add_option("-f", "--file", dest="filename",  
    95     parser.add_option("-f", "--file", dest="filename",  
    89                       help="write tweet to FILE", metavar="FILE", default="enmi2010_twitter.db")
    96                       help="write tweet to FILE", metavar="FILE", default="enmi2010_twitter.db")
    90     parser.add_option("-u", "--user", dest="username",
    97     parser.add_option("-u", "--user", dest="username",
    91                       help="Twitter user", metavar="USER", default=None)
    98                       help="Twitter user", metavar="USER", default=None)
    93                       help="Twitter password", metavar="PASSWORD", default=None)
   100                       help="Twitter password", metavar="PASSWORD", default=None)
    94     parser.add_option("-t", "--track", dest="track",
   101     parser.add_option("-t", "--track", dest="track",
    95                       help="Twitter track", metavar="TRACK")
   102                       help="Twitter track", metavar="TRACK")
    96     parser.add_option("-n", "--new", dest="new", action="store_true",
   103     parser.add_option("-n", "--new", dest="new", action="store_true",
    97                       help="new database", default=False)
   104                       help="new database", default=False)
    98     parser.add_option("-d", "--debug", dest="debug", action="store_true",
       
    99                       help="debug", default=False)
       
   100     parser.add_option("-r", "--reconnects", dest="reconnects",
   105     parser.add_option("-r", "--reconnects", dest="reconnects",
   101                       help="Reconnects", metavar="RECONNECTS", default=10, type='int')
   106                       help="Reconnects", metavar="RECONNECTS", default=10, type='int')
       
   107     
       
   108     utils.set_logging_options(parser)
   102 
   109 
       
   110     return parser.parse_args()
       
   111     
   103 
   112 
   104     (options, args) = parser.parse_args()
   113 if __name__ == '__main__':
   105     
   114     
       
   115 
       
   116     (options, args) = get_options()
       
   117     
       
   118     utils.set_logging(options)
       
   119         
   106     if options.debug:
   120     if options.debug:
   107         print "OPTIONS : "
   121         print "OPTIONS : "
   108         print repr(options)
   122         print repr(options)
   109     
   123     
   110     if options.new and os.path.exists(options.filename):
   124     if options.new and os.path.exists(options.filename):
   111         os.remove(options.filename)
   125         os.remove(options.filename)
   112     
   126     
   113     conn = connect(options.filename)
   127     engine, metadata = models.setup_database('sqlite:///'+options.filename, echo=(options.debug))
       
   128     Session = sessionmaker(bind=engine)
       
   129     session = Session()
       
   130 
   114     try:
   131     try:
   115         conn.row_factory = Row
       
   116         curs = conn.cursor()
       
   117     
       
   118         curs.execute("create table if not exists tweet_tweet (json);")
       
   119     
       
   120         try:
   132         try:
   121             main(options.username, options.password, options.track, curs, options.debug, options.reconnects)
   133             main(options.username, options.password, options.track, session, options.debug, options.reconnects)
   122         except KeyboardInterrupt:
   134         except KeyboardInterrupt:
   123             print '\nGoodbye!'
   135             print '\nGoodbye!'
       
   136         session.commit()
   124     finally:
   137     finally:
   125         conn.close()
   138         session.close()