correct. remove mutex and clear
--- a/script/lib/iri_tweet/iri_tweet/stream.py Mon Dec 10 11:00:57 2012 +0100
+++ b/script/lib/iri_tweet/iri_tweet/stream.py Mon Dec 10 12:53:09 2012 +0100
@@ -45,7 +45,7 @@
elif len(chunk) < chunk_size/2 and chunk_size < max_chunk_size:
chunk_size = max(chunk_size/2,1)
yield chunk
- except SSLError as e:
+ except requests.exceptions.SSLError as e:
if e.errno == 2:
# Apparently this means there was nothing in the socket buf
pass
@@ -272,7 +272,7 @@
class FilterStream(BaseStream):
- url = "https://stream.twitter.com/1/statuses/filter.json"
+ url = "https://stream.twitter.com/1.1/statuses/filter.json"
def __init__(self, auth, follow=None, locations=None,
track=None, catchup=None, url=None, raw=False, timeout=None, compressed=False, chunk_size=4096):
--- a/script/stream/recorder_tweetstream.py Mon Dec 10 11:00:57 2012 +0100
+++ b/script/stream/recorder_tweetstream.py Mon Dec 10 12:53:09 2012 +0100
@@ -184,12 +184,8 @@
finally:
session.rollback()
session.close()
- with self.logger_queue.mutex:
- self.logger_queue.clear()
- self.logger_queue.close()
- with self.queue.mutex:
- self.queue.clear()
- self.queue.close()
+ self.logger_queue.close()
+ self.queue.close()
self.stream.close()
self.stream = None
if not self.stop_event.is_set():
@@ -205,14 +201,15 @@
source_stream_iter_thread.start()
- while not self.stop_event.is_set():
+ while not self.stop_event.is_set():
+ self.logger.info("SourceProcess : In while after start")
self.stop_event.wait(DEFAULT_TIMEOUT)
if self.stop_event.is_set() and self.stream:
self.stream.close()
elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive:
- with self.stop_event.mutex:
- self.stop_event.set()
+ self.stop_event.set()
+ self.logger.info("SourceProcess : join")
source_stream_iter_thread.join(30)
@@ -283,10 +280,6 @@
finally:
session.rollback()
self.stop_event.set()
- with self.logger_queue.mutex:
- self.logger_queue.clear()
- with self.queue.mutex:
- self.queue.clear()
session.close()