1 from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer |
|
2 from iri_tweet import models, utils |
|
3 from iri_tweet.models import TweetSource, TweetLog, ProcessEvent |
|
4 from iri_tweet.processor import get_processor |
|
5 from multiprocessing import Queue as mQueue, Process, Event |
|
6 from sqlalchemy.exc import OperationalError |
|
7 from sqlalchemy.orm import scoped_session |
|
8 import Queue |
|
9 import StringIO |
|
10 import anyjson |
|
11 import argparse |
1 import argparse |
12 import datetime |
2 import datetime |
13 import inspect |
3 import inspect |
14 import iri_tweet.stream |
4 import json |
15 import logging |
5 import logging |
16 import os |
6 import os |
|
7 import queue |
17 import re |
8 import re |
18 import requests_oauthlib |
|
19 import shutil |
9 import shutil |
20 import signal |
10 import signal |
21 import socket |
11 import socket |
22 import sqlalchemy.schema |
|
23 import sys |
12 import sys |
24 import thread |
|
25 import threading |
13 import threading |
26 import time |
14 import time |
27 import traceback |
15 import traceback |
28 import urllib2 |
16 import urllib |
29 socket._fileobject.default_bufsize = 0 |
17 from http.server import BaseHTTPRequestHandler, HTTPServer |
30 |
18 from io import StringIO |
|
19 from multiprocessing import Event, Process |
|
20 from multiprocessing import Queue as mQueue |
|
21 |
|
22 import requests_oauthlib |
|
23 import sqlalchemy.schema |
|
24 import twitter |
|
25 from sqlalchemy.exc import OperationalError |
|
26 from sqlalchemy.orm import scoped_session |
|
27 |
|
28 import _thread |
|
29 import iri_tweet.stream |
|
30 from iri_tweet import models, utils |
|
31 from iri_tweet.models import ProcessEvent, TweetLog, TweetSource |
|
32 from iri_tweet.processor import get_processor |
31 |
33 |
32 |
34 |
33 # columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user'] |
35 # columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user'] |
34 columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source'] |
36 columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source'] |
35 # columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following'] |
37 # columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following'] |
38 |
40 |
39 DEFAULT_TIMEOUT = 3 |
41 DEFAULT_TIMEOUT = 3 |
40 |
42 |
41 class Requesthandler(BaseHTTPRequestHandler): |
43 class Requesthandler(BaseHTTPRequestHandler): |
42 |
44 |
43 def __init__(self, request, client_address, server): |
|
44 BaseHTTPRequestHandler.__init__(self, request, client_address, server) |
|
45 |
|
46 def do_GET(self): |
45 def do_GET(self): |
47 self.send_response(200) |
46 self.send_response(200) |
48 self.end_headers() |
47 self.end_headers() |
49 |
48 |
50 def log_message(self, format, *args): # @ReservedAssignment |
49 def log_message(self, format, *args): # @ReservedAssignment |
51 pass |
50 pass |
52 |
51 |
53 |
52 |
54 def set_logging(options): |
53 def set_logging(options): |
55 loggers = [] |
54 loggers = [] |
56 |
55 |
57 loggers.append(utils.set_logging(options, logging.getLogger('iri.tweet'))) |
56 loggers.append(utils.set_logging(options, logging.getLogger('iri.tweet'))) |
58 loggers.append(utils.set_logging(options, logging.getLogger('multiprocessing'))) |
57 loggers.append(utils.set_logging(options, logging.getLogger('multiprocessing'))) |
59 if options.debug >= 2: |
58 if options.debug >= 2: |
60 loggers.append(utils.set_logging(options, logging.getLogger('sqlalchemy.engine'))) |
59 loggers.append(utils.set_logging(options, logging.getLogger('sqlalchemy.engine'))) |
61 # utils.set_logging(options, logging.getLogger('sqlalchemy.dialects')) |
60 # utils.set_logging(options, logging.getLogger('sqlalchemy.dialects')) |
66 def set_logging_process(options, queue): |
65 def set_logging_process(options, queue): |
67 qlogger = utils.set_logging(options, logging.getLogger('iri.tweet.p'), queue) |
66 qlogger = utils.set_logging(options, logging.getLogger('iri.tweet.p'), queue) |
68 qlogger.propagate = 0 |
67 qlogger.propagate = 0 |
69 return qlogger |
68 return qlogger |
70 |
69 |
71 def get_auth(options, access_token): |
70 def get_auth(consumer_key, consumer_secret, token_key, token_secret): |
72 consumer_key = options.consumer_key |
71 return requests_oauthlib.OAuth1(client_key=consumer_key, client_secret=consumer_secret, resource_owner_key=token_key, resource_owner_secret=token_secret, signature_type='auth_header') |
73 consumer_secret = options.consumer_secret |
|
74 auth = requests_oauthlib.OAuth1(client_key=consumer_key, client_secret=consumer_secret, resource_owner_key=access_token[0], resource_owner_secret=access_token[1], signature_type='auth_header') |
|
75 return auth |
|
76 |
72 |
77 |
73 |
78 def add_process_event(event_type, args, session_maker): |
74 def add_process_event(event_type, args, session_maker): |
79 session = session_maker() |
75 session = session_maker() |
80 try: |
76 try: |
81 evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=event_type) |
77 evt = ProcessEvent(args=None if args is None else json.dumps(args), type=event_type) |
82 session.add(evt) |
78 session.add(evt) |
83 session.commit() |
79 session.commit() |
84 finally: |
80 finally: |
85 session.close() |
81 session.close() |
86 |
82 |
87 |
83 |
88 class BaseProcess(Process): |
84 class BaseProcess(Process): |
89 |
85 |
90 def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): |
86 def __init__(self, session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid): |
91 self.parent_pid = parent_pid |
87 self.parent_pid = parent_pid |
92 self.session_maker = session_maker |
88 self.session_maker = session_maker |
93 self.queue = queue |
89 self.queue = queue |
94 self.options = options |
90 self.options = options |
95 self.logger_queue = logger_queue |
91 self.logger_queue = logger_queue |
96 self.stop_event = stop_event |
92 self.stop_event = stop_event |
97 self.consumer_token = (options.consumer_key, options.consumer_secret) |
93 self.twitter_auth = twitter_auth |
98 self.access_token = access_token |
|
99 |
94 |
100 super(BaseProcess, self).__init__() |
95 super(BaseProcess, self).__init__() |
101 |
96 |
102 # |
97 # |
103 # from http://stackoverflow.com/questions/2542610/python-daemon-doesnt-kill-its-kids |
98 # from http://stackoverflow.com/questions/2542610/python-daemon-doesnt-kill-its-kids |
110 # *beeep* oh no! The phone's disconnected! |
105 # *beeep* oh no! The phone's disconnected! |
111 return False |
106 return False |
112 else: |
107 else: |
113 # *ring* Hi mom! |
108 # *ring* Hi mom! |
114 return True |
109 return True |
115 |
110 |
116 |
111 |
117 def __get_process_event_args(self): |
112 def __get_process_event_args(self): |
118 return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__, 'access_token':self.access_token} |
113 return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__} |
119 |
114 |
120 def run(self): |
115 def run(self): |
121 try: |
116 try: |
122 add_process_event("start_worker", self.__get_process_event_args(), self.session_maker) |
117 add_process_event("start_worker", self.__get_process_event_args(), self.session_maker) |
123 self.do_run() |
118 self.do_run() |
124 finally: |
119 finally: |
125 add_process_event("stop_worker", self.__get_process_event_args(), self.session_maker) |
120 add_process_event("stop_worker", self.__get_process_event_args(), self.session_maker) |
126 |
121 |
127 def do_run(self): |
122 def do_run(self): |
128 raise NotImplementedError() |
123 raise NotImplementedError() |
129 |
124 |
130 |
125 |
131 |
126 |
132 class SourceProcess(BaseProcess): |
127 class SourceProcess(BaseProcess): |
133 |
128 |
134 def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): |
129 def __init__(self, session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid): |
135 self.track = options.track |
130 self.track = options.track |
136 self.token_filename = options.token_filename |
|
137 self.timeout = options.timeout |
131 self.timeout = options.timeout |
138 self.stream = None |
132 self.stream = None |
139 super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid) |
133 super(SourceProcess, self).__init__(session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid) |
140 |
134 |
141 def __source_stream_iter(self): |
135 def __source_stream_iter(self): |
142 |
136 |
143 self.logger.debug("SourceProcess : run ") |
137 self.logger.debug("SourceProcess : run ") |
144 |
138 |
145 self.logger.debug("SourceProcess : get_auth auth with option %s and token %s " %(self.options, self.access_token)) |
139 self.logger.debug("SourceProcess : get_auth auth with option %s and token %s " %(self.options, self.twitter_auth)) |
146 self.auth = get_auth(self.options, self.access_token) |
140 self.auth = get_auth(self.twitter_auth.consumer_key, self.twitter_auth.consumer_secret, self.twitter_auth.token, self.twitter_auth.token_secret) |
147 self.logger.debug("SourceProcess : auth set ") |
141 self.logger.debug("SourceProcess : auth set ") |
148 |
|
149 track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip() |
142 track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip() |
150 self.logger.debug("SourceProcess : track list " + track_list) |
143 self.logger.debug("SourceProcess : track list " + track_list) |
151 |
144 |
152 track_list = [k.strip() for k in track_list.split(',')] |
145 track_list = [k.strip() for k in track_list.split(',')] |
153 |
146 |
154 self.logger.debug("SourceProcess : before connecting to stream %s, url : %s, auth : %s" % (repr(track_list), self.options.url, repr(self.auth))) |
147 self.logger.debug("SourceProcess : before connecting to stream %s, url : %s, auth : %s" % (repr(track_list), self.options.url, repr(self.auth))) |
155 self.stream = iri_tweet.stream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, timeout=self.timeout, chunk_size=512, logger=self.logger) |
148 self.stream = iri_tweet.stream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, timeout=self.timeout, logger=self.logger) |
156 self.logger.debug("SourceProcess : after connecting to stream") |
149 self.logger.debug("SourceProcess : after connecting to stream") |
157 self.stream.muststop = lambda: self.stop_event.is_set() |
150 self.stream.muststop = lambda: self.stop_event.is_set() |
158 |
151 |
159 stream_wrapper = iri_tweet.stream.SafeStreamWrapper(self.stream, logger=self.logger) |
152 stream_wrapper = iri_tweet.stream.SafeStreamWrapper(self.stream, logger=self.logger) |
160 |
153 |
161 session = self.session_maker() |
154 session = self.session_maker() |
162 |
155 |
163 #import pydevd |
156 #import pydevd |
164 #pydevd.settrace(suspend=False) |
157 #pydevd.settrace(suspend=False) |
165 |
158 |
166 |
159 |
167 try: |
160 try: |
168 for tweet in stream_wrapper: |
161 for tweet in stream_wrapper: |
169 if not self.parent_is_alive(): |
162 if not self.parent_is_alive(): |
170 self.stop_event.set() |
163 self.stop_event.set() |
171 sys.exit() |
164 sys.exit() |
228 self.logger_queue.cancel_join_thread() |
221 self.logger_queue.cancel_join_thread() |
229 self.logger.info("SourceProcess : join") |
222 self.logger.info("SourceProcess : join") |
230 source_stream_iter_thread.join(30) |
223 source_stream_iter_thread.join(30) |
231 |
224 |
232 |
225 |
233 def process_tweet(tweet, source_id, session, consumer_token, access_token, twitter_query_user, token_filename, logger): |
226 def process_tweet(tweet, source_id, session, twitter_auth, twitter_query_user, logger): |
234 try: |
227 try: |
235 if not tweet.strip(): |
228 if not tweet.strip(): |
236 return |
229 return |
237 tweet_obj = anyjson.deserialize(tweet) |
230 tweet_obj = json.loads(tweet) |
238 processor_klass = get_processor(tweet_obj) |
231 processor_klass = get_processor(tweet_obj) |
239 if not processor_klass: |
232 if not processor_klass: |
240 tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET']) |
233 tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET']) |
241 session.add(tweet_log) |
234 session.add(tweet_log) |
242 return |
235 return |
243 processor = processor_klass(json_dict=tweet_obj, |
236 processor = processor_klass(json_dict=tweet_obj, |
244 json_txt=tweet, |
237 json_txt=tweet, |
245 source_id=source_id, |
238 source_id=source_id, |
246 session=session, |
239 session=session, |
247 consumer_token=consumer_token, |
240 twitter_auth=twitter_auth, |
248 access_token=access_token, |
|
249 token_filename=token_filename, |
|
250 user_query_twitter=twitter_query_user, |
241 user_query_twitter=twitter_query_user, |
251 logger=logger) |
242 logger=logger) |
252 logger.info(processor.log_info()) |
243 logger.info(processor.log_info()) |
253 logger.debug(u"Process_tweet :" + repr(tweet)) |
244 logger.debug(u"Process_tweet :" + repr(tweet)) |
254 processor.process() |
245 processor.process() |
255 |
246 |
256 except ValueError as e: |
247 except ValueError as e: |
257 message = u"Value Error %s processing tweet %s" % (repr(e), tweet) |
248 message = u"Value Error %s processing tweet %s" % (repr(e), tweet) |
258 output = StringIO.StringIO() |
249 output = StringIO() |
259 try: |
250 try: |
260 traceback.print_exc(file=output) |
251 traceback.print_exc(file=output) |
261 error_stack = output.getvalue() |
252 error_stack = output.getvalue() |
262 finally: |
253 finally: |
263 output.close() |
254 output.close() |
264 tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'], error=message, error_stack=error_stack) |
255 tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'], error=message, error_stack=error_stack) |
265 session.add(tweet_log) |
256 session.add(tweet_log) |
266 session.commit() |
257 session.commit() |
267 except Exception as e: |
258 except Exception as e: |
268 message = u"Error %s processing tweet %s" % (repr(e), tweet) |
259 message = u"Error %s processing tweet %s" % (repr(e), tweet) |
269 logger.exception(message) |
260 logger.exception(message) |
270 output = StringIO.StringIO() |
261 output = StringIO() |
271 try: |
262 try: |
272 traceback.print_exc(file=output) |
263 traceback.print_exc(file=output) |
273 error_stack = output.getvalue() |
264 error_stack = output.getvalue() |
274 finally: |
265 finally: |
275 output.close() |
266 output.close() |
276 session.rollback() |
267 session.rollback() |
277 tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack) |
268 tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack) |
278 session.add(tweet_log) |
269 session.add(tweet_log) |
279 session.commit() |
270 session.commit() |
280 |
271 |
281 |
272 |
282 |
273 |
283 class TweetProcess(BaseProcess): |
274 class TweetProcess(BaseProcess): |
284 |
275 |
285 def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): |
276 def __init__(self, session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid): |
286 super(TweetProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid) |
277 super(TweetProcess, self).__init__(session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid) |
287 self.twitter_query_user = options.twitter_query_user |
278 self.twitter_query_user = options.twitter_query_user |
288 |
279 |
289 |
280 |
290 def do_run(self): |
281 def do_run(self): |
291 |
282 |
292 self.logger = set_logging_process(self.options, self.logger_queue) |
283 self.logger = set_logging_process(self.options, self.logger_queue) |
293 session = self.session_maker() |
284 session = self.session_maker() |
294 try: |
285 try: |
295 while not self.stop_event.is_set() and self.parent_is_alive(): |
286 while not self.stop_event.is_set() and self.parent_is_alive(): |
296 try: |
287 try: |
297 source_id, tweet_txt = self.queue.get(True, 3) |
288 source_id, tweet_txt = self.queue.get(True, 3) |
298 self.logger.debug("Processing source id " + repr(source_id)) |
289 self.logger.debug("Processing source id " + repr(source_id)) |
299 except Exception as e: |
290 except Exception as e: |
300 self.logger.debug('Process tweet exception in loop : ' + repr(e)) |
291 self.logger.debug('Process tweet exception in loop : ' + repr(e)) |
301 continue |
292 continue |
302 process_tweet(tweet_txt, source_id, session, self.consumer_token, self.access_token, self.twitter_query_user, self.options.token_filename, self.logger) |
293 process_tweet(tweet_txt, source_id, session, self.twitter_auth, self.twitter_query_user, self.logger) |
303 session.commit() |
294 session.commit() |
304 except KeyboardInterrupt: |
295 except KeyboardInterrupt: |
305 self.stop_event.set() |
296 self.stop_event.set() |
306 finally: |
297 finally: |
307 session.rollback() |
298 session.rollback() |
311 def get_sessionmaker(conn_str): |
302 def get_sessionmaker(conn_str): |
312 engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False) |
303 engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False) |
313 Session = scoped_session(Session) |
304 Session = scoped_session(Session) |
314 return Session, engine, metadata |
305 return Session, engine, metadata |
315 |
306 |
316 |
307 |
317 def process_leftovers(session, consumer_token, access_token, twitter_query_user, token_filename, ask_process_leftovers, logger): |
308 def process_leftovers(session, twitter_auth, twitter_query_user, ask_process_leftovers, logger): |
318 |
309 |
319 sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None) |
310 sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None) |
320 sources_count = sources.count() |
311 sources_count = sources.count() |
321 |
312 |
322 if sources_count > 10 and ask_process_leftovers: |
313 if sources_count > 10 and ask_process_leftovers: |
323 resp = raw_input("Do you want to process leftovers (Y/n) ? (%d tweet to process)" % sources_count) |
314 resp = input("Do you want to process leftovers (Y/n) ? (%d tweet to process)" % sources_count) |
324 if resp and resp.strip().lower() == "n": |
315 if resp and resp.strip().lower() == "n": |
325 return |
316 return |
326 logger.info("Process leftovers, %d tweets to process" % (sources_count)) |
317 logger.info("Process leftovers, %d tweets to process" % (sources_count)) |
327 for src in sources: |
318 for src in sources: |
328 tweet_txt = src.original_json |
319 tweet_txt = src.original_json |
329 process_tweet(tweet_txt, src.id, session, consumer_token, access_token, twitter_query_user, token_filename, logger) |
320 process_tweet(tweet_txt, src.id, session, twitter_auth, twitter_query_user, logger) |
330 session.commit() |
321 session.commit() |
331 |
322 |
332 |
323 |
333 def process_log(logger_queues, stop_event): |
324 def process_log(logger_queues, stop_event): |
334 while not stop_event.is_set(): |
325 while not stop_event.is_set(): |
335 for lqueue in logger_queues: |
326 for lqueue in logger_queues: |
336 try: |
327 try: |
337 record = lqueue.get_nowait() |
328 record = lqueue.get_nowait() |
338 logging.getLogger(record.name).handle(record) |
329 logging.getLogger(record.name).handle(record) |
339 except Queue.Empty: |
330 except queue.Empty: |
340 continue |
331 continue |
341 except IOError: |
332 except IOError: |
342 continue |
333 continue |
343 time.sleep(0.1) |
334 time.sleep(0.1) |
344 |
335 |
345 |
336 |
346 def get_options(): |
337 def get_options(): |
347 |
338 |
348 usage = "usage: %(prog)s [options]" |
339 usage = "usage: %(prog)s [options]" |
349 |
340 |
350 parser = argparse.ArgumentParser(usage=usage) |
341 parser = argparse.ArgumentParser(usage=usage) |
383 |
374 |
384 |
375 |
385 def do_run(options, session_maker): |
376 def do_run(options, session_maker): |
386 |
377 |
387 stop_args = {} |
378 stop_args = {} |
388 |
379 |
389 consumer_token = (options.consumer_key, options.consumer_secret) |
380 |
390 access_token = utils.get_oauth_token(consumer_key=consumer_token[0], consumer_secret=consumer_token[1], token_file_path=options.token_filename) |
381 access_token_key, access_token_secret = utils.get_oauth_token(consumer_key=options.consumer_key, consumer_secret=options.consumer_secret, token_file_path=options.token_filename) |
391 |
382 twitter_auth = twitter.OAuth(access_token_key, access_token_secret, options.consumer_key, options.consumer_secret) |
392 |
383 |
393 session = session_maker() |
384 session = session_maker() |
394 try: |
385 try: |
395 process_leftovers(session, consumer_token, access_token, options.twitter_query_user, options.token_filename, options.ask_process_leftovers, utils.get_logger()) |
386 process_leftovers(session, twitter_auth, options.twitter_query_user, options.ask_process_leftovers, utils.get_logger()) |
396 session.commit() |
387 session.commit() |
397 finally: |
388 finally: |
398 session.rollback() |
389 session.rollback() |
399 session.close() |
390 session.close() |
400 |
391 |
401 if options.process_nb <= 0: |
392 if options.process_nb <= 0: |
402 utils.get_logger().debug("Leftovers processed. Exiting.") |
393 utils.get_logger().debug("Leftovers processed. Exiting.") |
403 return None |
394 return None |
404 |
395 |
405 queue = mQueue() |
396 queue = mQueue() |
406 stop_event = Event() |
397 stop_event = Event() |
407 |
398 |
408 # workaround for bug on using urllib2 and multiprocessing |
399 # workaround for bug on using urllib2 and multiprocessing |
409 httpd = HTTPServer(('127.0.0.1',0), Requesthandler) |
400 httpd = HTTPServer(('127.0.0.1',0), Requesthandler) |
410 thread.start_new_thread(httpd.handle_request, ()) |
401 _thread.start_new_thread(httpd.handle_request, ()) |
411 |
402 |
412 req = urllib2.Request('http://localhost:%d' % httpd.server_port) |
403 req = urllib.request.Request('http://localhost:%d' % httpd.server_port) |
413 conn = None |
404 conn = None |
414 try: |
405 try: |
415 conn = urllib2.urlopen(req) |
406 conn = urllib.request.urlopen(req) |
416 except: |
407 except: |
417 utils.get_logger().debug("could not open localhost") |
408 utils.get_logger().debug("could not open localhost") |
418 # donothing |
409 # donothing |
419 finally: |
410 finally: |
420 if conn is not None: |
411 if conn is not None: |
421 conn.close() |
412 conn.close() |
422 |
413 |
423 process_engines = [] |
414 process_engines = [] |
424 logger_queues = [] |
415 logger_queues = [] |
425 |
416 |
426 SessionProcess, engine_process, _ = get_sessionmaker(conn_str) |
417 SessionProcess, engine_process, _ = get_sessionmaker(conn_str) |
427 process_engines.append(engine_process) |
418 process_engines.append(engine_process) |
428 lqueue = mQueue(50) |
419 lqueue = mQueue(50) |
429 logger_queues.append(lqueue) |
420 logger_queues.append(lqueue) |
430 pid = os.getpid() |
421 pid = os.getpid() |
431 sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) |
422 sprocess = SourceProcess(SessionProcess, queue, options, twitter_auth, stop_event, lqueue, pid) |
432 |
423 |
433 tweet_processes = [] |
424 tweet_processes = [] |
434 |
425 |
435 for i in range(options.process_nb - 1): |
426 for i in range(options.process_nb - 1): |
436 SessionProcess, engine_process, _ = get_sessionmaker(conn_str) |
427 SessionProcess, engine_process, _ = get_sessionmaker(conn_str) |
437 process_engines.append(engine_process) |
428 process_engines.append(engine_process) |
438 lqueue = mQueue(50) |
429 lqueue = mQueue(50) |
439 logger_queues.append(lqueue) |
430 logger_queues.append(lqueue) |
440 cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) |
431 cprocess = TweetProcess(SessionProcess, queue, options, twitter_auth, stop_event, lqueue, pid) |
441 tweet_processes.append(cprocess) |
432 tweet_processes.append(cprocess) |
442 |
433 |
443 log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,)) |
434 log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,)) |
444 log_thread.daemon = True |
435 log_thread.daemon = True |
445 |
436 |
450 cprocess.start() |
441 cprocess.start() |
451 |
442 |
452 add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name, sprocess.pid), 'consumers':dict([(p.name, p.pid) for p in tweet_processes])}, session_maker) |
443 add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name, sprocess.pid), 'consumers':dict([(p.name, p.pid) for p in tweet_processes])}, session_maker) |
453 |
444 |
454 if options.duration >= 0: |
445 if options.duration >= 0: |
455 end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration) |
446 end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration) |
456 |
447 |
457 def interupt_handler(signum, frame): |
448 def interupt_handler(signum, frame): |
458 utils.get_logger().debug("shutdown asked " + repr(signum) + " " + repr(inspect.getframeinfo(frame, 9))) |
449 utils.get_logger().debug("shutdown asked " + repr(signum) + " " + repr(inspect.getframeinfo(frame, 9))) |
459 stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)}) |
450 stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)}) |
460 stop_event.set() |
451 stop_event.set() |
461 |
452 |
462 signal.signal(signal.SIGINT , interupt_handler) |
453 signal.signal(signal.SIGINT , interupt_handler) |
463 signal.signal(signal.SIGHUP , interupt_handler) |
454 signal.signal(signal.SIGHUP , interupt_handler) |
464 signal.signal(signal.SIGALRM, interupt_handler) |
455 signal.signal(signal.SIGALRM, interupt_handler) |
465 signal.signal(signal.SIGTERM, interupt_handler) |
456 signal.signal(signal.SIGTERM, interupt_handler) |
466 |
457 |
467 |
458 |
468 while not stop_event.is_set(): |
459 while not stop_event.is_set(): |
469 if options.duration >= 0 and datetime.datetime.utcnow() >= end_ts: |
460 if options.duration >= 0 and datetime.datetime.utcnow() >= end_ts: |
470 stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts}) |
461 stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts}) |
471 stop_event.set() |
462 stop_event.set() |
482 sprocess.join(10) |
473 sprocess.join(10) |
483 except: |
474 except: |
484 utils.get_logger().debug("Pb joining Source Process - terminating") |
475 utils.get_logger().debug("Pb joining Source Process - terminating") |
485 finally: |
476 finally: |
486 sprocess.terminate() |
477 sprocess.terminate() |
487 |
478 |
488 for i, cprocess in enumerate(tweet_processes): |
479 for i, cprocess in enumerate(tweet_processes): |
489 utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1)) |
480 utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1)) |
490 try: |
481 try: |
491 cprocess.join(3) |
482 cprocess.join(3) |
492 except: |
483 except: |
493 utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1)) |
484 utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1)) |
494 cprocess.terminate() |
485 cprocess.terminate() |
495 |
486 |
496 |
487 |
497 utils.get_logger().debug("Close queues") |
488 utils.get_logger().debug("Close queues") |
498 try: |
489 try: |
499 queue.close() |
490 queue.close() |
500 for lqueue in logger_queues: |
491 for lqueue in logger_queues: |
501 lqueue.close() |
492 lqueue.close() |
502 except Exception as e: |
493 except Exception as e: |
503 utils.get_logger().error("error when closing queues %s", repr(e)) |
494 utils.get_logger().error("error when closing queues %s", repr(e)) |
504 # do nothing |
495 # do nothing |
505 |
496 |
506 |
497 |
507 if options.process_nb > 1: |
498 if options.process_nb > 1: |
508 utils.get_logger().debug("Processing leftovers") |
499 utils.get_logger().debug("Processing leftovers") |
509 session = session_maker() |
500 session = session_maker() |
510 try: |
501 try: |
511 process_leftovers(session, consumer_token, access_token, options.twitter_query_user, options.token_filename, options.ask_process_leftovers, utils.get_logger()) |
502 process_leftovers(session, twitter_auth, options.twitter_query_user, options.ask_process_leftovers, utils.get_logger()) |
512 session.commit() |
503 session.commit() |
513 finally: |
504 finally: |
514 session.rollback() |
505 session.rollback() |
515 session.close() |
506 session.close() |
516 |
507 |
517 for pengine in process_engines: |
508 for pengine in process_engines: |
518 pengine.dispose() |
509 pengine.dispose() |
519 |
510 |
520 return stop_args |
511 return stop_args |
521 |
512 |
522 |
513 |
523 def main(options): |
514 def main(options): |
524 |
515 |
525 global conn_str |
516 global conn_str |
526 |
517 |
527 conn_str = options.conn_str.strip() |
518 conn_str = options.conn_str.strip() |
528 if not re.match("^\w+://.+", conn_str): |
519 if not re.match(r"^\w+://.+", conn_str): |
529 conn_str = 'sqlite:///' + options.conn_str |
520 conn_str = 'sqlite:///' + options.conn_str |
530 |
521 |
531 if conn_str.startswith("sqlite") and options.new: |
522 if conn_str.startswith("sqlite") and options.new: |
532 filepath = conn_str[conn_str.find(":///") + 4:] |
523 filepath = conn_str[conn_str.find(":///") + 4:] |
533 if os.path.exists(filepath): |
524 if os.path.exists(filepath): |
534 i = 1 |
525 i = 1 |
535 basename, extension = os.path.splitext(filepath) |
526 basename, extension = os.path.splitext(filepath) |
541 raise Exception("Unable to find new filename for " + filepath) |
532 raise Exception("Unable to find new filename for " + filepath) |
542 else: |
533 else: |
543 shutil.move(filepath, new_path) |
534 shutil.move(filepath, new_path) |
544 |
535 |
545 Session, engine, metadata = get_sessionmaker(conn_str) |
536 Session, engine, metadata = get_sessionmaker(conn_str) |
546 |
537 |
547 if options.new: |
538 if options.new: |
548 check_metadata = sqlalchemy.schema.MetaData(bind=engine) |
539 check_metadata = sqlalchemy.schema.MetaData(bind=engine) |
549 check_metadata.reflect() |
540 check_metadata.reflect() |
550 if len(check_metadata.sorted_tables) > 0: |
541 if len(check_metadata.sorted_tables) > 0: |
551 message = "Database %s not empty exiting" % conn_str |
542 message = "Database %s not empty exiting" % conn_str |
552 utils.get_logger().error(message) |
543 utils.get_logger().error(message) |
553 sys.exit(message) |
544 sys.exit(message) |
554 |
545 |
555 metadata.create_all(engine) |
546 metadata.create_all(engine) |
556 session = Session() |
547 session = Session() |
557 try: |
548 try: |
558 models.add_model_version(session) |
549 models.add_model_version(session) |
559 finally: |
550 finally: |
560 session.close() |
551 session.close() |
561 |
552 |
562 stop_args = {} |
553 stop_args = {} |
563 try: |
554 try: |
564 add_process_event(event_type="start", args={'options':options.__dict__, 'args': [], 'command_line': sys.argv}, session_maker=Session) |
555 add_process_event(event_type="start", args={'options':options.__dict__, 'args': [], 'command_line': sys.argv}, session_maker=Session) |
565 stop_args = do_run(options, Session) |
556 stop_args = do_run(options, Session) |
566 except Exception as e: |
557 except Exception as e: |
567 utils.get_logger().exception("Error in main thread") |
558 utils.get_logger().exception("Error in main thread") |
568 outfile = StringIO.StringIO() |
559 outfile = StringIO() |
569 try: |
560 try: |
570 traceback.print_exc(file=outfile) |
561 traceback.print_exc(file=outfile) |
571 stop_args = {'error': repr(e), 'message': repr(e), 'stacktrace':outfile.getvalue()} |
562 stop_args = {'error': repr(e), 'message': repr(e), 'stacktrace':outfile.getvalue()} |
572 finally: |
563 finally: |
573 outfile.close() |
564 outfile.close() |
574 raise |
565 raise |
575 finally: |
566 finally: |
576 add_process_event(event_type="shutdown", args=stop_args, session_maker=Session) |
567 add_process_event(event_type="shutdown", args=stop_args, session_maker=Session) |
577 |
568 |
578 utils.get_logger().debug("Done. Exiting. " + repr(stop_args)) |
569 utils.get_logger().debug("Done. Exiting. " + repr(stop_args)) |
579 |
570 |
580 |
571 |
581 |
572 |
582 if __name__ == '__main__': |
573 if __name__ == '__main__': |
583 |
574 |
584 options = get_options() |
575 options = get_options() |
585 |
576 |
586 loggers = set_logging(options) |
577 loggers = set_logging(options) |
587 |
578 |
588 utils.get_logger().debug("OPTIONS : " + repr(options)) |
579 utils.get_logger().debug("OPTIONS : " + repr(options)) |
589 |
580 |
590 if options.daemon: |
581 if options.daemon: |
591 options.ask_process_leftovers = False |
582 options.ask_process_leftovers = False |
592 import daemon |
583 import daemon |
593 |
584 |
594 hdlr_preserve = [] |
585 hdlr_preserve = [] |
595 for logger in loggers: |
586 for logger in loggers: |
596 hdlr_preserve.extend([h.stream for h in logger.handlers]) |
587 hdlr_preserve.extend([h.stream for h in logger.handlers]) |
597 |
588 |
598 context = daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=hdlr_preserve) |
589 context = daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=hdlr_preserve) |
599 with context: |
590 with context: |
600 main(options) |
591 main(options) |
601 else: |
592 else: |
602 main(options) |
593 main(options) |
603 |
|