# HG changeset patch # User Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com> # Date 1355829965 -3600 # Node ID 8ae3d91ea4ae60fd3c92abd7ad9318dafb1c304d # Parent b46cfa1d188bfb1effa23ea358c69a2b70bc3bab after update to requests 1.0.2, do some cleaning: remove tweetstream and tweepy diff -r b46cfa1d188b -r 8ae3d91ea4ae script/lib/iri_tweet/iri_tweet/__init__.py --- a/script/lib/iri_tweet/iri_tweet/__init__.py Tue Dec 18 10:40:15 2012 +0100 +++ b/script/lib/iri_tweet/iri_tweet/__init__.py Tue Dec 18 12:26:05 2012 +0100 @@ -33,7 +33,7 @@ class IRITweetError(Exception): - """Base class for all tweetstream errors""" + """Base class for all IRITweet errors""" pass diff -r b46cfa1d188b -r 8ae3d91ea4ae script/lib/iri_tweet/iri_tweet/stream.py --- a/script/lib/iri_tweet/iri_tweet/stream.py Tue Dec 18 10:40:15 2012 +0100 +++ b/script/lib/iri_tweet/iri_tweet/stream.py Tue Dec 18 12:26:05 2012 +0100 @@ -68,7 +68,7 @@ """A network connection to Twitters streaming API - :param auth: tweepy auth object. + :param auth: requests auth object. :keyword catchup: Number of tweets from the past to get before switching to live stream. :keyword raw: If True, return each tweet's raw data direct from the socket, @@ -168,14 +168,11 @@ if self._catchup_count: postdata["count"] = self._catchup_count - if self._auth: - self._auth.apply_auth(self.url, "POST", headers, postdata) - if self._logger : self._logger.debug("BaseStream init connection url " + repr(self.url)) if self._logger : self._logger.debug("BaseStream init connection headers " + repr(headers)) if self._logger : self._logger.debug("BaseStream init connection data " + repr(postdata)) - self._resp = requests.post(self.url, headers=headers, data=postdata, prefetch=False) + self._resp = requests.post(self.url, auth=self._auth, headers=headers, data=postdata, stream=True) if self._logger : self._logger.debug("BaseStream init connection " + repr(self._resp)) self._resp.raise_for_status() diff -r b46cfa1d188b -r 8ae3d91ea4ae script/lib/iri_tweet/iri_tweet/utils.py --- a/script/lib/iri_tweet/iri_tweet/utils.py Tue Dec 18 10:40:15 2012 +0100 +++ b/script/lib/iri_tweet/iri_tweet/utils.py Tue Dec 18 12:26:05 2012 +0100 @@ -1,7 +1,7 @@ -from models import (Tweet, User, Hashtag, EntityHashtag, EntityUser, Url, - EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY, - ACCESS_TOKEN_SECRET, adapt_date, adapt_json, TweetSource, TweetLog, MediaType, - Media, EntityMedia, Entity, EntityType) +from models import (Tweet, User, Hashtag, EntityHashtag, EntityUser, Url, #@UnresolvedImport + EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY,#@UnresolvedImport + ACCESS_TOKEN_SECRET, adapt_date, adapt_json, TweetSource, TweetLog, MediaType, #@UnresolvedImport + Media, EntityMedia, Entity, EntityType) #@UnresolvedImport from sqlalchemy.sql import select, or_ #@UnresolvedImport import Queue #@UnresolvedImport import anyjson #@UnresolvedImport diff -r b46cfa1d188b -r 8ae3d91ea4ae script/lib/tweetstream/CHANGELOG --- a/script/lib/tweetstream/CHANGELOG Tue Dec 18 10:40:15 2012 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,68 +0,0 @@ -0.1 - - - Initial version - -0.2 - - - Improved error handling - - Added AuthenticationError and ConnectionError exceptions - - Added ReconnectingTweetStream class that supports automatically - reconnecting if the connection is dropped - -0.3 - - - Fixed bugs in authtentication - - Added TrackStream and FollowStream classes - - Added list of endpoint names, and made them legal values for the url arg - -0.3.1 - - - Added lots of tests - - Added proper handling of keepalive newlines - - Improved handling of closing streams - - Added missing anyjson dependency to setup - - Fixed bug where newlines and malformed content were counted as a tweet - -0.3.2 - - - This release was skipped over, due to maintainer brainfart. - -0.3.3 - - - Fixed setup.py so it wont attempt to load modules that aren't installed - yet. Fixes installation issue. - -0.3.4 - - - Updated to latest twitter streaming urls - - Fixed a bug where we tried to call a method on None - -0.3.5 - - - Removed a spurious print statement left over from debugging - - Introduced common base class for all tweetstream exceptions - - Make sure we raise a sensible error on 404. Include url in desc of that error - -0.3.6 - - - Added LocationStream class for filtering on location bounding boxes. - -1.0.0 - - - Changed API to match latest twitter endpoints. This adds SampleStream and - FilterStream and deprecates TweetStream, FollowStream, LocationStream, - TrackStream and ReconnectingTweetStream. - -1.1.0 - - - Fixed issues #2 and #12, related to low volume streams not yielding tweets - until a relatively large buffer was filled. This meant that tweets would - arrive in bunches, not immediatly. - - Switched to HTTPS urls for streams. Twitter will switch off HTTP streams - on 29. sept. 2011. - - Added support for Python 3 - -1.1.1 - - - Fixed issue #16. Odd case where python_version_tuple was returning ints - rather than the strings the docs promis. Make sure we always cast to int. diff -r b46cfa1d188b -r 8ae3d91ea4ae script/lib/tweetstream/LICENSE --- a/script/lib/tweetstream/LICENSE Tue Dec 18 10:40:15 2012 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,27 +0,0 @@ -Copyright (c) 2009, Rune Halvorsen -All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are met: - - * Redistributions of source code must retain the above copyright notice, - this list of conditions and the following disclaimer. - * Redistributions in binary form must reproduce the above copyright - notice, this list of conditions and the following disclaimer in the - documentation and/or other materials provided with the distribution. - -Neither the name of Rune Halvorsen nor the names of its contributors may be -used to endorse or promote products derived from this software without -specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, -THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR -PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS -BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN -CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -POSSIBILITY OF SUCH DAMAGE. diff -r b46cfa1d188b -r 8ae3d91ea4ae script/lib/tweetstream/README --- a/script/lib/tweetstream/README Tue Dec 18 10:40:15 2012 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,110 +0,0 @@ -.. -*- restructuredtext -*- - -########################################## -tweetstream - Simple twitter streaming API -########################################## - -Introduction ------------- - -tweetstream provides two classes, SampleStream and FollowStream, that can be -used to get tweets from Twitter's streaming API. An instance of one of the -classes can be used as an iterator. In addition to fetching tweets, the -object keeps track of the number of tweets collected and the rate at which -tweets are received. - -SampleStream delivers a sample of all tweets. FilterStream delivers -tweets that match one or more criteria. Note that it's not possible -to get all tweets without access to the "firehose" stream, which -is not currently avaliable to the public. - -Twitter's documentation about the streaming API can be found here: -http://dev.twitter.com/pages/streaming_api_methods . - -**Note** that the API is blocking. If for some reason data is not immediatly -available, calls will block until enough data is available to yield a tweet. - -Examples --------- - -Printing incoming tweets: - ->>> stream = tweetstream.SampleStream("username", "password") ->>> for tweet in stream: -... print tweet - - -The stream object can also be used as a context, as in this example that -prints the author for each tweet as well as the tweet count and rate: - ->>> with tweetstream.SampleStream("username", "password") as stream -... for tweet in stream: -... print "Got tweet from %-16s\t( tweet %d, rate %.1f tweets/sec)" % ( -... tweet["user"]["screen_name"], stream.count, stream.rate ) - - -Stream objects can raise ConnectionError or AuthenticationError exceptions: - ->>> try: -... with tweetstream.TweetStream("username", "password") as stream -... for tweet in stream: -... print "Got tweet from %-16s\t( tweet %d, rate %.1f tweets/sec)" % ( -... tweet["user"]["screen_name"], stream.count, stream.rate ) -... except tweetstream.ConnectionError, e: -... print "Disconnected from twitter. Reason:", e.reason - -To get tweets that match specific criteria, use the FilterStream. FilterStreams -take three keyword arguments: "locations", "follow" and "track". - -Locations are a list of bounding boxes in which geotagged tweets should originate. -The argument should be an iterable of longitude/latitude pairs. - -Track specifies keywords to track. The argument should be an iterable of -strings. - -Follow returns statuses that reference given users. Argument should be an iterable -of twitter user IDs. The IDs are userid ints, not the screen names. - ->>> words = ["opera", "firefox", "safari"] ->>> people = [123,124,125] ->>> locations = ["-122.75,36.8", "-121.75,37.8"] ->>> with tweetstream.FilterStream("username", "password", track=words, -... follow=people, locations=locations) as stream -... for tweet in stream: -... print "Got interesting tweet:", tweet - - -Deprecated classes ------------------- - -tweetstream used to contain the classes TweetStream, FollowStream, TrackStream -LocationStream and ReconnectingTweetStream. These were deprecated when twitter -changed its API end points. The same functionality is now available in -SampleStream and FilterStream. The deprecated methods will emit a warning when -used, but will remain functional for a while longer. - - -Changelog ---------- - -See the CHANGELOG file - -Contact -------- - -The author is Rune Halvorsen . The project resides at -http://bitbucket.org/runeh/tweetstream . If you find bugs, or have feature -requests, please report them in the project site issue tracker. Patches are -also very welcome. - -Contributors ------------- - -- Rune Halvorsen -- Christopher Schierkolk - -License -------- - -This software is licensed under the ``New BSD License``. See the ``LICENCE`` -file in the top distribution directory for the full license text. diff -r b46cfa1d188b -r 8ae3d91ea4ae script/lib/tweetstream/conftest.py --- a/script/lib/tweetstream/conftest.py Tue Dec 18 10:40:15 2012 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,10 +0,0 @@ -# content of conftest.py - -import pytest -def pytest_addoption(parser): - parser.addoption("--runslow", action="store_true", - help="run slow tests") - -def pytest_runtest_setup(item): - if 'slow' in item.keywords and not item.config.getvalue("runslow"): - pytest.skip("need --runslow option to run") \ No newline at end of file diff -r b46cfa1d188b -r 8ae3d91ea4ae script/lib/tweetstream/servercontext.py --- a/script/lib/tweetstream/servercontext.py Tue Dec 18 10:40:15 2012 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,221 +0,0 @@ -import threading -import contextlib -import time -import os -import socket -import random -from functools import partial -from inspect import isclass -from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler -from SimpleHTTPServer import SimpleHTTPRequestHandler -from SocketServer import BaseRequestHandler - - -class ServerError(Exception): - pass - - -class ServerContext(object): - """Context object with information about a running test server.""" - - def __init__(self, address, port): - self.address = address or "localhost" - self.port = port - - @property - def baseurl(self): - return "http://%s:%s" % (self.address, self.port) - - def __str__(self): - return "" % self.baseurl - - __repr__ = __str__ - - -class _SilentSimpleHTTPRequestHandler(SimpleHTTPRequestHandler): - - def __init__(self, *args, **kwargs): - self.logging = kwargs.get("logging", False) - SimpleHTTPRequestHandler.__init__(self, *args, **kwargs) - - def log_message(self, *args, **kwargs): - if self.logging: - SimpleHTTPRequestHandler.log_message(self, *args, **kwargs) - - -class _TestHandler(BaseHTTPRequestHandler): - """RequestHandler class that handles requests that use a custom handler - callable.""" - - def __init__(self, handler, methods, *args, **kwargs): - self._handler = handler - self._methods = methods - self._response_sent = False - self._headers_sent = False - self.logging = kwargs.get("logging", False) - BaseHTTPRequestHandler.__init__(self, *args, **kwargs) - - def log_message(self, *args, **kwargs): - if self.logging: - BaseHTTPRequestHandler.log_message(self, *args, **kwargs) - - def send_response(self, *args, **kwargs): - self._response_sent = True - BaseHTTPRequestHandler.send_response(self, *args, **kwargs) - - def end_headers(self, *args, **kwargs): - self._headers_sent = True - BaseHTTPRequestHandler.end_headers(self, *args, **kwargs) - - def _do_whatever(self): - """Called in place of do_METHOD""" - data = self._handler(self) - - if hasattr(data, "next"): - # assume it's something supporting generator protocol - self._handle_with_iterator(data) - else: - # Nothing more to do then. - pass - - - def __getattr__(self, name): - if name.startswith("do_") and name[3:].lower() in self._methods: - return self._do_whatever - else: - # fixme instance or class? - raise AttributeError(name) - - def _handle_with_iterator(self, iterator): - self.connection.settimeout(0.1) - for data in iterator: - if not self.server.server_thread.running: - return - - if not self._response_sent: - self.send_response(200) - if not self._headers_sent: - self.end_headers() - - self.wfile.write(data) - # flush immediatly. We may want to do trickling writes - # or something else tha trequires bypassing normal caching - self.wfile.flush() - -class _TestServerThread(threading.Thread): - """Thread class for a running test server""" - - def __init__(self, handler, methods, cwd, port, address): - threading.Thread.__init__(self) - self.startup_finished = threading.Event() - self._methods = methods - self._cwd = cwd - self._orig_cwd = None - self._handler = self._wrap_handler(handler, methods) - self._setup() - self.running = True - self.serverloc = (address, port) - self.error = None - - def _wrap_handler(self, handler, methods): - if isclass(handler) and issubclass(handler, BaseRequestHandler): - return handler # It's OK. user passed in a proper handler - elif callable(handler): - return partial(_TestHandler, handler, methods) - # it's a callable, so wrap in a req handler - else: - raise ServerError("handler must be callable or RequestHandler") - - def _setup(self): - if self._cwd != "./": - self._orig_cwd = os.getcwd() - os.chdir(self._cwd) - - def _init_server(self): - """Hooks up the server socket""" - try: - if self.serverloc[1] == "random": - retries = 10 # try getting an available port max this many times - while True: - try: - self.serverloc = (self.serverloc[0], - random.randint(1025, 49151)) - self._server = HTTPServer(self.serverloc, self._handler) - except socket.error: - retries -= 1 - if not retries: # not able to get a port. - raise - else: - break - else: # use specific port. this might throw, that's expected - self._server = HTTPServer(self.serverloc, self._handler) - except socket.error, e: - self.running = False - self.error = e - # set this here, since we'll never enter the serve loop where - # it is usually set: - self.startup_finished.set() - return - - self._server.allow_reuse_address = True # lots of tests, same port - self._server.timeout = 0.1 - self._server.server_thread = self - - - def run(self): - self._init_server() - - while self.running: - self._server.handle_request() # blocks for self.timeout secs - # First time this falls through, signal the parent thread that - # the server is ready for incomming connections - if not self.startup_finished.is_set(): - self.startup_finished.set() - - self._cleanup() - - def stop(self): - """Stop the server and attempt to make the thread terminate. - This happens async but the calling code can check periodically - the isRunning flag on the thread object. - """ - # actual stopping happens in the run method - self.running = False - - def _cleanup(self): - """Do some rudimentary cleanup.""" - if self._orig_cwd: - os.chdir(self._orig_cwd) - - -@contextlib.contextmanager -def test_server(handler=_SilentSimpleHTTPRequestHandler, port=8514, - address="", methods=("get", "head"), cwd="./"): - """Context that makes available a web server in a separate thread""" - thread = _TestServerThread(handler=handler, methods=methods, cwd=cwd, - port=port, address=address) - thread.start() - - # fixme: should this be daemonized? If it isn't it will block the entire - # app, but that should never happen anyway.. - thread.startup_finished.wait() - - if thread.error: # startup failed! Bail, throw whatever the server did - raise thread.error - - exc = None - try: - yield ServerContext(*thread.serverloc) - except Exception, exc: - pass - thread.stop() - thread.join(5) # giving it a lot of leeway. should never happen - - if exc: - raise exc - - # fixme: this takes second priorty after the internal exception but would - # still be nice to signal back to calling code. - - if thread.isAlive(): - raise Warning("Test server could not be stopped") diff -r b46cfa1d188b -r 8ae3d91ea4ae script/lib/tweetstream/setup.py --- a/script/lib/tweetstream/setup.py Tue Dec 18 10:40:15 2012 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,82 +0,0 @@ -#@PydevCodeAnalysisIgnore -import sys -import os - -extra = {} -if sys.version_info >= (3, 0): - extra.update(use_2to3=True) - - -try: - from setuptools import setup, find_packages -except ImportError: - from distutils.core import setup, find_packages - - -# -*- Distribution Meta -*- -import re -re_meta = re.compile(r'__(\w+?)__\s*=\s*(.*)') -re_vers = re.compile(r'VERSION\s*=\s*\((.*?)\)') -re_doc = re.compile(r'^"""(.+?)"""', re.M|re.S) -rq = lambda s: s.strip("\"'") - - -def add_default(m): - attr_name, attr_value = m.groups() - return ((attr_name, rq(attr_value)), ) - - -def add_version(m): - v = list(map(rq, m.groups()[0].split(", "))) - return (("VERSION", ".".join(v[0:3]) + "".join(v[3:])), ) - - -def add_doc(m): - return (("doc", m.groups()[0].replace("\n", " ")), ) - -pats = {re_meta: add_default, - re_vers: add_version} -here = os.path.abspath(os.path.dirname(__file__)) -meta_fh = open(os.path.join(here, "tweetstream/__init__.py")) -try: - meta = {} - acc = [] - for line in meta_fh: - if line.strip() == '# -eof meta-': - break - acc.append(line) - for pattern, handler in pats.items(): - m = pattern.match(line.strip()) - if m: - meta.update(handler(m)) - m = re_doc.match("".join(acc).strip()) - if m: - meta.update(add_doc(m)) -finally: - meta_fh.close() - - -setup(name='tweetstream', - version=meta["VERSION"], - description=meta["doc"], - long_description=open("README").read(), - classifiers=[ - 'License :: OSI Approved :: BSD License', - 'Intended Audience :: Developers', - 'Programming Language :: Python :: 2.6', - 'Programming Language :: Python :: 2.7', - 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.1', - ], - keywords='twitter', - author=meta["author"], - author_email=meta["contact"], - url=meta["homepage"], - license='BSD', - packages=find_packages(exclude=['ez_setup', 'examples', 'tests']), - include_package_data=True, - zip_safe=False, - platforms=["any"], - install_requires=['anyjson'], - **extra -) diff -r b46cfa1d188b -r 8ae3d91ea4ae script/lib/tweetstream/tests/test_tweetstream.py --- a/script/lib/tweetstream/tests/test_tweetstream.py Tue Dec 18 10:40:15 2012 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,188 +0,0 @@ -import contextlib -import threading -import time - -from tweetstream import TweetStream, FollowStream, TrackStream, LocationStream -from tweetstream import ConnectionError, AuthenticationError, SampleStream, FilterStream -from tweepy.auth import BasicAuthHandler - -import pytest -from pytest import raises -slow = pytest.mark.slow - -from servercontext import test_server - -single_tweet = r"""{"in_reply_to_status_id":null,"in_reply_to_user_id":null,"favorited":false,"created_at":"Tue Jun 16 10:40:14 +0000 2009","in_reply_to_screen_name":null,"text":"record industry just keeps on amazing me: http:\/\/is.gd\/13lFo - $150k per song you've SHARED, not that somebody has actually DOWNLOADED.","user":{"notifications":null,"profile_background_tile":false,"followers_count":206,"time_zone":"Copenhagen","utc_offset":3600,"friends_count":191,"profile_background_color":"ffffff","profile_image_url":"http:\/\/s3.amazonaws.com\/twitter_production\/profile_images\/250715794\/profile_normal.png","description":"Digital product developer, currently at Opera Software. My tweets are my opinions, not those of my employer.","verified_profile":false,"protected":false,"favourites_count":0,"profile_text_color":"3C3940","screen_name":"eiriksnilsen","name":"Eirik Stridsklev N.","following":null,"created_at":"Tue May 06 12:24:12 +0000 2008","profile_background_image_url":"http:\/\/s3.amazonaws.com\/twitter_production\/profile_background_images\/10531192\/160x600opera15.gif","profile_link_color":"0099B9","profile_sidebar_fill_color":"95E8EC","url":"http:\/\/www.stridsklev-nilsen.no\/eirik","id":14672543,"statuses_count":506,"profile_sidebar_border_color":"5ED4DC","location":"Oslo, Norway"},"id":2190767504,"truncated":false,"source":"Twitter Opera widget<\/a>"}""" + "\r" - - -def parameterized(funcarglist): - def wrapper(function): - function.funcarglist = funcarglist - return function - return wrapper - -def pytest_generate_tests(metafunc): - for funcargs in getattr(metafunc.function, 'funcarglist', ()): - metafunc.addcall(funcargs=funcargs) - - -streamtypes = [ - dict(cls=TweetStream, args=[], kwargs=dict()), - dict(cls=SampleStream, args=[], kwargs=dict()), - dict(cls=FilterStream, args=[], kwargs=dict(track=("test",))), - dict(cls=FollowStream, args=[[1, 2, 3]], kwargs=dict()), - dict(cls=TrackStream, args=["opera"], kwargs=dict()), - dict(cls=LocationStream, args=["123,4321"], kwargs=dict()) -] - - -@parameterized(streamtypes) -def test_bad_auth(cls, args, kwargs): - """Test that the proper exception is raised when the user could not be - authenticated""" - def auth_denied(request): - request.send_error(401) - - with raises(AuthenticationError): - with test_server(handler=auth_denied, methods=("post", "get"), port="random") as server: - auth = BasicAuthHandler("user", "passwd") - stream = cls(auth, *args, url=server.baseurl) - for e in stream: pass - - -@parameterized(streamtypes) -def test_404_url(cls, args, kwargs): - """Test that the proper exception is raised when the stream URL can't be - found""" - def not_found(request): - request.send_error(404) - - with raises(ConnectionError): - with test_server(handler=not_found, methods=("post", "get"), port="random") as server: - auth = BasicAuthHandler("user", "passwd") - stream = cls(auth, *args, url=server.baseurl) - for e in stream: pass - - -@parameterized(streamtypes) -def test_bad_content(cls, args, kwargs): - """Test error handling if we are given invalid data""" - def bad_content(request): - for n in xrange(10): - # what json we pass doesn't matter. It's not verifying the - # strcuture, only checking that it's parsable - yield "[1,2,3]\r" - yield "[1,2, I need no stinking close brace\r" - yield "[1,2,3]\r" - - - with raises(ConnectionError): - with test_server(handler=bad_content, methods=("post", "get"), port="random") as server: - auth = BasicAuthHandler("user", "passwd") - stream = cls(auth, *args, url=server.baseurl) - for tweet in stream: - pass - - -@parameterized(streamtypes) -def test_closed_connection(cls, args, kwargs): - """Test error handling if server unexpectedly closes connection""" - cnt = 1000 - def bad_content(request): - for n in xrange(cnt): - # what json we pass doesn't matter. It's not verifying the - # strcuture, only checking that it's parsable - yield "[1,2,3]\r" - - with raises(ConnectionError): - with test_server(handler=bad_content, methods=("post", "get"), port="random") as server: - auth = BasicAuthHandler("foo", "bar") - stream = cls(auth, *args, url=server.baseurl) - for tweet in stream: - pass - - -@parameterized(streamtypes) -def test_bad_host(cls, args, kwargs): - """Test behaviour if we can't connect to the host""" - with raises(ConnectionError): - stream = cls("username", "passwd", *args, url="http://wedfwecfghhreewerewads.foo") - stream.next() - - -@parameterized(streamtypes) -def smoke_test_receive_tweets(cls, args, kwargs): - """Receive 100k tweets and disconnect (slow)""" - total = 100000 - - def tweetsource(request): - while True: - yield single_tweet + "\n" - - with test_server(handler=tweetsource, methods=("post", "get"), port="random") as server: - auth = BasicAuthHandler("foo", "bar") - stream = cls(auth, *args, url=server.baseurl) - for tweet in stream: - if stream.count == total: - break - - -@parameterized(streamtypes) -def test_keepalive(cls, args, kwargs): - """Make sure we behave sanely when there are keepalive newlines in the - data recevived from twitter""" - def tweetsource(request): - yield single_tweet+"\n" - yield "\n" - yield "\n" - yield single_tweet+"\n" - yield "\n" - yield "\n" - yield "\n" - yield "\n" - yield "\n" - yield "\n" - yield "\n" - yield single_tweet+"\n" - yield "\n" - - - with test_server(handler=tweetsource, methods=("post", "get"), port="random") as server: - auth = BasicAuthHandler("foo", "bar") - stream = cls(auth, *args, url=server.baseurl) - try: - for tweet in stream: - pass - except ConnectionError: - assert stream.count == 3, "Got %s, wanted 3" % stream.count - else: - assert False, "Didn't handle keepalive" - - -@slow -@parameterized(streamtypes) -def test_buffering(cls, args, kwargs): - """Test if buffering stops data from being returned immediately. - If there is some buffering in play that might mean data is only returned - from the generator when the buffer is full. If buffer is bigger than a - tweet, this will happen. Default buffer size in the part of socket lib - that enables readline is 8k. Max tweet length is around 3k.""" - - def tweetsource(request): - yield single_tweet+"\n" - time.sleep(2) - # need to yield a bunch here so we're sure we'll return from the - # blocking call in case the buffering bug is present. - for n in xrange(100): - yield single_tweet+"\n" - - - with test_server(handler=tweetsource, methods=("post", "get"), port="random") as server: - auth = BasicAuthHandler("foo", "bar") - stream = cls(auth, *args, url=server.baseurl) - start = time.time() - stream.next() - first = time.time() - diff = first - start - assert diff < 1, "Getting first tweet took more than a second!" - diff -r b46cfa1d188b -r 8ae3d91ea4ae script/lib/tweetstream/tox.ini --- a/script/lib/tweetstream/tox.ini Tue Dec 18 10:40:15 2012 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,16 +0,0 @@ -[tox] -envlist = py25,py26,py27,py30,py31,py32 - -[testenv] -deps=pytest -sitepackages=False -commands=py.test --runslow - -[testenv:py30] -changedir = .tox - -[testenv:py31] -changedir = .tox - -[testenv:py32] -changedir = .tox \ No newline at end of file diff -r b46cfa1d188b -r 8ae3d91ea4ae script/lib/tweetstream/tweetstream/__init__.py --- a/script/lib/tweetstream/tweetstream/__init__.py Tue Dec 18 10:40:15 2012 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,45 +0,0 @@ -"""Simple access to Twitter's streaming API""" - -VERSION = (1, 1, 1, 'iri') -__version__ = ".".join(map(str, VERSION[0:3])) + "".join(VERSION[3:]) -__author__ = "Rune Halvorsen" -__contact__ = "runefh@gmail.com" -__homepage__ = "http://bitbucket.org/runeh/tweetstream/" -__docformat__ = "restructuredtext" - -# -eof meta- - - -""" - .. data:: USER_AGENT - - The default user agent string for stream objects -""" - -USER_AGENT = "TweetStream %s" % __version__ - - -class TweetStreamError(Exception): - """Base class for all tweetstream errors""" - pass - - -class AuthenticationError(TweetStreamError): - """Exception raised if the username/password is not accepted""" - pass - - -class ConnectionError(TweetStreamError): - """Raised when there are network problems. This means when there are - dns errors, network errors, twitter issues""" - - def __init__(self, reason, details=None): - self.reason = reason - self.details = details - - def __str__(self): - return '' % self.reason - - -from .streamclasses import SampleStream, FilterStream -from .deprecated import FollowStream, TrackStream, LocationStream, TweetStream, ReconnectingTweetStream diff -r b46cfa1d188b -r 8ae3d91ea4ae script/lib/tweetstream/tweetstream/deprecated.py --- a/script/lib/tweetstream/tweetstream/deprecated.py Tue Dec 18 10:40:15 2012 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,82 +0,0 @@ -from .streamclasses import FilterStream, SampleStream, ConnectionError -import time - -class DeprecatedStream(FilterStream): - def __init__(self, *args, **kwargs): - import warnings - warnings.warn("%s is deprecated. Use FilterStream instead" % self.__class__.__name__, DeprecationWarning) - super(DeprecatedStream, self).__init__(*args, **kwargs) - - -class FollowStream(DeprecatedStream): - def __init__(self, auth, follow, catchup=None, url=None): - super(FollowStream, self).__init__(auth, follow=follow, catchup=catchup, url=url) - - -class TrackStream(DeprecatedStream): - def __init__(self, auth, track, catchup=None, url=None, slow=False): - super(TrackStream, self).__init__(auth, track=track, catchup=catchup, url=url) - - -class LocationStream(DeprecatedStream): - def __init__(self, auth, locations, catchup=None, url=None, slow=False): - super(LocationStream, self).__init__(auth, locations=locations, catchup=catchup, url=url) - - -class TweetStream(SampleStream): - def __init__(self, *args, **kwargs): - import warnings - warnings.warn("%s is deprecated. Use SampleStream instead" % self.__class__.__name__, DeprecationWarning) - SampleStream.__init__(self, *args, **kwargs) - - -class ReconnectingTweetStream(TweetStream): - """TweetStream class that automatically tries to reconnect if the - connecting goes down. Reconnecting, and waiting for reconnecting, is - blocking. - - :param username: See :TweetStream: - - :param password: See :TweetStream: - - :keyword url: See :TweetStream: - - :keyword reconnects: Number of reconnects before a ConnectionError is - raised. Default is 3 - - :error_cb: Optional callable that will be called just before trying to - reconnect. The callback will be called with a single argument, the - exception that caused the reconnect attempt. Default is None - - :retry_wait: Time to wait before reconnecting in seconds. Default is 5 - - """ - - def __init__(self, auth, url="sample", - reconnects=3, error_cb=None, retry_wait=5): - self.max_reconnects = reconnects - self.retry_wait = retry_wait - self._reconnects = 0 - self._error_cb = error_cb - TweetStream.__init__(self, auth, url=url) - - def next(self): - while True: - try: - return TweetStream.next(self) - except ConnectionError, e: - self._reconnects += 1 - if self._reconnects > self.max_reconnects: - raise ConnectionError("Too many retries") - - # Note: error_cb is not called on the last error since we - # raise a ConnectionError instead - if callable(self._error_cb): - self._error_cb(e) - - time.sleep(self.retry_wait) - # Don't listen to auth error, since we can't reasonably reconnect - # when we get one. - - - diff -r b46cfa1d188b -r 8ae3d91ea4ae script/lib/tweetstream/tweetstream/streamclasses.py --- a/script/lib/tweetstream/tweetstream/streamclasses.py Tue Dec 18 10:40:15 2012 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,256 +0,0 @@ -import time -import urllib -import urllib2 -import socket -from platform import python_version_tuple -import anyjson - -from . import AuthenticationError, ConnectionError, USER_AGENT - -class BaseStream(object): - """A network connection to Twitters streaming API - - :param auth: tweepy auth object. - :keyword catchup: Number of tweets from the past to get before switching to - live stream. - :keyword raw: If True, return each tweet's raw data direct from the socket, - without UTF8 decoding or parsing, rather than a parsed object. The - default is False. - :keyword timeout: If non-None, set a timeout in seconds on the receiving - socket. Certain types of network problems (e.g., disconnecting a VPN) - can cause the connection to hang, leading to indefinite blocking that - requires kill -9 to resolve. Setting a timeout leads to an orderly - shutdown in these cases. The default is None (i.e., no timeout). - :keyword url: Endpoint URL for the object. Note: you should not - need to edit this. It's present to make testing easier. - - .. attribute:: connected - - True if the object is currently connected to the stream. - - .. attribute:: url - - The URL to which the object is connected - - .. attribute:: starttime - - The timestamp, in seconds since the epoch, the object connected to the - streaming api. - - .. attribute:: count - - The number of tweets that have been returned by the object. - - .. attribute:: rate - - The rate at which tweets have been returned from the object as a - float. see also :attr: `rate_period`. - - .. attribute:: rate_period - - The ammount of time to sample tweets to calculate tweet rate. By - default 10 seconds. Changes to this attribute will not be reflected - until the next time the rate is calculated. The rate of tweets vary - with time of day etc. so it's usefull to set this to something - sensible. - - .. attribute:: user_agent - - User agent string that will be included in the request. NOTE: This can - not be changed after the connection has been made. This property must - thus be set before accessing the iterator. The default is set in - :attr: `USER_AGENT`. - """ - - def __init__(self, auth, - catchup=None, raw=False, timeout=None, url=None): - self._conn = None - self._rate_ts = None - self._rate_cnt = 0 - self._auth = auth - self._catchup_count = catchup - self._raw_mode = raw - self._timeout = timeout - self._iter = self.__iter__() - - self.rate_period = 10 # in seconds - self.connected = False - self.starttime = None - self.count = 0 - self.rate = 0 - self.user_agent = USER_AGENT - if url: self.url = url - - self.muststop = False - - def __enter__(self): - return self - - def __exit__(self, *params): - self.close() - return False - - def _init_conn(self): - """Open the connection to the twitter server""" - headers = {'User-Agent': self.user_agent} - - postdata = self._get_post_data() or {} - if self._catchup_count: - postdata["count"] = self._catchup_count - - poststring = urllib.urlencode(postdata) if postdata else None - - if self._auth: - self._auth.apply_auth(self.url, "POST", headers, postdata) - - req = urllib2.Request(self.url, poststring, headers) - - try: - self._conn = urllib2.urlopen(req, timeout=self._timeout) - - except urllib2.HTTPError, exception: - if exception.code == 401: - raise AuthenticationError("Access denied") - elif exception.code == 404: - raise ConnectionError("URL not found: %s" % self.url) - else: # re raise. No idea what would cause this, so want to know - raise - except urllib2.URLError, exception: - raise ConnectionError(exception.reason) - - # This is horrible. This line grabs the raw socket (actually an ssl - # wrapped socket) from the guts of urllib2/httplib. We want the raw - # socket so we can bypass the buffering that those libs provide. - # The buffering is reasonable when dealing with connections that - # try to finish as soon as possible. With twitters' never ending - # connections, it causes a bug where we would not deliver tweets - # until the buffer was full. That's problematic for very low volume - # filterstreams, since you might not see a tweet for minutes or hours - # after they occured while the buffer fills. - # - # Oh, and the inards of the http libs are different things on in - # py2 and 3, so need to deal with that. py3 libs do more of what I - # want by default, but I wont do more special casing for it than - # neccessary. - - major, _, _ = python_version_tuple() - # The cast is needed because apparently some versions return strings - # and some return ints. - # On my ubuntu with stock 2.6 I get strings, which match the docs. - # Someone reported the issue on 2.6.1 on macos, but that was - # manually built, not the bundled one. Anyway, cast for safety. - major = int(major) - if major == 2: - self._socket = self._conn.fp._sock.fp._sock - else: - self._socket = self._conn.fp.raw - # our code that reads from the socket expects a method called recv. - # py3 socket.SocketIO uses the name read, so alias it. - self._socket.recv = self._socket.read - - self.connected = True - if not self.starttime: - self.starttime = time.time() - if not self._rate_ts: - self._rate_ts = time.time() - - def _get_post_data(self): - """Subclasses that need to add post data to the request can override - this method and return post data. The data should be in the format - returned by urllib.urlencode.""" - return None - - def __muststop(self): - if callable(self.muststop): - return self.muststop() - else: - return self.muststop - - def _update_rate(self): - rate_time = time.time() - self._rate_ts - if not self._rate_ts or rate_time > self.rate_period: - self.rate = self._rate_cnt / rate_time - self._rate_cnt = 0 - self._rate_ts = time.time() - - def __iter__(self): - buf = b"" - while True: - try: - if self.__muststop(): - raise StopIteration() - - if not self.connected: - self._init_conn() - - buf += self._socket.recv(8192) - if buf == b"": # something is wrong - self.close() - raise ConnectionError("Got entry of length 0. Disconnected") - elif buf.isspace(): - buf = b"" - elif b"\r" not in buf: # not enough data yet. Loop around - continue - - lines = buf.split(b"\r") - buf = lines[-1] - lines = lines[:-1] - - for line in lines: - if (self._raw_mode): - tweet = line - else: - line = line.decode("utf8") - try: - tweet = anyjson.deserialize(line) - except ValueError, e: - self.close() - raise ConnectionError("Got invalid data from twitter", details=line) - - if 'text' in tweet: - self.count += 1 - self._rate_cnt += 1 - self._update_rate() - yield tweet - - - except socket.error, e: - self.close() - raise ConnectionError("Server disconnected") - - - def next(self): - """Return the next available tweet. This call is blocking!""" - return self._iter.next() - - - def close(self): - """ - Close the connection to the streaming server. - """ - self.connected = False - if self._conn: - self._conn.close() - - -class SampleStream(BaseStream): - url = "https://stream.twitter.com/1/statuses/sample.json" - - -class FilterStream(BaseStream): - url = "https://stream.twitter.com/1/statuses/filter.json" - - def __init__(self, auth, follow=None, locations=None, - track=None, catchup=None, url=None, raw=False, timeout=None): - self._follow = follow - self._locations = locations - self._track = track - # remove follow, locations, track - BaseStream.__init__(self, auth, url=url, raw=raw, catchup=catchup, timeout=timeout) - - def _get_post_data(self): - postdata = {} - if self._follow: postdata["follow"] = ",".join([str(e) for e in self._follow]) - if self._locations: postdata["locations"] = ",".join(self._locations) - if self._track: postdata["track"] = ",".join(self._track) - return postdata diff -r b46cfa1d188b -r 8ae3d91ea4ae script/stream/recorder_stream.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/script/stream/recorder_stream.py Tue Dec 18 12:26:05 2012 +0100 @@ -0,0 +1,570 @@ +from getpass import getpass +from iri_tweet import models, utils +from iri_tweet.models import TweetSource, TweetLog, ProcessEvent +from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event, + get_logger) +from optparse import OptionParser +from sqlalchemy.exc import OperationalError +from sqlalchemy.orm import scoped_session +import Queue +import StringIO +import anyjson +import datetime +import inspect +import iri_tweet.stream +import logging +import os +import re +import requests.auth +import shutil +import signal +import socket +import sqlalchemy.schema +import sys +import threading +import time +import traceback +import urllib2 +socket._fileobject.default_bufsize = 0 + + + +# columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user'] +columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source'] +# columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following'] +columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count'] +# just put it in a sqlite3 tqble + +DEFAULT_TIMEOUT = 5 + +def set_logging(options): + loggers = [] + + loggers.append(utils.set_logging(options, logging.getLogger('iri.tweet'))) + loggers.append(utils.set_logging(options, logging.getLogger('multiprocessing'))) + if options.debug >= 2: + loggers.append(utils.set_logging(options, logging.getLogger('sqlalchemy.engine'))) + # utils.set_logging(options, logging.getLogger('sqlalchemy.dialects')) + # utils.set_logging(options, logging.getLogger('sqlalchemy.pool')) + # utils.set_logging(options, logging.getLogger('sqlalchemy.orm')) + return loggers + +def set_logging_process(options, queue): + qlogger = utils.set_logging(options, logging.getLogger('iri.tweet.p'), queue) + qlogger.propagate = 0 + return qlogger + +def get_auth(options, access_token): + if options.username and options.password: + auth = requests.auth.BasicAuthHandler(options.username, options.password) + else: + consumer_key = models.CONSUMER_KEY + consumer_secret = models.CONSUMER_SECRET + auth = requests.auth.OAuth1(access_token[0], access_token[1], consumer_key, consumer_secret, signature_type='auth_header') + return auth + + +def add_process_event(type, args, session_maker): + session = session_maker() + try: + evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type) + session.add(evt) + session.commit() + finally: + session.close() + + +class BaseProcess(Process): + + def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): + self.parent_pid = parent_pid + self.session_maker = session_maker + self.queue = queue + self.options = options + self.logger_queue = logger_queue + self.stop_event = stop_event + self.access_token = access_token + + super(BaseProcess, self).__init__() + + # + # from http://stackoverflow.com/questions/2542610/python-daemon-doesnt-kill-its-kids + # + def parent_is_alive(self): + try: + # try to call Parent + os.kill(self.parent_pid, 0) + except OSError: + # *beeep* oh no! The phone's disconnected! + return False + else: + # *ring* Hi mom! + return True + + + def __get_process_event_args(self): + return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__, 'access_token':self.access_token} + + def run(self): + try: + add_process_event("start_worker", self.__get_process_event_args(), self.session_maker) + self.do_run() + finally: + add_process_event("stop_worker", self.__get_process_event_args(), self.session_maker) + + def do_run(self): + raise NotImplementedError() + + + +class SourceProcess(BaseProcess): + + def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): + self.track = options.track + self.token_filename = options.token_filename + self.catchup = options.catchup + self.timeout = options.timeout + self.stream = None + super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid) + + def __source_stream_iter(self): + + self.logger = set_logging_process(self.options, self.logger_queue) + self.logger.debug("SourceProcess : run ") + + self.auth = get_auth(self.options, self.access_token) + self.logger.debug("SourceProcess : auth set ") + + track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip() + self.logger.debug("SourceProcess : track list " + track_list) + + track_list = [k.strip() for k in track_list.split(',')] + + self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list)) + self.stream = iri_tweet.stream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout, chunk_size=1, logger=self.logger) + self.logger.debug("SourceProcess : after connecting to stream") + self.stream.muststop = lambda: self.stop_event.is_set() + + stream_wrapper = iri_tweet.stream.SafeStreamWrapper(self.stream, logger=self.logger) + + session = self.session_maker() + + try: + for tweet in stream_wrapper: + if not self.parent_is_alive(): + self.stop_event.set() + stop_thread.join(5) + sys.exit() + self.logger.debug("SourceProcess : tweet " + repr(tweet)) + source = TweetSource(original_json=tweet) + self.logger.debug("SourceProcess : source created") + add_retries = 0 + while add_retries < 10: + try: + add_retries += 1 + session.add(source) + session.flush() + break + except OperationalError as e: + session.rollback() + self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries)) + if add_retries == 10: + raise + + source_id = source.id + self.logger.debug("SourceProcess : before queue + source id " + repr(source_id)) + self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (self.stream.count, self.stream.rate, int(time.time() - self.stream.starttime))) + session.commit() + self.queue.put((source_id, tweet), False) + + except Exception as e: + self.logger.error("SourceProcess : Error when processing tweet " + repr(e)) + raise + finally: + session.rollback() + session.close() + self.logger_queue.close() + self.queue.close() + self.stream.close() + self.stream = None + if not self.stop_event.is_set(): + self.stop_event.set() + + + def do_run(self): + + # import pydevd + # pydevd.settrace(suspend=False) + + source_stream_iter_thread = threading.Thread(target=self.__source_stream_iter , name="SourceStreamIterThread") + + source_stream_iter_thread.start() + + while not self.stop_event.is_set(): + self.logger.debug("SourceProcess : In while after start") + self.stop_event.wait(DEFAULT_TIMEOUT) + if self.stop_event.is_set() and self.stream: + self.stream.close() + elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive: + self.stop_event.set() + + self.logger.info("SourceProcess : join") + source_stream_iter_thread.join(30) + + +def process_tweet(tweet, source_id, session, access_token, twitter_query_user, logger): + try: + if not tweet.strip(): + return + tweet_obj = anyjson.deserialize(tweet) + if 'text' not in tweet_obj: + tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET']) + session.add(tweet_log) + return + screen_name = "" + if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']: + screen_name = tweet_obj['user']['screen_name'] + logger.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text'])) + logger.debug(u"Process_tweet :" + repr(tweet)) + processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None, twitter_query_user) + processor.process() + except ValueError as e: + message = u"Value Error %s processing tweet %s" % (repr(e), tweet) + output = StringIO.StringIO() + try: + traceback.print_exc(file=output) + error_stack = output.getvalue() + finally: + output.close() + tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'], error=message, error_stack=error_stack) + session.add(tweet_log) + session.commit() + except Exception as e: + message = u"Error %s processing tweet %s" % (repr(e), tweet) + logger.exception(message) + output = StringIO.StringIO() + try: + traceback.print_exc(file=output) + error_stack = output.getvalue() + finally: + output.close() + session.rollback() + tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack) + session.add(tweet_log) + session.commit() + + + +class TweetProcess(BaseProcess): + + def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): + super(TweetProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid) + self.twitter_query_user = options.twitter_query_user + + + def do_run(self): + + self.logger = set_logging_process(self.options, self.logger_queue) + session = self.session_maker() + try: + while not self.stop_event.is_set() and self.parent_is_alive(): + try: + source_id, tweet_txt = self.queue.get(True, 3) + self.logger.debug("Processing source id " + repr(source_id)) + except Exception as e: + self.logger.debug('Process tweet exception in loop : ' + repr(e)) + continue + process_tweet(tweet_txt, source_id, session, self.access_token, self.twitter_query_user, self.logger) + session.commit() + finally: + session.rollback() + session.close() + + +def get_sessionmaker(conn_str): + engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False) + Session = scoped_session(Session) + return Session, engine, metadata + + +def process_leftovers(session, access_token, twitter_query_user, logger): + + sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None) + + for src in sources: + tweet_txt = src.original_json + process_tweet(tweet_txt, src.id, session, access_token, twitter_query_user, logger) + session.commit() + + + + # get tweet source that do not match any message + # select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull; +def process_log(logger_queues, stop_event): + while not stop_event.is_set(): + for lqueue in logger_queues: + try: + record = lqueue.get_nowait() + logging.getLogger(record.name).handle(record) + except Queue.Empty: + continue + except IOError: + continue + time.sleep(0.1) + + +def get_options(): + + usage = "usage: %prog [options]" + + parser = OptionParser(usage=usage) + + parser.add_option("-f", "--file", dest="conn_str", + help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db") + parser.add_option("-u", "--user", dest="username", + help="Twitter user", metavar="USER", default=None) + parser.add_option("-w", "--password", dest="password", + help="Twitter password", metavar="PASSWORD", default=None) + parser.add_option("-T", "--track", dest="track", + help="Twitter track", metavar="TRACK") + parser.add_option("-n", "--new", dest="new", action="store_true", + help="new database", default=False) + parser.add_option("-D", "--daemon", dest="daemon", action="store_true", + help="launch daemon", default=False) + parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token", + help="Token file name") + parser.add_option("-d", "--duration", dest="duration", + help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int') + parser.add_option("-N", "--nb-process", dest="process_nb", + help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int') + parser.add_option("--url", dest="url", + help="The twitter url to connect to.", metavar="URL", default=iri_tweet.stream.FilterStream.url) + parser.add_option("--query-user", dest="twitter_query_user", action="store_true", + help="Query twitter for users", default=False, metavar="QUERY_USER") + parser.add_option("--catchup", dest="catchup", + help="catchup count for tweets", default=None, metavar="CATCHUP", type='int') + parser.add_option("--timeout", dest="timeout", + help="timeout for connecting in seconds", default=60, metavar="TIMEOUT", type='int') + + + + + utils.set_logging_options(parser) + + return parser.parse_args() + + +def do_run(options, session_maker): + + stop_args = {} + + access_token = None + if not options.username or not options.password: + access_token = utils.get_oauth_token(options.token_filename) + + session = session_maker() + try: + process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger()) + session.commit() + finally: + session.rollback() + session.close() + + if options.process_nb <= 0: + utils.get_logger().debug("Leftovers processed. Exiting.") + return None + + queue = mQueue() + stop_event = Event() + + # workaround for bug on using urllib2 and multiprocessing + req = urllib2.Request('http://localhost') + conn = None + try: + conn = urllib2.urlopen(req) + except: + utils.get_logger().debug("could not open localhost") + # donothing + finally: + if conn is not None: + conn.close() + + process_engines = [] + logger_queues = [] + + SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) + process_engines.append(engine_process) + lqueue = mQueue(50) + logger_queues.append(lqueue) + pid = os.getpid() + sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) + + tweet_processes = [] + + for i in range(options.process_nb - 1): + SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) + process_engines.append(engine_process) + lqueue = mQueue(50) + logger_queues.append(lqueue) + cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) + tweet_processes.append(cprocess) + + log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,)) + log_thread.daemon = True + + log_thread.start() + + sprocess.start() + for cprocess in tweet_processes: + cprocess.start() + + add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name, sprocess.pid), 'consumers':dict([(p.name, p.pid) for p in tweet_processes])}, session_maker) + + if options.duration >= 0: + end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration) + + def interupt_handler(signum, frame): + utils.get_logger().debug("shutdown asked " + repr(signum) + " " + repr(inspect.getframeinfo(frame, 9))) + stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)}) + stop_event.set() + + signal.signal(signal.SIGINT , interupt_handler) + signal.signal(signal.SIGHUP , interupt_handler) + signal.signal(signal.SIGALRM, interupt_handler) + signal.signal(signal.SIGTERM, interupt_handler) + + + while not stop_event.is_set(): + if options.duration >= 0 and datetime.datetime.utcnow() >= end_ts: + stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts}) + stop_event.set() + break + if sprocess.is_alive(): + utils.get_logger().debug("Source process alive") + time.sleep(1) + else: + stop_args.update({'message': 'Source process killed'}) + stop_event.set() + break + utils.get_logger().debug("Joining Source Process") + try: + sprocess.join(10) + except: + utils.get_logger().debug("Pb joining Source Process - terminating") + sprocess.terminate() + + for i, cprocess in enumerate(tweet_processes): + utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1)) + try: + cprocess.join(3) + except: + utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1)) + cprocess.terminate() + + + utils.get_logger().debug("Close queues") + try: + queue.close() + for lqueue in logger_queues: + lqueue.close() + except exception as e: + utils.get_logger().error("error when closing queues %s", repr(e)) + # do nothing + + + if options.process_nb > 1: + utils.get_logger().debug("Processing leftovers") + session = session_maker() + try: + process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger()) + session.commit() + finally: + session.rollback() + session.close() + + for pengine in process_engines: + pengine.dispose() + + return stop_args + + +def main(options, args): + + global conn_str + + conn_str = options.conn_str.strip() + if not re.match("^\w+://.+", conn_str): + conn_str = 'sqlite:///' + options.conn_str + + if conn_str.startswith("sqlite") and options.new: + filepath = conn_str[conn_str.find(":///") + 4:] + if os.path.exists(filepath): + i = 1 + basename, extension = os.path.splitext(filepath) + new_path = '%s.%d%s' % (basename, i, extension) + while i < 1000000 and os.path.exists(new_path): + i += 1 + new_path = '%s.%d%s' % (basename, i, extension) + if i >= 1000000: + raise Exception("Unable to find new filename for " + filepath) + else: + shutil.move(filepath, new_path) + + Session, engine, metadata = get_sessionmaker(conn_str) + + if options.new: + check_metadata = sqlalchemy.schema.MetaData(bind=engine, reflect=True) + if len(check_metadata.sorted_tables) > 0: + message = "Database %s not empty exiting" % conn_str + utils.get_logger().error(message) + sys.exit(message) + + metadata.create_all(engine) + session = Session() + try: + models.add_model_version(session) + finally: + session.close() + + stop_args = {} + try: + add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session) + stop_args = do_run(options, Session) + except Exception as e: + utils.get_logger().exception("Error in main thread") + outfile = StringIO.StringIO() + try: + traceback.print_exc(file=outfile) + stop_args = {'error': repr(e), 'message': getattr(e, 'message', ''), 'stacktrace':outfile.getvalue()} + finally: + outfile.close() + raise + finally: + add_process_event(type="shutdown", args=stop_args, session_maker=Session) + + utils.get_logger().debug("Done. Exiting. " + repr(stop_args)) + + + +if __name__ == '__main__': + + (options, args) = get_options() + + loggers = set_logging(options) + + utils.get_logger().debug("OPTIONS : " + repr(options)) + + if options.daemon: + import daemon + import lockfile + + hdlr_preserve = [] + for logger in loggers: + hdlr_preserve.extend([h.stream for h in logger.handlers]) + + context = daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=hdlr_preserve) + with context: + main(options, args) + else: + main(options, args) + diff -r b46cfa1d188b -r 8ae3d91ea4ae script/stream/recorder_tweetstream.py --- a/script/stream/recorder_tweetstream.py Tue Dec 18 10:40:15 2012 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,571 +0,0 @@ -from getpass import getpass -from iri_tweet import models, utils -from iri_tweet.models import TweetSource, TweetLog, ProcessEvent -from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event, - get_logger) -from optparse import OptionParser -from sqlalchemy.exc import OperationalError -from sqlalchemy.orm import scoped_session -import Queue -import StringIO -import anyjson -import datetime -import inspect -import logging -import os -import re -import shutil -import signal -import socket -import sqlalchemy.schema -import sys -import threading -import time -import traceback -import tweepy.auth -import iri_tweet.stream as tweetstream -import urllib2 -socket._fileobject.default_bufsize = 0 - - - -#columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user'] -columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source'] -#columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following'] -columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count'] -#just put it in a sqlite3 tqble - -DEFAULT_TIMEOUT = 5 - -def set_logging(options): - loggers = [] - - loggers.append(utils.set_logging(options, logging.getLogger('iri.tweet'))) - loggers.append(utils.set_logging(options, logging.getLogger('multiprocessing'))) - if options.debug >= 2: - loggers.append(utils.set_logging(options, logging.getLogger('sqlalchemy.engine'))) - #utils.set_logging(options, logging.getLogger('sqlalchemy.dialects')) - #utils.set_logging(options, logging.getLogger('sqlalchemy.pool')) - #utils.set_logging(options, logging.getLogger('sqlalchemy.orm')) - return loggers - -def set_logging_process(options, queue): - qlogger = utils.set_logging(options, logging.getLogger('iri.tweet.p'), queue) - qlogger.propagate = 0 - return qlogger - -def get_auth(options, access_token): - if options.username and options.password: - auth = tweepy.auth.BasicAuthHandler(options.username, options.password) - else: - consumer_key = models.CONSUMER_KEY - consumer_secret = models.CONSUMER_SECRET - auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=True) - auth.set_access_token(*access_token) - return auth - - -def add_process_event(type, args, session_maker): - session = session_maker() - try: - evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type) - session.add(evt) - session.commit() - finally: - session.close() - - -class BaseProcess(Process): - - def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): - self.parent_pid = parent_pid - self.session_maker = session_maker - self.queue = queue - self.options = options - self.logger_queue = logger_queue - self.stop_event = stop_event - self.access_token = access_token - - super(BaseProcess, self).__init__() - - # - # from http://stackoverflow.com/questions/2542610/python-daemon-doesnt-kill-its-kids - # - def parent_is_alive(self): - try: - # try to call Parent - os.kill(self.parent_pid, 0) - except OSError: - # *beeep* oh no! The phone's disconnected! - return False - else: - # *ring* Hi mom! - return True - - - def __get_process_event_args(self): - return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__, 'access_token':self.access_token} - - def run(self): - try: - add_process_event("start_worker", self.__get_process_event_args(), self.session_maker) - self.do_run() - finally: - add_process_event("stop_worker", self.__get_process_event_args(), self.session_maker) - - def do_run(self): - raise NotImplementedError() - - - -class SourceProcess(BaseProcess): - - def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): - self.track = options.track - self.token_filename = options.token_filename - self.catchup = options.catchup - self.timeout = options.timeout - self.stream = None - super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid) - - def __source_stream_iter(self): - - self.logger = set_logging_process(self.options, self.logger_queue) - self.logger.debug("SourceProcess : run ") - - self.auth = get_auth(self.options, self.access_token) - self.logger.debug("SourceProcess : auth set ") - - track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip() - self.logger.debug("SourceProcess : track list " + track_list) - - track_list = [k.strip() for k in track_list.split(',')] - - self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list)) - self.stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout, chunk_size=1, logger=self.logger) - self.logger.debug("SourceProcess : after connecting to stream") - self.stream.muststop = lambda: self.stop_event.is_set() - - stream_wrapper = tweetstream.SafeStreamWrapper(self.stream, logger=self.logger) - - session = self.session_maker() - - try: - for tweet in stream_wrapper: - if not self.parent_is_alive(): - self.stop_event.set() - stop_thread.join(5) - sys.exit() - self.logger.debug("SourceProcess : tweet " + repr(tweet)) - source = TweetSource(original_json=tweet) - self.logger.debug("SourceProcess : source created") - add_retries = 0 - while add_retries < 10: - try: - add_retries += 1 - session.add(source) - session.flush() - break - except OperationalError as e: - session.rollback() - self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries)) - if add_retries == 10: - raise - - source_id = source.id - self.logger.debug("SourceProcess : before queue + source id " + repr(source_id)) - self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (self.stream.count, self.stream.rate, int(time.time() - self.stream.starttime))) - session.commit() - self.queue.put((source_id, tweet), False) - - except Exception as e: - self.logger.error("SourceProcess : Error when processing tweet " + repr(e)) - raise - finally: - session.rollback() - session.close() - self.logger_queue.close() - self.queue.close() - self.stream.close() - self.stream = None - if not self.stop_event.is_set(): - self.stop_event.set() - - - def do_run(self): - - #import pydevd - #pydevd.settrace(suspend=False) - - source_stream_iter_thread = threading.Thread(target=self.__source_stream_iter , name="SourceStreamIterThread") - - source_stream_iter_thread.start() - - while not self.stop_event.is_set(): - self.logger.debug("SourceProcess : In while after start") - self.stop_event.wait(DEFAULT_TIMEOUT) - if self.stop_event.is_set() and self.stream: - self.stream.close() - elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive: - self.stop_event.set() - - self.logger.info("SourceProcess : join") - source_stream_iter_thread.join(30) - - -def process_tweet(tweet, source_id, session, access_token, twitter_query_user, logger): - try: - if not tweet.strip(): - return - tweet_obj = anyjson.deserialize(tweet) - if 'text' not in tweet_obj: - tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET']) - session.add(tweet_log) - return - screen_name = "" - if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']: - screen_name = tweet_obj['user']['screen_name'] - logger.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text'])) - logger.debug(u"Process_tweet :" + repr(tweet)) - processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None, twitter_query_user) - processor.process() - except ValueError as e: - message = u"Value Error %s processing tweet %s" % (repr(e), tweet) - output = StringIO.StringIO() - try: - traceback.print_exc(file=output) - error_stack = output.getvalue() - finally: - output.close() - tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'], error=message, error_stack=error_stack) - session.add(tweet_log) - session.commit() - except Exception as e: - message = u"Error %s processing tweet %s" % (repr(e), tweet) - logger.exception(message) - output = StringIO.StringIO() - try: - traceback.print_exc(file=output) - error_stack = output.getvalue() - finally: - output.close() - session.rollback() - tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack) - session.add(tweet_log) - session.commit() - - - -class TweetProcess(BaseProcess): - - def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): - super(TweetProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid) - self.twitter_query_user = options.twitter_query_user - - - def do_run(self): - - self.logger = set_logging_process(self.options, self.logger_queue) - session = self.session_maker() - try: - while not self.stop_event.is_set() and self.parent_is_alive(): - try: - source_id, tweet_txt = self.queue.get(True, 3) - self.logger.debug("Processing source id " + repr(source_id)) - except Exception as e: - self.logger.debug('Process tweet exception in loop : ' + repr(e)) - continue - process_tweet(tweet_txt, source_id, session, self.access_token, self.twitter_query_user, self.logger) - session.commit() - finally: - session.rollback() - session.close() - - -def get_sessionmaker(conn_str): - engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False) - Session = scoped_session(Session) - return Session, engine, metadata - - -def process_leftovers(session, access_token, twitter_query_user, logger): - - sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None) - - for src in sources: - tweet_txt = src.original_json - process_tweet(tweet_txt, src.id, session, access_token, twitter_query_user, logger) - session.commit() - - - - #get tweet source that do not match any message - #select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull; -def process_log(logger_queues, stop_event): - while not stop_event.is_set(): - for lqueue in logger_queues: - try: - record = lqueue.get_nowait() - logging.getLogger(record.name).handle(record) - except Queue.Empty: - continue - except IOError: - continue - time.sleep(0.1) - - -def get_options(): - - usage = "usage: %prog [options]" - - parser = OptionParser(usage=usage) - - parser.add_option("-f", "--file", dest="conn_str", - help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db") - parser.add_option("-u", "--user", dest="username", - help="Twitter user", metavar="USER", default=None) - parser.add_option("-w", "--password", dest="password", - help="Twitter password", metavar="PASSWORD", default=None) - parser.add_option("-T", "--track", dest="track", - help="Twitter track", metavar="TRACK") - parser.add_option("-n", "--new", dest="new", action="store_true", - help="new database", default=False) - parser.add_option("-D", "--daemon", dest="daemon", action="store_true", - help="launch daemon", default=False) - parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token", - help="Token file name") - parser.add_option("-d", "--duration", dest="duration", - help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int') - parser.add_option("-N", "--nb-process", dest="process_nb", - help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int') - parser.add_option("--url", dest="url", - help="The twitter url to connect to.", metavar="URL", default=tweetstream.FilterStream.url) - parser.add_option("--query-user", dest="twitter_query_user", action="store_true", - help="Query twitter for users", default=False, metavar="QUERY_USER") - parser.add_option("--catchup", dest="catchup", - help="catchup count for tweets", default=None, metavar="CATCHUP", type='int') - parser.add_option("--timeout", dest="timeout", - help="timeout for connecting in seconds", default=60, metavar="TIMEOUT", type='int') - - - - - utils.set_logging_options(parser) - - return parser.parse_args() - - -def do_run(options, session_maker): - - stop_args = {} - - access_token = None - if not options.username or not options.password: - access_token = utils.get_oauth_token(options.token_filename) - - session = session_maker() - try: - process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger()) - session.commit() - finally: - session.rollback() - session.close() - - if options.process_nb <= 0: - utils.get_logger().debug("Leftovers processed. Exiting.") - return None - - queue = mQueue() - stop_event = Event() - - #workaround for bug on using urllib2 and multiprocessing - req = urllib2.Request('http://localhost') - conn = None - try: - conn = urllib2.urlopen(req) - except: - utils.get_logger().debug("could not open localhost") - #donothing - finally: - if conn is not None: - conn.close() - - process_engines = [] - logger_queues = [] - - SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) - process_engines.append(engine_process) - lqueue = mQueue(50) - logger_queues.append(lqueue) - pid = os.getpid() - sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) - - tweet_processes = [] - - for i in range(options.process_nb - 1): - SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) - process_engines.append(engine_process) - lqueue = mQueue(50) - logger_queues.append(lqueue) - cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) - tweet_processes.append(cprocess) - - log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,)) - log_thread.daemon = True - - log_thread.start() - - sprocess.start() - for cprocess in tweet_processes: - cprocess.start() - - add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name, sprocess.pid), 'consumers':dict([(p.name, p.pid) for p in tweet_processes])}, session_maker) - - if options.duration >= 0: - end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration) - - def interupt_handler(signum, frame): - utils.get_logger().debug("shutdown asked " + repr(signum) + " " + repr(inspect.getframeinfo(frame, 9))) - stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)}) - stop_event.set() - - signal.signal(signal.SIGINT , interupt_handler) - signal.signal(signal.SIGHUP , interupt_handler) - signal.signal(signal.SIGALRM, interupt_handler) - signal.signal(signal.SIGTERM, interupt_handler) - - - while not stop_event.is_set(): - if options.duration >= 0 and datetime.datetime.utcnow() >= end_ts: - stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts}) - stop_event.set() - break - if sprocess.is_alive(): - utils.get_logger().debug("Source process alive") - time.sleep(1) - else: - stop_args.update({'message': 'Source process killed'}) - stop_event.set() - break - utils.get_logger().debug("Joining Source Process") - try: - sprocess.join(10) - except: - utils.get_logger().debug("Pb joining Source Process - terminating") - sprocess.terminate() - - for i, cprocess in enumerate(tweet_processes): - utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1)) - try: - cprocess.join(3) - except: - utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1)) - cprocess.terminate() - - - utils.get_logger().debug("Close queues") - try: - queue.close() - for lqueue in logger_queues: - lqueue.close() - except exception as e: - utils.get_logger().error("error when closing queues %s", repr(e)) - #do nothing - - - if options.process_nb > 1: - utils.get_logger().debug("Processing leftovers") - session = session_maker() - try: - process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger()) - session.commit() - finally: - session.rollback() - session.close() - - for pengine in process_engines: - pengine.dispose() - - return stop_args - - -def main(options, args): - - global conn_str - - conn_str = options.conn_str.strip() - if not re.match("^\w+://.+", conn_str): - conn_str = 'sqlite:///' + options.conn_str - - if conn_str.startswith("sqlite") and options.new: - filepath = conn_str[conn_str.find(":///") + 4:] - if os.path.exists(filepath): - i = 1 - basename, extension = os.path.splitext(filepath) - new_path = '%s.%d%s' % (basename, i, extension) - while i < 1000000 and os.path.exists(new_path): - i += 1 - new_path = '%s.%d%s' % (basename, i, extension) - if i >= 1000000: - raise Exception("Unable to find new filename for " + filepath) - else: - shutil.move(filepath, new_path) - - Session, engine, metadata = get_sessionmaker(conn_str) - - if options.new: - check_metadata = sqlalchemy.schema.MetaData(bind=engine, reflect=True) - if len(check_metadata.sorted_tables) > 0: - message = "Database %s not empty exiting" % conn_str - utils.get_logger().error(message) - sys.exit(message) - - metadata.create_all(engine) - session = Session() - try: - models.add_model_version(session) - finally: - session.close() - - stop_args = {} - try: - add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session) - stop_args = do_run(options, Session) - except Exception as e: - utils.get_logger().exception("Error in main thread") - outfile = StringIO.StringIO() - try: - traceback.print_exc(file=outfile) - stop_args = {'error': repr(e), 'message': getattr(e, 'message', ''), 'stacktrace':outfile.getvalue()} - finally: - outfile.close() - raise - finally: - add_process_event(type="shutdown", args=stop_args, session_maker=Session) - - utils.get_logger().debug("Done. Exiting. " + repr(stop_args)) - - - -if __name__ == '__main__': - - (options, args) = get_options() - - loggers = set_logging(options) - - utils.get_logger().debug("OPTIONS : " + repr(options)) - - if options.daemon: - import daemon - import lockfile - - hdlr_preserve = [] - for logger in loggers: - hdlr_preserve.extend([h.stream for h in logger.handlers]) - - context = daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=hdlr_preserve) - with context: - main(options, args) - else: - main(options, args) - diff -r b46cfa1d188b -r 8ae3d91ea4ae script/virtualenv/res/SQLAlchemy-0.7.8.tar.gz Binary file script/virtualenv/res/SQLAlchemy-0.7.8.tar.gz has changed diff -r b46cfa1d188b -r 8ae3d91ea4ae script/virtualenv/res/blessings-1.5.tar.gz Binary file script/virtualenv/res/blessings-1.5.tar.gz has changed diff -r b46cfa1d188b -r 8ae3d91ea4ae script/virtualenv/res/lib/lib_create_env.py --- a/script/virtualenv/res/lib/lib_create_env.py Tue Dec 18 10:40:15 2012 +0100 +++ b/script/virtualenv/res/lib/lib_create_env.py Tue Dec 18 12:26:05 2012 +0100 @@ -26,7 +26,6 @@ 'PYTZ': {'setup': 'pytz', 'url':'http://pypi.python.org/packages/source/p/pytz/pytz-2012h.tar.bz2', 'local':"pytz-2012h.tar.bz2", 'install': {'method': 'pip', 'option_str': None, 'dict_extra_env': None}}, 'SIMPLEJSON': {'setup': 'simplejson', 'url':'http://pypi.python.org/packages/source/s/simplejson/simplejson-2.6.2.tar.gz', 'local':"simplejson-2.6.2.tar.gz", 'install': {'method': 'pip', 'option_str': None, 'dict_extra_env': None}}, 'SQLALCHEMY': {'setup': 'sqlalchemy', 'url':'http://downloads.sourceforge.net/project/sqlalchemy/sqlalchemy/0.8.0b1/SQLAlchemy-0.8.0b1.tar.gz?r=http%3A%2F%2Fwww.sqlalchemy.org%2Fdownload.html&ts=1355091775&use_mirror=ignum', 'local':"SQLAlchemy-0.8.0b1.tar.gz", 'install': {'method': 'pip', 'option_str': None, 'dict_extra_env': None}}, - 'TWEEPY': {'setup': 'tweepy', 'url':'https://github.com/tweepy/tweepy/archive/1.12.tar.gz', 'local':"tweepy-1.12.tar.gz", 'install': {'method': 'pip', 'option_str': None, 'dict_extra_env': None}}, 'TWITTER': {'setup': 'twitter', 'url':'http://pypi.python.org/packages/source/t/twitter/twitter-1.9.0.tar.gz', 'local':"twitter-1.9.0.tar.gz", 'install': {'method': 'pip', 'option_str': None, 'dict_extra_env': None}}, 'TWITTER-TEXT': {'setup': 'twitter-text', 'url':'https://github.com/dryan/twitter-text-py/archive/master.tar.gz', 'local':"twitter-text-1.0.4.tar.gz", 'install': {'method': 'pip', 'option_str': None, 'dict_extra_env': None}}, 'REQUESTS': {'setup': 'requests', 'url':'https://github.com/kennethreitz/requests/archive/v1.0.2.tar.gz', 'local':'requests-v1.0.2.tar.gz', 'install' : {'method':'pip', 'option_str': None, 'dict_extra_env': None}}, diff -r b46cfa1d188b -r 8ae3d91ea4ae script/virtualenv/res/requests-0.13.1.tar.gz Binary file script/virtualenv/res/requests-0.13.1.tar.gz has changed diff -r b46cfa1d188b -r 8ae3d91ea4ae script/virtualenv/res/src/tweepy-1.12.tar.gz Binary file script/virtualenv/res/src/tweepy-1.12.tar.gz has changed diff -r b46cfa1d188b -r 8ae3d91ea4ae script/virtualenv/res/twitter-1.9.0.tar.gz Binary file script/virtualenv/res/twitter-1.9.0.tar.gz has changed diff -r b46cfa1d188b -r 8ae3d91ea4ae script/virtualenv/script/res/res_create_env.py --- a/script/virtualenv/script/res/res_create_env.py Tue Dec 18 10:40:15 2012 +0100 +++ b/script/virtualenv/script/res/res_create_env.py Tue Dec 18 12:26:05 2012 +0100 @@ -17,7 +17,6 @@ 'DATEUTIL', 'SIMPLEJSON', 'SQLALCHEMY', - 'TWEEPY', 'TWITTER', 'TWITTER-TEXT', 'REQUESTS',