| changeset 693 | 2ef837069108 |
| parent 528 | 7fb5a7b0d35c |
| child 737 | 8f6a4d6dfe14 |
| 692:51072e5e6ea9 | 693:2ef837069108 |
|---|---|
21 import sys |
21 import sys |
22 import threading |
22 import threading |
23 import time |
23 import time |
24 import traceback |
24 import traceback |
25 import tweepy.auth |
25 import tweepy.auth |
26 import tweetstream |
26 import iri_tweet.stream as tweetstream |
27 import urllib2 |
27 import urllib2 |
28 socket._fileobject.default_bufsize = 0 |
28 socket._fileobject.default_bufsize = 0 |
29 |
29 |
30 |
30 |
31 |
31 |
33 columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source'] |
33 columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source'] |
34 #columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following'] |
34 #columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following'] |
35 columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count'] |
35 columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count'] |
36 #just put it in a sqlite3 tqble |
36 #just put it in a sqlite3 tqble |
37 |
37 |
38 DEFAULT_TIMEOUT = 5 |
|
38 |
39 |
39 def set_logging(options): |
40 def set_logging(options): |
40 loggers = [] |
41 loggers = [] |
41 |
42 |
42 loggers.append(utils.set_logging(options, logging.getLogger('iri.tweet'))) |
43 loggers.append(utils.set_logging(options, logging.getLogger('iri.tweet'))) |
122 def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): |
123 def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): |
123 self.track = options.track |
124 self.track = options.track |
124 self.token_filename = options.token_filename |
125 self.token_filename = options.token_filename |
125 self.catchup = options.catchup |
126 self.catchup = options.catchup |
126 self.timeout = options.timeout |
127 self.timeout = options.timeout |
128 self.stream = None |
|
127 super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid) |
129 super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid) |
128 |
130 |
129 def do_run(self): |
131 def __source_stream_iter(self): |
130 |
132 |
131 #import pydevd |
|
132 #pydevd.settrace(suspend=True) |
|
133 |
|
134 self.logger = set_logging_process(self.options, self.logger_queue) |
133 self.logger = set_logging_process(self.options, self.logger_queue) |
134 self.logger.debug("SourceProcess : run ") |
|
135 |
|
135 self.auth = get_auth(self.options, self.access_token) |
136 self.auth = get_auth(self.options, self.access_token) |
136 |
137 self.logger.debug("SourceProcess : auth set ") |
137 self.logger.debug("SourceProcess : run ") |
138 |
138 track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip() |
139 track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip() |
139 self.logger.debug("SourceProcess : track list " + track_list) |
140 self.logger.debug("SourceProcess : track list " + track_list) |
140 |
141 |
141 track_list = [k.strip() for k in track_list.split(',')] |
142 track_list = [k.strip() for k in track_list.split(',')] |
142 |
143 |
143 self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list)) |
144 self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list)) |
144 stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout) |
145 self.stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout, chunk_size=1) |
145 self.logger.debug("SourceProcess : after connecting to stream") |
146 self.logger.debug("SourceProcess : after connecting to stream") |
146 stream.muststop = lambda: self.stop_event.is_set() |
147 self.stream.muststop = lambda: self.stop_event.is_set() |
148 |
|
149 stream_wrapper = tweetstream.SafeStreamWrapper(self.stream, logger=self.logger) |
|
147 |
150 |
148 session = self.session_maker() |
151 session = self.session_maker() |
149 |
152 |
150 try: |
153 try: |
151 for tweet in stream: |
154 for tweet in stream_wrapper: |
152 if not self.parent_is_alive(): |
155 if not self.parent_is_alive(): |
156 self.stop_event.set() |
|
157 stop_thread.join(5) |
|
153 sys.exit() |
158 sys.exit() |
154 self.logger.debug("SourceProcess : tweet " + repr(tweet)) |
159 self.logger.debug("SourceProcess : tweet " + repr(tweet)) |
155 source = TweetSource(original_json=tweet) |
160 source = TweetSource(original_json=tweet) |
156 self.logger.debug("SourceProcess : source created") |
161 self.logger.debug("SourceProcess : source created") |
157 add_retries = 0 |
162 add_retries = 0 |
163 break |
168 break |
164 except OperationalError as e: |
169 except OperationalError as e: |
165 session.rollback() |
170 session.rollback() |
166 self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries)) |
171 self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries)) |
167 if add_retries == 10: |
172 if add_retries == 10: |
168 raise e |
173 raise |
169 |
174 |
170 source_id = source.id |
175 source_id = source.id |
171 self.logger.debug("SourceProcess : before queue + source id " + repr(source_id)) |
176 self.logger.debug("SourceProcess : before queue + source id " + repr(source_id)) |
172 self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime))) |
177 self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (self.stream.count, self.stream.rate, int(time.time() - self.stream.starttime))) |
173 session.commit() |
178 session.commit() |
174 self.queue.put((source_id, tweet), False) |
179 self.queue.put((source_id, tweet), False) |
175 |
180 |
176 except Exception as e: |
181 except Exception as e: |
177 self.logger.error("SourceProcess : Error when processing tweet " + repr(e)) |
182 self.logger.error("SourceProcess : Error when processing tweet " + repr(e)) |
183 raise |
|
178 finally: |
184 finally: |
179 session.rollback() |
185 session.rollback() |
180 stream.close() |
|
181 session.close() |
186 session.close() |
182 self.queue.close() |
187 with self.logger_queue.mutex: |
183 self.stop_event.set() |
188 self.logger_queue.clear() |
189 self.logger_queue.close() |
|
190 with self.queue.mutex: |
|
191 self.queue.clear() |
|
192 self.queue.close() |
|
193 self.stream.close() |
|
194 self.stream = None |
|
195 if not self.stop_event.is_set(): |
|
196 self.stop_event.set() |
|
197 |
|
198 |
|
199 def do_run(self): |
|
200 |
|
201 import pydevd |
|
202 pydevd.settrace(suspend=False) |
|
203 |
|
204 source_stream_iter_thread = threading.Thread(target=self.__source_stream_iter , name="SourceStreamIterThread") |
|
205 |
|
206 source_stream_iter_thread.start() |
|
207 |
|
208 while not self.stop_event.is_set(): |
|
209 self.stop_event.wait(DEFAULT_TIMEOUT) |
|
210 if self.stop_event.is_set() and self.stream: |
|
211 self.stream.close() |
|
212 elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive: |
|
213 with self.stop_event.mutex: |
|
214 self.stop_event.set() |
|
215 |
|
216 source_stream_iter_thread.join(30) |
|
184 |
217 |
185 |
218 |
186 def process_tweet(tweet, source_id, session, access_token, twitter_query_user, logger): |
219 def process_tweet(tweet, source_id, session, access_token, twitter_query_user, logger): |
187 try: |
220 try: |
221 if not tweet.strip(): |
|
222 return |
|
188 tweet_obj = anyjson.deserialize(tweet) |
223 tweet_obj = anyjson.deserialize(tweet) |
189 if 'text' not in tweet_obj: |
224 if 'text' not in tweet_obj: |
190 tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET']) |
225 tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET']) |
191 session.add(tweet_log) |
226 session.add(tweet_log) |
192 return |
227 return |
195 screen_name = tweet_obj['user']['screen_name'] |
230 screen_name = tweet_obj['user']['screen_name'] |
196 logger.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text'])) |
231 logger.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text'])) |
197 logger.debug(u"Process_tweet :" + repr(tweet)) |
232 logger.debug(u"Process_tweet :" + repr(tweet)) |
198 processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None, twitter_query_user) |
233 processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None, twitter_query_user) |
199 processor.process() |
234 processor.process() |
235 except ValueError as e: |
|
236 message = u"Value Error %s processing tweet %s" % (repr(e), tweet) |
|
237 output = StringIO.StringIO() |
|
238 try: |
|
239 traceback.print_exc(file=output) |
|
240 error_stack = output.getvalue() |
|
241 finally: |
|
242 output.close() |
|
243 tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'], error=message, error_stack=error_stack) |
|
244 session.add(tweet_log) |
|
245 session.commit() |
|
200 except Exception as e: |
246 except Exception as e: |
201 message = u"Error %s processing tweet %s" % (repr(e), tweet) |
247 message = u"Error %s processing tweet %s" % (repr(e), tweet) |
202 logger.exception(message) |
248 logger.exception(message) |
203 output = StringIO.StringIO() |
249 output = StringIO.StringIO() |
204 try: |
250 try: |
235 process_tweet(tweet_txt, source_id, session, self.access_token, self.twitter_query_user, self.logger) |
281 process_tweet(tweet_txt, source_id, session, self.access_token, self.twitter_query_user, self.logger) |
236 session.commit() |
282 session.commit() |
237 finally: |
283 finally: |
238 session.rollback() |
284 session.rollback() |
239 self.stop_event.set() |
285 self.stop_event.set() |
286 with self.logger_queue.mutex: |
|
287 self.logger_queue.clear() |
|
288 with self.queue.mutex: |
|
289 self.queue.clear() |
|
240 session.close() |
290 session.close() |
241 |
291 |
242 |
292 |
243 def get_sessionmaker(conn_str): |
293 def get_sessionmaker(conn_str): |
244 engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False) |
294 engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False) |
351 process_engines = [] |
401 process_engines = [] |
352 logger_queues = [] |
402 logger_queues = [] |
353 |
403 |
354 SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) |
404 SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) |
355 process_engines.append(engine_process) |
405 process_engines.append(engine_process) |
356 lqueue = mQueue(1) |
406 lqueue = mQueue(50) |
357 logger_queues.append(lqueue) |
407 logger_queues.append(lqueue) |
358 pid = os.getpid() |
408 pid = os.getpid() |
359 sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) |
409 sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) |
360 |
410 |
361 tweet_processes = [] |
411 tweet_processes = [] |
362 |
412 |
363 for i in range(options.process_nb - 1): |
413 for i in range(options.process_nb - 1): |
364 SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) |
414 SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) |
365 process_engines.append(engine_process) |
415 process_engines.append(engine_process) |
366 lqueue = mQueue(1) |
416 lqueue = mQueue(50) |
367 logger_queues.append(lqueue) |
417 logger_queues.append(lqueue) |
368 cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) |
418 cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) |
369 tweet_processes.append(cprocess) |
419 tweet_processes.append(cprocess) |
370 |
420 |
371 def interupt_handler(signum, frame): |
421 def interupt_handler(signum, frame): |