1 from getpass import getpass |
1 from getpass import getpass |
2 from iri_tweet import models, utils |
2 from iri_tweet import models, utils |
3 from iri_tweet.models import TweetSource, TweetLog |
3 from iri_tweet.models import TweetSource, TweetLog, ProcessEvent |
4 from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event, |
4 from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event, |
5 get_logger) |
5 get_logger) |
6 from optparse import OptionParser |
6 from optparse import OptionParser |
7 from sqlalchemy.exc import OperationalError |
7 from sqlalchemy.exc import OperationalError |
8 from sqlalchemy.orm import scoped_session, sessionmaker |
8 from sqlalchemy.orm import scoped_session, sessionmaker |
9 import Queue |
9 import Queue |
10 import StringIO |
10 import StringIO |
11 import anyjson |
11 import anyjson |
12 import datetime |
12 import datetime |
|
13 import inspect |
13 import logging |
14 import logging |
14 import os |
15 import os |
15 import re |
16 import re |
16 import shutil |
17 import shutil |
17 import signal |
18 import signal |
109 time.sleep(self.retry_wait) |
110 time.sleep(self.retry_wait) |
110 # Don't listen to auth error, since we can't reasonably reconnect |
111 # Don't listen to auth error, since we can't reasonably reconnect |
111 # when we get one. |
112 # when we get one. |
112 |
113 |
113 |
114 |
114 |
115 class BaseProcess(Process): |
115 |
116 |
116 class SourceProcess(Process): |
117 def __init__(self, parent_pid): |
117 |
118 self.parent_pid = parent_pid |
118 def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue): |
119 super(BaseProcess, self).__init__() |
|
120 |
|
121 # |
|
122 # from http://stackoverflow.com/questions/2542610/python-daemon-doesnt-kill-its-kids |
|
123 # |
|
124 def parent_is_alive(self): |
|
125 try: |
|
126 # try to call Parent |
|
127 os.kill(self.parent_pid, 0) |
|
128 except OSError: |
|
129 # *beeep* oh no! The phone's disconnected! |
|
130 return False |
|
131 else: |
|
132 # *ring* Hi mom! |
|
133 return True |
|
134 |
|
135 |
|
136 class SourceProcess(BaseProcess): |
|
137 |
|
138 def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): |
119 self.session_maker = session_maker |
139 self.session_maker = session_maker |
120 self.queue = queue |
140 self.queue = queue |
121 self.track = options.track |
141 self.track = options.track |
122 self.reconnects = options.reconnects |
142 self.reconnects = options.reconnects |
123 self.token_filename = options.token_filename |
143 self.token_filename = options.token_filename |
124 self.stop_event = stop_event |
144 self.stop_event = stop_event |
125 self.options = options |
145 self.options = options |
126 self.access_token = access_token |
146 self.access_token = access_token |
127 self.logger_queue = logger_queue |
147 self.logger_queue = logger_queue |
128 super(SourceProcess, self).__init__() |
148 super(SourceProcess, self).__init__(parent_pid) |
129 |
149 |
130 def run(self): |
150 def run(self): |
131 |
151 |
132 #import pydevd |
152 #import pydevd |
133 #pydevd.settrace(suspend=False) |
153 #pydevd.settrace(suspend=False) |
134 |
154 |
146 |
166 |
147 session = self.session_maker() |
167 session = self.session_maker() |
148 |
168 |
149 try: |
169 try: |
150 for tweet in stream: |
170 for tweet in stream: |
151 self.logger.debug("tweet " + repr(tweet)) |
171 if not self.parent_is_alive(): |
|
172 sys.exit() |
|
173 self.logger.debug("SourceProcess : tweet " + repr(tweet)) |
152 source = TweetSource(original_json=tweet) |
174 source = TweetSource(original_json=tweet) |
153 self.logger.debug("source created") |
175 self.logger.debug("SourceProcess : source created") |
154 add_retries = 0 |
176 add_retries = 0 |
155 while add_retries < 10: |
177 while add_retries < 10: |
156 try: |
178 try: |
157 add_retries += 1 |
179 add_retries += 1 |
158 session.add(source) |
180 session.add(source) |
159 session.flush() |
181 session.flush() |
160 break |
182 break |
161 except OperationalError as e: |
183 except OperationalError as e: |
162 session.rollback() |
184 session.rollback() |
163 self.logger.debug("Operational Error %s nb %d" % (repr(e), add_retries)) |
185 self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries)) |
164 if add_retries == 10: |
186 if add_retries == 10: |
165 raise e |
187 raise e |
166 |
188 |
167 source_id = source.id |
189 source_id = source.id |
168 self.logger.debug("before queue + source id " + repr(source_id)) |
190 self.logger.debug("SourceProcess : before queue + source id " + repr(source_id)) |
169 self.logger.info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime))) |
191 self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime))) |
170 session.commit() |
192 session.commit() |
171 self.queue.put((source_id, tweet), False) |
193 self.queue.put((source_id, tweet), False) |
172 |
194 |
173 except Exception as e: |
195 except Exception as e: |
174 self.logger.error("Error when processing tweet " + repr(e)) |
196 self.logger.error("SourceProcess : Error when processing tweet " + repr(e)) |
175 finally: |
197 finally: |
176 session.rollback() |
198 session.rollback() |
177 stream.close() |
199 stream.close() |
178 session.close() |
200 session.close() |
179 self.queue.close() |
201 self.queue.close() |
206 session.add(tweet_log) |
228 session.add(tweet_log) |
207 session.commit() |
229 session.commit() |
208 |
230 |
209 |
231 |
210 |
232 |
211 class TweetProcess(Process): |
233 class TweetProcess(BaseProcess): |
212 |
234 |
213 def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue): |
235 def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): |
214 self.session_maker = session_maker |
236 self.session_maker = session_maker |
215 self.queue = queue |
237 self.queue = queue |
216 self.stop_event = stop_event |
238 self.stop_event = stop_event |
217 self.options = options |
239 self.options = options |
218 self.access_token = access_token |
240 self.access_token = access_token |
219 self.logger_queue = logger_queue |
241 self.logger_queue = logger_queue |
220 super(TweetProcess, self).__init__() |
242 super(TweetProcess, self).__init__(parent_pid) |
221 |
243 |
222 |
244 |
223 def run(self): |
245 def run(self): |
224 |
246 |
225 self.logger = set_logging_process(self.options, self.logger_queue) |
247 self.logger = set_logging_process(self.options, self.logger_queue) |
226 session = self.session_maker() |
248 session = self.session_maker() |
227 try: |
249 try: |
228 while not self.stop_event.is_set(): |
250 while not self.stop_event.is_set() and self.parent_is_alive(): |
229 try: |
251 try: |
230 source_id, tweet_txt = queue.get(True, 3) |
252 source_id, tweet_txt = queue.get(True, 3) |
231 self.logger.debug("Processing source id " + repr(source_id)) |
253 self.logger.debug("Processing source id " + repr(source_id)) |
232 except Exception as e: |
254 except Exception as e: |
233 self.logger.debug('Process tweet exception in loop : ' + repr(e)) |
255 self.logger.debug('Process tweet exception in loop : ' + repr(e)) |
271 continue |
293 continue |
272 time.sleep(0.1) |
294 time.sleep(0.1) |
273 |
295 |
274 |
296 |
275 def get_options(): |
297 def get_options(): |
276 parser = OptionParser() |
298 |
|
299 usage = "usage: %prog [options]" |
|
300 |
|
301 parser = OptionParser(usage=usage) |
|
302 |
277 parser.add_option("-f", "--file", dest="conn_str", |
303 parser.add_option("-f", "--file", dest="conn_str", |
278 help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db") |
304 help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db") |
279 parser.add_option("-u", "--user", dest="username", |
305 parser.add_option("-u", "--user", dest="username", |
280 help="Twitter user", metavar="USER", default=None) |
306 help="Twitter user", metavar="USER", default=None) |
281 parser.add_option("-w", "--password", dest="password", |
307 parser.add_option("-w", "--password", dest="password", |
291 parser.add_option("-d", "--duration", dest="duration", |
317 parser.add_option("-d", "--duration", dest="duration", |
292 help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int') |
318 help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int') |
293 parser.add_option("-N", "--nb-process", dest="process_nb", |
319 parser.add_option("-N", "--nb-process", dest="process_nb", |
294 help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int') |
320 help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int') |
295 |
321 |
296 |
|
297 |
|
298 utils.set_logging_options(parser) |
322 utils.set_logging_options(parser) |
299 |
323 |
300 return parser.parse_args() |
324 return parser.parse_args() |
301 |
325 |
|
326 def add_process_event(type, args, session_maker): |
|
327 session = session_maker() |
|
328 try: |
|
329 evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type) |
|
330 session.add(evt) |
|
331 session.commit() |
|
332 finally: |
|
333 session.close() |
|
334 |
302 |
335 |
303 if __name__ == '__main__': |
336 if __name__ == '__main__': |
304 |
337 |
|
338 stop_args = {} |
305 (options, args) = get_options() |
339 (options, args) = get_options() |
306 |
340 |
307 set_logging(options) |
341 set_logging(options) |
308 |
342 |
309 if options.debug: |
343 utils.get_logger().debug("OPTIONS : " + repr(options)) |
310 print "OPTIONS : " |
|
311 print repr(options) |
|
312 |
|
313 |
344 |
314 conn_str = options.conn_str.strip() |
345 conn_str = options.conn_str.strip() |
315 if not re.match("^\w+://.+", conn_str): |
346 if not re.match("^\w+://.+", conn_str): |
316 conn_str = 'sqlite:///' + options.conn_str |
347 conn_str = 'sqlite:///' + options.conn_str |
317 |
348 |
318 if conn_str.startswith("sqlite") and options.new: |
349 if conn_str.startswith("sqlite") and options.new: |
319 filepath = conn_str[conn_str.find(":///")+4:] |
350 filepath = conn_str[conn_str.find(":///") + 4:] |
320 if os.path.exists(filepath): |
351 if os.path.exists(filepath): |
321 i = 1 |
352 i = 1 |
322 basename, extension = os.path.splitext(filepath) |
353 basename, extension = os.path.splitext(filepath) |
323 new_path = '%s.%d%s' % (basename, i, extension) |
354 new_path = '%s.%d%s' % (basename, i, extension) |
324 while i < 1000000 and os.path.exists(new_path): |
355 while i < 1000000 and os.path.exists(new_path): |
376 |
410 |
377 SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) |
411 SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) |
378 process_engines.append(engine_process) |
412 process_engines.append(engine_process) |
379 lqueue = mQueue(1) |
413 lqueue = mQueue(1) |
380 logger_queues.append(lqueue) |
414 logger_queues.append(lqueue) |
381 sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue) |
415 pid = os.getpid() |
|
416 sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) |
382 |
417 |
383 tweet_processes = [] |
418 tweet_processes = [] |
384 |
419 |
385 for i in range(options.process_nb - 1): |
420 for i in range(options.process_nb - 1): |
386 SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) |
421 SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) |
387 process_engines.append(engine_process) |
422 process_engines.append(engine_process) |
388 lqueue = mQueue(1) |
423 lqueue = mQueue(1) |
389 logger_queues.append(lqueue) |
424 logger_queues.append(lqueue) |
390 cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue) |
425 cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) |
391 tweet_processes.append(cprocess) |
426 tweet_processes.append(cprocess) |
392 |
427 |
393 def interupt_handler(signum, frame): |
428 def interupt_handler(signum, frame): |
394 utils.get_logger().debug("shutdown asked " + repr(signum) + " " + repr(frame)) |
429 global stop_args |
|
430 utils.get_logger().debug("shutdown asked " + repr(signum) + " " + repr(inspect.getframeinfo(frame, 9))) |
|
431 stop_args = {'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)} |
395 stop_event.set() |
432 stop_event.set() |
396 |
433 |
397 signal.signal(signal.SIGINT , interupt_handler) |
434 signal.signal(signal.SIGINT , interupt_handler) |
398 signal.signal(signal.SIGHUP , interupt_handler) |
435 signal.signal(signal.SIGHUP , interupt_handler) |
399 signal.signal(signal.SIGALRM, interupt_handler) |
436 signal.signal(signal.SIGALRM, interupt_handler) |
400 signal.signal(signal.SIGTERM, interupt_handler) |
437 signal.signal(signal.SIGTERM, interupt_handler) |
401 |
438 |
402 log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues,stop_event,)) |
439 log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,)) |
403 log_thread.daemon = True |
440 log_thread.daemon = True |
|
441 |
|
442 log_thread.start() |
404 |
443 |
405 sprocess.start() |
444 sprocess.start() |
406 for cprocess in tweet_processes: |
445 for cprocess in tweet_processes: |
407 cprocess.start() |
446 cprocess.start() |
408 |
447 |
409 log_thread.start() |
448 add_process_event("pid", {"main":os.getpid(), 'source':sprocess.pid, 'consumers':[p.pid for p in tweet_processes]}, Session) |
410 |
449 |
411 if options.duration >= 0: |
450 if options.duration >= 0: |
412 end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration) |
451 end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration) |
413 |
452 |
414 |
453 |