|
1 from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer |
|
2 from iri_tweet import models, utils |
|
3 from iri_tweet.models import TweetSource, TweetLog, ProcessEvent |
|
4 from iri_tweet.processor import get_processor |
|
5 from multiprocessing import Queue as mQueue, Process, Event |
|
6 from sqlalchemy.exc import OperationalError |
|
7 from sqlalchemy.orm import scoped_session |
|
8 import Queue |
|
9 import StringIO |
|
10 import anyjson |
|
11 import argparse |
|
12 import datetime |
|
13 import inspect |
|
14 import iri_tweet.stream |
|
15 import logging |
|
16 import os |
|
17 import re |
|
18 import requests_oauthlib |
|
19 import shutil |
|
20 import signal |
|
21 import socket |
|
22 import sqlalchemy.schema |
|
23 import sys |
|
24 import thread |
|
25 import threading |
|
26 import time |
|
27 import traceback |
|
28 import urllib2 |
|
29 socket._fileobject.default_bufsize = 0 |
|
30 |
|
31 |
|
32 |
|
33 # columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user'] |
|
34 columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source'] |
|
35 # columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following'] |
|
36 columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count'] |
|
37 # just put it in a sqlite3 tqble |
|
38 |
|
39 DEFAULT_TIMEOUT = 3 |
|
40 |
|
41 class Requesthandler(BaseHTTPRequestHandler): |
|
42 |
|
43 def __init__(self, request, client_address, server): |
|
44 BaseHTTPRequestHandler.__init__(self, request, client_address, server) |
|
45 |
|
46 def do_GET(self): |
|
47 self.send_response(200) |
|
48 self.end_headers() |
|
49 |
|
50 def log_message(self, format, *args): # @ReservedAssignment |
|
51 pass |
|
52 |
|
53 |
|
54 def set_logging(options): |
|
55 loggers = [] |
|
56 |
|
57 loggers.append(utils.set_logging(options, logging.getLogger('iri.tweet'))) |
|
58 loggers.append(utils.set_logging(options, logging.getLogger('multiprocessing'))) |
|
59 if options.debug >= 2: |
|
60 loggers.append(utils.set_logging(options, logging.getLogger('sqlalchemy.engine'))) |
|
61 # utils.set_logging(options, logging.getLogger('sqlalchemy.dialects')) |
|
62 # utils.set_logging(options, logging.getLogger('sqlalchemy.pool')) |
|
63 # utils.set_logging(options, logging.getLogger('sqlalchemy.orm')) |
|
64 return loggers |
|
65 |
|
66 def set_logging_process(options, queue): |
|
67 qlogger = utils.set_logging(options, logging.getLogger('iri.tweet.p'), queue) |
|
68 qlogger.propagate = 0 |
|
69 return qlogger |
|
70 |
|
71 def get_auth(options, access_token): |
|
72 consumer_key = options.consumer_key |
|
73 consumer_secret = options.consumer_secret |
|
74 auth = requests_oauthlib.OAuth1(client_key=consumer_key, client_secret=consumer_secret, resource_owner_key=access_token[0], resource_owner_secret=access_token[1], signature_type='auth_header') |
|
75 return auth |
|
76 |
|
77 |
|
78 def add_process_event(event_type, args, session_maker): |
|
79 session = session_maker() |
|
80 try: |
|
81 evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=event_type) |
|
82 session.add(evt) |
|
83 session.commit() |
|
84 finally: |
|
85 session.close() |
|
86 |
|
87 |
|
88 class BaseProcess(Process): |
|
89 |
|
90 def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): |
|
91 self.parent_pid = parent_pid |
|
92 self.session_maker = session_maker |
|
93 self.queue = queue |
|
94 self.options = options |
|
95 self.logger_queue = logger_queue |
|
96 self.stop_event = stop_event |
|
97 self.consumer_token = (options.consumer_key, options.consumer_secret) |
|
98 self.access_token = access_token |
|
99 |
|
100 super(BaseProcess, self).__init__() |
|
101 |
|
102 # |
|
103 # from http://stackoverflow.com/questions/2542610/python-daemon-doesnt-kill-its-kids |
|
104 # |
|
105 def parent_is_alive(self): |
|
106 try: |
|
107 # try to call Parent |
|
108 os.kill(self.parent_pid, 0) |
|
109 except OSError: |
|
110 # *beeep* oh no! The phone's disconnected! |
|
111 return False |
|
112 else: |
|
113 # *ring* Hi mom! |
|
114 return True |
|
115 |
|
116 |
|
117 def __get_process_event_args(self): |
|
118 return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__, 'access_token':self.access_token} |
|
119 |
|
120 def run(self): |
|
121 try: |
|
122 add_process_event("start_worker", self.__get_process_event_args(), self.session_maker) |
|
123 self.do_run() |
|
124 finally: |
|
125 add_process_event("stop_worker", self.__get_process_event_args(), self.session_maker) |
|
126 |
|
127 def do_run(self): |
|
128 raise NotImplementedError() |
|
129 |
|
130 |
|
131 |
|
132 class SourceProcess(BaseProcess): |
|
133 |
|
134 def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): |
|
135 self.track = options.track |
|
136 self.token_filename = options.token_filename |
|
137 self.timeout = options.timeout |
|
138 self.stream = None |
|
139 super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid) |
|
140 |
|
141 def __source_stream_iter(self): |
|
142 |
|
143 self.logger.debug("SourceProcess : run ") |
|
144 |
|
145 self.logger.debug("SourceProcess : get_auth auth with option %s and token %s " %(self.options, self.access_token)) |
|
146 self.auth = get_auth(self.options, self.access_token) |
|
147 self.logger.debug("SourceProcess : auth set ") |
|
148 |
|
149 track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip() |
|
150 self.logger.debug("SourceProcess : track list " + track_list) |
|
151 |
|
152 track_list = [k.strip() for k in track_list.split(',')] |
|
153 |
|
154 self.logger.debug("SourceProcess : before connecting to stream %s, url : %s, auth : %s" % (repr(track_list), self.options.url, repr(self.auth))) |
|
155 self.stream = iri_tweet.stream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, timeout=self.timeout, chunk_size=512, logger=self.logger) |
|
156 self.logger.debug("SourceProcess : after connecting to stream") |
|
157 self.stream.muststop = lambda: self.stop_event.is_set() |
|
158 |
|
159 stream_wrapper = iri_tweet.stream.SafeStreamWrapper(self.stream, logger=self.logger) |
|
160 |
|
161 session = self.session_maker() |
|
162 |
|
163 #import pydevd |
|
164 #pydevd.settrace(suspend=False) |
|
165 |
|
166 |
|
167 try: |
|
168 for tweet in stream_wrapper: |
|
169 if not self.parent_is_alive(): |
|
170 self.stop_event.set() |
|
171 sys.exit() |
|
172 self.logger.debug("SourceProcess : tweet " + repr(tweet)) |
|
173 source = TweetSource(original_json=tweet) |
|
174 self.logger.debug("SourceProcess : source created") |
|
175 add_retries = 0 |
|
176 while add_retries < 10: |
|
177 try: |
|
178 add_retries += 1 |
|
179 session.add(source) |
|
180 session.flush() |
|
181 break |
|
182 except OperationalError as e: |
|
183 session.rollback() |
|
184 self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries)) |
|
185 if add_retries == 10: |
|
186 raise |
|
187 |
|
188 source_id = source.id |
|
189 self.logger.debug("SourceProcess : before queue + source id " + repr(source_id)) |
|
190 self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (self.stream.count, self.stream.rate, int(time.time() - self.stream.starttime))) |
|
191 session.commit() |
|
192 self.queue.put((source_id, tweet), False) |
|
193 |
|
194 except Exception as e: |
|
195 self.logger.error("SourceProcess : Error when processing tweet " + repr(e)) |
|
196 raise |
|
197 finally: |
|
198 session.rollback() |
|
199 session.close() |
|
200 self.stream.close() |
|
201 self.stream = None |
|
202 if not self.stop_event.is_set(): |
|
203 self.stop_event.set() |
|
204 |
|
205 |
|
206 def do_run(self): |
|
207 |
|
208 self.logger = set_logging_process(self.options, self.logger_queue) |
|
209 |
|
210 source_stream_iter_thread = threading.Thread(target=self.__source_stream_iter , name="SourceStreamIterThread") |
|
211 |
|
212 source_stream_iter_thread.start() |
|
213 |
|
214 try: |
|
215 while not self.stop_event.is_set(): |
|
216 self.logger.debug("SourceProcess : In while after start") |
|
217 self.stop_event.wait(DEFAULT_TIMEOUT) |
|
218 except KeyboardInterrupt: |
|
219 self.stop_event.set() |
|
220 pass |
|
221 |
|
222 if self.stop_event.is_set() and self.stream: |
|
223 self.stream.close() |
|
224 elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive: |
|
225 self.stop_event.set() |
|
226 |
|
227 self.queue.cancel_join_thread() |
|
228 self.logger_queue.cancel_join_thread() |
|
229 self.logger.info("SourceProcess : join") |
|
230 source_stream_iter_thread.join(30) |
|
231 |
|
232 |
|
233 def process_tweet(tweet, source_id, session, consumer_token, access_token, twitter_query_user, token_filename, logger): |
|
234 try: |
|
235 if not tweet.strip(): |
|
236 return |
|
237 tweet_obj = anyjson.deserialize(tweet) |
|
238 processor_klass = get_processor(tweet_obj) |
|
239 if not processor_klass: |
|
240 tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET']) |
|
241 session.add(tweet_log) |
|
242 return |
|
243 processor = processor_klass(json_dict=tweet_obj, |
|
244 json_txt=tweet, |
|
245 source_id=source_id, |
|
246 session=session, |
|
247 consumer_token=consumer_token, |
|
248 access_token=access_token, |
|
249 token_filename=token_filename, |
|
250 user_query_twitter=twitter_query_user, |
|
251 logger=logger) |
|
252 logger.info(processor.log_info()) |
|
253 logger.debug(u"Process_tweet :" + repr(tweet)) |
|
254 processor.process() |
|
255 |
|
256 except ValueError as e: |
|
257 message = u"Value Error %s processing tweet %s" % (repr(e), tweet) |
|
258 output = StringIO.StringIO() |
|
259 try: |
|
260 traceback.print_exc(file=output) |
|
261 error_stack = output.getvalue() |
|
262 finally: |
|
263 output.close() |
|
264 tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'], error=message, error_stack=error_stack) |
|
265 session.add(tweet_log) |
|
266 session.commit() |
|
267 except Exception as e: |
|
268 message = u"Error %s processing tweet %s" % (repr(e), tweet) |
|
269 logger.exception(message) |
|
270 output = StringIO.StringIO() |
|
271 try: |
|
272 traceback.print_exc(file=output) |
|
273 error_stack = output.getvalue() |
|
274 finally: |
|
275 output.close() |
|
276 session.rollback() |
|
277 tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack) |
|
278 session.add(tweet_log) |
|
279 session.commit() |
|
280 |
|
281 |
|
282 |
|
283 class TweetProcess(BaseProcess): |
|
284 |
|
285 def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): |
|
286 super(TweetProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid) |
|
287 self.twitter_query_user = options.twitter_query_user |
|
288 |
|
289 |
|
290 def do_run(self): |
|
291 |
|
292 self.logger = set_logging_process(self.options, self.logger_queue) |
|
293 session = self.session_maker() |
|
294 try: |
|
295 while not self.stop_event.is_set() and self.parent_is_alive(): |
|
296 try: |
|
297 source_id, tweet_txt = self.queue.get(True, 3) |
|
298 self.logger.debug("Processing source id " + repr(source_id)) |
|
299 except Exception as e: |
|
300 self.logger.debug('Process tweet exception in loop : ' + repr(e)) |
|
301 continue |
|
302 process_tweet(tweet_txt, source_id, session, self.consumer_token, self.access_token, self.twitter_query_user, self.options.token_filename, self.logger) |
|
303 session.commit() |
|
304 except KeyboardInterrupt: |
|
305 self.stop_event.set() |
|
306 finally: |
|
307 session.rollback() |
|
308 session.close() |
|
309 |
|
310 |
|
311 def get_sessionmaker(conn_str): |
|
312 engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False) |
|
313 Session = scoped_session(Session) |
|
314 return Session, engine, metadata |
|
315 |
|
316 |
|
317 def process_leftovers(session, consumer_token, access_token, twitter_query_user, token_filename, ask_process_leftovers, logger): |
|
318 |
|
319 sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None) |
|
320 sources_count = sources.count() |
|
321 |
|
322 if sources_count > 10 and ask_process_leftovers: |
|
323 resp = raw_input("Do you want to process leftovers (Y/n) ? (%d tweet to process)" % sources_count) |
|
324 if resp and resp.strip().lower() == "n": |
|
325 return |
|
326 logger.info("Process leftovers, %d tweets to process" % (sources_count)) |
|
327 for src in sources: |
|
328 tweet_txt = src.original_json |
|
329 process_tweet(tweet_txt, src.id, session, consumer_token, access_token, twitter_query_user, token_filename, logger) |
|
330 session.commit() |
|
331 |
|
332 |
|
333 def process_log(logger_queues, stop_event): |
|
334 while not stop_event.is_set(): |
|
335 for lqueue in logger_queues: |
|
336 try: |
|
337 record = lqueue.get_nowait() |
|
338 logging.getLogger(record.name).handle(record) |
|
339 except Queue.Empty: |
|
340 continue |
|
341 except IOError: |
|
342 continue |
|
343 time.sleep(0.1) |
|
344 |
|
345 |
|
346 def get_options(): |
|
347 |
|
348 usage = "usage: %(prog)s [options]" |
|
349 |
|
350 parser = argparse.ArgumentParser(usage=usage) |
|
351 |
|
352 parser.add_argument("-f", "--file", dest="conn_str", |
|
353 help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db") |
|
354 parser.add_argument("-T", "--track", dest="track", |
|
355 help="Twitter track", metavar="TRACK") |
|
356 parser.add_argument("-k", "--key", dest="consumer_key", |
|
357 help="Twitter consumer key", metavar="CONSUMER_KEY", required=True) |
|
358 parser.add_argument("-s", "--secret", dest="consumer_secret", |
|
359 help="Twitter consumer secret", metavar="CONSUMER_SECRET", required=True) |
|
360 parser.add_argument("-n", "--new", dest="new", action="store_true", |
|
361 help="new database", default=False) |
|
362 parser.add_argument("-D", "--daemon", dest="daemon", action="store_true", |
|
363 help="launch daemon", default=False) |
|
364 parser.add_argument("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token", |
|
365 help="Token file name") |
|
366 parser.add_argument("-d", "--duration", dest="duration", |
|
367 help="Duration of recording in seconds", metavar="DURATION", default= -1, type=int) |
|
368 parser.add_argument("-N", "--nb-process", dest="process_nb", |
|
369 help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type=int) |
|
370 parser.add_argument("--url", dest="url", |
|
371 help="The twitter url to connect to.", metavar="URL", default=iri_tweet.stream.FilterStream.url) |
|
372 parser.add_argument("--query-user", dest="twitter_query_user", action="store_true", |
|
373 help="Query twitter for users", default=False) |
|
374 parser.add_argument("--timeout", dest="timeout", |
|
375 help="timeout for connecting in seconds", default=60, metavar="TIMEOUT", type=int) |
|
376 parser.add_argument("--ask-process-leftovers", dest="ask_process_leftovers", action="store_false", |
|
377 help="ask process leftover", default=True) |
|
378 |
|
379 |
|
380 utils.set_logging_options(parser) |
|
381 |
|
382 return parser.parse_args() |
|
383 |
|
384 |
|
385 def do_run(options, session_maker): |
|
386 |
|
387 stop_args = {} |
|
388 |
|
389 consumer_token = (options.consumer_key, options.consumer_secret) |
|
390 access_token = utils.get_oauth_token(consumer_key=consumer_token[0], consumer_secret=consumer_token[1], token_file_path=options.token_filename) |
|
391 |
|
392 |
|
393 session = session_maker() |
|
394 try: |
|
395 process_leftovers(session, consumer_token, access_token, options.twitter_query_user, options.token_filename, options.ask_process_leftovers, utils.get_logger()) |
|
396 session.commit() |
|
397 finally: |
|
398 session.rollback() |
|
399 session.close() |
|
400 |
|
401 if options.process_nb <= 0: |
|
402 utils.get_logger().debug("Leftovers processed. Exiting.") |
|
403 return None |
|
404 |
|
405 queue = mQueue() |
|
406 stop_event = Event() |
|
407 |
|
408 # workaround for bug on using urllib2 and multiprocessing |
|
409 httpd = HTTPServer(('127.0.0.1',0), Requesthandler) |
|
410 thread.start_new_thread(httpd.handle_request, ()) |
|
411 |
|
412 req = urllib2.Request('http://localhost:%d' % httpd.server_port) |
|
413 conn = None |
|
414 try: |
|
415 conn = urllib2.urlopen(req) |
|
416 except: |
|
417 utils.get_logger().debug("could not open localhost") |
|
418 # donothing |
|
419 finally: |
|
420 if conn is not None: |
|
421 conn.close() |
|
422 |
|
423 process_engines = [] |
|
424 logger_queues = [] |
|
425 |
|
426 SessionProcess, engine_process, _ = get_sessionmaker(conn_str) |
|
427 process_engines.append(engine_process) |
|
428 lqueue = mQueue(50) |
|
429 logger_queues.append(lqueue) |
|
430 pid = os.getpid() |
|
431 sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) |
|
432 |
|
433 tweet_processes = [] |
|
434 |
|
435 for i in range(options.process_nb - 1): |
|
436 SessionProcess, engine_process, _ = get_sessionmaker(conn_str) |
|
437 process_engines.append(engine_process) |
|
438 lqueue = mQueue(50) |
|
439 logger_queues.append(lqueue) |
|
440 cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) |
|
441 tweet_processes.append(cprocess) |
|
442 |
|
443 log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,)) |
|
444 log_thread.daemon = True |
|
445 |
|
446 log_thread.start() |
|
447 |
|
448 sprocess.start() |
|
449 for cprocess in tweet_processes: |
|
450 cprocess.start() |
|
451 |
|
452 add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name, sprocess.pid), 'consumers':dict([(p.name, p.pid) for p in tweet_processes])}, session_maker) |
|
453 |
|
454 if options.duration >= 0: |
|
455 end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration) |
|
456 |
|
457 def interupt_handler(signum, frame): |
|
458 utils.get_logger().debug("shutdown asked " + repr(signum) + " " + repr(inspect.getframeinfo(frame, 9))) |
|
459 stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)}) |
|
460 stop_event.set() |
|
461 |
|
462 signal.signal(signal.SIGINT , interupt_handler) |
|
463 signal.signal(signal.SIGHUP , interupt_handler) |
|
464 signal.signal(signal.SIGALRM, interupt_handler) |
|
465 signal.signal(signal.SIGTERM, interupt_handler) |
|
466 |
|
467 |
|
468 while not stop_event.is_set(): |
|
469 if options.duration >= 0 and datetime.datetime.utcnow() >= end_ts: |
|
470 stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts}) |
|
471 stop_event.set() |
|
472 break |
|
473 if sprocess.is_alive(): |
|
474 utils.get_logger().debug("Source process alive") |
|
475 time.sleep(1) |
|
476 else: |
|
477 stop_args.update({'message': 'Source process killed'}) |
|
478 stop_event.set() |
|
479 break |
|
480 utils.get_logger().debug("Joining Source Process") |
|
481 try: |
|
482 sprocess.join(10) |
|
483 except: |
|
484 utils.get_logger().debug("Pb joining Source Process - terminating") |
|
485 finally: |
|
486 sprocess.terminate() |
|
487 |
|
488 for i, cprocess in enumerate(tweet_processes): |
|
489 utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1)) |
|
490 try: |
|
491 cprocess.join(3) |
|
492 except: |
|
493 utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1)) |
|
494 cprocess.terminate() |
|
495 |
|
496 |
|
497 utils.get_logger().debug("Close queues") |
|
498 try: |
|
499 queue.close() |
|
500 for lqueue in logger_queues: |
|
501 lqueue.close() |
|
502 except Exception as e: |
|
503 utils.get_logger().error("error when closing queues %s", repr(e)) |
|
504 # do nothing |
|
505 |
|
506 |
|
507 if options.process_nb > 1: |
|
508 utils.get_logger().debug("Processing leftovers") |
|
509 session = session_maker() |
|
510 try: |
|
511 process_leftovers(session, consumer_token, access_token, options.twitter_query_user, options.token_filename, options.ask_process_leftovers, utils.get_logger()) |
|
512 session.commit() |
|
513 finally: |
|
514 session.rollback() |
|
515 session.close() |
|
516 |
|
517 for pengine in process_engines: |
|
518 pengine.dispose() |
|
519 |
|
520 return stop_args |
|
521 |
|
522 |
|
523 def main(options): |
|
524 |
|
525 global conn_str |
|
526 |
|
527 conn_str = options.conn_str.strip() |
|
528 if not re.match("^\w+://.+", conn_str): |
|
529 conn_str = 'sqlite:///' + options.conn_str |
|
530 |
|
531 if conn_str.startswith("sqlite") and options.new: |
|
532 filepath = conn_str[conn_str.find(":///") + 4:] |
|
533 if os.path.exists(filepath): |
|
534 i = 1 |
|
535 basename, extension = os.path.splitext(filepath) |
|
536 new_path = '%s.%d%s' % (basename, i, extension) |
|
537 while i < 1000000 and os.path.exists(new_path): |
|
538 i += 1 |
|
539 new_path = '%s.%d%s' % (basename, i, extension) |
|
540 if i >= 1000000: |
|
541 raise Exception("Unable to find new filename for " + filepath) |
|
542 else: |
|
543 shutil.move(filepath, new_path) |
|
544 |
|
545 Session, engine, metadata = get_sessionmaker(conn_str) |
|
546 |
|
547 if options.new: |
|
548 check_metadata = sqlalchemy.schema.MetaData(bind=engine) |
|
549 check_metadata.reflect() |
|
550 if len(check_metadata.sorted_tables) > 0: |
|
551 message = "Database %s not empty exiting" % conn_str |
|
552 utils.get_logger().error(message) |
|
553 sys.exit(message) |
|
554 |
|
555 metadata.create_all(engine) |
|
556 session = Session() |
|
557 try: |
|
558 models.add_model_version(session) |
|
559 finally: |
|
560 session.close() |
|
561 |
|
562 stop_args = {} |
|
563 try: |
|
564 add_process_event(event_type="start", args={'options':options.__dict__, 'args': [], 'command_line': sys.argv}, session_maker=Session) |
|
565 stop_args = do_run(options, Session) |
|
566 except Exception as e: |
|
567 utils.get_logger().exception("Error in main thread") |
|
568 outfile = StringIO.StringIO() |
|
569 try: |
|
570 traceback.print_exc(file=outfile) |
|
571 stop_args = {'error': repr(e), 'message': getattr(e, 'message', ''), 'stacktrace':outfile.getvalue()} |
|
572 finally: |
|
573 outfile.close() |
|
574 raise |
|
575 finally: |
|
576 add_process_event(event_type="shutdown", args=stop_args, session_maker=Session) |
|
577 |
|
578 utils.get_logger().debug("Done. Exiting. " + repr(stop_args)) |
|
579 |
|
580 |
|
581 |
|
582 if __name__ == '__main__': |
|
583 |
|
584 options = get_options() |
|
585 |
|
586 loggers = set_logging(options) |
|
587 |
|
588 utils.get_logger().debug("OPTIONS : " + repr(options)) |
|
589 |
|
590 if options.daemon: |
|
591 options.ask_process_leftovers = False |
|
592 import daemon |
|
593 |
|
594 hdlr_preserve = [] |
|
595 for logger in loggers: |
|
596 hdlr_preserve.extend([h.stream for h in logger.handlers]) |
|
597 |
|
598 context = daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=hdlr_preserve) |
|
599 with context: |
|
600 main(options) |
|
601 else: |
|
602 main(options) |
|
603 |