58 if options.username and options.password: |
58 if options.username and options.password: |
59 auth = tweepy.auth.BasicAuthHandler(options.username, options.password) |
59 auth = tweepy.auth.BasicAuthHandler(options.username, options.password) |
60 else: |
60 else: |
61 consumer_key = models.CONSUMER_KEY |
61 consumer_key = models.CONSUMER_KEY |
62 consumer_secret = models.CONSUMER_SECRET |
62 consumer_secret = models.CONSUMER_SECRET |
63 auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False) |
63 auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=True) |
64 auth.set_access_token(*access_token) |
64 auth.set_access_token(*access_token) |
65 return auth |
65 return auth |
66 |
66 |
67 |
67 |
68 def add_process_event(type, args, session_maker): |
68 def add_process_event(type, args, session_maker): |
140 self.logger.debug("SourceProcess : track list " + track_list) |
140 self.logger.debug("SourceProcess : track list " + track_list) |
141 |
141 |
142 track_list = [k.strip() for k in track_list.split(',')] |
142 track_list = [k.strip() for k in track_list.split(',')] |
143 |
143 |
144 self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list)) |
144 self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list)) |
145 self.stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout, chunk_size=1) |
145 self.stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout, chunk_size=1, logger=self.logger) |
146 self.logger.debug("SourceProcess : after connecting to stream") |
146 self.logger.debug("SourceProcess : after connecting to stream") |
147 self.stream.muststop = lambda: self.stop_event.is_set() |
147 self.stream.muststop = lambda: self.stop_event.is_set() |
148 |
148 |
149 stream_wrapper = tweetstream.SafeStreamWrapper(self.stream, logger=self.logger) |
149 stream_wrapper = tweetstream.SafeStreamWrapper(self.stream, logger=self.logger) |
150 |
150 |
200 source_stream_iter_thread = threading.Thread(target=self.__source_stream_iter , name="SourceStreamIterThread") |
200 source_stream_iter_thread = threading.Thread(target=self.__source_stream_iter , name="SourceStreamIterThread") |
201 |
201 |
202 source_stream_iter_thread.start() |
202 source_stream_iter_thread.start() |
203 |
203 |
204 while not self.stop_event.is_set(): |
204 while not self.stop_event.is_set(): |
205 self.logger.info("SourceProcess : In while after start") |
205 self.logger.debug("SourceProcess : In while after start") |
206 self.stop_event.wait(DEFAULT_TIMEOUT) |
206 self.stop_event.wait(DEFAULT_TIMEOUT) |
207 if self.stop_event.is_set() and self.stream: |
207 if self.stop_event.is_set() and self.stream: |
208 self.stream.close() |
208 self.stream.close() |
209 elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive: |
209 elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive: |
210 self.stop_event.set() |
210 self.stop_event.set() |
277 continue |
277 continue |
278 process_tweet(tweet_txt, source_id, session, self.access_token, self.twitter_query_user, self.logger) |
278 process_tweet(tweet_txt, source_id, session, self.access_token, self.twitter_query_user, self.logger) |
279 session.commit() |
279 session.commit() |
280 finally: |
280 finally: |
281 session.rollback() |
281 session.rollback() |
282 self.stop_event.set() |
|
283 session.close() |
282 session.close() |
284 |
283 |
285 |
284 |
286 def get_sessionmaker(conn_str): |
285 def get_sessionmaker(conn_str): |
287 engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False) |
286 engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False) |
409 lqueue = mQueue(50) |
408 lqueue = mQueue(50) |
410 logger_queues.append(lqueue) |
409 logger_queues.append(lqueue) |
411 cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) |
410 cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) |
412 tweet_processes.append(cprocess) |
411 tweet_processes.append(cprocess) |
413 |
412 |
|
413 log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,)) |
|
414 log_thread.daemon = True |
|
415 |
|
416 log_thread.start() |
|
417 |
|
418 sprocess.start() |
|
419 for cprocess in tweet_processes: |
|
420 cprocess.start() |
|
421 |
|
422 add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name, sprocess.pid), 'consumers':dict([(p.name, p.pid) for p in tweet_processes])}, session_maker) |
|
423 |
|
424 if options.duration >= 0: |
|
425 end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration) |
|
426 |
414 def interupt_handler(signum, frame): |
427 def interupt_handler(signum, frame): |
415 utils.get_logger().debug("shutdown asked " + repr(signum) + " " + repr(inspect.getframeinfo(frame, 9))) |
428 utils.get_logger().debug("shutdown asked " + repr(signum) + " " + repr(inspect.getframeinfo(frame, 9))) |
416 stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)}) |
429 stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)}) |
417 stop_event.set() |
430 stop_event.set() |
418 |
431 |
419 signal.signal(signal.SIGINT , interupt_handler) |
432 signal.signal(signal.SIGINT , interupt_handler) |
420 signal.signal(signal.SIGHUP , interupt_handler) |
433 signal.signal(signal.SIGHUP , interupt_handler) |
421 signal.signal(signal.SIGALRM, interupt_handler) |
434 signal.signal(signal.SIGALRM, interupt_handler) |
422 signal.signal(signal.SIGTERM, interupt_handler) |
435 signal.signal(signal.SIGTERM, interupt_handler) |
423 |
|
424 log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,)) |
|
425 log_thread.daemon = True |
|
426 |
|
427 log_thread.start() |
|
428 |
|
429 sprocess.start() |
|
430 for cprocess in tweet_processes: |
|
431 cprocess.start() |
|
432 |
|
433 add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name, sprocess.pid), 'consumers':dict([(p.name, p.pid) for p in tweet_processes])}, session_maker) |
|
434 |
|
435 if options.duration >= 0: |
|
436 end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration) |
|
437 |
436 |
438 |
437 |
439 while not stop_event.is_set(): |
438 while not stop_event.is_set(): |
440 if options.duration >= 0 and datetime.datetime.utcnow() >= end_ts: |
439 if options.duration >= 0 and datetime.datetime.utcnow() >= end_ts: |
441 stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts}) |
440 stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts}) |
442 stop_event.set() |
441 stop_event.set() |
443 break |
442 break |
444 if sprocess.is_alive(): |
443 if sprocess.is_alive(): |
|
444 utils.get_logger().debug("Source process alive") |
445 time.sleep(1) |
445 time.sleep(1) |
446 else: |
446 else: |
447 stop_args.update({'message': 'Source process killed'}) |
447 stop_args.update({'message': 'Source process killed'}) |
448 stop_event.set() |
448 stop_event.set() |
449 break |
449 break |