45 def __init__(self, auth, keywords, url="track", reconnects=3, error_cb=None, retry_wait=5, **kwargs): |
46 def __init__(self, auth, keywords, url="track", reconnects=3, error_cb=None, retry_wait=5, **kwargs): |
46 self.max_reconnects = reconnects |
47 self.max_reconnects = reconnects |
47 self.retry_wait = retry_wait |
48 self.retry_wait = retry_wait |
48 self._reconnects = 0 |
49 self._reconnects = 0 |
49 self._error_cb = error_cb |
50 self._error_cb = error_cb |
50 super(ReconnectingTweetStream,self).__init__(auth, keywords, url, **kwargs) |
51 super(ReconnectingTweetStream, self).__init__(auth, keywords, url, **kwargs) |
51 |
52 |
52 def next(self): |
53 def next(self): |
53 while True: |
54 while True: |
54 try: |
55 try: |
55 return super(ReconnectingTweetStream,self).next() |
56 return super(ReconnectingTweetStream, self).next() |
56 except tweetstream.ConnectionError, e: |
57 except tweetstream.ConnectionError, e: |
57 logging.debug("connection error :" + str(e)) |
58 logging.debug("connection error :" + str(e)) |
58 self._reconnects += 1 |
59 self._reconnects += 1 |
59 if self._reconnects > self.max_reconnects: |
60 if self._reconnects > self.max_reconnects: |
60 raise tweetstream.ConnectionError("Too many retries") |
61 raise tweetstream.ConnectionError("Too many retries") |
72 |
73 |
73 def process_tweet(tweet, session, debug, token_filename): |
74 def process_tweet(tweet, session, debug, token_filename): |
74 screen_name = "" |
75 screen_name = "" |
75 if 'user' in tweet and 'screen_name' in tweet['user']: |
76 if 'user' in tweet and 'screen_name' in tweet['user']: |
76 screen_name = tweet['user']['screen_name'] |
77 screen_name = tweet['user']['screen_name'] |
77 logging.info("Process_tweet from %s : %s" % (screen_name,tweet['text'])) |
78 logging.info("Process_tweet from %s : %s" % (screen_name, tweet['text'])) |
78 logging.debug("Process_tweet :" + repr(tweet)) |
79 logging.debug("Process_tweet :" + repr(tweet)) |
79 processor = utils.TwitterProcessor(tweet, None, session, token_filename) |
80 processor = utils.TwitterProcessor(tweet, None, session, token_filename) |
80 processor.process() |
81 processor.process() |
81 |
82 |
82 def main(username, password, track, session, debug, reconnects, token_filename): |
83 def main(username, password, track, session, debug, reconnects, token_filename, duration): |
83 |
84 |
84 #username = username or raw_input('Twitter username: ') |
85 #username = username or raw_input('Twitter username: ') |
85 #password = password or getpass('Twitter password: ') |
86 #password = password or getpass('Twitter password: ') |
86 |
87 |
87 track_list = track or raw_input('Keywords to track (comma seperated): ').strip() |
88 track_list = track or raw_input('Keywords to track (comma seperated): ').strip() |
93 consumer_key = models.CONSUMER_KEY |
94 consumer_key = models.CONSUMER_KEY |
94 consumer_secret = models.CONSUMER_SECRET |
95 consumer_secret = models.CONSUMER_SECRET |
95 auth = tweetstream.auth.OAuthHandler(consumer_key, consumer_secret, secure=False) |
96 auth = tweetstream.auth.OAuthHandler(consumer_key, consumer_secret, secure=False) |
96 auth.set_access_token(*(utils.get_oauth_token(token_filename))) |
97 auth.set_access_token(*(utils.get_oauth_token(token_filename))) |
97 |
98 |
|
99 if duration >= 0: |
|
100 end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=duration) |
|
101 |
98 stream = ReconnectingTweetStream(auth, track_list, reconnects=reconnects) |
102 stream = ReconnectingTweetStream(auth, track_list, reconnects=reconnects) |
99 try: |
103 try: |
100 for tweet in stream: |
104 for tweet in stream: |
|
105 if duration >= 0 and datetime.datetime.utcnow() >= end_ts: |
|
106 print "Stop recording after %d seconds." % (duration) |
|
107 break |
101 process_tweet(tweet, session, debug, token_filename) |
108 process_tweet(tweet, session, debug, token_filename) |
102 session.commit() |
109 session.commit() |
103 finally: |
110 finally: |
104 stream.close() |
111 stream.close() |
105 |
112 |
106 def get_options(): |
113 def get_options(): |
107 parser = OptionParser() |
114 parser = OptionParser() |
108 parser.add_option("-f", "--file", dest="filename", |
115 parser.add_option("-f", "--file", dest="filename", |
109 help="write tweet to FILE", metavar="FILE", default="enmi2010_twitter.db") |
116 help="write tweet to FILE", metavar="FILE", default="enmi2010_twitter.db") |
110 parser.add_option("-u", "--user", dest="username", |
117 parser.add_option("-u", "--user", dest="username", |
111 help="Twitter user", metavar="USER", default=None) |
118 help="Twitter user", metavar="USER", default=None) |
112 parser.add_option("-w", "--password", dest="password", |
119 parser.add_option("-w", "--password", dest="password", |
113 help="Twitter password", metavar="PASSWORD", default=None) |
120 help="Twitter password", metavar="PASSWORD", default=None) |
117 help="new database", default=False) |
124 help="new database", default=False) |
118 parser.add_option("-r", "--reconnects", dest="reconnects", |
125 parser.add_option("-r", "--reconnects", dest="reconnects", |
119 help="Reconnects", metavar="RECONNECTS", default=10, type='int') |
126 help="Reconnects", metavar="RECONNECTS", default=10, type='int') |
120 parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token", |
127 parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token", |
121 help="Token file name") |
128 help="Token file name") |
|
129 parser.add_option("-d", "--duration", dest="duration", |
|
130 help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int') |
|
131 |
122 |
132 |
123 utils.set_logging_options(parser) |
133 utils.set_logging_options(parser) |
124 |
134 |
125 return parser.parse_args() |
135 return parser.parse_args() |
126 |
136 |
137 print repr(options) |
147 print repr(options) |
138 |
148 |
139 if options.new and os.path.exists(options.filename): |
149 if options.new and os.path.exists(options.filename): |
140 os.remove(options.filename) |
150 os.remove(options.filename) |
141 |
151 |
142 engine, metadata = models.setup_database('sqlite:///'+options.filename, echo=(options.debug)) |
152 engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug)) |
143 Session = sessionmaker(bind=engine) |
153 Session = sessionmaker(bind=engine) |
144 session = Session() |
154 session = Session() |
145 |
155 |
146 try: |
156 try: |
147 try: |
157 try: |
148 main(options.username, options.password, options.track, session, options.debug, options.reconnects, options.token_filename) |
158 main(options.username, options.password, options.track, session, options.debug, options.reconnects, options.token_filename, options.duration) |
149 except KeyboardInterrupt: |
159 except KeyboardInterrupt: |
150 print '\nGoodbye!' |
160 print '\nGoodbye!' |
151 session.commit() |
161 session.commit() |
152 finally: |
162 finally: |
153 session.close() |
163 session.close() |