1 from getpass import getpass |
1 from getpass import getpass |
2 from iri_tweet import models, utils |
2 from iri_tweet import models, utils |
3 from iri_tweet.models import TweetSource, TweetLog |
3 from iri_tweet.models import TweetSource, TweetLog |
4 from multiprocessing import Queue, JoinableQueue, Process, Event, get_logger |
4 from multiprocessing import Queue, JoinableQueue, Process, Event, get_logger |
5 from optparse import OptionParser |
5 from optparse import OptionParser |
6 from sqlalchemy.orm import sessionmaker |
6 from sqlalchemy.exc import OperationalError |
|
7 from sqlalchemy.orm import scoped_session, sessionmaker |
7 import StringIO |
8 import StringIO |
8 import logging |
|
9 import anyjson |
9 import anyjson |
10 import datetime |
10 import datetime |
|
11 import logging |
11 import os |
12 import os |
|
13 import re |
12 import shutil |
14 import shutil |
13 import signal |
15 import signal |
14 import socket |
16 import socket |
|
17 import sqlalchemy.schema |
15 import sys |
18 import sys |
16 import time |
19 import time |
17 import traceback |
20 import traceback |
18 import tweepy.auth |
21 import tweepy.auth |
19 import tweetstream |
22 import tweetstream |
20 from iri_tweet.utils import logger |
23 import urllib2 |
21 from sqlalchemy.exc import OperationalError |
24 #from iri_tweet.utils import get_logger |
22 socket._fileobject.default_bufsize = 0 |
25 socket._fileobject.default_bufsize = 0 |
23 |
26 |
24 |
27 |
25 |
28 |
26 #columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user'] |
29 #columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user'] |
27 columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source'] |
30 columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source'] |
28 #columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following'] |
31 #columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following'] |
29 columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count'] |
32 columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count'] |
30 #just put it in a sqlite3 tqble |
33 #just put it in a sqlite3 tqble |
|
34 |
|
35 |
|
36 def set_logging(options): |
|
37 utils.set_logging(options, logging.getLogger('iri_tweet')) |
|
38 utils.set_logging(options, logging.getLogger('multiprocessing')) |
|
39 if options.debug >= 2: |
|
40 utils.set_logging(options, logging.getLogger('sqlalchemy.engine')) |
|
41 #utils.set_logging(options, logging.getLogger('sqlalchemy.dialects')) |
|
42 #utils.set_logging(options, logging.getLogger('sqlalchemy.pool')) |
|
43 #utils.set_logging(options, logging.getLogger('sqlalchemy.orm')) |
|
44 |
|
45 def get_auth(options, access_token): |
|
46 if options.username and options.password: |
|
47 auth = tweepy.auth.BasicAuthHandler(options.username, options.password) |
|
48 else: |
|
49 consumer_key = models.CONSUMER_KEY |
|
50 consumer_secret = models.CONSUMER_SECRET |
|
51 auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False) |
|
52 auth.set_access_token(*access_token) |
|
53 return auth |
31 |
54 |
32 |
55 |
33 class ReconnectingTweetStream(tweetstream.FilterStream): |
56 class ReconnectingTweetStream(tweetstream.FilterStream): |
34 """TweetStream class that automatically tries to reconnect if the |
57 """TweetStream class that automatically tries to reconnect if the |
35 connecting goes down. Reconnecting, and waiting for reconnecting, is |
58 connecting goes down. Reconnecting, and waiting for reconnecting, is |
78 # Don't listen to auth error, since we can't reasonably reconnect |
102 # Don't listen to auth error, since we can't reasonably reconnect |
79 # when we get one. |
103 # when we get one. |
80 |
104 |
81 |
105 |
82 |
106 |
|
107 |
83 class SourceProcess(Process): |
108 class SourceProcess(Process): |
84 |
109 |
85 def __init__(self, session_maker, queue, auth, track, debug, reconnects, token_filename, stop_event): |
110 def __init__(self, session_maker, queue, options, access_token, stop_event): |
86 self.session_maker = session_maker |
111 self.session_maker = session_maker |
87 self.queue = queue |
112 self.queue = queue |
88 self.auth = auth |
113 self.track = options.track |
89 self.track = track |
114 self.reconnects = options.reconnects |
90 self.debug = debug |
115 self.token_filename = options.token_filename |
91 self.reconnects = reconnects |
|
92 self.token_filename = token_filename |
|
93 self.stop_event = stop_event |
116 self.stop_event = stop_event |
|
117 self.options = options |
|
118 self.access_token = access_token |
94 super(SourceProcess, self).__init__() |
119 super(SourceProcess, self).__init__() |
95 # self.stop_event = |
|
96 |
120 |
97 def run(self): |
121 def run(self): |
98 |
122 #import pydevd |
99 get_logger().debug("SourceProcess : run") |
123 #pydevd.settrace(suspend=False) |
|
124 |
|
125 set_logging(self.options) |
|
126 self.auth = get_auth(self.options, self.access_token) |
|
127 |
|
128 utils.get_logger().debug("SourceProcess : run") |
100 track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip() |
129 track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip() |
101 track_list = [k for k in track_list.split(',')] |
130 track_list = [k for k in track_list.split(',')] |
102 |
131 |
103 get_logger().debug("SourceProcess : before connecting to stream " + repr(track_list)) |
132 utils.get_logger().debug("SourceProcess : before connecting to stream " + repr(track_list)) |
104 stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True) |
133 stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True) |
105 get_logger().debug("SourceProcess : after connecting to stream") |
134 utils.get_logger().debug("SourceProcess : after connecting to stream") |
106 stream.muststop = lambda: self.stop_event.is_set() |
135 stream.muststop = lambda: self.stop_event.is_set() |
107 |
136 |
108 session = self.session_maker() |
137 session = self.session_maker() |
109 |
138 |
110 try: |
139 try: |
111 for tweet in stream: |
140 for tweet in stream: |
112 get_logger().debug("tweet " + repr(tweet)) |
141 utils.get_logger().debug("tweet " + repr(tweet)) |
113 source = TweetSource(original_json=tweet) |
142 source = TweetSource(original_json=tweet) |
114 get_logger().debug("source created") |
143 utils.get_logger().debug("source created") |
115 add_retries = 0 |
144 add_retries = 0 |
116 while add_retries < 10: |
145 while add_retries < 10: |
117 try: |
146 try: |
118 add_retries += 1 |
147 add_retries += 1 |
119 session.add(source) |
148 session.add(source) |
120 session.flush() |
149 session.flush() |
121 break |
150 break |
122 except OperationalError as e: |
151 except OperationalError as e: |
123 session.rollback() |
152 session.rollback() |
124 get_logger().debug("Operational Error %s nb %d" % (repr(e), add_retries)) |
153 utils.get_logger().debug("Operational Error %s nb %d" % (repr(e), add_retries)) |
125 if add_retries==10: |
154 if add_retries == 10: |
126 raise e |
155 raise e |
127 |
156 |
128 source_id = source.id |
157 source_id = source.id |
129 get_logger().debug("before queue + source id " + repr(source_id)) |
158 utils.get_logger().debug("before queue + source id " + repr(source_id)) |
|
159 utils.get_logger().info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime))) |
|
160 session.commit() |
130 self.queue.put((source_id, tweet), False) |
161 self.queue.put((source_id, tweet), False) |
131 #process_tweet(tweet, source_id, self.session, self.debug, self.token_filename) |
162 |
132 get_logger().info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime))) |
|
133 session.commit() |
|
134 # if duration >= 0 and datetime.datetime.utcnow() >= end_ts: |
|
135 # print "Stop recording after %d seconds." % (duration) |
|
136 # break |
|
137 except Exception as e: |
163 except Exception as e: |
138 get_logger().error("Error when processing tweet " + repr(e)) |
164 utils.get_logger().error("Error when processing tweet " + repr(e)) |
139 finally: |
165 finally: |
140 session.rollback() |
166 session.rollback() |
141 stream.close() |
167 stream.close() |
142 session.close() |
168 session.close() |
143 self.queue.close() |
169 self.queue.close() |
144 self.stop_event.set() |
170 self.stop_event.set() |
145 |
171 |
146 |
172 |
147 def process_tweet(tweet, source_id, session, token_filename): |
173 def process_tweet(tweet, source_id, session, access_token): |
148 try: |
174 try: |
149 tweet_obj = anyjson.deserialize(tweet) |
175 tweet_obj = anyjson.deserialize(tweet) |
150 screen_name = "" |
176 screen_name = "" |
151 if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']: |
177 if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']: |
152 screen_name = tweet_obj['user']['screen_name'] |
178 screen_name = tweet_obj['user']['screen_name'] |
153 get_logger().info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text'])) |
179 utils.get_logger().debug(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text'])) |
154 get_logger().debug(u"Process_tweet :" + repr(tweet)) |
180 utils.get_logger().debug(u"Process_tweet :" + repr(tweet)) |
155 processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, token_filename) |
181 processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None) |
156 processor.process() |
182 processor.process() |
157 except Exception as e: |
183 except Exception as e: |
158 message = u"Error %s processing tweet %s" % (repr(e), tweet) |
184 message = u"Error %s processing tweet %s" % (repr(e), tweet) |
159 get_logger().error(message) |
185 utils.get_logger().error(message) |
160 output = StringIO.StringIO() |
186 output = StringIO.StringIO() |
161 traceback.print_exception(Exception, e, None, None, output) |
187 traceback.print_exception(Exception, e, None, None, output) |
162 error_stack = output.getvalue() |
188 error_stack = output.getvalue() |
163 output.close() |
189 output.close() |
164 session.rollback() |
190 session.rollback() |
168 |
194 |
169 |
195 |
170 |
196 |
171 class TweetProcess(Process): |
197 class TweetProcess(Process): |
172 |
198 |
173 def __init__(self, session_maker, queue, debug, token_filename, stop_event): |
199 def __init__(self, session_maker, queue, options, access_token, stop_event): |
174 self.session_maker = session_maker |
200 self.session_maker = session_maker |
175 self.queue = queue |
201 self.queue = queue |
176 self.debug = debug |
|
177 self.stop_event = stop_event |
202 self.stop_event = stop_event |
178 self.token_filename = token_filename |
203 self.options = options |
|
204 self.access_token = access_token |
179 super(TweetProcess, self).__init__() |
205 super(TweetProcess, self).__init__() |
180 |
206 |
181 |
207 |
182 def run(self): |
208 def run(self): |
183 |
209 |
|
210 set_logging(self.options) |
184 session = self.session_maker() |
211 session = self.session_maker() |
185 try: |
212 try: |
186 while not self.stop_event.is_set(): |
213 while not self.stop_event.is_set(): |
187 try: |
214 try: |
188 source_id, tweet_txt = queue.get(True, 10) |
215 source_id, tweet_txt = queue.get(True, 3) |
189 get_logger().debug("Processing source id " + repr(source_id)) |
216 utils.get_logger().debug("Processing source id " + repr(source_id)) |
190 except Exception as e: |
217 except Exception as e: |
191 get_logger().debug('Process tweet exception in loop : ' + repr(e)) |
218 utils.get_logger().debug('Process tweet exception in loop : ' + repr(e)) |
192 continue |
219 continue |
193 process_tweet(tweet_txt, source_id, session, self.token_filename) |
220 process_tweet(tweet_txt, source_id, session, self.access_token) |
194 session.commit() |
221 session.commit() |
195 except: |
|
196 raise |
|
197 finally: |
222 finally: |
198 session.rollback() |
223 session.rollback() |
199 self.stop_event.set() |
224 self.stop_event.set() |
200 session.close() |
225 session.close() |
|
226 |
|
227 |
|
228 def get_sessionmaker(conn_str): |
|
229 engine, metadata = models.setup_database(conn_str, echo=False, create_all=False) |
|
230 Session = scoped_session(sessionmaker(bind=engine, autocommit=False)) |
|
231 return Session, engine, metadata |
|
232 |
201 |
233 |
202 def process_leftovers(session, token_filename): |
234 def process_leftovers(session, access_token): |
203 |
235 |
204 sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None) |
236 sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None) |
205 |
237 |
206 for src in sources: |
238 for src in sources: |
207 tweet_txt = src.original_json |
239 tweet_txt = src.original_json |
208 process_tweet(tweet_txt, src.id, session, token_filename) |
240 process_tweet(tweet_txt, src.id, session, access_token) |
|
241 session.commit() |
209 |
242 |
210 |
243 |
211 |
244 |
212 #get tweet source that do not match any message |
245 #get tweet source that do not match any message |
213 #select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull; |
246 #select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull; |
214 |
247 |
215 |
248 |
216 def get_options(): |
249 def get_options(): |
217 parser = OptionParser() |
250 parser = OptionParser() |
218 parser.add_option("-f", "--file", dest="filename", |
251 parser.add_option("-f", "--file", dest="conn_str", |
219 help="write tweet to FILE", metavar="FILE", default="enmi2010_twitter.db") |
252 help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db") |
220 parser.add_option("-u", "--user", dest="username", |
253 parser.add_option("-u", "--user", dest="username", |
221 help="Twitter user", metavar="USER", default=None) |
254 help="Twitter user", metavar="USER", default=None) |
222 parser.add_option("-w", "--password", dest="password", |
255 parser.add_option("-w", "--password", dest="password", |
223 help="Twitter password", metavar="PASSWORD", default=None) |
256 help="Twitter password", metavar="PASSWORD", default=None) |
224 parser.add_option("-T", "--track", dest="track", |
257 parser.add_option("-T", "--track", dest="track", |
229 help="Reconnects", metavar="RECONNECTS", default=10, type='int') |
262 help="Reconnects", metavar="RECONNECTS", default=10, type='int') |
230 parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token", |
263 parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token", |
231 help="Token file name") |
264 help="Token file name") |
232 parser.add_option("-d", "--duration", dest="duration", |
265 parser.add_option("-d", "--duration", dest="duration", |
233 help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int') |
266 help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int') |
234 parser.add_option("-N", "--consumer", dest="consumer_nb", |
267 parser.add_option("-N", "--nb-process", dest="process_nb", |
235 help="number of consumer", metavar="CONSUMER", default=1, type='int') |
268 help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int') |
236 |
269 |
237 |
270 |
238 |
271 |
239 utils.set_logging_options(parser) |
272 utils.set_logging_options(parser) |
240 |
273 |
243 |
276 |
244 if __name__ == '__main__': |
277 if __name__ == '__main__': |
245 |
278 |
246 (options, args) = get_options() |
279 (options, args) = get_options() |
247 |
280 |
248 utils.set_logging(options, get_logger()) |
281 set_logging(options) |
249 |
282 |
250 if options.debug: |
283 if options.debug: |
251 print "OPTIONS : " |
284 print "OPTIONS : " |
252 print repr(options) |
285 print repr(options) |
253 |
286 |
254 if options.new and os.path.exists(options.filename): |
287 |
255 i = 1 |
288 conn_str = options.conn_str.strip() |
256 basename, extension = os.path.splitext(options.filename) |
289 if not re.match("^\w+://.+", conn_str): |
257 new_path = '%s.%d%s' % (basename, i, extension) |
290 conn_str = 'sqlite://' + options.conn_str |
258 while i < 1000000 and os.path.exists(new_path): |
291 |
259 i += 1 |
292 if conn_str.startswith("sqlite") and options.new: |
|
293 filepath = conn_str[conn_str.find("://"):] |
|
294 if os.path.exists(filepath): |
|
295 i = 1 |
|
296 basename, extension = os.path.splitext(filepath) |
260 new_path = '%s.%d%s' % (basename, i, extension) |
297 new_path = '%s.%d%s' % (basename, i, extension) |
261 if i >= 1000000: |
298 while i < 1000000 and os.path.exists(new_path): |
262 raise Exception("Unable to find new filename for " + options.filename) |
299 i += 1 |
263 else: |
300 new_path = '%s.%d%s' % (basename, i, extension) |
264 shutil.move(options.filename, new_path) |
301 if i >= 1000000: |
265 |
302 raise Exception("Unable to find new filename for " + filepath) |
266 |
303 else: |
|
304 shutil.move(filepath, new_path) |
|
305 |
|
306 Session, engine, metadata = get_sessionmaker(conn_str) |
|
307 |
|
308 if options.new: |
|
309 check_metadata = sqlalchemy.schema.MetaData(bind=engine, reflect=True) |
|
310 if len(check_metadata.sorted_tables) > 0: |
|
311 message = "Database %s not empty exiting" % conn_str |
|
312 utils.get_logger().error(message) |
|
313 sys.exit(message) |
|
314 |
|
315 metadata.create_all(engine) |
|
316 |
|
317 access_token = None |
|
318 if not options.username or not options.password: |
|
319 access_token = utils.get_oauth_token(options.token_filename) |
|
320 |
|
321 session = Session() |
|
322 try: |
|
323 process_leftovers(session, access_token) |
|
324 session.commit() |
|
325 finally: |
|
326 session.rollback() |
|
327 session.close() |
|
328 |
|
329 if options.process_nb <= 0: |
|
330 utils.get_logger().debug("Leftovers processed. Exiting.") |
|
331 sys.exit() |
|
332 |
267 queue = JoinableQueue() |
333 queue = JoinableQueue() |
268 stop_event = Event() |
334 stop_event = Event() |
269 |
335 |
270 if options.username and options.password: |
336 #workaround for bug on using urllib2 and multiprocessing |
271 auth = tweepy.auth.BasicAuthHandler(options.username, options.password) |
337 req = urllib2.Request('http://localhost') |
272 else: |
338 conn = None |
273 consumer_key = models.CONSUMER_KEY |
339 try: |
274 consumer_secret = models.CONSUMER_SECRET |
340 conn = urllib2.urlopen(req) |
275 auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False) |
341 except: |
276 auth.set_access_token(*(utils.get_oauth_token(options.token_filename))) |
342 pass |
277 |
343 #donothing |
278 |
344 finally: |
279 engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2)) |
345 if conn is not None: |
280 Session = sessionmaker(bind=engine) |
346 conn.close() |
281 |
347 |
282 session = Session() |
348 |
283 process_leftovers(session, options.token_filename) |
349 sprocess = SourceProcess(Session, queue, options, access_token, stop_event) |
284 session.commit() |
|
285 session.close() |
|
286 |
|
287 sprocess = SourceProcess(Session, queue, auth, options.track, options.debug, options.reconnects, options.token_filename, stop_event) |
|
288 |
350 |
289 tweet_processes = [] |
351 tweet_processes = [] |
290 |
352 |
291 for i in range(options.consumer_nb): |
353 for i in range(options.process_nb - 1): |
292 engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2)) |
354 Session, engine, metadata = get_sessionmaker(conn_str) |
293 Session = sessionmaker(bind=engine) |
355 cprocess = TweetProcess(Session, queue, options, access_token, stop_event) |
294 cprocess = TweetProcess(Session, queue, options.debug, options.token_filename, stop_event) |
|
295 tweet_processes.append(cprocess) |
356 tweet_processes.append(cprocess) |
296 |
357 |
297 def interupt_handler(signum, frame): |
358 def interupt_handler(signum, frame): |
298 stop_event.set() |
359 stop_event.set() |
299 |
360 |