33 columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source'] |
34 columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source'] |
34 # columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following'] |
35 # columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following'] |
35 columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count'] |
36 columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count'] |
36 # just put it in a sqlite3 tqble |
37 # just put it in a sqlite3 tqble |
37 |
38 |
38 DEFAULT_TIMEOUT = 5 |
39 DEFAULT_TIMEOUT = 3 |
|
40 |
|
41 class Requesthandler(BaseHTTPRequestHandler): |
|
42 |
|
43 def __init__(self, request, client_address, server): |
|
44 BaseHTTPRequestHandler.__init__(self, request, client_address, server) |
|
45 |
|
46 def do_GET(self): |
|
47 self.send_response(200) |
|
48 self.end_headers() |
|
49 |
|
50 def log_message(self, format, *args): # @ReservedAssignment |
|
51 pass |
|
52 |
39 |
53 |
40 def set_logging(options): |
54 def set_logging(options): |
41 loggers = [] |
55 loggers = [] |
42 |
56 |
43 loggers.append(utils.set_logging(options, logging.getLogger('iri.tweet'))) |
57 loggers.append(utils.set_logging(options, logging.getLogger('iri.tweet'))) |
53 qlogger = utils.set_logging(options, logging.getLogger('iri.tweet.p'), queue) |
67 qlogger = utils.set_logging(options, logging.getLogger('iri.tweet.p'), queue) |
54 qlogger.propagate = 0 |
68 qlogger.propagate = 0 |
55 return qlogger |
69 return qlogger |
56 |
70 |
57 def get_auth(options, access_token): |
71 def get_auth(options, access_token): |
58 if options.username and options.password: |
72 consumer_key = options.consumer_key |
59 auth = requests.auth.BasicAuthHandler(options.username, options.password) |
73 consumer_secret = options.consumer_secret |
60 else: |
74 auth = requests_oauthlib.OAuth1(client_key=consumer_key, client_secret=consumer_secret, resource_owner_key=access_token[0], resource_owner_secret=access_token[1], signature_type='auth_header') |
61 consumer_key = models.CONSUMER_KEY |
|
62 consumer_secret = models.CONSUMER_SECRET |
|
63 auth = requests_oauthlib.OAuth1(access_token[0], access_token[1], consumer_key, consumer_secret, signature_type='auth_header') |
|
64 return auth |
75 return auth |
65 |
76 |
66 |
77 |
67 def add_process_event(type, args, session_maker): |
78 def add_process_event(event_type, args, session_maker): |
68 session = session_maker() |
79 session = session_maker() |
69 try: |
80 try: |
70 evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type) |
81 evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=event_type) |
71 session.add(evt) |
82 session.add(evt) |
72 session.commit() |
83 session.commit() |
73 finally: |
84 finally: |
74 session.close() |
85 session.close() |
75 |
86 |
120 class SourceProcess(BaseProcess): |
132 class SourceProcess(BaseProcess): |
121 |
133 |
122 def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): |
134 def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): |
123 self.track = options.track |
135 self.track = options.track |
124 self.token_filename = options.token_filename |
136 self.token_filename = options.token_filename |
125 self.catchup = options.catchup |
|
126 self.timeout = options.timeout |
137 self.timeout = options.timeout |
127 self.stream = None |
138 self.stream = None |
128 super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid) |
139 super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid) |
129 |
140 |
130 def __source_stream_iter(self): |
141 def __source_stream_iter(self): |
131 |
142 |
132 self.logger = set_logging_process(self.options, self.logger_queue) |
|
133 self.logger.debug("SourceProcess : run ") |
143 self.logger.debug("SourceProcess : run ") |
134 |
144 |
|
145 self.logger.debug("SourceProcess : get_auth auth with option %s and token %s " %(self.options, self.access_token)) |
135 self.auth = get_auth(self.options, self.access_token) |
146 self.auth = get_auth(self.options, self.access_token) |
136 self.logger.debug("SourceProcess : auth set ") |
147 self.logger.debug("SourceProcess : auth set ") |
137 |
148 |
138 track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip() |
149 track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip() |
139 self.logger.debug("SourceProcess : track list " + track_list) |
150 self.logger.debug("SourceProcess : track list " + track_list) |
140 |
151 |
141 track_list = [k.strip() for k in track_list.split(',')] |
152 track_list = [k.strip() for k in track_list.split(',')] |
142 |
153 |
143 self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list)) |
154 self.logger.debug("SourceProcess : before connecting to stream %s, url : %s, auth : %s" % (repr(track_list), self.options.url, repr(self.auth))) |
144 self.stream = iri_tweet.stream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout, chunk_size=1, logger=self.logger) |
155 self.stream = iri_tweet.stream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, timeout=self.timeout, chunk_size=512, logger=self.logger) |
145 self.logger.debug("SourceProcess : after connecting to stream") |
156 self.logger.debug("SourceProcess : after connecting to stream") |
146 self.stream.muststop = lambda: self.stop_event.is_set() |
157 self.stream.muststop = lambda: self.stop_event.is_set() |
147 |
158 |
148 stream_wrapper = iri_tweet.stream.SafeStreamWrapper(self.stream, logger=self.logger) |
159 stream_wrapper = iri_tweet.stream.SafeStreamWrapper(self.stream, logger=self.logger) |
149 |
160 |
150 session = self.session_maker() |
161 session = self.session_maker() |
|
162 |
|
163 #import pydevd |
|
164 #pydevd.settrace(suspend=False) |
|
165 |
151 |
166 |
152 try: |
167 try: |
153 for tweet in stream_wrapper: |
168 for tweet in stream_wrapper: |
154 if not self.parent_is_alive(): |
169 if not self.parent_is_alive(): |
155 self.stop_event.set() |
170 self.stop_event.set() |
156 stop_thread.join(5) |
|
157 sys.exit() |
171 sys.exit() |
158 self.logger.debug("SourceProcess : tweet " + repr(tweet)) |
172 self.logger.debug("SourceProcess : tweet " + repr(tweet)) |
159 source = TweetSource(original_json=tweet) |
173 source = TweetSource(original_json=tweet) |
160 self.logger.debug("SourceProcess : source created") |
174 self.logger.debug("SourceProcess : source created") |
161 add_retries = 0 |
175 add_retries = 0 |
191 self.stop_event.set() |
205 self.stop_event.set() |
192 |
206 |
193 |
207 |
194 def do_run(self): |
208 def do_run(self): |
195 |
209 |
196 # import pydevd |
210 self.logger = set_logging_process(self.options, self.logger_queue) |
197 # pydevd.settrace(suspend=False) |
|
198 |
211 |
199 source_stream_iter_thread = threading.Thread(target=self.__source_stream_iter , name="SourceStreamIterThread") |
212 source_stream_iter_thread = threading.Thread(target=self.__source_stream_iter , name="SourceStreamIterThread") |
200 |
213 |
201 source_stream_iter_thread.start() |
214 source_stream_iter_thread.start() |
202 |
215 |
203 while not self.stop_event.is_set(): |
216 try: |
204 self.logger.debug("SourceProcess : In while after start") |
217 while not self.stop_event.is_set(): |
205 self.stop_event.wait(DEFAULT_TIMEOUT) |
218 self.logger.debug("SourceProcess : In while after start") |
206 if self.stop_event.is_set() and self.stream: |
219 self.stop_event.wait(DEFAULT_TIMEOUT) |
207 self.stream.close() |
220 except KeyboardInterrupt: |
208 elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive: |
221 self.stop_event.set() |
209 self.stop_event.set() |
222 pass |
210 |
223 |
|
224 if self.stop_event.is_set() and self.stream: |
|
225 self.stream.close() |
|
226 elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive: |
|
227 self.stop_event.set() |
|
228 |
211 self.logger.info("SourceProcess : join") |
229 self.logger.info("SourceProcess : join") |
212 source_stream_iter_thread.join(30) |
230 source_stream_iter_thread.join(30) |
213 |
231 |
214 |
232 |
215 def process_tweet(tweet, source_id, session, access_token, twitter_query_user, logger): |
233 def process_tweet(tweet, source_id, session, consumer_token, access_token, twitter_query_user, token_filename, logger): |
216 try: |
234 try: |
217 if not tweet.strip(): |
235 if not tweet.strip(): |
218 return |
236 return |
219 tweet_obj = anyjson.deserialize(tweet) |
237 tweet_obj = anyjson.deserialize(tweet) |
220 if 'text' not in tweet_obj: |
238 processor_klass = get_processor(tweet_obj) |
|
239 if not processor_klass: |
221 tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET']) |
240 tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET']) |
222 session.add(tweet_log) |
241 session.add(tweet_log) |
223 return |
242 return |
224 screen_name = "" |
243 processor = processor_klass(json_dict=tweet_obj, |
225 if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']: |
244 json_txt=tweet, |
226 screen_name = tweet_obj['user']['screen_name'] |
245 source_id=source_id, |
227 logger.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text'])) |
246 session=session, |
228 logger.debug(u"Process_tweet :" + repr(tweet)) |
247 consumer_token=consumer_token, |
229 processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None, twitter_query_user) |
248 access_token=access_token, |
|
249 token_filename=token_filename, |
|
250 user_query_twitter=twitter_query_user, |
|
251 logger=logger) |
|
252 logger.info(processor.log_info()) |
|
253 logger.debug(u"Process_tweet :" + repr(tweet)) |
230 processor.process() |
254 processor.process() |
|
255 |
231 except ValueError as e: |
256 except ValueError as e: |
232 message = u"Value Error %s processing tweet %s" % (repr(e), tweet) |
257 message = u"Value Error %s processing tweet %s" % (repr(e), tweet) |
233 output = StringIO.StringIO() |
258 output = StringIO.StringIO() |
234 try: |
259 try: |
235 traceback.print_exc(file=output) |
260 traceback.print_exc(file=output) |
285 engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False) |
312 engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False) |
286 Session = scoped_session(Session) |
313 Session = scoped_session(Session) |
287 return Session, engine, metadata |
314 return Session, engine, metadata |
288 |
315 |
289 |
316 |
290 def process_leftovers(session, access_token, twitter_query_user, logger): |
317 def process_leftovers(session, consumer_token, access_token, twitter_query_user, token_filename, ask_process_leftovers, logger): |
291 |
318 |
292 sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None) |
319 sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None) |
293 |
320 sources_count = sources.count() |
|
321 |
|
322 if sources_count > 10 and ask_process_leftovers: |
|
323 resp = raw_input("Do you want to process leftovers (Y/n) ? (%d tweet to process)" % sources_count) |
|
324 if resp and resp.strip().lower() == "n": |
|
325 return |
|
326 logger.info("Process leftovers, %d tweets to process" % (sources_count)) |
294 for src in sources: |
327 for src in sources: |
295 tweet_txt = src.original_json |
328 tweet_txt = src.original_json |
296 process_tweet(tweet_txt, src.id, session, access_token, twitter_query_user, logger) |
329 process_tweet(tweet_txt, src.id, session, consumer_token, access_token, twitter_query_user, token_filename, logger) |
297 session.commit() |
330 session.commit() |
298 |
|
299 |
331 |
300 |
332 |
301 # get tweet source that do not match any message |
333 # get tweet source that do not match any message |
302 # select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull; |
334 # select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull; |
303 def process_log(logger_queues, stop_event): |
335 def process_log(logger_queues, stop_event): |
313 time.sleep(0.1) |
345 time.sleep(0.1) |
314 |
346 |
315 |
347 |
316 def get_options(): |
348 def get_options(): |
317 |
349 |
318 usage = "usage: %prog [options]" |
350 usage = "usage: %(prog)s [options]" |
319 |
351 |
320 parser = OptionParser(usage=usage) |
352 parser = argparse.ArgumentParser(usage=usage) |
321 |
353 |
322 parser.add_option("-f", "--file", dest="conn_str", |
354 parser.add_argument("-f", "--file", dest="conn_str", |
323 help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db") |
355 help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db") |
324 parser.add_option("-u", "--user", dest="username", |
356 parser.add_argument("-T", "--track", dest="track", |
325 help="Twitter user", metavar="USER", default=None) |
357 help="Twitter track", metavar="TRACK") |
326 parser.add_option("-w", "--password", dest="password", |
358 parser.add_argument("-k", "--key", dest="consumer_key", |
327 help="Twitter password", metavar="PASSWORD", default=None) |
359 help="Twitter consumer key", metavar="CONSUMER_KEY", required=True) |
328 parser.add_option("-T", "--track", dest="track", |
360 parser.add_argument("-s", "--secret", dest="consumer_secret", |
329 help="Twitter track", metavar="TRACK") |
361 help="Twitter consumer secret", metavar="CONSUMER_SECRET", required=True) |
330 parser.add_option("-n", "--new", dest="new", action="store_true", |
362 parser.add_argument("-n", "--new", dest="new", action="store_true", |
331 help="new database", default=False) |
363 help="new database", default=False) |
332 parser.add_option("-D", "--daemon", dest="daemon", action="store_true", |
364 parser.add_argument("-D", "--daemon", dest="daemon", action="store_true", |
333 help="launch daemon", default=False) |
365 help="launch daemon", default=False) |
334 parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token", |
366 parser.add_argument("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token", |
335 help="Token file name") |
367 help="Token file name") |
336 parser.add_option("-d", "--duration", dest="duration", |
368 parser.add_argument("-d", "--duration", dest="duration", |
337 help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int') |
369 help="Duration of recording in seconds", metavar="DURATION", default= -1, type=int) |
338 parser.add_option("-N", "--nb-process", dest="process_nb", |
370 parser.add_argument("-N", "--nb-process", dest="process_nb", |
339 help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int') |
371 help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type=int) |
340 parser.add_option("--url", dest="url", |
372 parser.add_argument("--url", dest="url", |
341 help="The twitter url to connect to.", metavar="URL", default=iri_tweet.stream.FilterStream.url) |
373 help="The twitter url to connect to.", metavar="URL", default=iri_tweet.stream.FilterStream.url) |
342 parser.add_option("--query-user", dest="twitter_query_user", action="store_true", |
374 parser.add_argument("--query-user", dest="twitter_query_user", action="store_true", |
343 help="Query twitter for users", default=False, metavar="QUERY_USER") |
375 help="Query twitter for users", default=False) |
344 parser.add_option("--catchup", dest="catchup", |
376 parser.add_argument("--timeout", dest="timeout", |
345 help="catchup count for tweets", default=None, metavar="CATCHUP", type='int') |
377 help="timeout for connecting in seconds", default=60, metavar="TIMEOUT", type=int) |
346 parser.add_option("--timeout", dest="timeout", |
378 parser.add_argument("--ask-process-leftovers", dest="ask_process_leftovers", action="store_false", |
347 help="timeout for connecting in seconds", default=60, metavar="TIMEOUT", type='int') |
379 help="ask process leftover", default=True) |
348 |
|
349 |
|
350 |
380 |
351 |
381 |
352 utils.set_logging_options(parser) |
382 utils.set_logging_options(parser) |
353 |
383 |
354 return parser.parse_args() |
384 return parser.parse_args() |
355 |
385 |
356 |
386 |
357 def do_run(options, session_maker): |
387 def do_run(options, session_maker): |
358 |
388 |
359 stop_args = {} |
389 stop_args = {} |
360 |
390 |
361 access_token = None |
391 consumer_token = (options.consumer_key, options.consumer_secret) |
362 if not options.username or not options.password: |
392 access_token = utils.get_oauth_token(consumer_key=consumer_token[0], consumer_secret=consumer_token[1], token_file_path=options.token_filename) |
363 access_token = utils.get_oauth_token(options.token_filename) |
393 |
364 |
394 |
365 session = session_maker() |
395 session = session_maker() |
366 try: |
396 try: |
367 process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger()) |
397 process_leftovers(session, consumer_token, access_token, options.twitter_query_user, options.token_filename, options.ask_process_leftovers, utils.get_logger()) |
368 session.commit() |
398 session.commit() |
369 finally: |
399 finally: |
370 session.rollback() |
400 session.rollback() |
371 session.close() |
401 session.close() |
372 |
402 |
390 conn.close() |
423 conn.close() |
391 |
424 |
392 process_engines = [] |
425 process_engines = [] |
393 logger_queues = [] |
426 logger_queues = [] |
394 |
427 |
395 SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) |
428 SessionProcess, engine_process, _ = get_sessionmaker(conn_str) |
396 process_engines.append(engine_process) |
429 process_engines.append(engine_process) |
397 lqueue = mQueue(50) |
430 lqueue = mQueue(50) |
398 logger_queues.append(lqueue) |
431 logger_queues.append(lqueue) |
399 pid = os.getpid() |
432 pid = os.getpid() |
400 sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) |
433 sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) |
401 |
434 |
402 tweet_processes = [] |
435 tweet_processes = [] |
403 |
436 |
404 for i in range(options.process_nb - 1): |
437 for i in range(options.process_nb - 1): |
405 SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) |
438 SessionProcess, engine_process, _ = get_sessionmaker(conn_str) |
406 process_engines.append(engine_process) |
439 process_engines.append(engine_process) |
407 lqueue = mQueue(50) |
440 lqueue = mQueue(50) |
408 logger_queues.append(lqueue) |
441 logger_queues.append(lqueue) |
409 cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) |
442 cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) |
410 tweet_processes.append(cprocess) |
443 tweet_processes.append(cprocess) |
460 except: |
493 except: |
461 utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1)) |
494 utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1)) |
462 cprocess.terminate() |
495 cprocess.terminate() |
463 |
496 |
464 |
497 |
465 utils.get_logger().debug("Close queues") |
498 utils.get_logger().debug("Close queues") |
466 try: |
|
467 queue.close() |
|
468 for lqueue in logger_queues: |
|
469 lqueue.close() |
|
470 except exception as e: |
|
471 utils.get_logger().error("error when closing queues %s", repr(e)) |
|
472 # do nothing |
|
473 |
|
474 |
499 |
475 if options.process_nb > 1: |
500 if options.process_nb > 1: |
476 utils.get_logger().debug("Processing leftovers") |
501 utils.get_logger().debug("Processing leftovers") |
477 session = session_maker() |
502 session = session_maker() |
478 try: |
503 try: |
479 process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger()) |
504 process_leftovers(session, consumer_token, access_token, options.twitter_query_user, options.token_filename, options.ask_process_leftovers, utils.get_logger()) |
480 session.commit() |
505 session.commit() |
481 finally: |
506 finally: |
482 session.rollback() |
507 session.rollback() |
483 session.close() |
508 session.close() |
484 |
509 |
485 for pengine in process_engines: |
510 for pengine in process_engines: |
486 pengine.dispose() |
511 pengine.dispose() |
|
512 |
|
513 try: |
|
514 queue.close() |
|
515 for lqueue in logger_queues: |
|
516 lqueue.close() |
|
517 except Exception as e: |
|
518 utils.get_logger().error("error when closing queues %s", repr(e)) |
|
519 # do nothing |
487 |
520 |
488 return stop_args |
521 return stop_args |
489 |
522 |
490 |
523 |
491 def main(options, args): |
524 def main(options): |
492 |
525 |
493 global conn_str |
526 global conn_str |
494 |
527 |
495 conn_str = options.conn_str.strip() |
528 conn_str = options.conn_str.strip() |
496 if not re.match("^\w+://.+", conn_str): |
529 if not re.match("^\w+://.+", conn_str): |