70 # Don't listen to auth error, since we can't reasonably reconnect |
77 # Don't listen to auth error, since we can't reasonably reconnect |
71 # when we get one. |
78 # when we get one. |
72 |
79 |
73 |
80 |
74 |
81 |
75 def process_tweet(tweet, session, debug, token_filename): |
82 class SourceProcess(Process): |
76 screen_name = "" |
83 |
77 if 'user' in tweet and 'screen_name' in tweet['user']: |
84 def __init__(self, session_maker, queue, auth, track, debug, reconnects, token_filename, stop_event): |
78 screen_name = tweet['user']['screen_name'] |
85 super(SourceProcess, self).__init__() |
79 logging.info("Process_tweet from %s : %s" % (screen_name, tweet['text'])) |
86 self.session_maker = session_maker |
80 logging.debug("Process_tweet :" + repr(tweet)) |
87 self.queue = queue |
81 processor = utils.TwitterProcessor(tweet, None, session, token_filename) |
88 self.auth = auth |
82 processor.process() |
89 self.track = track |
83 |
90 self.debug = debug |
84 def main(username, password, track, session, debug, reconnects, token_filename, duration): |
91 self.reconnects = reconnects |
|
92 self.token_filename = token_filename |
|
93 self.stop_event = stop_event |
|
94 # self.stop_event = |
|
95 |
|
96 def run(self): |
|
97 |
|
98 track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip() |
|
99 track_list = [k for k in track_list.split(',')] |
|
100 |
|
101 stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True) |
|
102 stream.muststop = lambda: self.stop_event.is_set() |
|
103 |
|
104 session = self.session_maker() |
|
105 |
|
106 try: |
|
107 for tweet in stream: |
|
108 source = TweetSource(original_json=tweet) |
|
109 session.add(source) |
|
110 session.flush() |
|
111 source_id = source.id |
|
112 queue.put((source_id, tweet), False) |
|
113 #process_tweet(tweet, source_id, self.session, self.debug, self.token_filename) |
|
114 logging.info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime))) |
|
115 session.commit() |
|
116 # if duration >= 0 and datetime.datetime.utcnow() >= end_ts: |
|
117 # print "Stop recording after %d seconds." % (duration) |
|
118 # break |
|
119 except: |
|
120 session.rollback() |
|
121 finally: |
|
122 stream.close() |
|
123 session.close() |
|
124 |
|
125 |
|
126 class TweetProcess(Process): |
|
127 |
|
128 def __init__(self, session_maker, queue, debug, token_filename, stop_event): |
|
129 super(TweetProcess, self).__init__() |
|
130 self.session_maker = session_maker |
|
131 self.queue = queue |
|
132 self.debug = debug |
|
133 self.stop_event = stop_event |
|
134 self.token_filename = token_filename |
|
135 |
|
136 def run(self): |
|
137 |
|
138 session = self.session_maker() |
|
139 try: |
|
140 while not self.stop_event.is_set(): |
|
141 try: |
|
142 source_id, tweet_txt = queue.get(True, 30) |
|
143 except: |
|
144 continue |
|
145 process_tweet(tweet_txt, source_id, session) |
|
146 session.commit() |
|
147 self.queue.task_done() |
|
148 except: |
|
149 session.rollback() |
|
150 raise |
|
151 finally: |
|
152 session.close() |
|
153 |
|
154 |
|
155 def process_tweet(tweet, source_id, session): |
|
156 |
|
157 try: |
|
158 tweet_obj = anyjson.deserialize(tweet) |
|
159 screen_name = "" |
|
160 if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']: |
|
161 screen_name = tweet_obj['user']['screen_name'] |
|
162 logging.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text'])) |
|
163 logging.debug(u"Process_tweet :" + repr(tweet)) |
|
164 processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, self.token_filename) |
|
165 processor.process() |
|
166 except Exception, e: |
|
167 message = u"Error %e processing tweet %s" % (unicode(e), tweet) |
|
168 logging.error(message) |
|
169 output = StringIO.StringIO() |
|
170 traceback.print_exception(Exception, e, None, None, output) |
|
171 tweet_log = TweetLog(status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=output.getvalue()) |
|
172 output.close() |
|
173 |
|
174 |
|
175 |
|
176 #def main_source(username, password, track, session, debug, reconnects, token_filename, duration): |
85 |
177 |
86 #username = username or raw_input('Twitter username: ') |
178 #username = username or raw_input('Twitter username: ') |
87 #password = password or getpass('Twitter password: ') |
179 #password = password or getpass('Twitter password: ') |
88 |
180 |
89 track_list = track or raw_input('Keywords to track (comma seperated): ').strip() |
181 # track_list = track or raw_input('Keywords to track (comma seperated): ').strip() |
90 track_list = [k for k in track_list.split(',')] |
182 # track_list = [k for k in track_list.split(',')] |
91 |
183 |
92 if username and password: |
184 # if username and password: |
93 auth = tweepy.auth.BasicAuthHandler(username, password) |
185 # auth = tweepy.auth.BasicAuthHandler(username, password) |
94 else: |
186 # else: |
95 consumer_key = models.CONSUMER_KEY |
187 # consumer_key = models.CONSUMER_KEY |
96 consumer_secret = models.CONSUMER_SECRET |
188 # consumer_secret = models.CONSUMER_SECRET |
97 auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False) |
189 # auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False) |
98 auth.set_access_token(*(utils.get_oauth_token(token_filename))) |
190 # auth.set_access_token(*(utils.get_oauth_token(token_filename))) |
99 |
191 |
100 if duration >= 0: |
192 # if duration >= 0: |
101 end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=duration) |
193 # end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=duration) |
102 |
194 |
103 stream = ReconnectingTweetStream(auth, track_list, reconnects=reconnects) |
195 # stream = ReconnectingTweetStream(auth, track_list, reconnects=reconnects, as_text=True) |
104 try: |
196 # try: |
105 for tweet in stream: |
197 # for tweet in stream: |
106 if duration >= 0 and datetime.datetime.utcnow() >= end_ts: |
198 # source = TweetSource(original_json=tweet) |
107 print "Stop recording after %d seconds." % (duration) |
199 # session.add(source) |
108 break |
200 # session.flush() |
109 process_tweet(tweet, session, debug, token_filename) |
201 # source_id = source.id |
110 logging.info("Tweet count: %d - current rate : %.2f - running : %s"%(stream.count, stream.rate, int(time.time()-stream.starttime))) |
202 # process_tweet(tweet, source_id, session, debug, token_filename) |
111 session.commit() |
203 # logging.info("Tweet count: %d - current rate : %.2f - running : %s"%(stream.count, stream.rate, int(time.time()-stream.starttime))) |
112 finally: |
204 # session.commit() |
113 stream.close() |
205 # if duration >= 0 and datetime.datetime.utcnow() >= end_ts: |
|
206 # print "Stop recording after %d seconds." % (duration) |
|
207 # break |
|
208 # finally: |
|
209 # stream.close() |
114 |
210 |
115 def get_options(): |
211 def get_options(): |
116 parser = OptionParser() |
212 parser = OptionParser() |
117 parser.add_option("-f", "--file", dest="filename", |
213 parser.add_option("-f", "--file", dest="filename", |
118 help="write tweet to FILE", metavar="FILE", default="enmi2010_twitter.db") |
214 help="write tweet to FILE", metavar="FILE", default="enmi2010_twitter.db") |
128 help="Reconnects", metavar="RECONNECTS", default=10, type='int') |
224 help="Reconnects", metavar="RECONNECTS", default=10, type='int') |
129 parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token", |
225 parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token", |
130 help="Token file name") |
226 help="Token file name") |
131 parser.add_option("-d", "--duration", dest="duration", |
227 parser.add_option("-d", "--duration", dest="duration", |
132 help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int') |
228 help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int') |
|
229 parser.add_option("-N", "--consumer", dest="consumer_nb", |
|
230 help="number of consumer", metavar="CONSUMER", default=1, type='int') |
|
231 |
133 |
232 |
134 |
233 |
135 utils.set_logging_options(parser) |
234 utils.set_logging_options(parser) |
136 |
235 |
137 return parser.parse_args() |
236 return parser.parse_args() |
138 |
237 |
139 |
238 |
140 if __name__ == '__main__': |
239 if __name__ == '__main__': |
141 |
240 |
142 |
|
143 (options, args) = get_options() |
241 (options, args) = get_options() |
144 |
242 |
145 utils.set_logging(options) |
243 utils.set_logging(options) |
146 |
244 |
147 if options.debug: |
245 if options.debug: |
148 print "OPTIONS : " |
246 print "OPTIONS : " |
149 print repr(options) |
247 print repr(options) |
150 |
248 |
151 if options.new and os.path.exists(options.filename): |
249 if options.new and os.path.exists(options.filename): |
152 os.remove(options.filename) |
250 i = 1 |
153 |
251 basename, extension = os.path.splitext(options.filename) |
154 engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug>=2)) |
252 new_path = '$s.%d.%s' % (basename, i, extension) |
|
253 while i < 1000000 and os.path.exists(new_path): |
|
254 i += 1 |
|
255 new_path = '$s.%d.%s' % (basename, i, extension) |
|
256 if i >= 1000000: |
|
257 raise Exception("Unable to find new filename for " + options.filename) |
|
258 else: |
|
259 shutil.move(options.filename, new_path) |
|
260 |
|
261 |
|
262 engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2)) |
155 Session = sessionmaker(bind=engine) |
263 Session = sessionmaker(bind=engine) |
156 session = Session() |
264 queue = JoinableQueue() |
157 |
265 stop_event = Event() |
158 try: |
266 |
159 try: |
267 if options.username and options.password: |
160 main(options.username, options.password, options.track, session, options.debug, options.reconnects, options.token_filename, options.duration) |
268 auth = tweepy.auth.BasicAuthHandler(options.username, options.password) |
161 except KeyboardInterrupt: |
269 else: |
162 print '\nGoodbye!' |
270 consumer_key = models.CONSUMER_KEY |
163 session.commit() |
271 consumer_secret = models.CONSUMER_SECRET |
164 finally: |
272 auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False) |
165 session.close() |
273 auth.set_access_token(*(utils.get_oauth_token(options.token_filename))) |
|
274 |
|
275 |
|
276 sprocess = SourceProcess(Session, queue, auth, options.track, options.debug, options.reconnects, options.token_filename, stop_event) |
|
277 |
|
278 tweet_processes = [] |
|
279 |
|
280 for i in range(options.consumer_nb): |
|
281 cprocess = TweetProcess(Session, queue, options.debug, options.token_filename, stop_event) |
|
282 tweet_processes.append(cprocess) |
|
283 |
|
284 def interupt_handler(signum, frame): |
|
285 stop_event.set() |
|
286 |
|
287 signal.signal(signal.SIGINT, interupt_handler) |
|
288 |
|
289 sprocess.start() |
|
290 for cprocess in tweet_processes: |
|
291 cprocess.start() |
|
292 |
|
293 if options.duration >= 0: |
|
294 end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration) |
|
295 |
|
296 while not stop_event.is_set(): |
|
297 if options.duration >= 0 and datetime.datetime.utcnow() >= end_ts: |
|
298 stop_event.set() |
|
299 break |
|
300 if sprocess.is_alive(): |
|
301 time.sleep(0.1) |
|
302 else: |
|
303 break |
|
304 |
|
305 sprocess.join() |
|
306 queue.join() |
|
307 for cprocess in tweet_processes: |
|
308 cprocess.join() |