1 from getpass import getpass |
1 from getpass import getpass |
2 from iri_tweet import models, utils |
2 from iri_tweet import models, utils |
|
3 from iri_tweet.models import TweetSource, TweetLog |
|
4 from multiprocessing import Queue, JoinableQueue, Process, Event, get_logger |
3 from optparse import OptionParser |
5 from optparse import OptionParser |
4 from sqlalchemy.orm import sessionmaker |
6 from sqlalchemy.orm import sessionmaker |
5 from sqlite3 import * |
7 import StringIO |
|
8 import logging |
|
9 import anyjson |
6 import datetime |
10 import datetime |
7 import logging |
|
8 import os |
11 import os |
|
12 import shutil |
|
13 import signal |
9 import socket |
14 import socket |
10 import sys |
15 import sys |
11 import time |
16 import time |
|
17 import traceback |
|
18 import tweepy.auth |
12 import tweetstream |
19 import tweetstream |
13 import tweepy.auth |
20 from iri_tweet.utils import logger |
|
21 from sqlalchemy.exc import OperationalError |
14 socket._fileobject.default_bufsize = 0 |
22 socket._fileobject.default_bufsize = 0 |
15 |
23 |
16 |
24 |
17 |
25 |
18 #columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user'] |
26 #columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user'] |
70 # Don't listen to auth error, since we can't reasonably reconnect |
78 # Don't listen to auth error, since we can't reasonably reconnect |
71 # when we get one. |
79 # when we get one. |
72 |
80 |
73 |
81 |
74 |
82 |
75 def process_tweet(tweet, session, debug, token_filename): |
83 class SourceProcess(Process): |
76 screen_name = "" |
84 |
77 if 'user' in tweet and 'screen_name' in tweet['user']: |
85 def __init__(self, session_maker, queue, auth, track, debug, reconnects, token_filename, stop_event): |
78 screen_name = tweet['user']['screen_name'] |
86 self.session_maker = session_maker |
79 logging.info("Process_tweet from %s : %s" % (screen_name, tweet['text'])) |
87 self.queue = queue |
80 logging.debug("Process_tweet :" + repr(tweet)) |
88 self.auth = auth |
81 processor = utils.TwitterProcessor(tweet, None, session, token_filename) |
89 self.track = track |
82 processor.process() |
90 self.debug = debug |
83 |
91 self.reconnects = reconnects |
84 def main(username, password, track, session, debug, reconnects, token_filename, duration): |
92 self.token_filename = token_filename |
85 |
93 self.stop_event = stop_event |
86 #username = username or raw_input('Twitter username: ') |
94 super(SourceProcess, self).__init__() |
87 #password = password or getpass('Twitter password: ') |
95 # self.stop_event = |
88 |
96 |
89 track_list = track or raw_input('Keywords to track (comma seperated): ').strip() |
97 def run(self): |
90 track_list = [k for k in track_list.split(',')] |
98 |
91 |
99 get_logger().debug("SourceProcess : run") |
92 if username and password: |
100 track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip() |
93 auth = tweepy.auth.BasicAuthHandler(username, password) |
101 track_list = [k for k in track_list.split(',')] |
94 else: |
102 |
95 consumer_key = models.CONSUMER_KEY |
103 get_logger().debug("SourceProcess : before connecting to stream " + repr(track_list)) |
96 consumer_secret = models.CONSUMER_SECRET |
104 stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True) |
97 auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False) |
105 get_logger().debug("SourceProcess : after connecting to stream") |
98 auth.set_access_token(*(utils.get_oauth_token(token_filename))) |
106 stream.muststop = lambda: self.stop_event.is_set() |
99 |
107 |
100 if duration >= 0: |
108 session = self.session_maker() |
101 end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=duration) |
109 |
102 |
110 try: |
103 stream = ReconnectingTweetStream(auth, track_list, reconnects=reconnects) |
111 for tweet in stream: |
|
112 get_logger().debug("tweet " + repr(tweet)) |
|
113 source = TweetSource(original_json=tweet) |
|
114 get_logger().debug("source created") |
|
115 add_retries = 0 |
|
116 while add_retries < 10: |
|
117 try: |
|
118 add_retries += 1 |
|
119 session.add(source) |
|
120 session.flush() |
|
121 break |
|
122 except OperationalError as e: |
|
123 session.rollback() |
|
124 get_logger().debug("Operational Error %s nb %d" % (repr(e), add_retries)) |
|
125 if add_retries==10: |
|
126 raise e |
|
127 |
|
128 source_id = source.id |
|
129 get_logger().debug("before queue + source id " + repr(source_id)) |
|
130 self.queue.put((source_id, tweet), False) |
|
131 #process_tweet(tweet, source_id, self.session, self.debug, self.token_filename) |
|
132 get_logger().info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime))) |
|
133 session.commit() |
|
134 # if duration >= 0 and datetime.datetime.utcnow() >= end_ts: |
|
135 # print "Stop recording after %d seconds." % (duration) |
|
136 # break |
|
137 except Exception as e: |
|
138 get_logger().error("Error when processing tweet " + repr(e)) |
|
139 finally: |
|
140 session.rollback() |
|
141 stream.close() |
|
142 session.close() |
|
143 self.queue.close() |
|
144 self.stop_event.set() |
|
145 |
|
146 |
|
147 def process_tweet(tweet, source_id, session, token_filename): |
104 try: |
148 try: |
105 for tweet in stream: |
149 tweet_obj = anyjson.deserialize(tweet) |
106 if duration >= 0 and datetime.datetime.utcnow() >= end_ts: |
150 screen_name = "" |
107 print "Stop recording after %d seconds." % (duration) |
151 if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']: |
108 break |
152 screen_name = tweet_obj['user']['screen_name'] |
109 process_tweet(tweet, session, debug, token_filename) |
153 get_logger().info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text'])) |
110 logging.info("Tweet count: %d - current rate : %.2f - running : %s"%(stream.count, stream.rate, int(time.time()-stream.starttime))) |
154 get_logger().debug(u"Process_tweet :" + repr(tweet)) |
111 session.commit() |
155 processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, token_filename) |
112 finally: |
156 processor.process() |
113 stream.close() |
157 except Exception as e: |
|
158 message = u"Error %s processing tweet %s" % (repr(e), tweet) |
|
159 get_logger().error(message) |
|
160 output = StringIO.StringIO() |
|
161 traceback.print_exception(Exception, e, None, None, output) |
|
162 error_stack = output.getvalue() |
|
163 output.close() |
|
164 session.rollback() |
|
165 tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack) |
|
166 session.add(tweet_log) |
|
167 session.commit() |
|
168 |
|
169 |
|
170 |
|
171 class TweetProcess(Process): |
|
172 |
|
173 def __init__(self, session_maker, queue, debug, token_filename, stop_event): |
|
174 self.session_maker = session_maker |
|
175 self.queue = queue |
|
176 self.debug = debug |
|
177 self.stop_event = stop_event |
|
178 self.token_filename = token_filename |
|
179 super(TweetProcess, self).__init__() |
|
180 |
|
181 |
|
182 def run(self): |
|
183 |
|
184 session = self.session_maker() |
|
185 try: |
|
186 while not self.stop_event.is_set(): |
|
187 try: |
|
188 source_id, tweet_txt = queue.get(True, 10) |
|
189 get_logger().debug("Processing source id " + repr(source_id)) |
|
190 except Exception as e: |
|
191 get_logger().debug('Process tweet exception in loop : ' + repr(e)) |
|
192 continue |
|
193 process_tweet(tweet_txt, source_id, session, self.token_filename) |
|
194 session.commit() |
|
195 except: |
|
196 raise |
|
197 finally: |
|
198 session.rollback() |
|
199 self.stop_event.set() |
|
200 session.close() |
|
201 |
|
202 def process_leftovers(session, token_filename): |
|
203 |
|
204 sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None) |
|
205 |
|
206 for src in sources: |
|
207 tweet_txt = src.original_json |
|
208 process_tweet(tweet_txt, src.id, session, token_filename) |
|
209 |
|
210 |
|
211 |
|
212 #get tweet source that do not match any message |
|
213 #select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull; |
|
214 |
114 |
215 |
115 def get_options(): |
216 def get_options(): |
116 parser = OptionParser() |
217 parser = OptionParser() |
117 parser.add_option("-f", "--file", dest="filename", |
218 parser.add_option("-f", "--file", dest="filename", |
118 help="write tweet to FILE", metavar="FILE", default="enmi2010_twitter.db") |
219 help="write tweet to FILE", metavar="FILE", default="enmi2010_twitter.db") |
128 help="Reconnects", metavar="RECONNECTS", default=10, type='int') |
229 help="Reconnects", metavar="RECONNECTS", default=10, type='int') |
129 parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token", |
230 parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token", |
130 help="Token file name") |
231 help="Token file name") |
131 parser.add_option("-d", "--duration", dest="duration", |
232 parser.add_option("-d", "--duration", dest="duration", |
132 help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int') |
233 help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int') |
|
234 parser.add_option("-N", "--consumer", dest="consumer_nb", |
|
235 help="number of consumer", metavar="CONSUMER", default=1, type='int') |
|
236 |
133 |
237 |
134 |
238 |
135 utils.set_logging_options(parser) |
239 utils.set_logging_options(parser) |
136 |
240 |
137 return parser.parse_args() |
241 return parser.parse_args() |
138 |
242 |
139 |
243 |
140 if __name__ == '__main__': |
244 if __name__ == '__main__': |
141 |
245 |
142 |
|
143 (options, args) = get_options() |
246 (options, args) = get_options() |
144 |
247 |
145 utils.set_logging(options) |
248 utils.set_logging(options, get_logger()) |
146 |
249 |
147 if options.debug: |
250 if options.debug: |
148 print "OPTIONS : " |
251 print "OPTIONS : " |
149 print repr(options) |
252 print repr(options) |
150 |
253 |
151 if options.new and os.path.exists(options.filename): |
254 if options.new and os.path.exists(options.filename): |
152 os.remove(options.filename) |
255 i = 1 |
153 |
256 basename, extension = os.path.splitext(options.filename) |
154 engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug>=2)) |
257 new_path = '%s.%d%s' % (basename, i, extension) |
|
258 while i < 1000000 and os.path.exists(new_path): |
|
259 i += 1 |
|
260 new_path = '%s.%d%s' % (basename, i, extension) |
|
261 if i >= 1000000: |
|
262 raise Exception("Unable to find new filename for " + options.filename) |
|
263 else: |
|
264 shutil.move(options.filename, new_path) |
|
265 |
|
266 |
|
267 queue = JoinableQueue() |
|
268 stop_event = Event() |
|
269 |
|
270 if options.username and options.password: |
|
271 auth = tweepy.auth.BasicAuthHandler(options.username, options.password) |
|
272 else: |
|
273 consumer_key = models.CONSUMER_KEY |
|
274 consumer_secret = models.CONSUMER_SECRET |
|
275 auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False) |
|
276 auth.set_access_token(*(utils.get_oauth_token(options.token_filename))) |
|
277 |
|
278 |
|
279 engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2)) |
155 Session = sessionmaker(bind=engine) |
280 Session = sessionmaker(bind=engine) |
|
281 |
156 session = Session() |
282 session = Session() |
157 |
283 process_leftovers(session, options.token_filename) |
158 try: |
284 session.commit() |
159 try: |
285 session.close() |
160 main(options.username, options.password, options.track, session, options.debug, options.reconnects, options.token_filename, options.duration) |
286 |
161 except KeyboardInterrupt: |
287 sprocess = SourceProcess(Session, queue, auth, options.track, options.debug, options.reconnects, options.token_filename, stop_event) |
162 print '\nGoodbye!' |
288 |
163 session.commit() |
289 tweet_processes = [] |
164 finally: |
290 |
165 session.close() |
291 for i in range(options.consumer_nb): |
|
292 engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2)) |
|
293 Session = sessionmaker(bind=engine) |
|
294 cprocess = TweetProcess(Session, queue, options.debug, options.token_filename, stop_event) |
|
295 tweet_processes.append(cprocess) |
|
296 |
|
297 def interupt_handler(signum, frame): |
|
298 stop_event.set() |
|
299 |
|
300 signal.signal(signal.SIGINT, interupt_handler) |
|
301 |
|
302 sprocess.start() |
|
303 for cprocess in tweet_processes: |
|
304 cprocess.start() |
|
305 |
|
306 if options.duration >= 0: |
|
307 end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration) |
|
308 |
|
309 while not stop_event.is_set(): |
|
310 if options.duration >= 0 and datetime.datetime.utcnow() >= end_ts: |
|
311 stop_event.set() |
|
312 break |
|
313 if sprocess.is_alive(): |
|
314 time.sleep(0.1) |
|
315 else: |
|
316 break |
|
317 |
|
318 get_logger().debug("Joining Source Process") |
|
319 sprocess.join() |
|
320 get_logger().debug("Joining Queue") |
|
321 #queue.join() |
|
322 for i,cprocess in enumerate(tweet_processes): |
|
323 get_logger().debug("Joining consumer process Nb %d" % (i+1)) |
|
324 cprocess.join() |
|
325 |
|
326 get_logger().debug("Processing leftovers") |
|
327 session = Session() |
|
328 process_leftovers(session, options.token_filename) |
|
329 session.commit() |
|
330 session.close() |
|
331 |
|
332 get_logger().debug("Done. Exiting.") |
|
333 |