# HG changeset patch # User Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com> # Date 1355140389 -3600 # Node ID 2497c7f38e0a4509c617d11c82bea408247a3168 # Parent 8f6a4d6dfe140ede97cb40d54ce9b2ec51a354e8 correct. remove mutex and clear diff -r 8f6a4d6dfe14 -r 2497c7f38e0a script/lib/iri_tweet/iri_tweet/stream.py --- a/script/lib/iri_tweet/iri_tweet/stream.py Mon Dec 10 11:00:57 2012 +0100 +++ b/script/lib/iri_tweet/iri_tweet/stream.py Mon Dec 10 12:53:09 2012 +0100 @@ -45,7 +45,7 @@ elif len(chunk) < chunk_size/2 and chunk_size < max_chunk_size: chunk_size = max(chunk_size/2,1) yield chunk - except SSLError as e: + except requests.exceptions.SSLError as e: if e.errno == 2: # Apparently this means there was nothing in the socket buf pass @@ -272,7 +272,7 @@ class FilterStream(BaseStream): - url = "https://stream.twitter.com/1/statuses/filter.json" + url = "https://stream.twitter.com/1.1/statuses/filter.json" def __init__(self, auth, follow=None, locations=None, track=None, catchup=None, url=None, raw=False, timeout=None, compressed=False, chunk_size=4096): diff -r 8f6a4d6dfe14 -r 2497c7f38e0a script/stream/recorder_tweetstream.py --- a/script/stream/recorder_tweetstream.py Mon Dec 10 11:00:57 2012 +0100 +++ b/script/stream/recorder_tweetstream.py Mon Dec 10 12:53:09 2012 +0100 @@ -184,12 +184,8 @@ finally: session.rollback() session.close() - with self.logger_queue.mutex: - self.logger_queue.clear() - self.logger_queue.close() - with self.queue.mutex: - self.queue.clear() - self.queue.close() + self.logger_queue.close() + self.queue.close() self.stream.close() self.stream = None if not self.stop_event.is_set(): @@ -205,14 +201,15 @@ source_stream_iter_thread.start() - while not self.stop_event.is_set(): + while not self.stop_event.is_set(): + self.logger.info("SourceProcess : In while after start") self.stop_event.wait(DEFAULT_TIMEOUT) if self.stop_event.is_set() and self.stream: self.stream.close() elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive: - with self.stop_event.mutex: - self.stop_event.set() + self.stop_event.set() + self.logger.info("SourceProcess : join") source_stream_iter_thread.join(30) @@ -283,10 +280,6 @@ finally: session.rollback() self.stop_event.set() - with self.logger_queue.mutex: - self.logger_queue.clear() - with self.queue.mutex: - self.queue.clear() session.close()