after update to requests 1.0.2, do some cleaning: remove tweetstream and tweepy
authorYves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
Tue, 18 Dec 2012 12:26:05 +0100
changeset 883 8ae3d91ea4ae
parent 882 b46cfa1d188b
child 884 07f1c6854df9
after update to requests 1.0.2, do some cleaning: remove tweetstream and tweepy
script/lib/iri_tweet/iri_tweet/__init__.py
script/lib/iri_tweet/iri_tweet/stream.py
script/lib/iri_tweet/iri_tweet/utils.py
script/lib/tweetstream/CHANGELOG
script/lib/tweetstream/LICENSE
script/lib/tweetstream/README
script/lib/tweetstream/conftest.py
script/lib/tweetstream/servercontext.py
script/lib/tweetstream/setup.py
script/lib/tweetstream/tests/test_tweetstream.py
script/lib/tweetstream/tox.ini
script/lib/tweetstream/tweetstream/__init__.py
script/lib/tweetstream/tweetstream/deprecated.py
script/lib/tweetstream/tweetstream/streamclasses.py
script/stream/recorder_stream.py
script/stream/recorder_tweetstream.py
script/virtualenv/res/SQLAlchemy-0.7.8.tar.gz
script/virtualenv/res/blessings-1.5.tar.gz
script/virtualenv/res/lib/lib_create_env.py
script/virtualenv/res/requests-0.13.1.tar.gz
script/virtualenv/res/src/tweepy-1.12.tar.gz
script/virtualenv/res/twitter-1.9.0.tar.gz
script/virtualenv/script/res/res_create_env.py
--- a/script/lib/iri_tweet/iri_tweet/__init__.py	Tue Dec 18 10:40:15 2012 +0100
+++ b/script/lib/iri_tweet/iri_tweet/__init__.py	Tue Dec 18 12:26:05 2012 +0100
@@ -33,7 +33,7 @@
 
 
 class IRITweetError(Exception):
-    """Base class for all tweetstream errors"""
+    """Base class for all IRITweet errors"""
     pass
 
 
--- a/script/lib/iri_tweet/iri_tweet/stream.py	Tue Dec 18 10:40:15 2012 +0100
+++ b/script/lib/iri_tweet/iri_tweet/stream.py	Tue Dec 18 12:26:05 2012 +0100
@@ -68,7 +68,7 @@
 
     """A network connection to Twitters streaming API
 
-    :param auth: tweepy auth object.
+    :param auth: requests auth object.
     :keyword catchup: Number of tweets from the past to get before switching to
       live stream.
     :keyword raw: If True, return each tweet's raw data direct from the socket,
@@ -168,14 +168,11 @@
         if self._catchup_count:
             postdata["count"] = self._catchup_count
         
-        if self._auth:
-            self._auth.apply_auth(self.url, "POST", headers, postdata)
-
         if self._logger : self._logger.debug("BaseStream init connection url " + repr(self.url))
         if self._logger : self._logger.debug("BaseStream init connection headers " + repr(headers))
         if self._logger : self._logger.debug("BaseStream init connection data " + repr(postdata))
         
-        self._resp = requests.post(self.url, headers=headers, data=postdata, prefetch=False)
+        self._resp = requests.post(self.url, auth=self._auth, headers=headers, data=postdata, stream=True)
         if self._logger : self._logger.debug("BaseStream init connection " + repr(self._resp))
         
         self._resp.raise_for_status()
--- a/script/lib/iri_tweet/iri_tweet/utils.py	Tue Dec 18 10:40:15 2012 +0100
+++ b/script/lib/iri_tweet/iri_tweet/utils.py	Tue Dec 18 12:26:05 2012 +0100
@@ -1,7 +1,7 @@
-from models import (Tweet, User, Hashtag, EntityHashtag, EntityUser, Url, 
-    EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY, 
-    ACCESS_TOKEN_SECRET, adapt_date, adapt_json, TweetSource, TweetLog, MediaType, 
-    Media, EntityMedia, Entity, EntityType)
+from models import (Tweet, User, Hashtag, EntityHashtag, EntityUser, Url, #@UnresolvedImport
+    EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY,#@UnresolvedImport
+    ACCESS_TOKEN_SECRET, adapt_date, adapt_json, TweetSource, TweetLog, MediaType, #@UnresolvedImport 
+    Media, EntityMedia, Entity, EntityType) #@UnresolvedImport
 from sqlalchemy.sql import select, or_ #@UnresolvedImport
 import Queue #@UnresolvedImport
 import anyjson #@UnresolvedImport
--- a/script/lib/tweetstream/CHANGELOG	Tue Dec 18 10:40:15 2012 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,68 +0,0 @@
-0.1
-
- - Initial version
-
-0.2
-
- - Improved error handling
- - Added AuthenticationError and ConnectionError exceptions
- - Added ReconnectingTweetStream class that supports automatically
-   reconnecting if the connection is dropped
-
-0.3
-
- - Fixed bugs in authtentication
- - Added TrackStream and FollowStream classes
- - Added list of endpoint names, and made them legal values for the url arg
-
-0.3.1
-
- - Added lots of tests
- - Added proper handling of keepalive newlines
- - Improved handling of closing streams
- - Added missing anyjson dependency to setup
- - Fixed bug where newlines and malformed content were counted as a tweet
-
-0.3.2
-
- - This release was skipped over, due to maintainer brainfart.
-
-0.3.3
-
- - Fixed setup.py so it wont attempt to load modules that aren't installed
-   yet. Fixes installation issue.
-
-0.3.4
-
- - Updated to latest twitter streaming urls
- - Fixed a bug where we tried to call a method on None
-
-0.3.5
-
- - Removed a spurious print statement left over from debugging
- - Introduced common base class for all tweetstream exceptions
- - Make sure we raise a sensible error on 404. Include url in desc of that error
-
-0.3.6
-
- - Added LocationStream class for filtering on location bounding boxes.
-
-1.0.0
-
- - Changed API to match latest twitter endpoints. This adds SampleStream and
-   FilterStream and deprecates TweetStream, FollowStream, LocationStream,
-   TrackStream and ReconnectingTweetStream.
-
-1.1.0
-
- - Fixed issues #2 and #12, related to low volume streams not yielding tweets
-   until a relatively large buffer was filled. This meant that tweets would
-   arrive in bunches, not immediatly.
- - Switched to HTTPS urls for streams. Twitter will switch off HTTP streams
-   on 29. sept. 2011.
- - Added support for Python 3
-
-1.1.1
-
- - Fixed issue #16. Odd case where python_version_tuple was returning ints
-   rather than the strings the docs promis. Make sure we always cast to int.
--- a/script/lib/tweetstream/LICENSE	Tue Dec 18 10:40:15 2012 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,27 +0,0 @@
-Copyright (c) 2009, Rune Halvorsen
-All rights reserved.
-
-Redistribution and use in source and binary forms, with or without
-modification, are permitted provided that the following conditions are met:
-
-    * Redistributions of source code must retain the above copyright notice,
-      this list of conditions and the following disclaimer.
-    * Redistributions in binary form must reproduce the above copyright
-      notice, this list of conditions and the following disclaimer in the
-      documentation and/or other materials provided with the distribution.
-
-Neither the name of Rune Halvorsen nor the names of its contributors may be
-used to endorse or promote products derived from this software without
-specific prior written permission.
-
-THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
-AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
-THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
-PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS
-BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
-CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
-SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
-CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
-POSSIBILITY OF SUCH DAMAGE.
--- a/script/lib/tweetstream/README	Tue Dec 18 10:40:15 2012 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,110 +0,0 @@
-.. -*- restructuredtext -*-
-
-##########################################
-tweetstream - Simple twitter streaming API
-##########################################
-
-Introduction
-------------
-
-tweetstream provides two classes, SampleStream and FollowStream, that can be
-used to get tweets from Twitter's streaming API. An instance of one of the
-classes can be used as an iterator. In addition to fetching tweets, the 
-object keeps track of the number of tweets collected and the rate at which
-tweets are received.
-
-SampleStream delivers a sample of all tweets. FilterStream delivers
-tweets that match one or more criteria. Note that it's not possible
-to get all tweets without access to the "firehose" stream, which
-is not currently avaliable to the public.
-
-Twitter's documentation about the streaming API can be found here:
-http://dev.twitter.com/pages/streaming_api_methods .
-
-**Note** that the API is blocking. If for some reason data is not immediatly
-available, calls will block until enough data is available to yield a tweet.
-
-Examples
---------
-
-Printing incoming tweets:
-
->>> stream = tweetstream.SampleStream("username", "password")
->>> for tweet in stream:
-...     print tweet
-
-
-The stream object can also be used as a context, as in this example that
-prints the author for each tweet as well as the tweet count and rate:
-
->>> with tweetstream.SampleStream("username", "password") as stream
-...     for tweet in stream:
-...         print "Got tweet from %-16s\t( tweet %d, rate %.1f tweets/sec)" % (
-...                 tweet["user"]["screen_name"], stream.count, stream.rate )
-
-
-Stream objects can raise ConnectionError or AuthenticationError exceptions:
-
->>> try:
-...     with tweetstream.TweetStream("username", "password") as stream
-...         for tweet in stream:
-...             print "Got tweet from %-16s\t( tweet %d, rate %.1f tweets/sec)" % (
-...                     tweet["user"]["screen_name"], stream.count, stream.rate )
-... except tweetstream.ConnectionError, e:
-...     print "Disconnected from twitter. Reason:", e.reason
-
-To get tweets that match specific criteria, use the FilterStream. FilterStreams
-take three keyword arguments: "locations", "follow" and "track".
-
-Locations are a list of bounding boxes in which geotagged tweets should originate.
-The argument should be an iterable of longitude/latitude pairs.
-
-Track specifies keywords to track. The argument should be an iterable of
-strings.
-
-Follow returns statuses that reference given users. Argument should be an iterable
-of twitter user IDs. The IDs are userid ints, not the screen names. 
-
->>> words = ["opera", "firefox", "safari"]
->>> people = [123,124,125]
->>> locations = ["-122.75,36.8", "-121.75,37.8"]
->>> with tweetstream.FilterStream("username", "password", track=words,
-...                               follow=people, locations=locations) as stream
-...     for tweet in stream:
-...         print "Got interesting tweet:", tweet
-
-
-Deprecated classes
-------------------
-
-tweetstream used to contain the classes TweetStream, FollowStream, TrackStream
-LocationStream and ReconnectingTweetStream. These were deprecated when twitter
-changed its API end points. The same functionality is now available in
-SampleStream and FilterStream. The deprecated methods will emit a warning when
-used, but will remain functional for a while longer.
-
-
-Changelog
----------
-
-See the CHANGELOG file
-
-Contact
--------
-
-The author is Rune Halvorsen <runefh@gmail.com>. The project resides at
-http://bitbucket.org/runeh/tweetstream . If you find bugs, or have feature
-requests, please report them in the project site issue tracker. Patches are
-also very welcome.
-
-Contributors
-------------
-
-- Rune Halvorsen
-- Christopher Schierkolk
-
-License
--------
-
-This software is licensed under the ``New BSD License``. See the ``LICENCE``
-file in the top distribution directory for the full license text.
--- a/script/lib/tweetstream/conftest.py	Tue Dec 18 10:40:15 2012 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,10 +0,0 @@
-# content of conftest.py
-
-import pytest
-def pytest_addoption(parser):
-    parser.addoption("--runslow", action="store_true",
-        help="run slow tests")
-
-def pytest_runtest_setup(item):
-    if 'slow' in item.keywords and not item.config.getvalue("runslow"):
-        pytest.skip("need --runslow option to run")
\ No newline at end of file
--- a/script/lib/tweetstream/servercontext.py	Tue Dec 18 10:40:15 2012 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,221 +0,0 @@
-import threading
-import contextlib
-import time
-import os
-import socket
-import random
-from functools import partial
-from inspect import isclass
-from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
-from SimpleHTTPServer import SimpleHTTPRequestHandler
-from SocketServer import BaseRequestHandler
-
-
-class ServerError(Exception):
-    pass
-
-
-class ServerContext(object):
-    """Context object with information about a running test server."""
-
-    def __init__(self, address, port):
-        self.address = address or "localhost"
-        self.port = port
-
-    @property
-    def baseurl(self):
-        return "http://%s:%s" % (self.address, self.port)
-
-    def __str__(self):
-        return "<ServerContext %s >" % self.baseurl
-
-    __repr__ = __str__
-
-
-class _SilentSimpleHTTPRequestHandler(SimpleHTTPRequestHandler):
-
-    def __init__(self, *args, **kwargs):
-        self.logging = kwargs.get("logging", False)
-        SimpleHTTPRequestHandler.__init__(self, *args, **kwargs)
-
-    def log_message(self, *args, **kwargs):
-        if self.logging:
-            SimpleHTTPRequestHandler.log_message(self, *args, **kwargs)
-
-
-class _TestHandler(BaseHTTPRequestHandler):
-    """RequestHandler class that handles requests that use a custom handler
-    callable."""
-
-    def __init__(self, handler, methods, *args, **kwargs):
-        self._handler = handler
-        self._methods = methods
-        self._response_sent = False
-        self._headers_sent = False
-        self.logging = kwargs.get("logging", False)
-        BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
-
-    def log_message(self, *args, **kwargs):
-        if self.logging:
-            BaseHTTPRequestHandler.log_message(self, *args, **kwargs)
-
-    def send_response(self, *args, **kwargs):
-        self._response_sent = True
-        BaseHTTPRequestHandler.send_response(self, *args, **kwargs)
-
-    def end_headers(self, *args, **kwargs):
-        self._headers_sent = True
-        BaseHTTPRequestHandler.end_headers(self, *args, **kwargs)
-
-    def _do_whatever(self):
-        """Called in place of do_METHOD"""
-        data = self._handler(self)
-
-        if hasattr(data, "next"):
-            # assume it's something supporting generator protocol
-            self._handle_with_iterator(data)
-        else:
-            # Nothing more to do then.
-            pass
-
-
-    def __getattr__(self, name):
-        if name.startswith("do_") and name[3:].lower() in self._methods:
-            return self._do_whatever
-        else:
-            # fixme instance or class?
-            raise AttributeError(name)
-
-    def _handle_with_iterator(self, iterator):
-        self.connection.settimeout(0.1)
-        for data in iterator:
-            if not self.server.server_thread.running:
-                return
-
-            if not self._response_sent:
-                self.send_response(200)
-            if not self._headers_sent:
-                self.end_headers()
-
-            self.wfile.write(data)
-            # flush immediatly. We may want to do trickling writes
-            # or something else tha trequires bypassing normal caching
-            self.wfile.flush()
-
-class _TestServerThread(threading.Thread):
-    """Thread class for a running test server"""
-
-    def __init__(self, handler, methods, cwd, port, address):
-        threading.Thread.__init__(self)
-        self.startup_finished = threading.Event()
-        self._methods = methods
-        self._cwd = cwd
-        self._orig_cwd = None
-        self._handler = self._wrap_handler(handler, methods)
-        self._setup()
-        self.running = True
-        self.serverloc = (address, port)
-        self.error = None
-
-    def _wrap_handler(self, handler, methods):
-        if isclass(handler) and issubclass(handler, BaseRequestHandler):
-            return handler # It's OK. user passed in a proper handler
-        elif callable(handler):
-            return partial(_TestHandler, handler, methods)
-            # it's a callable, so wrap in a req handler
-        else:
-            raise ServerError("handler must be callable or RequestHandler")
-
-    def _setup(self):
-        if self._cwd != "./":
-            self._orig_cwd = os.getcwd()
-            os.chdir(self._cwd)
-
-    def _init_server(self):
-        """Hooks up the server socket"""
-        try:
-            if self.serverloc[1] == "random":
-                retries = 10 # try getting an available port max this many times
-                while True:
-                    try:
-                        self.serverloc = (self.serverloc[0],
-                                          random.randint(1025, 49151))
-                        self._server = HTTPServer(self.serverloc, self._handler)
-                    except socket.error:
-                        retries -= 1
-                        if not retries: # not able to get a port.
-                            raise
-                    else:
-                        break
-            else: # use specific port. this might throw, that's expected
-                self._server = HTTPServer(self.serverloc, self._handler)
-        except socket.error, e:
-            self.running = False
-            self.error = e
-            # set this here, since we'll never enter the serve loop where
-            # it is usually set:
-            self.startup_finished.set()
-            return
-
-        self._server.allow_reuse_address = True # lots of tests, same port
-        self._server.timeout = 0.1
-        self._server.server_thread = self
-
-
-    def run(self):
-        self._init_server()
-
-        while self.running:
-            self._server.handle_request() # blocks for self.timeout secs
-            # First time this falls through, signal the parent thread that
-            # the server is ready for incomming connections
-            if not self.startup_finished.is_set():
-                self.startup_finished.set()
-
-        self._cleanup()
-
-    def stop(self):
-        """Stop the server and attempt to make the thread terminate.
-        This happens async but the calling code can check periodically
-        the isRunning flag on the thread object.
-        """
-        # actual stopping happens in the run method
-        self.running = False
-
-    def _cleanup(self):
-        """Do some rudimentary cleanup."""
-        if self._orig_cwd:
-            os.chdir(self._orig_cwd)
-
-
-@contextlib.contextmanager
-def test_server(handler=_SilentSimpleHTTPRequestHandler, port=8514,
-                address="", methods=("get", "head"), cwd="./"):
-    """Context that makes available a web server in a separate thread"""
-    thread = _TestServerThread(handler=handler, methods=methods, cwd=cwd,
-                               port=port, address=address)
-    thread.start()
-
-    # fixme: should this be daemonized? If it isn't it will block the entire
-    # app, but that should never happen anyway..
-    thread.startup_finished.wait()
-
-    if thread.error: # startup failed! Bail, throw whatever the server did
-        raise thread.error
-
-    exc = None
-    try:
-        yield ServerContext(*thread.serverloc)
-    except Exception, exc:
-        pass
-    thread.stop()
-    thread.join(5) # giving it a lot of leeway. should never happen
-
-    if exc:
-        raise exc
-
-    # fixme: this takes second priorty after the internal exception but would
-    # still be nice to signal back to calling code.
-
-    if thread.isAlive():
-        raise Warning("Test server could not be stopped")
--- a/script/lib/tweetstream/setup.py	Tue Dec 18 10:40:15 2012 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,82 +0,0 @@
-#@PydevCodeAnalysisIgnore
-import sys
-import os
-
-extra = {}
-if sys.version_info >= (3, 0):
-    extra.update(use_2to3=True)
-
-
-try:
-    from setuptools import setup, find_packages
-except ImportError:
-    from distutils.core import setup, find_packages
-
-
-# -*- Distribution Meta -*-
-import re
-re_meta = re.compile(r'__(\w+?)__\s*=\s*(.*)')
-re_vers = re.compile(r'VERSION\s*=\s*\((.*?)\)')
-re_doc = re.compile(r'^"""(.+?)"""', re.M|re.S)
-rq = lambda s: s.strip("\"'")
-
-
-def add_default(m):
-    attr_name, attr_value = m.groups()
-    return ((attr_name, rq(attr_value)), )
-
-
-def add_version(m):
-    v = list(map(rq, m.groups()[0].split(", ")))
-    return (("VERSION", ".".join(v[0:3]) + "".join(v[3:])), )
-
-
-def add_doc(m):
-    return (("doc", m.groups()[0].replace("\n", " ")), )
-
-pats = {re_meta: add_default,
-        re_vers: add_version}
-here = os.path.abspath(os.path.dirname(__file__))
-meta_fh = open(os.path.join(here, "tweetstream/__init__.py"))
-try:
-    meta = {}
-    acc = []
-    for line in meta_fh:
-        if line.strip() == '# -eof meta-':
-            break
-        acc.append(line)
-        for pattern, handler in pats.items():
-            m = pattern.match(line.strip())
-            if m:
-                meta.update(handler(m))
-    m = re_doc.match("".join(acc).strip())
-    if m:
-        meta.update(add_doc(m))
-finally:
-    meta_fh.close()
-
-
-setup(name='tweetstream',
-    version=meta["VERSION"],
-    description=meta["doc"],
-    long_description=open("README").read(),
-    classifiers=[
-        'License :: OSI Approved :: BSD License',
-        'Intended Audience :: Developers',
-        'Programming Language :: Python :: 2.6',
-        'Programming Language :: Python :: 2.7',
-        'Programming Language :: Python :: 3',
-        'Programming Language :: Python :: 3.1',
-    ],
-    keywords='twitter',
-    author=meta["author"],
-    author_email=meta["contact"],
-    url=meta["homepage"],
-    license='BSD',
-    packages=find_packages(exclude=['ez_setup', 'examples', 'tests']),
-    include_package_data=True,
-    zip_safe=False,
-    platforms=["any"],
-    install_requires=['anyjson'],
-    **extra
-)
--- a/script/lib/tweetstream/tests/test_tweetstream.py	Tue Dec 18 10:40:15 2012 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,188 +0,0 @@
-import contextlib
-import threading
-import time
-
-from tweetstream import TweetStream, FollowStream, TrackStream, LocationStream
-from tweetstream import ConnectionError, AuthenticationError, SampleStream, FilterStream
-from tweepy.auth import BasicAuthHandler   
-
-import pytest
-from pytest import raises
-slow = pytest.mark.slow
-
-from servercontext import test_server
-
-single_tweet = r"""{"in_reply_to_status_id":null,"in_reply_to_user_id":null,"favorited":false,"created_at":"Tue Jun 16 10:40:14 +0000 2009","in_reply_to_screen_name":null,"text":"record industry just keeps on amazing me: http:\/\/is.gd\/13lFo - $150k per song you've SHARED, not that somebody has actually DOWNLOADED.","user":{"notifications":null,"profile_background_tile":false,"followers_count":206,"time_zone":"Copenhagen","utc_offset":3600,"friends_count":191,"profile_background_color":"ffffff","profile_image_url":"http:\/\/s3.amazonaws.com\/twitter_production\/profile_images\/250715794\/profile_normal.png","description":"Digital product developer, currently at Opera Software. My tweets are my opinions, not those of my employer.","verified_profile":false,"protected":false,"favourites_count":0,"profile_text_color":"3C3940","screen_name":"eiriksnilsen","name":"Eirik Stridsklev N.","following":null,"created_at":"Tue May 06 12:24:12 +0000 2008","profile_background_image_url":"http:\/\/s3.amazonaws.com\/twitter_production\/profile_background_images\/10531192\/160x600opera15.gif","profile_link_color":"0099B9","profile_sidebar_fill_color":"95E8EC","url":"http:\/\/www.stridsklev-nilsen.no\/eirik","id":14672543,"statuses_count":506,"profile_sidebar_border_color":"5ED4DC","location":"Oslo, Norway"},"id":2190767504,"truncated":false,"source":"<a href=\"http:\/\/widgets.opera.com\/widget\/7206\">Twitter Opera widget<\/a>"}""" + "\r"
-
-
-def parameterized(funcarglist):
-    def wrapper(function):
-        function.funcarglist = funcarglist
-        return function
-    return wrapper
-
-def pytest_generate_tests(metafunc):
-    for funcargs in getattr(metafunc.function, 'funcarglist', ()):
-        metafunc.addcall(funcargs=funcargs)
-
-
-streamtypes = [
-    dict(cls=TweetStream, args=[], kwargs=dict()),
-    dict(cls=SampleStream, args=[], kwargs=dict()),
-    dict(cls=FilterStream, args=[], kwargs=dict(track=("test",))),
-    dict(cls=FollowStream, args=[[1, 2, 3]], kwargs=dict()),
-    dict(cls=TrackStream, args=["opera"], kwargs=dict()),
-    dict(cls=LocationStream, args=["123,4321"], kwargs=dict())
-]
-
-
-@parameterized(streamtypes)
-def test_bad_auth(cls, args, kwargs):
-    """Test that the proper exception is raised when the user could not be
-    authenticated"""
-    def auth_denied(request):
-        request.send_error(401)
-
-    with raises(AuthenticationError):
-        with test_server(handler=auth_denied, methods=("post", "get"), port="random") as server:
-            auth = BasicAuthHandler("user", "passwd")
-            stream = cls(auth, *args, url=server.baseurl)
-            for e in stream: pass
-
-
-@parameterized(streamtypes)
-def test_404_url(cls, args, kwargs):
-    """Test that the proper exception is raised when the stream URL can't be
-    found"""
-    def not_found(request):
-        request.send_error(404)
-
-    with raises(ConnectionError):
-        with test_server(handler=not_found, methods=("post", "get"), port="random") as server:
-            auth = BasicAuthHandler("user", "passwd")
-            stream = cls(auth, *args, url=server.baseurl)
-            for e in stream: pass
-
-
-@parameterized(streamtypes)
-def test_bad_content(cls, args, kwargs):
-    """Test error handling if we are given invalid data"""
-    def bad_content(request):
-        for n in xrange(10):
-            # what json we pass doesn't matter. It's not verifying the
-            # strcuture, only checking that it's parsable
-            yield "[1,2,3]\r"
-        yield "[1,2, I need no stinking close brace\r"
-        yield "[1,2,3]\r"
-
-
-    with raises(ConnectionError):
-        with test_server(handler=bad_content, methods=("post", "get"), port="random") as server:
-            auth = BasicAuthHandler("user", "passwd")
-            stream = cls(auth, *args, url=server.baseurl)
-            for tweet in stream:
-                pass
-
-
-@parameterized(streamtypes)
-def test_closed_connection(cls, args, kwargs):
-    """Test error handling if server unexpectedly closes connection"""
-    cnt = 1000
-    def bad_content(request):
-        for n in xrange(cnt):
-            # what json we pass doesn't matter. It's not verifying the
-            # strcuture, only checking that it's parsable
-            yield "[1,2,3]\r"
-
-    with raises(ConnectionError):
-        with test_server(handler=bad_content, methods=("post", "get"), port="random") as server:
-            auth = BasicAuthHandler("foo", "bar")
-            stream = cls(auth, *args, url=server.baseurl)
-            for tweet in stream:
-                pass
-
-
-@parameterized(streamtypes)
-def test_bad_host(cls, args, kwargs):
-    """Test behaviour if we can't connect to the host"""
-    with raises(ConnectionError):
-        stream = cls("username", "passwd", *args, url="http://wedfwecfghhreewerewads.foo")
-        stream.next()
-
-
-@parameterized(streamtypes)
-def smoke_test_receive_tweets(cls, args, kwargs):
-    """Receive 100k tweets and disconnect (slow)"""
-    total = 100000
-
-    def tweetsource(request):
-        while True:
-            yield single_tweet + "\n"
-
-    with test_server(handler=tweetsource, methods=("post", "get"), port="random") as server:
-        auth = BasicAuthHandler("foo", "bar")
-        stream = cls(auth, *args, url=server.baseurl)
-        for tweet in stream:
-            if stream.count == total:
-                break
-
-
-@parameterized(streamtypes)
-def test_keepalive(cls, args, kwargs):
-    """Make sure we behave sanely when there are keepalive newlines in the
-    data recevived from twitter"""
-    def tweetsource(request):
-        yield single_tweet+"\n"
-        yield "\n"
-        yield "\n"
-        yield single_tweet+"\n"
-        yield "\n"
-        yield "\n"
-        yield "\n"
-        yield "\n"
-        yield "\n"
-        yield "\n"
-        yield "\n"
-        yield single_tweet+"\n"
-        yield "\n"
-
-
-    with test_server(handler=tweetsource, methods=("post", "get"), port="random") as server:
-        auth = BasicAuthHandler("foo", "bar")
-        stream = cls(auth, *args, url=server.baseurl)
-        try:
-            for tweet in stream:
-                pass
-        except ConnectionError:
-            assert stream.count == 3, "Got %s, wanted 3" % stream.count
-        else:
-            assert False, "Didn't handle keepalive"
-
-
-@slow
-@parameterized(streamtypes)
-def test_buffering(cls, args, kwargs):
-    """Test if buffering stops data from being returned immediately.
-    If there is some buffering in play that might mean data is only returned
-    from the generator when the buffer is full. If buffer is bigger than a
-    tweet, this will happen. Default buffer size in the part of socket lib
-    that enables readline is 8k. Max tweet length is around 3k."""
-
-    def tweetsource(request):
-        yield single_tweet+"\n"
-        time.sleep(2)
-        # need to yield a bunch here so we're sure we'll return from the
-        # blocking call in case the buffering bug is present.
-        for n in xrange(100):
-            yield single_tweet+"\n"
-
-
-    with test_server(handler=tweetsource, methods=("post", "get"), port="random") as server:
-        auth = BasicAuthHandler("foo", "bar")
-        stream = cls(auth, *args, url=server.baseurl)
-        start = time.time()
-        stream.next()
-        first = time.time()
-        diff = first - start
-        assert diff < 1, "Getting first tweet took more than a second!"
-
--- a/script/lib/tweetstream/tox.ini	Tue Dec 18 10:40:15 2012 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,16 +0,0 @@
-[tox]
-envlist = py25,py26,py27,py30,py31,py32
-
-[testenv]
-deps=pytest
-sitepackages=False
-commands=py.test --runslow
-
-[testenv:py30]
-changedir = .tox
-
-[testenv:py31]
-changedir = .tox
-
-[testenv:py32]
-changedir = .tox
\ No newline at end of file
--- a/script/lib/tweetstream/tweetstream/__init__.py	Tue Dec 18 10:40:15 2012 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,45 +0,0 @@
-"""Simple access to Twitter's streaming API"""
-
-VERSION = (1, 1, 1, 'iri')
-__version__ = ".".join(map(str, VERSION[0:3])) + "".join(VERSION[3:])
-__author__ = "Rune Halvorsen"
-__contact__ = "runefh@gmail.com"
-__homepage__ = "http://bitbucket.org/runeh/tweetstream/"
-__docformat__ = "restructuredtext"
-
-# -eof meta-
-
-
-"""
- .. data:: USER_AGENT
-
-     The default user agent string for stream objects
-"""
-
-USER_AGENT = "TweetStream %s" % __version__
-
-
-class TweetStreamError(Exception):
-    """Base class for all tweetstream errors"""
-    pass
-
-
-class AuthenticationError(TweetStreamError):
-    """Exception raised if the username/password is not accepted"""
-    pass
-
-
-class ConnectionError(TweetStreamError):
-    """Raised when there are network problems. This means when there are
-    dns errors, network errors, twitter issues"""
-
-    def __init__(self, reason, details=None):
-        self.reason = reason
-        self.details = details
-
-    def __str__(self):
-        return '<ConnectionError %s>' % self.reason
-
-
-from .streamclasses import SampleStream, FilterStream
-from .deprecated import FollowStream, TrackStream, LocationStream, TweetStream, ReconnectingTweetStream
--- a/script/lib/tweetstream/tweetstream/deprecated.py	Tue Dec 18 10:40:15 2012 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,82 +0,0 @@
-from .streamclasses import FilterStream, SampleStream, ConnectionError
-import time
-
-class DeprecatedStream(FilterStream):
-    def __init__(self, *args, **kwargs):
-        import warnings
-        warnings.warn("%s is deprecated. Use FilterStream instead" % self.__class__.__name__, DeprecationWarning)
-        super(DeprecatedStream, self).__init__(*args, **kwargs)
-
-
-class FollowStream(DeprecatedStream):
-    def __init__(self, auth, follow, catchup=None, url=None):
-        super(FollowStream, self).__init__(auth, follow=follow, catchup=catchup, url=url)
-
-
-class TrackStream(DeprecatedStream):
-    def __init__(self, auth, track, catchup=None, url=None, slow=False):
-        super(TrackStream, self).__init__(auth, track=track, catchup=catchup, url=url)
-
-
-class LocationStream(DeprecatedStream):
-    def __init__(self, auth, locations, catchup=None, url=None, slow=False):
-        super(LocationStream, self).__init__(auth, locations=locations, catchup=catchup, url=url)
-
-
-class TweetStream(SampleStream):
-    def __init__(self, *args, **kwargs):
-        import warnings
-        warnings.warn("%s is deprecated. Use SampleStream instead" % self.__class__.__name__, DeprecationWarning)
-        SampleStream.__init__(self, *args, **kwargs)
-
-
-class ReconnectingTweetStream(TweetStream):
-    """TweetStream class that automatically tries to reconnect if the
-    connecting goes down. Reconnecting, and waiting for reconnecting, is
-    blocking.
-
-    :param username: See :TweetStream:
-
-    :param password: See :TweetStream:
-
-    :keyword url: See :TweetStream:
-
-    :keyword reconnects: Number of reconnects before a ConnectionError is
-        raised. Default is 3
-
-    :error_cb: Optional callable that will be called just before trying to
-        reconnect. The callback will be called with a single argument, the
-        exception that caused the reconnect attempt. Default is None
-
-    :retry_wait: Time to wait before reconnecting in seconds. Default is 5
-
-    """
-
-    def __init__(self, auth, url="sample",
-                 reconnects=3, error_cb=None, retry_wait=5):
-        self.max_reconnects = reconnects
-        self.retry_wait = retry_wait
-        self._reconnects = 0
-        self._error_cb = error_cb
-        TweetStream.__init__(self, auth, url=url)
-
-    def next(self):
-        while True:
-            try:
-                return TweetStream.next(self)
-            except ConnectionError, e:
-                self._reconnects += 1
-                if self._reconnects > self.max_reconnects:
-                    raise ConnectionError("Too many retries")
-
-                # Note: error_cb is not called on the last error since we
-                # raise a ConnectionError instead
-                if  callable(self._error_cb):
-                    self._error_cb(e)
-
-                time.sleep(self.retry_wait)
-        # Don't listen to auth error, since we can't reasonably reconnect
-        # when we get one.
-
-
-
--- a/script/lib/tweetstream/tweetstream/streamclasses.py	Tue Dec 18 10:40:15 2012 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,256 +0,0 @@
-import time
-import urllib
-import urllib2
-import socket
-from platform import python_version_tuple
-import anyjson
-
-from . import AuthenticationError, ConnectionError, USER_AGENT
-
-class BaseStream(object):
-    """A network connection to Twitters streaming API
-
-    :param auth: tweepy auth object.
-    :keyword catchup: Number of tweets from the past to get before switching to
-      live stream.
-    :keyword raw: If True, return each tweet's raw data direct from the socket,
-      without UTF8 decoding or parsing, rather than a parsed object. The
-      default is False.
-    :keyword timeout: If non-None, set a timeout in seconds on the receiving
-      socket. Certain types of network problems (e.g., disconnecting a VPN)
-      can cause the connection to hang, leading to indefinite blocking that
-      requires kill -9 to resolve. Setting a timeout leads to an orderly
-      shutdown in these cases. The default is None (i.e., no timeout).
-    :keyword url: Endpoint URL for the object. Note: you should not
-      need to edit this. It's present to make testing easier.
-
-    .. attribute:: connected
-
-        True if the object is currently connected to the stream.
-
-    .. attribute:: url
-
-        The URL to which the object is connected
-
-    .. attribute:: starttime
-
-        The timestamp, in seconds since the epoch, the object connected to the
-        streaming api.
-
-    .. attribute:: count
-
-        The number of tweets that have been returned by the object.
-
-    .. attribute:: rate
-
-        The rate at which tweets have been returned from the object as a
-        float. see also :attr: `rate_period`.
-
-    .. attribute:: rate_period
-
-        The ammount of time to sample tweets to calculate tweet rate. By
-        default 10 seconds. Changes to this attribute will not be reflected
-        until the next time the rate is calculated. The rate of tweets vary
-        with time of day etc. so it's usefull to set this to something
-        sensible.
-
-    .. attribute:: user_agent
-
-        User agent string that will be included in the request. NOTE: This can
-        not be changed after the connection has been made. This property must
-        thus be set before accessing the iterator. The default is set in
-        :attr: `USER_AGENT`.
-    """
-
-    def __init__(self, auth,
-                 catchup=None, raw=False, timeout=None, url=None):
-        self._conn = None
-        self._rate_ts = None
-        self._rate_cnt = 0
-        self._auth = auth
-        self._catchup_count = catchup
-        self._raw_mode = raw
-        self._timeout = timeout
-        self._iter = self.__iter__()
-
-        self.rate_period = 10  # in seconds
-        self.connected = False
-        self.starttime = None
-        self.count = 0
-        self.rate = 0
-        self.user_agent = USER_AGENT
-        if url: self.url = url
-        
-        self.muststop = False
-
-    def __enter__(self):
-        return self
-
-    def __exit__(self, *params):
-        self.close()
-        return False
-
-    def _init_conn(self):
-        """Open the connection to the twitter server"""
-        headers = {'User-Agent': self.user_agent}
-
-        postdata = self._get_post_data() or {}
-        if self._catchup_count:
-            postdata["count"] = self._catchup_count
-
-        poststring = urllib.urlencode(postdata) if postdata else None
-        
-        if self._auth:
-            self._auth.apply_auth(self.url, "POST", headers, postdata)
-                
-        req = urllib2.Request(self.url, poststring, headers)
-
-        try:
-            self._conn = urllib2.urlopen(req, timeout=self._timeout)
-
-        except urllib2.HTTPError, exception:
-            if exception.code == 401:
-                raise AuthenticationError("Access denied")
-            elif exception.code == 404:
-                raise ConnectionError("URL not found: %s" % self.url)
-            else:  # re raise. No idea what would cause this, so want to know
-                raise
-        except urllib2.URLError, exception:
-            raise ConnectionError(exception.reason)
-
-        # This is horrible. This line grabs the raw socket (actually an ssl
-        # wrapped socket) from the guts of urllib2/httplib. We want the raw
-        # socket so we can bypass the buffering that those libs provide.
-        # The buffering is reasonable when dealing with connections that
-        # try to finish as soon as possible. With twitters' never ending
-        # connections, it causes a bug where we would not deliver tweets
-        # until the buffer was full. That's problematic for very low volume
-        # filterstreams, since you might not see a tweet for minutes or hours
-        # after they occured while the buffer fills.
-        #
-        # Oh, and the inards of the http libs are different things on in
-        # py2 and 3, so need to deal with that. py3 libs do more of what I
-        # want by default, but I wont do more special casing for it than
-        # neccessary.
-
-        major, _, _ = python_version_tuple()
-        # The cast is needed because apparently some versions return strings
-        # and some return ints.
-        # On my ubuntu with stock 2.6 I get strings, which match the docs.
-        # Someone reported the issue on 2.6.1 on macos, but that was
-        # manually built, not the bundled one. Anyway, cast for safety.
-        major = int(major)
-        if major == 2:
-            self._socket = self._conn.fp._sock.fp._sock
-        else:
-            self._socket = self._conn.fp.raw
-            # our code that reads from the socket expects a method called recv.
-            # py3 socket.SocketIO uses the name read, so alias it.
-            self._socket.recv = self._socket.read
-
-        self.connected = True
-        if not self.starttime:
-            self.starttime = time.time()
-        if not self._rate_ts:
-            self._rate_ts = time.time()
-
-    def _get_post_data(self):
-        """Subclasses that need to add post data to the request can override
-        this method and return post data. The data should be in the format
-        returned by urllib.urlencode."""
-        return None
-
-    def __muststop(self):
-        if callable(self.muststop):
-            return self.muststop()
-        else:
-            return self.muststop
-    
-    def _update_rate(self):
-        rate_time = time.time() - self._rate_ts
-        if not self._rate_ts or rate_time > self.rate_period:
-            self.rate = self._rate_cnt / rate_time
-            self._rate_cnt = 0
-            self._rate_ts = time.time()
-
-    def __iter__(self):
-        buf = b""
-        while True:
-            try:
-                if self.__muststop():
-                    raise StopIteration()
-                
-                if not self.connected:
-                    self._init_conn()
-
-                buf += self._socket.recv(8192)
-                if buf == b"":  # something is wrong
-                    self.close()
-                    raise ConnectionError("Got entry of length 0. Disconnected")
-                elif buf.isspace():
-                    buf = b""
-                elif b"\r" not in buf: # not enough data yet. Loop around
-                    continue
-
-                lines = buf.split(b"\r")
-                buf = lines[-1]
-                lines = lines[:-1]
-
-                for line in lines:
-                    if (self._raw_mode):
-                        tweet = line
-                    else:
-                        line = line.decode("utf8")
-                        try:
-                            tweet = anyjson.deserialize(line)
-                        except ValueError, e:
-                            self.close()
-                            raise ConnectionError("Got invalid data from twitter", details=line)
-
-                    if 'text' in tweet:
-                        self.count += 1
-                        self._rate_cnt += 1
-                    self._update_rate()
-                    yield tweet
-
-
-            except socket.error, e:
-                self.close()
-                raise ConnectionError("Server disconnected")
-
-
-    def next(self):
-        """Return the next available tweet. This call is blocking!"""
-        return self._iter.next()
-
-
-    def close(self):
-        """
-        Close the connection to the streaming server.
-        """
-        self.connected = False
-        if self._conn:
-            self._conn.close()
-
-
-class SampleStream(BaseStream):
-    url = "https://stream.twitter.com/1/statuses/sample.json"
-
-
-class FilterStream(BaseStream):
-    url = "https://stream.twitter.com/1/statuses/filter.json"
-
-    def __init__(self, auth, follow=None, locations=None,
-                 track=None, catchup=None, url=None, raw=False, timeout=None):
-        self._follow = follow
-        self._locations = locations
-        self._track = track
-        # remove follow, locations, track
-        BaseStream.__init__(self, auth, url=url, raw=raw, catchup=catchup, timeout=timeout)
-
-    def _get_post_data(self):
-        postdata = {}
-        if self._follow: postdata["follow"] = ",".join([str(e) for e in self._follow])
-        if self._locations: postdata["locations"] = ",".join(self._locations)
-        if self._track: postdata["track"] = ",".join(self._track)
-        return postdata
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/script/stream/recorder_stream.py	Tue Dec 18 12:26:05 2012 +0100
@@ -0,0 +1,570 @@
+from getpass import getpass
+from iri_tweet import models, utils
+from iri_tweet.models import TweetSource, TweetLog, ProcessEvent
+from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event, 
+    get_logger)
+from optparse import OptionParser
+from sqlalchemy.exc import OperationalError
+from sqlalchemy.orm import scoped_session
+import Queue
+import StringIO
+import anyjson
+import datetime
+import inspect
+import iri_tweet.stream
+import logging
+import os
+import re
+import requests.auth
+import shutil
+import signal
+import socket
+import sqlalchemy.schema
+import sys
+import threading
+import time
+import traceback
+import urllib2
+socket._fileobject.default_bufsize = 0
+
+
+
+# columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user']
+columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source']
+# columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following']
+columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count']
+# just put it in a sqlite3 tqble
+
+DEFAULT_TIMEOUT = 5
+
+def set_logging(options):
+    loggers = []
+    
+    loggers.append(utils.set_logging(options, logging.getLogger('iri.tweet')))
+    loggers.append(utils.set_logging(options, logging.getLogger('multiprocessing')))
+    if options.debug >= 2:
+        loggers.append(utils.set_logging(options, logging.getLogger('sqlalchemy.engine')))
+    # utils.set_logging(options, logging.getLogger('sqlalchemy.dialects'))
+    # utils.set_logging(options, logging.getLogger('sqlalchemy.pool'))
+    # utils.set_logging(options, logging.getLogger('sqlalchemy.orm'))
+    return loggers
+
+def set_logging_process(options, queue):
+    qlogger = utils.set_logging(options, logging.getLogger('iri.tweet.p'), queue)
+    qlogger.propagate = 0
+    return qlogger
+
+def get_auth(options, access_token):
+    if options.username and options.password:
+        auth = requests.auth.BasicAuthHandler(options.username, options.password)        
+    else:
+        consumer_key = models.CONSUMER_KEY
+        consumer_secret = models.CONSUMER_SECRET
+        auth = requests.auth.OAuth1(access_token[0], access_token[1], consumer_key, consumer_secret, signature_type='auth_header')
+    return auth
+
+
+def add_process_event(type, args, session_maker):
+    session = session_maker()
+    try:
+        evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type)
+        session.add(evt)
+        session.commit()
+    finally:
+        session.close()
+
+
+class BaseProcess(Process):
+
+    def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
+        self.parent_pid = parent_pid
+        self.session_maker = session_maker
+        self.queue = queue
+        self.options = options
+        self.logger_queue = logger_queue
+        self.stop_event = stop_event
+        self.access_token = access_token
+
+        super(BaseProcess, self).__init__()
+
+    #
+    # from http://stackoverflow.com/questions/2542610/python-daemon-doesnt-kill-its-kids
+    #
+    def parent_is_alive(self):
+        try:
+            # try to call Parent
+            os.kill(self.parent_pid, 0)
+        except OSError:
+            # *beeep* oh no! The phone's disconnected!
+            return False
+        else:
+            # *ring* Hi mom!
+            return True
+    
+
+    def __get_process_event_args(self):
+        return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__, 'access_token':self.access_token}
+
+    def run(self):
+        try:
+            add_process_event("start_worker", self.__get_process_event_args(), self.session_maker)
+            self.do_run()
+        finally:
+            add_process_event("stop_worker", self.__get_process_event_args(), self.session_maker)
+        
+    def do_run(self):
+        raise NotImplementedError()
+
+
+
+class SourceProcess(BaseProcess):
+    
+    def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
+        self.track = options.track
+        self.token_filename = options.token_filename
+        self.catchup = options.catchup
+        self.timeout = options.timeout
+        self.stream = None
+        super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
+                    
+    def __source_stream_iter(self):
+        
+        self.logger = set_logging_process(self.options, self.logger_queue)
+        self.logger.debug("SourceProcess : run ")
+        
+        self.auth = get_auth(self.options, self.access_token) 
+        self.logger.debug("SourceProcess : auth set ")
+        
+        track_list = self.track  # or raw_input('Keywords to track (comma seperated): ').strip()
+        self.logger.debug("SourceProcess : track list " + track_list)
+        
+        track_list = [k.strip() for k in track_list.split(',')]
+
+        self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list))                        
+        self.stream = iri_tweet.stream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout, chunk_size=1, logger=self.logger)
+        self.logger.debug("SourceProcess : after connecting to stream")
+        self.stream.muststop = lambda: self.stop_event.is_set()        
+        
+        stream_wrapper = iri_tweet.stream.SafeStreamWrapper(self.stream, logger=self.logger)
+        
+        session = self.session_maker()
+        
+        try:
+            for tweet in stream_wrapper:
+                if not self.parent_is_alive():
+                    self.stop_event.set()
+                    stop_thread.join(5)
+                    sys.exit()
+                self.logger.debug("SourceProcess : tweet " + repr(tweet))
+                source = TweetSource(original_json=tweet)
+                self.logger.debug("SourceProcess : source created")
+                add_retries = 0
+                while add_retries < 10:
+                    try:
+                        add_retries += 1
+                        session.add(source)
+                        session.flush()
+                        break
+                    except OperationalError as e:
+                        session.rollback()
+                        self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries))
+                        if add_retries == 10:
+                            raise
+                     
+                source_id = source.id
+                self.logger.debug("SourceProcess : before queue + source id " + repr(source_id))
+                self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (self.stream.count, self.stream.rate, int(time.time() - self.stream.starttime)))
+                session.commit()
+                self.queue.put((source_id, tweet), False)
+
+        except Exception as e:
+            self.logger.error("SourceProcess : Error when processing tweet " + repr(e))
+            raise
+        finally:
+            session.rollback()
+            session.close()
+            self.logger_queue.close()
+            self.queue.close()
+            self.stream.close()
+            self.stream = None
+            if not self.stop_event.is_set():
+                self.stop_event.set()
+
+
+    def do_run(self):
+        
+        # import pydevd
+        # pydevd.settrace(suspend=False)
+        
+        source_stream_iter_thread = threading.Thread(target=self.__source_stream_iter , name="SourceStreamIterThread")
+        
+        source_stream_iter_thread.start()
+        
+        while not self.stop_event.is_set():
+            self.logger.debug("SourceProcess : In while after start")
+            self.stop_event.wait(DEFAULT_TIMEOUT)
+            if self.stop_event.is_set() and self.stream:
+                self.stream.close()
+            elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive:
+                self.stop_event.set()
+
+        self.logger.info("SourceProcess : join")
+        source_stream_iter_thread.join(30)
+
+
+def process_tweet(tweet, source_id, session, access_token, twitter_query_user, logger):
+    try:
+        if not tweet.strip():
+            return
+        tweet_obj = anyjson.deserialize(tweet)
+        if 'text' not in tweet_obj:
+            tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
+            session.add(tweet_log)
+            return
+        screen_name = ""
+        if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
+            screen_name = tweet_obj['user']['screen_name']
+        logger.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
+        logger.debug(u"Process_tweet :" + repr(tweet))
+        processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None, twitter_query_user)
+        processor.process()
+    except ValueError as e:
+        message = u"Value Error %s processing tweet %s" % (repr(e), tweet)
+        output = StringIO.StringIO()
+        try:
+            traceback.print_exc(file=output)
+            error_stack = output.getvalue()
+        finally:
+            output.close()
+        tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'], error=message, error_stack=error_stack)
+        session.add(tweet_log)
+        session.commit()        
+    except Exception as e:
+        message = u"Error %s processing tweet %s" % (repr(e), tweet)
+        logger.exception(message)
+        output = StringIO.StringIO()
+        try:
+            traceback.print_exc(file=output)
+            error_stack = output.getvalue()
+        finally:
+            output.close()
+        session.rollback()
+        tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack)
+        session.add(tweet_log)
+        session.commit()
+
+    
+        
+class TweetProcess(BaseProcess):
+    
+    def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
+        super(TweetProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
+        self.twitter_query_user = options.twitter_query_user
+
+
+    def do_run(self):
+        
+        self.logger = set_logging_process(self.options, self.logger_queue)
+        session = self.session_maker()
+        try:
+            while not self.stop_event.is_set() and self.parent_is_alive():
+                try:
+                    source_id, tweet_txt = self.queue.get(True, 3)
+                    self.logger.debug("Processing source id " + repr(source_id))
+                except Exception as e:
+                    self.logger.debug('Process tweet exception in loop : ' + repr(e))
+                    continue
+                process_tweet(tweet_txt, source_id, session, self.access_token, self.twitter_query_user, self.logger)
+                session.commit()
+        finally:
+            session.rollback()
+            session.close()
+
+
+def get_sessionmaker(conn_str):
+    engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False)
+    Session = scoped_session(Session)
+    return Session, engine, metadata
+
+            
+def process_leftovers(session, access_token, twitter_query_user, logger):
+    
+    sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
+    
+    for src in sources:
+        tweet_txt = src.original_json
+        process_tweet(tweet_txt, src.id, session, access_token, twitter_query_user, logger)
+        session.commit()
+
+        
+    
+    # get tweet source that do not match any message
+    # select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull;
+def process_log(logger_queues, stop_event):
+    while not stop_event.is_set():
+        for lqueue in logger_queues:
+            try:
+                record = lqueue.get_nowait()
+                logging.getLogger(record.name).handle(record)
+            except Queue.Empty:
+                continue
+            except IOError:
+                continue
+        time.sleep(0.1)
+
+        
+def get_options():
+
+    usage = "usage: %prog [options]"
+
+    parser = OptionParser(usage=usage)
+
+    parser.add_option("-f", "--file", dest="conn_str",
+                      help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db")
+    parser.add_option("-u", "--user", dest="username",
+                      help="Twitter user", metavar="USER", default=None)
+    parser.add_option("-w", "--password", dest="password",
+                      help="Twitter password", metavar="PASSWORD", default=None)
+    parser.add_option("-T", "--track", dest="track",
+                      help="Twitter track", metavar="TRACK")
+    parser.add_option("-n", "--new", dest="new", action="store_true",
+                      help="new database", default=False)
+    parser.add_option("-D", "--daemon", dest="daemon", action="store_true",
+                      help="launch daemon", default=False)
+    parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
+                      help="Token file name")
+    parser.add_option("-d", "--duration", dest="duration",
+                      help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
+    parser.add_option("-N", "--nb-process", dest="process_nb",
+                      help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int')
+    parser.add_option("--url", dest="url",
+                      help="The twitter url to connect to.", metavar="URL", default=iri_tweet.stream.FilterStream.url)
+    parser.add_option("--query-user", dest="twitter_query_user", action="store_true",
+                      help="Query twitter for users", default=False, metavar="QUERY_USER")
+    parser.add_option("--catchup", dest="catchup",
+                      help="catchup count for tweets", default=None, metavar="CATCHUP", type='int')
+    parser.add_option("--timeout", dest="timeout",
+                      help="timeout for connecting in seconds", default=60, metavar="TIMEOUT", type='int')
+    
+
+
+
+    utils.set_logging_options(parser)
+
+    return parser.parse_args()
+
+
+def do_run(options, session_maker):
+
+    stop_args = {}
+
+    access_token = None
+    if not options.username or not options.password:
+        access_token = utils.get_oauth_token(options.token_filename)
+    
+    session = session_maker()
+    try:
+        process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger())
+        session.commit()
+    finally:
+        session.rollback()
+        session.close()
+    
+    if options.process_nb <= 0:
+        utils.get_logger().debug("Leftovers processed. Exiting.")
+        return None
+
+    queue = mQueue()
+    stop_event = Event()
+    
+    # workaround for bug on using urllib2 and multiprocessing
+    req = urllib2.Request('http://localhost')
+    conn = None
+    try:
+        conn = urllib2.urlopen(req)
+    except:
+        utils.get_logger().debug("could not open localhost")
+        # donothing
+    finally:
+        if conn is not None:
+            conn.close()
+    
+    process_engines = []
+    logger_queues = []
+    
+    SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
+    process_engines.append(engine_process)
+    lqueue = mQueue(50)
+    logger_queues.append(lqueue)
+    pid = os.getpid()
+    sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)    
+    
+    tweet_processes = []
+    
+    for i in range(options.process_nb - 1):
+        SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
+        process_engines.append(engine_process)
+        lqueue = mQueue(50)
+        logger_queues.append(lqueue)
+        cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
+        tweet_processes.append(cprocess)
+
+    log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,))
+    log_thread.daemon = True
+
+    log_thread.start()
+
+    sprocess.start()
+    for cprocess in tweet_processes:
+        cprocess.start()
+
+    add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name, sprocess.pid), 'consumers':dict([(p.name, p.pid) for p in tweet_processes])}, session_maker)
+
+    if options.duration >= 0:
+        end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)    
+
+    def interupt_handler(signum, frame):
+        utils.get_logger().debug("shutdown asked " + repr(signum) + "  " + repr(inspect.getframeinfo(frame, 9)))
+        stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)})
+        stop_event.set()
+        
+    signal.signal(signal.SIGINT , interupt_handler)
+    signal.signal(signal.SIGHUP , interupt_handler)
+    signal.signal(signal.SIGALRM, interupt_handler)
+    signal.signal(signal.SIGTERM, interupt_handler)
+    
+
+    while not stop_event.is_set():
+        if options.duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
+            stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts})
+            stop_event.set()
+            break
+        if sprocess.is_alive():
+            utils.get_logger().debug("Source process alive")
+            time.sleep(1)
+        else:
+            stop_args.update({'message': 'Source process killed'})
+            stop_event.set()
+            break
+    utils.get_logger().debug("Joining Source Process")
+    try:
+        sprocess.join(10)
+    except:
+        utils.get_logger().debug("Pb joining Source Process - terminating")
+        sprocess.terminate()
+        
+    for i, cprocess in enumerate(tweet_processes):
+        utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
+        try:
+            cprocess.join(3)
+        except:
+            utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
+            cprocess.terminate()
+
+    
+    utils.get_logger().debug("Close queues")
+    try:
+        queue.close()
+        for lqueue in logger_queues:
+            lqueue.close()
+    except exception as e:
+        utils.get_logger().error("error when closing queues %s", repr(e))
+        # do nothing
+        
+    
+    if options.process_nb > 1:
+        utils.get_logger().debug("Processing leftovers")
+        session = session_maker()
+        try:
+            process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger())
+            session.commit()
+        finally:
+            session.rollback()
+            session.close()
+
+    for pengine in process_engines:
+        pengine.dispose()
+
+    return stop_args
+
+
+def main(options, args):
+    
+    global conn_str
+    
+    conn_str = options.conn_str.strip()
+    if not re.match("^\w+://.+", conn_str):
+        conn_str = 'sqlite:///' + options.conn_str
+        
+    if conn_str.startswith("sqlite") and options.new:
+        filepath = conn_str[conn_str.find(":///") + 4:]
+        if os.path.exists(filepath):
+            i = 1
+            basename, extension = os.path.splitext(filepath)
+            new_path = '%s.%d%s' % (basename, i, extension)
+            while i < 1000000 and os.path.exists(new_path):
+                i += 1
+                new_path = '%s.%d%s' % (basename, i, extension)
+            if i >= 1000000:
+                raise Exception("Unable to find new filename for " + filepath)
+            else:
+                shutil.move(filepath, new_path)
+
+    Session, engine, metadata = get_sessionmaker(conn_str)
+    
+    if options.new:
+        check_metadata = sqlalchemy.schema.MetaData(bind=engine, reflect=True)
+        if len(check_metadata.sorted_tables) > 0:
+            message = "Database %s not empty exiting" % conn_str
+            utils.get_logger().error(message)
+            sys.exit(message)
+    
+    metadata.create_all(engine)
+    session = Session()
+    try:
+        models.add_model_version(session)
+    finally:
+        session.close()
+    
+    stop_args = {}
+    try:
+        add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session)
+        stop_args = do_run(options, Session)
+    except Exception as e:
+        utils.get_logger().exception("Error in main thread")        
+        outfile = StringIO.StringIO()
+        try:
+            traceback.print_exc(file=outfile)
+            stop_args = {'error': repr(e), 'message': getattr(e, 'message', ''), 'stacktrace':outfile.getvalue()}
+        finally:
+            outfile.close()
+        raise
+    finally:    
+        add_process_event(type="shutdown", args=stop_args, session_maker=Session)
+
+    utils.get_logger().debug("Done. Exiting. " + repr(stop_args))
+
+
+
+if __name__ == '__main__':
+
+    (options, args) = get_options()
+    
+    loggers = set_logging(options)
+    
+    utils.get_logger().debug("OPTIONS : " + repr(options))
+    
+    if options.daemon:
+        import daemon
+        import lockfile
+        
+        hdlr_preserve = []
+        for logger in loggers:
+            hdlr_preserve.extend([h.stream for h in logger.handlers])
+            
+        context = daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=hdlr_preserve) 
+        with context:
+            main(options, args)
+    else:
+        main(options, args)
+
--- a/script/stream/recorder_tweetstream.py	Tue Dec 18 10:40:15 2012 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,571 +0,0 @@
-from getpass import getpass
-from iri_tweet import models, utils
-from iri_tweet.models import TweetSource, TweetLog, ProcessEvent
-from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event, 
-    get_logger)
-from optparse import OptionParser
-from sqlalchemy.exc import OperationalError
-from sqlalchemy.orm import scoped_session
-import Queue
-import StringIO
-import anyjson
-import datetime
-import inspect
-import logging
-import os
-import re
-import shutil
-import signal
-import socket
-import sqlalchemy.schema
-import sys
-import threading
-import time
-import traceback
-import tweepy.auth
-import iri_tweet.stream as tweetstream
-import urllib2
-socket._fileobject.default_bufsize = 0
-
-
-
-#columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user']
-columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source']
-#columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following']
-columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count']
-#just put it in a sqlite3 tqble
-
-DEFAULT_TIMEOUT = 5
-
-def set_logging(options):
-    loggers = []
-    
-    loggers.append(utils.set_logging(options, logging.getLogger('iri.tweet')))
-    loggers.append(utils.set_logging(options, logging.getLogger('multiprocessing')))
-    if options.debug >= 2:
-        loggers.append(utils.set_logging(options, logging.getLogger('sqlalchemy.engine')))
-    #utils.set_logging(options, logging.getLogger('sqlalchemy.dialects'))
-    #utils.set_logging(options, logging.getLogger('sqlalchemy.pool'))
-    #utils.set_logging(options, logging.getLogger('sqlalchemy.orm'))
-    return loggers
-
-def set_logging_process(options, queue):
-    qlogger = utils.set_logging(options, logging.getLogger('iri.tweet.p'), queue)
-    qlogger.propagate = 0
-    return qlogger
-
-def get_auth(options, access_token):
-    if options.username and options.password:
-        auth = tweepy.auth.BasicAuthHandler(options.username, options.password)        
-    else:
-        consumer_key = models.CONSUMER_KEY
-        consumer_secret = models.CONSUMER_SECRET
-        auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=True)
-        auth.set_access_token(*access_token)
-    return auth
-
-
-def add_process_event(type, args, session_maker):
-    session = session_maker()
-    try:
-        evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type)
-        session.add(evt)
-        session.commit()
-    finally:
-        session.close()
-
-
-class BaseProcess(Process):
-
-    def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
-        self.parent_pid = parent_pid
-        self.session_maker = session_maker
-        self.queue = queue
-        self.options = options
-        self.logger_queue = logger_queue
-        self.stop_event = stop_event
-        self.access_token = access_token
-
-        super(BaseProcess, self).__init__()
-
-    #
-    # from http://stackoverflow.com/questions/2542610/python-daemon-doesnt-kill-its-kids
-    #
-    def parent_is_alive(self):
-        try:
-            # try to call Parent
-            os.kill(self.parent_pid, 0)
-        except OSError:
-            # *beeep* oh no! The phone's disconnected!
-            return False
-        else:
-            # *ring* Hi mom!
-            return True
-    
-
-    def __get_process_event_args(self):
-        return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__, 'access_token':self.access_token}
-
-    def run(self):
-        try:
-            add_process_event("start_worker", self.__get_process_event_args(), self.session_maker)
-            self.do_run()
-        finally:
-            add_process_event("stop_worker", self.__get_process_event_args(), self.session_maker)
-        
-    def do_run(self):
-        raise NotImplementedError()
-
-
-
-class SourceProcess(BaseProcess):
-    
-    def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
-        self.track = options.track
-        self.token_filename = options.token_filename
-        self.catchup = options.catchup
-        self.timeout = options.timeout
-        self.stream = None
-        super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
-                    
-    def __source_stream_iter(self):
-        
-        self.logger = set_logging_process(self.options, self.logger_queue)
-        self.logger.debug("SourceProcess : run ")
-        
-        self.auth = get_auth(self.options, self.access_token) 
-        self.logger.debug("SourceProcess : auth set ")
-        
-        track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
-        self.logger.debug("SourceProcess : track list " + track_list)
-        
-        track_list = [k.strip() for k in track_list.split(',')]
-
-        self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list))                        
-        self.stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout, chunk_size=1, logger=self.logger)
-        self.logger.debug("SourceProcess : after connecting to stream")
-        self.stream.muststop = lambda: self.stop_event.is_set()        
-        
-        stream_wrapper = tweetstream.SafeStreamWrapper(self.stream, logger=self.logger)
-        
-        session = self.session_maker()
-        
-        try:
-            for tweet in stream_wrapper:
-                if not self.parent_is_alive():
-                    self.stop_event.set()
-                    stop_thread.join(5)
-                    sys.exit()
-                self.logger.debug("SourceProcess : tweet " + repr(tweet))
-                source = TweetSource(original_json=tweet)
-                self.logger.debug("SourceProcess : source created")
-                add_retries = 0
-                while add_retries < 10:
-                    try:
-                        add_retries += 1
-                        session.add(source)
-                        session.flush()
-                        break
-                    except OperationalError as e:
-                        session.rollback()
-                        self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries))
-                        if add_retries == 10:
-                            raise
-                     
-                source_id = source.id
-                self.logger.debug("SourceProcess : before queue + source id " + repr(source_id))
-                self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (self.stream.count, self.stream.rate, int(time.time() - self.stream.starttime)))
-                session.commit()
-                self.queue.put((source_id, tweet), False)
-
-        except Exception as e:
-            self.logger.error("SourceProcess : Error when processing tweet " + repr(e))
-            raise
-        finally:
-            session.rollback()
-            session.close()
-            self.logger_queue.close()
-            self.queue.close()
-            self.stream.close()
-            self.stream = None
-            if not self.stop_event.is_set():
-                self.stop_event.set()
-
-
-    def do_run(self):
-        
-        #import pydevd
-        #pydevd.settrace(suspend=False)
-        
-        source_stream_iter_thread = threading.Thread(target=self.__source_stream_iter , name="SourceStreamIterThread")
-        
-        source_stream_iter_thread.start()
-        
-        while not self.stop_event.is_set():
-            self.logger.debug("SourceProcess : In while after start")
-            self.stop_event.wait(DEFAULT_TIMEOUT)
-            if self.stop_event.is_set() and self.stream:
-                self.stream.close()
-            elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive:
-                self.stop_event.set()
-
-        self.logger.info("SourceProcess : join")
-        source_stream_iter_thread.join(30)
-
-
-def process_tweet(tweet, source_id, session, access_token, twitter_query_user, logger):
-    try:
-        if not tweet.strip():
-            return
-        tweet_obj = anyjson.deserialize(tweet)
-        if 'text' not in tweet_obj:
-            tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
-            session.add(tweet_log)
-            return
-        screen_name = ""
-        if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
-            screen_name = tweet_obj['user']['screen_name']
-        logger.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
-        logger.debug(u"Process_tweet :" + repr(tweet))
-        processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None, twitter_query_user)
-        processor.process()
-    except ValueError as e:
-        message = u"Value Error %s processing tweet %s" % (repr(e), tweet)
-        output = StringIO.StringIO()
-        try:
-            traceback.print_exc(file=output)
-            error_stack = output.getvalue()
-        finally:
-            output.close()
-        tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'], error=message, error_stack=error_stack)
-        session.add(tweet_log)
-        session.commit()        
-    except Exception as e:
-        message = u"Error %s processing tweet %s" % (repr(e), tweet)
-        logger.exception(message)
-        output = StringIO.StringIO()
-        try:
-            traceback.print_exc(file=output)
-            error_stack = output.getvalue()
-        finally:
-            output.close()
-        session.rollback()
-        tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack)
-        session.add(tweet_log)
-        session.commit()
-
-    
-        
-class TweetProcess(BaseProcess):
-    
-    def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
-        super(TweetProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
-        self.twitter_query_user = options.twitter_query_user
-
-
-    def do_run(self):
-        
-        self.logger = set_logging_process(self.options, self.logger_queue)
-        session = self.session_maker()
-        try:
-            while not self.stop_event.is_set() and self.parent_is_alive():
-                try:
-                    source_id, tweet_txt = self.queue.get(True, 3)
-                    self.logger.debug("Processing source id " + repr(source_id))
-                except Exception as e:
-                    self.logger.debug('Process tweet exception in loop : ' + repr(e))
-                    continue
-                process_tweet(tweet_txt, source_id, session, self.access_token, self.twitter_query_user, self.logger)
-                session.commit()
-        finally:
-            session.rollback()
-            session.close()
-
-
-def get_sessionmaker(conn_str):
-    engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False)
-    Session = scoped_session(Session)
-    return Session, engine, metadata
-
-            
-def process_leftovers(session, access_token, twitter_query_user, logger):
-    
-    sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
-    
-    for src in sources:
-        tweet_txt = src.original_json
-        process_tweet(tweet_txt, src.id, session, access_token, twitter_query_user, logger)
-        session.commit()
-
-        
-    
-    #get tweet source that do not match any message
-    #select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull;
-def process_log(logger_queues, stop_event):
-    while not stop_event.is_set():
-        for lqueue in logger_queues:
-            try:
-                record = lqueue.get_nowait()
-                logging.getLogger(record.name).handle(record)
-            except Queue.Empty:
-                continue
-            except IOError:
-                continue
-        time.sleep(0.1)
-
-        
-def get_options():
-
-    usage = "usage: %prog [options]"
-
-    parser = OptionParser(usage=usage)
-
-    parser.add_option("-f", "--file", dest="conn_str",
-                      help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db")
-    parser.add_option("-u", "--user", dest="username",
-                      help="Twitter user", metavar="USER", default=None)
-    parser.add_option("-w", "--password", dest="password",
-                      help="Twitter password", metavar="PASSWORD", default=None)
-    parser.add_option("-T", "--track", dest="track",
-                      help="Twitter track", metavar="TRACK")
-    parser.add_option("-n", "--new", dest="new", action="store_true",
-                      help="new database", default=False)
-    parser.add_option("-D", "--daemon", dest="daemon", action="store_true",
-                      help="launch daemon", default=False)
-    parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
-                      help="Token file name")
-    parser.add_option("-d", "--duration", dest="duration",
-                      help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
-    parser.add_option("-N", "--nb-process", dest="process_nb",
-                      help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int')
-    parser.add_option("--url", dest="url",
-                      help="The twitter url to connect to.", metavar="URL", default=tweetstream.FilterStream.url)
-    parser.add_option("--query-user", dest="twitter_query_user", action="store_true",
-                      help="Query twitter for users", default=False, metavar="QUERY_USER")
-    parser.add_option("--catchup", dest="catchup",
-                      help="catchup count for tweets", default=None, metavar="CATCHUP", type='int')
-    parser.add_option("--timeout", dest="timeout",
-                      help="timeout for connecting in seconds", default=60, metavar="TIMEOUT", type='int')
-    
-
-
-
-    utils.set_logging_options(parser)
-
-    return parser.parse_args()
-
-
-def do_run(options, session_maker):
-
-    stop_args = {}
-
-    access_token = None
-    if not options.username or not options.password:
-        access_token = utils.get_oauth_token(options.token_filename)
-    
-    session = session_maker()
-    try:
-        process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger())
-        session.commit()
-    finally:
-        session.rollback()
-        session.close()
-    
-    if options.process_nb <= 0:
-        utils.get_logger().debug("Leftovers processed. Exiting.")
-        return None
-
-    queue = mQueue()
-    stop_event = Event()
-    
-    #workaround for bug on using urllib2 and multiprocessing
-    req = urllib2.Request('http://localhost')
-    conn = None
-    try:
-        conn = urllib2.urlopen(req)
-    except:
-        utils.get_logger().debug("could not open localhost")
-        #donothing
-    finally:
-        if conn is not None:
-            conn.close()
-    
-    process_engines = []
-    logger_queues = []
-    
-    SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
-    process_engines.append(engine_process)
-    lqueue = mQueue(50)
-    logger_queues.append(lqueue)
-    pid = os.getpid()
-    sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)    
-    
-    tweet_processes = []
-    
-    for i in range(options.process_nb - 1):
-        SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
-        process_engines.append(engine_process)
-        lqueue = mQueue(50)
-        logger_queues.append(lqueue)
-        cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
-        tweet_processes.append(cprocess)
-
-    log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,))
-    log_thread.daemon = True
-
-    log_thread.start()
-
-    sprocess.start()
-    for cprocess in tweet_processes:
-        cprocess.start()
-
-    add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name, sprocess.pid), 'consumers':dict([(p.name, p.pid) for p in tweet_processes])}, session_maker)
-
-    if options.duration >= 0:
-        end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)    
-
-    def interupt_handler(signum, frame):
-        utils.get_logger().debug("shutdown asked " + repr(signum) + "  " + repr(inspect.getframeinfo(frame, 9)))
-        stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)})
-        stop_event.set()
-        
-    signal.signal(signal.SIGINT , interupt_handler)
-    signal.signal(signal.SIGHUP , interupt_handler)
-    signal.signal(signal.SIGALRM, interupt_handler)
-    signal.signal(signal.SIGTERM, interupt_handler)
-    
-
-    while not stop_event.is_set():
-        if options.duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
-            stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts})
-            stop_event.set()
-            break
-        if sprocess.is_alive():
-            utils.get_logger().debug("Source process alive")
-            time.sleep(1)
-        else:
-            stop_args.update({'message': 'Source process killed'})
-            stop_event.set()
-            break
-    utils.get_logger().debug("Joining Source Process")
-    try:
-        sprocess.join(10)
-    except:
-        utils.get_logger().debug("Pb joining Source Process - terminating")
-        sprocess.terminate()
-        
-    for i, cprocess in enumerate(tweet_processes):
-        utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
-        try:
-            cprocess.join(3)
-        except:
-            utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
-            cprocess.terminate()
-
-    
-    utils.get_logger().debug("Close queues")
-    try:
-        queue.close()
-        for lqueue in logger_queues:
-            lqueue.close()
-    except exception as e:
-        utils.get_logger().error("error when closing queues %s", repr(e))
-        #do nothing
-        
-    
-    if options.process_nb > 1:
-        utils.get_logger().debug("Processing leftovers")
-        session = session_maker()
-        try:
-            process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger())
-            session.commit()
-        finally:
-            session.rollback()
-            session.close()
-
-    for pengine in process_engines:
-        pengine.dispose()
-
-    return stop_args
-
-
-def main(options, args):
-    
-    global conn_str
-    
-    conn_str = options.conn_str.strip()
-    if not re.match("^\w+://.+", conn_str):
-        conn_str = 'sqlite:///' + options.conn_str
-        
-    if conn_str.startswith("sqlite") and options.new:
-        filepath = conn_str[conn_str.find(":///") + 4:]
-        if os.path.exists(filepath):
-            i = 1
-            basename, extension = os.path.splitext(filepath)
-            new_path = '%s.%d%s' % (basename, i, extension)
-            while i < 1000000 and os.path.exists(new_path):
-                i += 1
-                new_path = '%s.%d%s' % (basename, i, extension)
-            if i >= 1000000:
-                raise Exception("Unable to find new filename for " + filepath)
-            else:
-                shutil.move(filepath, new_path)
-
-    Session, engine, metadata = get_sessionmaker(conn_str)
-    
-    if options.new:
-        check_metadata = sqlalchemy.schema.MetaData(bind=engine, reflect=True)
-        if len(check_metadata.sorted_tables) > 0:
-            message = "Database %s not empty exiting" % conn_str
-            utils.get_logger().error(message)
-            sys.exit(message)
-    
-    metadata.create_all(engine)
-    session = Session()
-    try:
-        models.add_model_version(session)
-    finally:
-        session.close()
-    
-    stop_args = {}
-    try:
-        add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session)
-        stop_args = do_run(options, Session)
-    except Exception as e:
-        utils.get_logger().exception("Error in main thread")        
-        outfile = StringIO.StringIO()
-        try:
-            traceback.print_exc(file=outfile)
-            stop_args = {'error': repr(e), 'message': getattr(e, 'message', ''), 'stacktrace':outfile.getvalue()}
-        finally:
-            outfile.close()
-        raise
-    finally:    
-        add_process_event(type="shutdown", args=stop_args, session_maker=Session)
-
-    utils.get_logger().debug("Done. Exiting. " + repr(stop_args))
-
-
-
-if __name__ == '__main__':
-
-    (options, args) = get_options()
-    
-    loggers = set_logging(options)
-    
-    utils.get_logger().debug("OPTIONS : " + repr(options))
-    
-    if options.daemon:
-        import daemon
-        import lockfile
-        
-        hdlr_preserve = []
-        for logger in loggers:
-            hdlr_preserve.extend([h.stream for h in logger.handlers])
-            
-        context = daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=hdlr_preserve) 
-        with context:
-            main(options, args)
-    else:
-        main(options, args)
-
Binary file script/virtualenv/res/SQLAlchemy-0.7.8.tar.gz has changed
Binary file script/virtualenv/res/blessings-1.5.tar.gz has changed
--- a/script/virtualenv/res/lib/lib_create_env.py	Tue Dec 18 10:40:15 2012 +0100
+++ b/script/virtualenv/res/lib/lib_create_env.py	Tue Dec 18 12:26:05 2012 +0100
@@ -26,7 +26,6 @@
     'PYTZ': {'setup': 'pytz', 'url':'http://pypi.python.org/packages/source/p/pytz/pytz-2012h.tar.bz2', 'local':"pytz-2012h.tar.bz2", 'install': {'method': 'pip', 'option_str': None, 'dict_extra_env': None}},
     'SIMPLEJSON': {'setup': 'simplejson', 'url':'http://pypi.python.org/packages/source/s/simplejson/simplejson-2.6.2.tar.gz', 'local':"simplejson-2.6.2.tar.gz", 'install': {'method': 'pip', 'option_str': None, 'dict_extra_env': None}},
     'SQLALCHEMY': {'setup': 'sqlalchemy', 'url':'http://downloads.sourceforge.net/project/sqlalchemy/sqlalchemy/0.8.0b1/SQLAlchemy-0.8.0b1.tar.gz?r=http%3A%2F%2Fwww.sqlalchemy.org%2Fdownload.html&ts=1355091775&use_mirror=ignum', 'local':"SQLAlchemy-0.8.0b1.tar.gz", 'install': {'method': 'pip', 'option_str': None, 'dict_extra_env': None}},
-    'TWEEPY': {'setup': 'tweepy', 'url':'https://github.com/tweepy/tweepy/archive/1.12.tar.gz', 'local':"tweepy-1.12.tar.gz", 'install': {'method': 'pip', 'option_str': None, 'dict_extra_env': None}},
     'TWITTER': {'setup': 'twitter', 'url':'http://pypi.python.org/packages/source/t/twitter/twitter-1.9.0.tar.gz', 'local':"twitter-1.9.0.tar.gz", 'install': {'method': 'pip', 'option_str': None, 'dict_extra_env': None}},
     'TWITTER-TEXT': {'setup': 'twitter-text', 'url':'https://github.com/dryan/twitter-text-py/archive/master.tar.gz', 'local':"twitter-text-1.0.4.tar.gz", 'install': {'method': 'pip', 'option_str': None, 'dict_extra_env': None}},
     'REQUESTS': {'setup': 'requests', 'url':'https://github.com/kennethreitz/requests/archive/v1.0.2.tar.gz', 'local':'requests-v1.0.2.tar.gz', 'install' : {'method':'pip', 'option_str': None, 'dict_extra_env': None}},
Binary file script/virtualenv/res/requests-0.13.1.tar.gz has changed
Binary file script/virtualenv/res/src/tweepy-1.12.tar.gz has changed
Binary file script/virtualenv/res/twitter-1.9.0.tar.gz has changed
--- a/script/virtualenv/script/res/res_create_env.py	Tue Dec 18 10:40:15 2012 +0100
+++ b/script/virtualenv/script/res/res_create_env.py	Tue Dec 18 12:26:05 2012 +0100
@@ -17,7 +17,6 @@
     'DATEUTIL',
     'SIMPLEJSON',
     'SQLALCHEMY',
-    'TWEEPY',
     'TWITTER',
     'TWITTER-TEXT',
     'REQUESTS',