--- a/script/lib/iri_tweet/iri_tweet/__init__.py Tue Dec 18 10:40:15 2012 +0100
+++ b/script/lib/iri_tweet/iri_tweet/__init__.py Tue Dec 18 12:26:05 2012 +0100
@@ -33,7 +33,7 @@
class IRITweetError(Exception):
- """Base class for all tweetstream errors"""
+ """Base class for all IRITweet errors"""
pass
--- a/script/lib/iri_tweet/iri_tweet/stream.py Tue Dec 18 10:40:15 2012 +0100
+++ b/script/lib/iri_tweet/iri_tweet/stream.py Tue Dec 18 12:26:05 2012 +0100
@@ -68,7 +68,7 @@
"""A network connection to Twitters streaming API
- :param auth: tweepy auth object.
+ :param auth: requests auth object.
:keyword catchup: Number of tweets from the past to get before switching to
live stream.
:keyword raw: If True, return each tweet's raw data direct from the socket,
@@ -168,14 +168,11 @@
if self._catchup_count:
postdata["count"] = self._catchup_count
- if self._auth:
- self._auth.apply_auth(self.url, "POST", headers, postdata)
-
if self._logger : self._logger.debug("BaseStream init connection url " + repr(self.url))
if self._logger : self._logger.debug("BaseStream init connection headers " + repr(headers))
if self._logger : self._logger.debug("BaseStream init connection data " + repr(postdata))
- self._resp = requests.post(self.url, headers=headers, data=postdata, prefetch=False)
+ self._resp = requests.post(self.url, auth=self._auth, headers=headers, data=postdata, stream=True)
if self._logger : self._logger.debug("BaseStream init connection " + repr(self._resp))
self._resp.raise_for_status()
--- a/script/lib/iri_tweet/iri_tweet/utils.py Tue Dec 18 10:40:15 2012 +0100
+++ b/script/lib/iri_tweet/iri_tweet/utils.py Tue Dec 18 12:26:05 2012 +0100
@@ -1,7 +1,7 @@
-from models import (Tweet, User, Hashtag, EntityHashtag, EntityUser, Url,
- EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY,
- ACCESS_TOKEN_SECRET, adapt_date, adapt_json, TweetSource, TweetLog, MediaType,
- Media, EntityMedia, Entity, EntityType)
+from models import (Tweet, User, Hashtag, EntityHashtag, EntityUser, Url, #@UnresolvedImport
+ EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY,#@UnresolvedImport
+ ACCESS_TOKEN_SECRET, adapt_date, adapt_json, TweetSource, TweetLog, MediaType, #@UnresolvedImport
+ Media, EntityMedia, Entity, EntityType) #@UnresolvedImport
from sqlalchemy.sql import select, or_ #@UnresolvedImport
import Queue #@UnresolvedImport
import anyjson #@UnresolvedImport
--- a/script/lib/tweetstream/CHANGELOG Tue Dec 18 10:40:15 2012 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,68 +0,0 @@
-0.1
-
- - Initial version
-
-0.2
-
- - Improved error handling
- - Added AuthenticationError and ConnectionError exceptions
- - Added ReconnectingTweetStream class that supports automatically
- reconnecting if the connection is dropped
-
-0.3
-
- - Fixed bugs in authtentication
- - Added TrackStream and FollowStream classes
- - Added list of endpoint names, and made them legal values for the url arg
-
-0.3.1
-
- - Added lots of tests
- - Added proper handling of keepalive newlines
- - Improved handling of closing streams
- - Added missing anyjson dependency to setup
- - Fixed bug where newlines and malformed content were counted as a tweet
-
-0.3.2
-
- - This release was skipped over, due to maintainer brainfart.
-
-0.3.3
-
- - Fixed setup.py so it wont attempt to load modules that aren't installed
- yet. Fixes installation issue.
-
-0.3.4
-
- - Updated to latest twitter streaming urls
- - Fixed a bug where we tried to call a method on None
-
-0.3.5
-
- - Removed a spurious print statement left over from debugging
- - Introduced common base class for all tweetstream exceptions
- - Make sure we raise a sensible error on 404. Include url in desc of that error
-
-0.3.6
-
- - Added LocationStream class for filtering on location bounding boxes.
-
-1.0.0
-
- - Changed API to match latest twitter endpoints. This adds SampleStream and
- FilterStream and deprecates TweetStream, FollowStream, LocationStream,
- TrackStream and ReconnectingTweetStream.
-
-1.1.0
-
- - Fixed issues #2 and #12, related to low volume streams not yielding tweets
- until a relatively large buffer was filled. This meant that tweets would
- arrive in bunches, not immediatly.
- - Switched to HTTPS urls for streams. Twitter will switch off HTTP streams
- on 29. sept. 2011.
- - Added support for Python 3
-
-1.1.1
-
- - Fixed issue #16. Odd case where python_version_tuple was returning ints
- rather than the strings the docs promis. Make sure we always cast to int.
--- a/script/lib/tweetstream/LICENSE Tue Dec 18 10:40:15 2012 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,27 +0,0 @@
-Copyright (c) 2009, Rune Halvorsen
-All rights reserved.
-
-Redistribution and use in source and binary forms, with or without
-modification, are permitted provided that the following conditions are met:
-
- * Redistributions of source code must retain the above copyright notice,
- this list of conditions and the following disclaimer.
- * Redistributions in binary form must reproduce the above copyright
- notice, this list of conditions and the following disclaimer in the
- documentation and/or other materials provided with the distribution.
-
-Neither the name of Rune Halvorsen nor the names of its contributors may be
-used to endorse or promote products derived from this software without
-specific prior written permission.
-
-THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
-AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
-THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
-PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS
-BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
-CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
-SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
-CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
-POSSIBILITY OF SUCH DAMAGE.
--- a/script/lib/tweetstream/README Tue Dec 18 10:40:15 2012 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,110 +0,0 @@
-.. -*- restructuredtext -*-
-
-##########################################
-tweetstream - Simple twitter streaming API
-##########################################
-
-Introduction
-------------
-
-tweetstream provides two classes, SampleStream and FollowStream, that can be
-used to get tweets from Twitter's streaming API. An instance of one of the
-classes can be used as an iterator. In addition to fetching tweets, the
-object keeps track of the number of tweets collected and the rate at which
-tweets are received.
-
-SampleStream delivers a sample of all tweets. FilterStream delivers
-tweets that match one or more criteria. Note that it's not possible
-to get all tweets without access to the "firehose" stream, which
-is not currently avaliable to the public.
-
-Twitter's documentation about the streaming API can be found here:
-http://dev.twitter.com/pages/streaming_api_methods .
-
-**Note** that the API is blocking. If for some reason data is not immediatly
-available, calls will block until enough data is available to yield a tweet.
-
-Examples
---------
-
-Printing incoming tweets:
-
->>> stream = tweetstream.SampleStream("username", "password")
->>> for tweet in stream:
-... print tweet
-
-
-The stream object can also be used as a context, as in this example that
-prints the author for each tweet as well as the tweet count and rate:
-
->>> with tweetstream.SampleStream("username", "password") as stream
-... for tweet in stream:
-... print "Got tweet from %-16s\t( tweet %d, rate %.1f tweets/sec)" % (
-... tweet["user"]["screen_name"], stream.count, stream.rate )
-
-
-Stream objects can raise ConnectionError or AuthenticationError exceptions:
-
->>> try:
-... with tweetstream.TweetStream("username", "password") as stream
-... for tweet in stream:
-... print "Got tweet from %-16s\t( tweet %d, rate %.1f tweets/sec)" % (
-... tweet["user"]["screen_name"], stream.count, stream.rate )
-... except tweetstream.ConnectionError, e:
-... print "Disconnected from twitter. Reason:", e.reason
-
-To get tweets that match specific criteria, use the FilterStream. FilterStreams
-take three keyword arguments: "locations", "follow" and "track".
-
-Locations are a list of bounding boxes in which geotagged tweets should originate.
-The argument should be an iterable of longitude/latitude pairs.
-
-Track specifies keywords to track. The argument should be an iterable of
-strings.
-
-Follow returns statuses that reference given users. Argument should be an iterable
-of twitter user IDs. The IDs are userid ints, not the screen names.
-
->>> words = ["opera", "firefox", "safari"]
->>> people = [123,124,125]
->>> locations = ["-122.75,36.8", "-121.75,37.8"]
->>> with tweetstream.FilterStream("username", "password", track=words,
-... follow=people, locations=locations) as stream
-... for tweet in stream:
-... print "Got interesting tweet:", tweet
-
-
-Deprecated classes
-------------------
-
-tweetstream used to contain the classes TweetStream, FollowStream, TrackStream
-LocationStream and ReconnectingTweetStream. These were deprecated when twitter
-changed its API end points. The same functionality is now available in
-SampleStream and FilterStream. The deprecated methods will emit a warning when
-used, but will remain functional for a while longer.
-
-
-Changelog
----------
-
-See the CHANGELOG file
-
-Contact
--------
-
-The author is Rune Halvorsen <runefh@gmail.com>. The project resides at
-http://bitbucket.org/runeh/tweetstream . If you find bugs, or have feature
-requests, please report them in the project site issue tracker. Patches are
-also very welcome.
-
-Contributors
-------------
-
-- Rune Halvorsen
-- Christopher Schierkolk
-
-License
--------
-
-This software is licensed under the ``New BSD License``. See the ``LICENCE``
-file in the top distribution directory for the full license text.
--- a/script/lib/tweetstream/conftest.py Tue Dec 18 10:40:15 2012 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,10 +0,0 @@
-# content of conftest.py
-
-import pytest
-def pytest_addoption(parser):
- parser.addoption("--runslow", action="store_true",
- help="run slow tests")
-
-def pytest_runtest_setup(item):
- if 'slow' in item.keywords and not item.config.getvalue("runslow"):
- pytest.skip("need --runslow option to run")
\ No newline at end of file
--- a/script/lib/tweetstream/servercontext.py Tue Dec 18 10:40:15 2012 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,221 +0,0 @@
-import threading
-import contextlib
-import time
-import os
-import socket
-import random
-from functools import partial
-from inspect import isclass
-from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
-from SimpleHTTPServer import SimpleHTTPRequestHandler
-from SocketServer import BaseRequestHandler
-
-
-class ServerError(Exception):
- pass
-
-
-class ServerContext(object):
- """Context object with information about a running test server."""
-
- def __init__(self, address, port):
- self.address = address or "localhost"
- self.port = port
-
- @property
- def baseurl(self):
- return "http://%s:%s" % (self.address, self.port)
-
- def __str__(self):
- return "<ServerContext %s >" % self.baseurl
-
- __repr__ = __str__
-
-
-class _SilentSimpleHTTPRequestHandler(SimpleHTTPRequestHandler):
-
- def __init__(self, *args, **kwargs):
- self.logging = kwargs.get("logging", False)
- SimpleHTTPRequestHandler.__init__(self, *args, **kwargs)
-
- def log_message(self, *args, **kwargs):
- if self.logging:
- SimpleHTTPRequestHandler.log_message(self, *args, **kwargs)
-
-
-class _TestHandler(BaseHTTPRequestHandler):
- """RequestHandler class that handles requests that use a custom handler
- callable."""
-
- def __init__(self, handler, methods, *args, **kwargs):
- self._handler = handler
- self._methods = methods
- self._response_sent = False
- self._headers_sent = False
- self.logging = kwargs.get("logging", False)
- BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
-
- def log_message(self, *args, **kwargs):
- if self.logging:
- BaseHTTPRequestHandler.log_message(self, *args, **kwargs)
-
- def send_response(self, *args, **kwargs):
- self._response_sent = True
- BaseHTTPRequestHandler.send_response(self, *args, **kwargs)
-
- def end_headers(self, *args, **kwargs):
- self._headers_sent = True
- BaseHTTPRequestHandler.end_headers(self, *args, **kwargs)
-
- def _do_whatever(self):
- """Called in place of do_METHOD"""
- data = self._handler(self)
-
- if hasattr(data, "next"):
- # assume it's something supporting generator protocol
- self._handle_with_iterator(data)
- else:
- # Nothing more to do then.
- pass
-
-
- def __getattr__(self, name):
- if name.startswith("do_") and name[3:].lower() in self._methods:
- return self._do_whatever
- else:
- # fixme instance or class?
- raise AttributeError(name)
-
- def _handle_with_iterator(self, iterator):
- self.connection.settimeout(0.1)
- for data in iterator:
- if not self.server.server_thread.running:
- return
-
- if not self._response_sent:
- self.send_response(200)
- if not self._headers_sent:
- self.end_headers()
-
- self.wfile.write(data)
- # flush immediatly. We may want to do trickling writes
- # or something else tha trequires bypassing normal caching
- self.wfile.flush()
-
-class _TestServerThread(threading.Thread):
- """Thread class for a running test server"""
-
- def __init__(self, handler, methods, cwd, port, address):
- threading.Thread.__init__(self)
- self.startup_finished = threading.Event()
- self._methods = methods
- self._cwd = cwd
- self._orig_cwd = None
- self._handler = self._wrap_handler(handler, methods)
- self._setup()
- self.running = True
- self.serverloc = (address, port)
- self.error = None
-
- def _wrap_handler(self, handler, methods):
- if isclass(handler) and issubclass(handler, BaseRequestHandler):
- return handler # It's OK. user passed in a proper handler
- elif callable(handler):
- return partial(_TestHandler, handler, methods)
- # it's a callable, so wrap in a req handler
- else:
- raise ServerError("handler must be callable or RequestHandler")
-
- def _setup(self):
- if self._cwd != "./":
- self._orig_cwd = os.getcwd()
- os.chdir(self._cwd)
-
- def _init_server(self):
- """Hooks up the server socket"""
- try:
- if self.serverloc[1] == "random":
- retries = 10 # try getting an available port max this many times
- while True:
- try:
- self.serverloc = (self.serverloc[0],
- random.randint(1025, 49151))
- self._server = HTTPServer(self.serverloc, self._handler)
- except socket.error:
- retries -= 1
- if not retries: # not able to get a port.
- raise
- else:
- break
- else: # use specific port. this might throw, that's expected
- self._server = HTTPServer(self.serverloc, self._handler)
- except socket.error, e:
- self.running = False
- self.error = e
- # set this here, since we'll never enter the serve loop where
- # it is usually set:
- self.startup_finished.set()
- return
-
- self._server.allow_reuse_address = True # lots of tests, same port
- self._server.timeout = 0.1
- self._server.server_thread = self
-
-
- def run(self):
- self._init_server()
-
- while self.running:
- self._server.handle_request() # blocks for self.timeout secs
- # First time this falls through, signal the parent thread that
- # the server is ready for incomming connections
- if not self.startup_finished.is_set():
- self.startup_finished.set()
-
- self._cleanup()
-
- def stop(self):
- """Stop the server and attempt to make the thread terminate.
- This happens async but the calling code can check periodically
- the isRunning flag on the thread object.
- """
- # actual stopping happens in the run method
- self.running = False
-
- def _cleanup(self):
- """Do some rudimentary cleanup."""
- if self._orig_cwd:
- os.chdir(self._orig_cwd)
-
-
-@contextlib.contextmanager
-def test_server(handler=_SilentSimpleHTTPRequestHandler, port=8514,
- address="", methods=("get", "head"), cwd="./"):
- """Context that makes available a web server in a separate thread"""
- thread = _TestServerThread(handler=handler, methods=methods, cwd=cwd,
- port=port, address=address)
- thread.start()
-
- # fixme: should this be daemonized? If it isn't it will block the entire
- # app, but that should never happen anyway..
- thread.startup_finished.wait()
-
- if thread.error: # startup failed! Bail, throw whatever the server did
- raise thread.error
-
- exc = None
- try:
- yield ServerContext(*thread.serverloc)
- except Exception, exc:
- pass
- thread.stop()
- thread.join(5) # giving it a lot of leeway. should never happen
-
- if exc:
- raise exc
-
- # fixme: this takes second priorty after the internal exception but would
- # still be nice to signal back to calling code.
-
- if thread.isAlive():
- raise Warning("Test server could not be stopped")
--- a/script/lib/tweetstream/setup.py Tue Dec 18 10:40:15 2012 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,82 +0,0 @@
-#@PydevCodeAnalysisIgnore
-import sys
-import os
-
-extra = {}
-if sys.version_info >= (3, 0):
- extra.update(use_2to3=True)
-
-
-try:
- from setuptools import setup, find_packages
-except ImportError:
- from distutils.core import setup, find_packages
-
-
-# -*- Distribution Meta -*-
-import re
-re_meta = re.compile(r'__(\w+?)__\s*=\s*(.*)')
-re_vers = re.compile(r'VERSION\s*=\s*\((.*?)\)')
-re_doc = re.compile(r'^"""(.+?)"""', re.M|re.S)
-rq = lambda s: s.strip("\"'")
-
-
-def add_default(m):
- attr_name, attr_value = m.groups()
- return ((attr_name, rq(attr_value)), )
-
-
-def add_version(m):
- v = list(map(rq, m.groups()[0].split(", ")))
- return (("VERSION", ".".join(v[0:3]) + "".join(v[3:])), )
-
-
-def add_doc(m):
- return (("doc", m.groups()[0].replace("\n", " ")), )
-
-pats = {re_meta: add_default,
- re_vers: add_version}
-here = os.path.abspath(os.path.dirname(__file__))
-meta_fh = open(os.path.join(here, "tweetstream/__init__.py"))
-try:
- meta = {}
- acc = []
- for line in meta_fh:
- if line.strip() == '# -eof meta-':
- break
- acc.append(line)
- for pattern, handler in pats.items():
- m = pattern.match(line.strip())
- if m:
- meta.update(handler(m))
- m = re_doc.match("".join(acc).strip())
- if m:
- meta.update(add_doc(m))
-finally:
- meta_fh.close()
-
-
-setup(name='tweetstream',
- version=meta["VERSION"],
- description=meta["doc"],
- long_description=open("README").read(),
- classifiers=[
- 'License :: OSI Approved :: BSD License',
- 'Intended Audience :: Developers',
- 'Programming Language :: Python :: 2.6',
- 'Programming Language :: Python :: 2.7',
- 'Programming Language :: Python :: 3',
- 'Programming Language :: Python :: 3.1',
- ],
- keywords='twitter',
- author=meta["author"],
- author_email=meta["contact"],
- url=meta["homepage"],
- license='BSD',
- packages=find_packages(exclude=['ez_setup', 'examples', 'tests']),
- include_package_data=True,
- zip_safe=False,
- platforms=["any"],
- install_requires=['anyjson'],
- **extra
-)
--- a/script/lib/tweetstream/tests/test_tweetstream.py Tue Dec 18 10:40:15 2012 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,188 +0,0 @@
-import contextlib
-import threading
-import time
-
-from tweetstream import TweetStream, FollowStream, TrackStream, LocationStream
-from tweetstream import ConnectionError, AuthenticationError, SampleStream, FilterStream
-from tweepy.auth import BasicAuthHandler
-
-import pytest
-from pytest import raises
-slow = pytest.mark.slow
-
-from servercontext import test_server
-
-single_tweet = r"""{"in_reply_to_status_id":null,"in_reply_to_user_id":null,"favorited":false,"created_at":"Tue Jun 16 10:40:14 +0000 2009","in_reply_to_screen_name":null,"text":"record industry just keeps on amazing me: http:\/\/is.gd\/13lFo - $150k per song you've SHARED, not that somebody has actually DOWNLOADED.","user":{"notifications":null,"profile_background_tile":false,"followers_count":206,"time_zone":"Copenhagen","utc_offset":3600,"friends_count":191,"profile_background_color":"ffffff","profile_image_url":"http:\/\/s3.amazonaws.com\/twitter_production\/profile_images\/250715794\/profile_normal.png","description":"Digital product developer, currently at Opera Software. My tweets are my opinions, not those of my employer.","verified_profile":false,"protected":false,"favourites_count":0,"profile_text_color":"3C3940","screen_name":"eiriksnilsen","name":"Eirik Stridsklev N.","following":null,"created_at":"Tue May 06 12:24:12 +0000 2008","profile_background_image_url":"http:\/\/s3.amazonaws.com\/twitter_production\/profile_background_images\/10531192\/160x600opera15.gif","profile_link_color":"0099B9","profile_sidebar_fill_color":"95E8EC","url":"http:\/\/www.stridsklev-nilsen.no\/eirik","id":14672543,"statuses_count":506,"profile_sidebar_border_color":"5ED4DC","location":"Oslo, Norway"},"id":2190767504,"truncated":false,"source":"<a href=\"http:\/\/widgets.opera.com\/widget\/7206\">Twitter Opera widget<\/a>"}""" + "\r"
-
-
-def parameterized(funcarglist):
- def wrapper(function):
- function.funcarglist = funcarglist
- return function
- return wrapper
-
-def pytest_generate_tests(metafunc):
- for funcargs in getattr(metafunc.function, 'funcarglist', ()):
- metafunc.addcall(funcargs=funcargs)
-
-
-streamtypes = [
- dict(cls=TweetStream, args=[], kwargs=dict()),
- dict(cls=SampleStream, args=[], kwargs=dict()),
- dict(cls=FilterStream, args=[], kwargs=dict(track=("test",))),
- dict(cls=FollowStream, args=[[1, 2, 3]], kwargs=dict()),
- dict(cls=TrackStream, args=["opera"], kwargs=dict()),
- dict(cls=LocationStream, args=["123,4321"], kwargs=dict())
-]
-
-
-@parameterized(streamtypes)
-def test_bad_auth(cls, args, kwargs):
- """Test that the proper exception is raised when the user could not be
- authenticated"""
- def auth_denied(request):
- request.send_error(401)
-
- with raises(AuthenticationError):
- with test_server(handler=auth_denied, methods=("post", "get"), port="random") as server:
- auth = BasicAuthHandler("user", "passwd")
- stream = cls(auth, *args, url=server.baseurl)
- for e in stream: pass
-
-
-@parameterized(streamtypes)
-def test_404_url(cls, args, kwargs):
- """Test that the proper exception is raised when the stream URL can't be
- found"""
- def not_found(request):
- request.send_error(404)
-
- with raises(ConnectionError):
- with test_server(handler=not_found, methods=("post", "get"), port="random") as server:
- auth = BasicAuthHandler("user", "passwd")
- stream = cls(auth, *args, url=server.baseurl)
- for e in stream: pass
-
-
-@parameterized(streamtypes)
-def test_bad_content(cls, args, kwargs):
- """Test error handling if we are given invalid data"""
- def bad_content(request):
- for n in xrange(10):
- # what json we pass doesn't matter. It's not verifying the
- # strcuture, only checking that it's parsable
- yield "[1,2,3]\r"
- yield "[1,2, I need no stinking close brace\r"
- yield "[1,2,3]\r"
-
-
- with raises(ConnectionError):
- with test_server(handler=bad_content, methods=("post", "get"), port="random") as server:
- auth = BasicAuthHandler("user", "passwd")
- stream = cls(auth, *args, url=server.baseurl)
- for tweet in stream:
- pass
-
-
-@parameterized(streamtypes)
-def test_closed_connection(cls, args, kwargs):
- """Test error handling if server unexpectedly closes connection"""
- cnt = 1000
- def bad_content(request):
- for n in xrange(cnt):
- # what json we pass doesn't matter. It's not verifying the
- # strcuture, only checking that it's parsable
- yield "[1,2,3]\r"
-
- with raises(ConnectionError):
- with test_server(handler=bad_content, methods=("post", "get"), port="random") as server:
- auth = BasicAuthHandler("foo", "bar")
- stream = cls(auth, *args, url=server.baseurl)
- for tweet in stream:
- pass
-
-
-@parameterized(streamtypes)
-def test_bad_host(cls, args, kwargs):
- """Test behaviour if we can't connect to the host"""
- with raises(ConnectionError):
- stream = cls("username", "passwd", *args, url="http://wedfwecfghhreewerewads.foo")
- stream.next()
-
-
-@parameterized(streamtypes)
-def smoke_test_receive_tweets(cls, args, kwargs):
- """Receive 100k tweets and disconnect (slow)"""
- total = 100000
-
- def tweetsource(request):
- while True:
- yield single_tweet + "\n"
-
- with test_server(handler=tweetsource, methods=("post", "get"), port="random") as server:
- auth = BasicAuthHandler("foo", "bar")
- stream = cls(auth, *args, url=server.baseurl)
- for tweet in stream:
- if stream.count == total:
- break
-
-
-@parameterized(streamtypes)
-def test_keepalive(cls, args, kwargs):
- """Make sure we behave sanely when there are keepalive newlines in the
- data recevived from twitter"""
- def tweetsource(request):
- yield single_tweet+"\n"
- yield "\n"
- yield "\n"
- yield single_tweet+"\n"
- yield "\n"
- yield "\n"
- yield "\n"
- yield "\n"
- yield "\n"
- yield "\n"
- yield "\n"
- yield single_tweet+"\n"
- yield "\n"
-
-
- with test_server(handler=tweetsource, methods=("post", "get"), port="random") as server:
- auth = BasicAuthHandler("foo", "bar")
- stream = cls(auth, *args, url=server.baseurl)
- try:
- for tweet in stream:
- pass
- except ConnectionError:
- assert stream.count == 3, "Got %s, wanted 3" % stream.count
- else:
- assert False, "Didn't handle keepalive"
-
-
-@slow
-@parameterized(streamtypes)
-def test_buffering(cls, args, kwargs):
- """Test if buffering stops data from being returned immediately.
- If there is some buffering in play that might mean data is only returned
- from the generator when the buffer is full. If buffer is bigger than a
- tweet, this will happen. Default buffer size in the part of socket lib
- that enables readline is 8k. Max tweet length is around 3k."""
-
- def tweetsource(request):
- yield single_tweet+"\n"
- time.sleep(2)
- # need to yield a bunch here so we're sure we'll return from the
- # blocking call in case the buffering bug is present.
- for n in xrange(100):
- yield single_tweet+"\n"
-
-
- with test_server(handler=tweetsource, methods=("post", "get"), port="random") as server:
- auth = BasicAuthHandler("foo", "bar")
- stream = cls(auth, *args, url=server.baseurl)
- start = time.time()
- stream.next()
- first = time.time()
- diff = first - start
- assert diff < 1, "Getting first tweet took more than a second!"
-
--- a/script/lib/tweetstream/tox.ini Tue Dec 18 10:40:15 2012 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,16 +0,0 @@
-[tox]
-envlist = py25,py26,py27,py30,py31,py32
-
-[testenv]
-deps=pytest
-sitepackages=False
-commands=py.test --runslow
-
-[testenv:py30]
-changedir = .tox
-
-[testenv:py31]
-changedir = .tox
-
-[testenv:py32]
-changedir = .tox
\ No newline at end of file
--- a/script/lib/tweetstream/tweetstream/__init__.py Tue Dec 18 10:40:15 2012 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,45 +0,0 @@
-"""Simple access to Twitter's streaming API"""
-
-VERSION = (1, 1, 1, 'iri')
-__version__ = ".".join(map(str, VERSION[0:3])) + "".join(VERSION[3:])
-__author__ = "Rune Halvorsen"
-__contact__ = "runefh@gmail.com"
-__homepage__ = "http://bitbucket.org/runeh/tweetstream/"
-__docformat__ = "restructuredtext"
-
-# -eof meta-
-
-
-"""
- .. data:: USER_AGENT
-
- The default user agent string for stream objects
-"""
-
-USER_AGENT = "TweetStream %s" % __version__
-
-
-class TweetStreamError(Exception):
- """Base class for all tweetstream errors"""
- pass
-
-
-class AuthenticationError(TweetStreamError):
- """Exception raised if the username/password is not accepted"""
- pass
-
-
-class ConnectionError(TweetStreamError):
- """Raised when there are network problems. This means when there are
- dns errors, network errors, twitter issues"""
-
- def __init__(self, reason, details=None):
- self.reason = reason
- self.details = details
-
- def __str__(self):
- return '<ConnectionError %s>' % self.reason
-
-
-from .streamclasses import SampleStream, FilterStream
-from .deprecated import FollowStream, TrackStream, LocationStream, TweetStream, ReconnectingTweetStream
--- a/script/lib/tweetstream/tweetstream/deprecated.py Tue Dec 18 10:40:15 2012 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,82 +0,0 @@
-from .streamclasses import FilterStream, SampleStream, ConnectionError
-import time
-
-class DeprecatedStream(FilterStream):
- def __init__(self, *args, **kwargs):
- import warnings
- warnings.warn("%s is deprecated. Use FilterStream instead" % self.__class__.__name__, DeprecationWarning)
- super(DeprecatedStream, self).__init__(*args, **kwargs)
-
-
-class FollowStream(DeprecatedStream):
- def __init__(self, auth, follow, catchup=None, url=None):
- super(FollowStream, self).__init__(auth, follow=follow, catchup=catchup, url=url)
-
-
-class TrackStream(DeprecatedStream):
- def __init__(self, auth, track, catchup=None, url=None, slow=False):
- super(TrackStream, self).__init__(auth, track=track, catchup=catchup, url=url)
-
-
-class LocationStream(DeprecatedStream):
- def __init__(self, auth, locations, catchup=None, url=None, slow=False):
- super(LocationStream, self).__init__(auth, locations=locations, catchup=catchup, url=url)
-
-
-class TweetStream(SampleStream):
- def __init__(self, *args, **kwargs):
- import warnings
- warnings.warn("%s is deprecated. Use SampleStream instead" % self.__class__.__name__, DeprecationWarning)
- SampleStream.__init__(self, *args, **kwargs)
-
-
-class ReconnectingTweetStream(TweetStream):
- """TweetStream class that automatically tries to reconnect if the
- connecting goes down. Reconnecting, and waiting for reconnecting, is
- blocking.
-
- :param username: See :TweetStream:
-
- :param password: See :TweetStream:
-
- :keyword url: See :TweetStream:
-
- :keyword reconnects: Number of reconnects before a ConnectionError is
- raised. Default is 3
-
- :error_cb: Optional callable that will be called just before trying to
- reconnect. The callback will be called with a single argument, the
- exception that caused the reconnect attempt. Default is None
-
- :retry_wait: Time to wait before reconnecting in seconds. Default is 5
-
- """
-
- def __init__(self, auth, url="sample",
- reconnects=3, error_cb=None, retry_wait=5):
- self.max_reconnects = reconnects
- self.retry_wait = retry_wait
- self._reconnects = 0
- self._error_cb = error_cb
- TweetStream.__init__(self, auth, url=url)
-
- def next(self):
- while True:
- try:
- return TweetStream.next(self)
- except ConnectionError, e:
- self._reconnects += 1
- if self._reconnects > self.max_reconnects:
- raise ConnectionError("Too many retries")
-
- # Note: error_cb is not called on the last error since we
- # raise a ConnectionError instead
- if callable(self._error_cb):
- self._error_cb(e)
-
- time.sleep(self.retry_wait)
- # Don't listen to auth error, since we can't reasonably reconnect
- # when we get one.
-
-
-
--- a/script/lib/tweetstream/tweetstream/streamclasses.py Tue Dec 18 10:40:15 2012 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,256 +0,0 @@
-import time
-import urllib
-import urllib2
-import socket
-from platform import python_version_tuple
-import anyjson
-
-from . import AuthenticationError, ConnectionError, USER_AGENT
-
-class BaseStream(object):
- """A network connection to Twitters streaming API
-
- :param auth: tweepy auth object.
- :keyword catchup: Number of tweets from the past to get before switching to
- live stream.
- :keyword raw: If True, return each tweet's raw data direct from the socket,
- without UTF8 decoding or parsing, rather than a parsed object. The
- default is False.
- :keyword timeout: If non-None, set a timeout in seconds on the receiving
- socket. Certain types of network problems (e.g., disconnecting a VPN)
- can cause the connection to hang, leading to indefinite blocking that
- requires kill -9 to resolve. Setting a timeout leads to an orderly
- shutdown in these cases. The default is None (i.e., no timeout).
- :keyword url: Endpoint URL for the object. Note: you should not
- need to edit this. It's present to make testing easier.
-
- .. attribute:: connected
-
- True if the object is currently connected to the stream.
-
- .. attribute:: url
-
- The URL to which the object is connected
-
- .. attribute:: starttime
-
- The timestamp, in seconds since the epoch, the object connected to the
- streaming api.
-
- .. attribute:: count
-
- The number of tweets that have been returned by the object.
-
- .. attribute:: rate
-
- The rate at which tweets have been returned from the object as a
- float. see also :attr: `rate_period`.
-
- .. attribute:: rate_period
-
- The ammount of time to sample tweets to calculate tweet rate. By
- default 10 seconds. Changes to this attribute will not be reflected
- until the next time the rate is calculated. The rate of tweets vary
- with time of day etc. so it's usefull to set this to something
- sensible.
-
- .. attribute:: user_agent
-
- User agent string that will be included in the request. NOTE: This can
- not be changed after the connection has been made. This property must
- thus be set before accessing the iterator. The default is set in
- :attr: `USER_AGENT`.
- """
-
- def __init__(self, auth,
- catchup=None, raw=False, timeout=None, url=None):
- self._conn = None
- self._rate_ts = None
- self._rate_cnt = 0
- self._auth = auth
- self._catchup_count = catchup
- self._raw_mode = raw
- self._timeout = timeout
- self._iter = self.__iter__()
-
- self.rate_period = 10 # in seconds
- self.connected = False
- self.starttime = None
- self.count = 0
- self.rate = 0
- self.user_agent = USER_AGENT
- if url: self.url = url
-
- self.muststop = False
-
- def __enter__(self):
- return self
-
- def __exit__(self, *params):
- self.close()
- return False
-
- def _init_conn(self):
- """Open the connection to the twitter server"""
- headers = {'User-Agent': self.user_agent}
-
- postdata = self._get_post_data() or {}
- if self._catchup_count:
- postdata["count"] = self._catchup_count
-
- poststring = urllib.urlencode(postdata) if postdata else None
-
- if self._auth:
- self._auth.apply_auth(self.url, "POST", headers, postdata)
-
- req = urllib2.Request(self.url, poststring, headers)
-
- try:
- self._conn = urllib2.urlopen(req, timeout=self._timeout)
-
- except urllib2.HTTPError, exception:
- if exception.code == 401:
- raise AuthenticationError("Access denied")
- elif exception.code == 404:
- raise ConnectionError("URL not found: %s" % self.url)
- else: # re raise. No idea what would cause this, so want to know
- raise
- except urllib2.URLError, exception:
- raise ConnectionError(exception.reason)
-
- # This is horrible. This line grabs the raw socket (actually an ssl
- # wrapped socket) from the guts of urllib2/httplib. We want the raw
- # socket so we can bypass the buffering that those libs provide.
- # The buffering is reasonable when dealing with connections that
- # try to finish as soon as possible. With twitters' never ending
- # connections, it causes a bug where we would not deliver tweets
- # until the buffer was full. That's problematic for very low volume
- # filterstreams, since you might not see a tweet for minutes or hours
- # after they occured while the buffer fills.
- #
- # Oh, and the inards of the http libs are different things on in
- # py2 and 3, so need to deal with that. py3 libs do more of what I
- # want by default, but I wont do more special casing for it than
- # neccessary.
-
- major, _, _ = python_version_tuple()
- # The cast is needed because apparently some versions return strings
- # and some return ints.
- # On my ubuntu with stock 2.6 I get strings, which match the docs.
- # Someone reported the issue on 2.6.1 on macos, but that was
- # manually built, not the bundled one. Anyway, cast for safety.
- major = int(major)
- if major == 2:
- self._socket = self._conn.fp._sock.fp._sock
- else:
- self._socket = self._conn.fp.raw
- # our code that reads from the socket expects a method called recv.
- # py3 socket.SocketIO uses the name read, so alias it.
- self._socket.recv = self._socket.read
-
- self.connected = True
- if not self.starttime:
- self.starttime = time.time()
- if not self._rate_ts:
- self._rate_ts = time.time()
-
- def _get_post_data(self):
- """Subclasses that need to add post data to the request can override
- this method and return post data. The data should be in the format
- returned by urllib.urlencode."""
- return None
-
- def __muststop(self):
- if callable(self.muststop):
- return self.muststop()
- else:
- return self.muststop
-
- def _update_rate(self):
- rate_time = time.time() - self._rate_ts
- if not self._rate_ts or rate_time > self.rate_period:
- self.rate = self._rate_cnt / rate_time
- self._rate_cnt = 0
- self._rate_ts = time.time()
-
- def __iter__(self):
- buf = b""
- while True:
- try:
- if self.__muststop():
- raise StopIteration()
-
- if not self.connected:
- self._init_conn()
-
- buf += self._socket.recv(8192)
- if buf == b"": # something is wrong
- self.close()
- raise ConnectionError("Got entry of length 0. Disconnected")
- elif buf.isspace():
- buf = b""
- elif b"\r" not in buf: # not enough data yet. Loop around
- continue
-
- lines = buf.split(b"\r")
- buf = lines[-1]
- lines = lines[:-1]
-
- for line in lines:
- if (self._raw_mode):
- tweet = line
- else:
- line = line.decode("utf8")
- try:
- tweet = anyjson.deserialize(line)
- except ValueError, e:
- self.close()
- raise ConnectionError("Got invalid data from twitter", details=line)
-
- if 'text' in tweet:
- self.count += 1
- self._rate_cnt += 1
- self._update_rate()
- yield tweet
-
-
- except socket.error, e:
- self.close()
- raise ConnectionError("Server disconnected")
-
-
- def next(self):
- """Return the next available tweet. This call is blocking!"""
- return self._iter.next()
-
-
- def close(self):
- """
- Close the connection to the streaming server.
- """
- self.connected = False
- if self._conn:
- self._conn.close()
-
-
-class SampleStream(BaseStream):
- url = "https://stream.twitter.com/1/statuses/sample.json"
-
-
-class FilterStream(BaseStream):
- url = "https://stream.twitter.com/1/statuses/filter.json"
-
- def __init__(self, auth, follow=None, locations=None,
- track=None, catchup=None, url=None, raw=False, timeout=None):
- self._follow = follow
- self._locations = locations
- self._track = track
- # remove follow, locations, track
- BaseStream.__init__(self, auth, url=url, raw=raw, catchup=catchup, timeout=timeout)
-
- def _get_post_data(self):
- postdata = {}
- if self._follow: postdata["follow"] = ",".join([str(e) for e in self._follow])
- if self._locations: postdata["locations"] = ",".join(self._locations)
- if self._track: postdata["track"] = ",".join(self._track)
- return postdata
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/script/stream/recorder_stream.py Tue Dec 18 12:26:05 2012 +0100
@@ -0,0 +1,570 @@
+from getpass import getpass
+from iri_tweet import models, utils
+from iri_tweet.models import TweetSource, TweetLog, ProcessEvent
+from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event,
+ get_logger)
+from optparse import OptionParser
+from sqlalchemy.exc import OperationalError
+from sqlalchemy.orm import scoped_session
+import Queue
+import StringIO
+import anyjson
+import datetime
+import inspect
+import iri_tweet.stream
+import logging
+import os
+import re
+import requests.auth
+import shutil
+import signal
+import socket
+import sqlalchemy.schema
+import sys
+import threading
+import time
+import traceback
+import urllib2
+socket._fileobject.default_bufsize = 0
+
+
+
+# columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user']
+columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source']
+# columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following']
+columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count']
+# just put it in a sqlite3 tqble
+
+DEFAULT_TIMEOUT = 5
+
+def set_logging(options):
+ loggers = []
+
+ loggers.append(utils.set_logging(options, logging.getLogger('iri.tweet')))
+ loggers.append(utils.set_logging(options, logging.getLogger('multiprocessing')))
+ if options.debug >= 2:
+ loggers.append(utils.set_logging(options, logging.getLogger('sqlalchemy.engine')))
+ # utils.set_logging(options, logging.getLogger('sqlalchemy.dialects'))
+ # utils.set_logging(options, logging.getLogger('sqlalchemy.pool'))
+ # utils.set_logging(options, logging.getLogger('sqlalchemy.orm'))
+ return loggers
+
+def set_logging_process(options, queue):
+ qlogger = utils.set_logging(options, logging.getLogger('iri.tweet.p'), queue)
+ qlogger.propagate = 0
+ return qlogger
+
+def get_auth(options, access_token):
+ if options.username and options.password:
+ auth = requests.auth.BasicAuthHandler(options.username, options.password)
+ else:
+ consumer_key = models.CONSUMER_KEY
+ consumer_secret = models.CONSUMER_SECRET
+ auth = requests.auth.OAuth1(access_token[0], access_token[1], consumer_key, consumer_secret, signature_type='auth_header')
+ return auth
+
+
+def add_process_event(type, args, session_maker):
+ session = session_maker()
+ try:
+ evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type)
+ session.add(evt)
+ session.commit()
+ finally:
+ session.close()
+
+
+class BaseProcess(Process):
+
+ def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
+ self.parent_pid = parent_pid
+ self.session_maker = session_maker
+ self.queue = queue
+ self.options = options
+ self.logger_queue = logger_queue
+ self.stop_event = stop_event
+ self.access_token = access_token
+
+ super(BaseProcess, self).__init__()
+
+ #
+ # from http://stackoverflow.com/questions/2542610/python-daemon-doesnt-kill-its-kids
+ #
+ def parent_is_alive(self):
+ try:
+ # try to call Parent
+ os.kill(self.parent_pid, 0)
+ except OSError:
+ # *beeep* oh no! The phone's disconnected!
+ return False
+ else:
+ # *ring* Hi mom!
+ return True
+
+
+ def __get_process_event_args(self):
+ return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__, 'access_token':self.access_token}
+
+ def run(self):
+ try:
+ add_process_event("start_worker", self.__get_process_event_args(), self.session_maker)
+ self.do_run()
+ finally:
+ add_process_event("stop_worker", self.__get_process_event_args(), self.session_maker)
+
+ def do_run(self):
+ raise NotImplementedError()
+
+
+
+class SourceProcess(BaseProcess):
+
+ def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
+ self.track = options.track
+ self.token_filename = options.token_filename
+ self.catchup = options.catchup
+ self.timeout = options.timeout
+ self.stream = None
+ super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
+
+ def __source_stream_iter(self):
+
+ self.logger = set_logging_process(self.options, self.logger_queue)
+ self.logger.debug("SourceProcess : run ")
+
+ self.auth = get_auth(self.options, self.access_token)
+ self.logger.debug("SourceProcess : auth set ")
+
+ track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
+ self.logger.debug("SourceProcess : track list " + track_list)
+
+ track_list = [k.strip() for k in track_list.split(',')]
+
+ self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list))
+ self.stream = iri_tweet.stream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout, chunk_size=1, logger=self.logger)
+ self.logger.debug("SourceProcess : after connecting to stream")
+ self.stream.muststop = lambda: self.stop_event.is_set()
+
+ stream_wrapper = iri_tweet.stream.SafeStreamWrapper(self.stream, logger=self.logger)
+
+ session = self.session_maker()
+
+ try:
+ for tweet in stream_wrapper:
+ if not self.parent_is_alive():
+ self.stop_event.set()
+ stop_thread.join(5)
+ sys.exit()
+ self.logger.debug("SourceProcess : tweet " + repr(tweet))
+ source = TweetSource(original_json=tweet)
+ self.logger.debug("SourceProcess : source created")
+ add_retries = 0
+ while add_retries < 10:
+ try:
+ add_retries += 1
+ session.add(source)
+ session.flush()
+ break
+ except OperationalError as e:
+ session.rollback()
+ self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries))
+ if add_retries == 10:
+ raise
+
+ source_id = source.id
+ self.logger.debug("SourceProcess : before queue + source id " + repr(source_id))
+ self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (self.stream.count, self.stream.rate, int(time.time() - self.stream.starttime)))
+ session.commit()
+ self.queue.put((source_id, tweet), False)
+
+ except Exception as e:
+ self.logger.error("SourceProcess : Error when processing tweet " + repr(e))
+ raise
+ finally:
+ session.rollback()
+ session.close()
+ self.logger_queue.close()
+ self.queue.close()
+ self.stream.close()
+ self.stream = None
+ if not self.stop_event.is_set():
+ self.stop_event.set()
+
+
+ def do_run(self):
+
+ # import pydevd
+ # pydevd.settrace(suspend=False)
+
+ source_stream_iter_thread = threading.Thread(target=self.__source_stream_iter , name="SourceStreamIterThread")
+
+ source_stream_iter_thread.start()
+
+ while not self.stop_event.is_set():
+ self.logger.debug("SourceProcess : In while after start")
+ self.stop_event.wait(DEFAULT_TIMEOUT)
+ if self.stop_event.is_set() and self.stream:
+ self.stream.close()
+ elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive:
+ self.stop_event.set()
+
+ self.logger.info("SourceProcess : join")
+ source_stream_iter_thread.join(30)
+
+
+def process_tweet(tweet, source_id, session, access_token, twitter_query_user, logger):
+ try:
+ if not tweet.strip():
+ return
+ tweet_obj = anyjson.deserialize(tweet)
+ if 'text' not in tweet_obj:
+ tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
+ session.add(tweet_log)
+ return
+ screen_name = ""
+ if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
+ screen_name = tweet_obj['user']['screen_name']
+ logger.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
+ logger.debug(u"Process_tweet :" + repr(tweet))
+ processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None, twitter_query_user)
+ processor.process()
+ except ValueError as e:
+ message = u"Value Error %s processing tweet %s" % (repr(e), tweet)
+ output = StringIO.StringIO()
+ try:
+ traceback.print_exc(file=output)
+ error_stack = output.getvalue()
+ finally:
+ output.close()
+ tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'], error=message, error_stack=error_stack)
+ session.add(tweet_log)
+ session.commit()
+ except Exception as e:
+ message = u"Error %s processing tweet %s" % (repr(e), tweet)
+ logger.exception(message)
+ output = StringIO.StringIO()
+ try:
+ traceback.print_exc(file=output)
+ error_stack = output.getvalue()
+ finally:
+ output.close()
+ session.rollback()
+ tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack)
+ session.add(tweet_log)
+ session.commit()
+
+
+
+class TweetProcess(BaseProcess):
+
+ def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
+ super(TweetProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
+ self.twitter_query_user = options.twitter_query_user
+
+
+ def do_run(self):
+
+ self.logger = set_logging_process(self.options, self.logger_queue)
+ session = self.session_maker()
+ try:
+ while not self.stop_event.is_set() and self.parent_is_alive():
+ try:
+ source_id, tweet_txt = self.queue.get(True, 3)
+ self.logger.debug("Processing source id " + repr(source_id))
+ except Exception as e:
+ self.logger.debug('Process tweet exception in loop : ' + repr(e))
+ continue
+ process_tweet(tweet_txt, source_id, session, self.access_token, self.twitter_query_user, self.logger)
+ session.commit()
+ finally:
+ session.rollback()
+ session.close()
+
+
+def get_sessionmaker(conn_str):
+ engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False)
+ Session = scoped_session(Session)
+ return Session, engine, metadata
+
+
+def process_leftovers(session, access_token, twitter_query_user, logger):
+
+ sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
+
+ for src in sources:
+ tweet_txt = src.original_json
+ process_tweet(tweet_txt, src.id, session, access_token, twitter_query_user, logger)
+ session.commit()
+
+
+
+ # get tweet source that do not match any message
+ # select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull;
+def process_log(logger_queues, stop_event):
+ while not stop_event.is_set():
+ for lqueue in logger_queues:
+ try:
+ record = lqueue.get_nowait()
+ logging.getLogger(record.name).handle(record)
+ except Queue.Empty:
+ continue
+ except IOError:
+ continue
+ time.sleep(0.1)
+
+
+def get_options():
+
+ usage = "usage: %prog [options]"
+
+ parser = OptionParser(usage=usage)
+
+ parser.add_option("-f", "--file", dest="conn_str",
+ help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db")
+ parser.add_option("-u", "--user", dest="username",
+ help="Twitter user", metavar="USER", default=None)
+ parser.add_option("-w", "--password", dest="password",
+ help="Twitter password", metavar="PASSWORD", default=None)
+ parser.add_option("-T", "--track", dest="track",
+ help="Twitter track", metavar="TRACK")
+ parser.add_option("-n", "--new", dest="new", action="store_true",
+ help="new database", default=False)
+ parser.add_option("-D", "--daemon", dest="daemon", action="store_true",
+ help="launch daemon", default=False)
+ parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
+ help="Token file name")
+ parser.add_option("-d", "--duration", dest="duration",
+ help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
+ parser.add_option("-N", "--nb-process", dest="process_nb",
+ help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int')
+ parser.add_option("--url", dest="url",
+ help="The twitter url to connect to.", metavar="URL", default=iri_tweet.stream.FilterStream.url)
+ parser.add_option("--query-user", dest="twitter_query_user", action="store_true",
+ help="Query twitter for users", default=False, metavar="QUERY_USER")
+ parser.add_option("--catchup", dest="catchup",
+ help="catchup count for tweets", default=None, metavar="CATCHUP", type='int')
+ parser.add_option("--timeout", dest="timeout",
+ help="timeout for connecting in seconds", default=60, metavar="TIMEOUT", type='int')
+
+
+
+
+ utils.set_logging_options(parser)
+
+ return parser.parse_args()
+
+
+def do_run(options, session_maker):
+
+ stop_args = {}
+
+ access_token = None
+ if not options.username or not options.password:
+ access_token = utils.get_oauth_token(options.token_filename)
+
+ session = session_maker()
+ try:
+ process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger())
+ session.commit()
+ finally:
+ session.rollback()
+ session.close()
+
+ if options.process_nb <= 0:
+ utils.get_logger().debug("Leftovers processed. Exiting.")
+ return None
+
+ queue = mQueue()
+ stop_event = Event()
+
+ # workaround for bug on using urllib2 and multiprocessing
+ req = urllib2.Request('http://localhost')
+ conn = None
+ try:
+ conn = urllib2.urlopen(req)
+ except:
+ utils.get_logger().debug("could not open localhost")
+ # donothing
+ finally:
+ if conn is not None:
+ conn.close()
+
+ process_engines = []
+ logger_queues = []
+
+ SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
+ process_engines.append(engine_process)
+ lqueue = mQueue(50)
+ logger_queues.append(lqueue)
+ pid = os.getpid()
+ sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
+
+ tweet_processes = []
+
+ for i in range(options.process_nb - 1):
+ SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
+ process_engines.append(engine_process)
+ lqueue = mQueue(50)
+ logger_queues.append(lqueue)
+ cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
+ tweet_processes.append(cprocess)
+
+ log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,))
+ log_thread.daemon = True
+
+ log_thread.start()
+
+ sprocess.start()
+ for cprocess in tweet_processes:
+ cprocess.start()
+
+ add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name, sprocess.pid), 'consumers':dict([(p.name, p.pid) for p in tweet_processes])}, session_maker)
+
+ if options.duration >= 0:
+ end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)
+
+ def interupt_handler(signum, frame):
+ utils.get_logger().debug("shutdown asked " + repr(signum) + " " + repr(inspect.getframeinfo(frame, 9)))
+ stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)})
+ stop_event.set()
+
+ signal.signal(signal.SIGINT , interupt_handler)
+ signal.signal(signal.SIGHUP , interupt_handler)
+ signal.signal(signal.SIGALRM, interupt_handler)
+ signal.signal(signal.SIGTERM, interupt_handler)
+
+
+ while not stop_event.is_set():
+ if options.duration >= 0 and datetime.datetime.utcnow() >= end_ts:
+ stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts})
+ stop_event.set()
+ break
+ if sprocess.is_alive():
+ utils.get_logger().debug("Source process alive")
+ time.sleep(1)
+ else:
+ stop_args.update({'message': 'Source process killed'})
+ stop_event.set()
+ break
+ utils.get_logger().debug("Joining Source Process")
+ try:
+ sprocess.join(10)
+ except:
+ utils.get_logger().debug("Pb joining Source Process - terminating")
+ sprocess.terminate()
+
+ for i, cprocess in enumerate(tweet_processes):
+ utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
+ try:
+ cprocess.join(3)
+ except:
+ utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
+ cprocess.terminate()
+
+
+ utils.get_logger().debug("Close queues")
+ try:
+ queue.close()
+ for lqueue in logger_queues:
+ lqueue.close()
+ except exception as e:
+ utils.get_logger().error("error when closing queues %s", repr(e))
+ # do nothing
+
+
+ if options.process_nb > 1:
+ utils.get_logger().debug("Processing leftovers")
+ session = session_maker()
+ try:
+ process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger())
+ session.commit()
+ finally:
+ session.rollback()
+ session.close()
+
+ for pengine in process_engines:
+ pengine.dispose()
+
+ return stop_args
+
+
+def main(options, args):
+
+ global conn_str
+
+ conn_str = options.conn_str.strip()
+ if not re.match("^\w+://.+", conn_str):
+ conn_str = 'sqlite:///' + options.conn_str
+
+ if conn_str.startswith("sqlite") and options.new:
+ filepath = conn_str[conn_str.find(":///") + 4:]
+ if os.path.exists(filepath):
+ i = 1
+ basename, extension = os.path.splitext(filepath)
+ new_path = '%s.%d%s' % (basename, i, extension)
+ while i < 1000000 and os.path.exists(new_path):
+ i += 1
+ new_path = '%s.%d%s' % (basename, i, extension)
+ if i >= 1000000:
+ raise Exception("Unable to find new filename for " + filepath)
+ else:
+ shutil.move(filepath, new_path)
+
+ Session, engine, metadata = get_sessionmaker(conn_str)
+
+ if options.new:
+ check_metadata = sqlalchemy.schema.MetaData(bind=engine, reflect=True)
+ if len(check_metadata.sorted_tables) > 0:
+ message = "Database %s not empty exiting" % conn_str
+ utils.get_logger().error(message)
+ sys.exit(message)
+
+ metadata.create_all(engine)
+ session = Session()
+ try:
+ models.add_model_version(session)
+ finally:
+ session.close()
+
+ stop_args = {}
+ try:
+ add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session)
+ stop_args = do_run(options, Session)
+ except Exception as e:
+ utils.get_logger().exception("Error in main thread")
+ outfile = StringIO.StringIO()
+ try:
+ traceback.print_exc(file=outfile)
+ stop_args = {'error': repr(e), 'message': getattr(e, 'message', ''), 'stacktrace':outfile.getvalue()}
+ finally:
+ outfile.close()
+ raise
+ finally:
+ add_process_event(type="shutdown", args=stop_args, session_maker=Session)
+
+ utils.get_logger().debug("Done. Exiting. " + repr(stop_args))
+
+
+
+if __name__ == '__main__':
+
+ (options, args) = get_options()
+
+ loggers = set_logging(options)
+
+ utils.get_logger().debug("OPTIONS : " + repr(options))
+
+ if options.daemon:
+ import daemon
+ import lockfile
+
+ hdlr_preserve = []
+ for logger in loggers:
+ hdlr_preserve.extend([h.stream for h in logger.handlers])
+
+ context = daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=hdlr_preserve)
+ with context:
+ main(options, args)
+ else:
+ main(options, args)
+
--- a/script/stream/recorder_tweetstream.py Tue Dec 18 10:40:15 2012 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,571 +0,0 @@
-from getpass import getpass
-from iri_tweet import models, utils
-from iri_tweet.models import TweetSource, TweetLog, ProcessEvent
-from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event,
- get_logger)
-from optparse import OptionParser
-from sqlalchemy.exc import OperationalError
-from sqlalchemy.orm import scoped_session
-import Queue
-import StringIO
-import anyjson
-import datetime
-import inspect
-import logging
-import os
-import re
-import shutil
-import signal
-import socket
-import sqlalchemy.schema
-import sys
-import threading
-import time
-import traceback
-import tweepy.auth
-import iri_tweet.stream as tweetstream
-import urllib2
-socket._fileobject.default_bufsize = 0
-
-
-
-#columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user']
-columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source']
-#columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following']
-columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count']
-#just put it in a sqlite3 tqble
-
-DEFAULT_TIMEOUT = 5
-
-def set_logging(options):
- loggers = []
-
- loggers.append(utils.set_logging(options, logging.getLogger('iri.tweet')))
- loggers.append(utils.set_logging(options, logging.getLogger('multiprocessing')))
- if options.debug >= 2:
- loggers.append(utils.set_logging(options, logging.getLogger('sqlalchemy.engine')))
- #utils.set_logging(options, logging.getLogger('sqlalchemy.dialects'))
- #utils.set_logging(options, logging.getLogger('sqlalchemy.pool'))
- #utils.set_logging(options, logging.getLogger('sqlalchemy.orm'))
- return loggers
-
-def set_logging_process(options, queue):
- qlogger = utils.set_logging(options, logging.getLogger('iri.tweet.p'), queue)
- qlogger.propagate = 0
- return qlogger
-
-def get_auth(options, access_token):
- if options.username and options.password:
- auth = tweepy.auth.BasicAuthHandler(options.username, options.password)
- else:
- consumer_key = models.CONSUMER_KEY
- consumer_secret = models.CONSUMER_SECRET
- auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=True)
- auth.set_access_token(*access_token)
- return auth
-
-
-def add_process_event(type, args, session_maker):
- session = session_maker()
- try:
- evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type)
- session.add(evt)
- session.commit()
- finally:
- session.close()
-
-
-class BaseProcess(Process):
-
- def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
- self.parent_pid = parent_pid
- self.session_maker = session_maker
- self.queue = queue
- self.options = options
- self.logger_queue = logger_queue
- self.stop_event = stop_event
- self.access_token = access_token
-
- super(BaseProcess, self).__init__()
-
- #
- # from http://stackoverflow.com/questions/2542610/python-daemon-doesnt-kill-its-kids
- #
- def parent_is_alive(self):
- try:
- # try to call Parent
- os.kill(self.parent_pid, 0)
- except OSError:
- # *beeep* oh no! The phone's disconnected!
- return False
- else:
- # *ring* Hi mom!
- return True
-
-
- def __get_process_event_args(self):
- return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__, 'access_token':self.access_token}
-
- def run(self):
- try:
- add_process_event("start_worker", self.__get_process_event_args(), self.session_maker)
- self.do_run()
- finally:
- add_process_event("stop_worker", self.__get_process_event_args(), self.session_maker)
-
- def do_run(self):
- raise NotImplementedError()
-
-
-
-class SourceProcess(BaseProcess):
-
- def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
- self.track = options.track
- self.token_filename = options.token_filename
- self.catchup = options.catchup
- self.timeout = options.timeout
- self.stream = None
- super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
-
- def __source_stream_iter(self):
-
- self.logger = set_logging_process(self.options, self.logger_queue)
- self.logger.debug("SourceProcess : run ")
-
- self.auth = get_auth(self.options, self.access_token)
- self.logger.debug("SourceProcess : auth set ")
-
- track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
- self.logger.debug("SourceProcess : track list " + track_list)
-
- track_list = [k.strip() for k in track_list.split(',')]
-
- self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list))
- self.stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout, chunk_size=1, logger=self.logger)
- self.logger.debug("SourceProcess : after connecting to stream")
- self.stream.muststop = lambda: self.stop_event.is_set()
-
- stream_wrapper = tweetstream.SafeStreamWrapper(self.stream, logger=self.logger)
-
- session = self.session_maker()
-
- try:
- for tweet in stream_wrapper:
- if not self.parent_is_alive():
- self.stop_event.set()
- stop_thread.join(5)
- sys.exit()
- self.logger.debug("SourceProcess : tweet " + repr(tweet))
- source = TweetSource(original_json=tweet)
- self.logger.debug("SourceProcess : source created")
- add_retries = 0
- while add_retries < 10:
- try:
- add_retries += 1
- session.add(source)
- session.flush()
- break
- except OperationalError as e:
- session.rollback()
- self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries))
- if add_retries == 10:
- raise
-
- source_id = source.id
- self.logger.debug("SourceProcess : before queue + source id " + repr(source_id))
- self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (self.stream.count, self.stream.rate, int(time.time() - self.stream.starttime)))
- session.commit()
- self.queue.put((source_id, tweet), False)
-
- except Exception as e:
- self.logger.error("SourceProcess : Error when processing tweet " + repr(e))
- raise
- finally:
- session.rollback()
- session.close()
- self.logger_queue.close()
- self.queue.close()
- self.stream.close()
- self.stream = None
- if not self.stop_event.is_set():
- self.stop_event.set()
-
-
- def do_run(self):
-
- #import pydevd
- #pydevd.settrace(suspend=False)
-
- source_stream_iter_thread = threading.Thread(target=self.__source_stream_iter , name="SourceStreamIterThread")
-
- source_stream_iter_thread.start()
-
- while not self.stop_event.is_set():
- self.logger.debug("SourceProcess : In while after start")
- self.stop_event.wait(DEFAULT_TIMEOUT)
- if self.stop_event.is_set() and self.stream:
- self.stream.close()
- elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive:
- self.stop_event.set()
-
- self.logger.info("SourceProcess : join")
- source_stream_iter_thread.join(30)
-
-
-def process_tweet(tweet, source_id, session, access_token, twitter_query_user, logger):
- try:
- if not tweet.strip():
- return
- tweet_obj = anyjson.deserialize(tweet)
- if 'text' not in tweet_obj:
- tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
- session.add(tweet_log)
- return
- screen_name = ""
- if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
- screen_name = tweet_obj['user']['screen_name']
- logger.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
- logger.debug(u"Process_tweet :" + repr(tweet))
- processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None, twitter_query_user)
- processor.process()
- except ValueError as e:
- message = u"Value Error %s processing tweet %s" % (repr(e), tweet)
- output = StringIO.StringIO()
- try:
- traceback.print_exc(file=output)
- error_stack = output.getvalue()
- finally:
- output.close()
- tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'], error=message, error_stack=error_stack)
- session.add(tweet_log)
- session.commit()
- except Exception as e:
- message = u"Error %s processing tweet %s" % (repr(e), tweet)
- logger.exception(message)
- output = StringIO.StringIO()
- try:
- traceback.print_exc(file=output)
- error_stack = output.getvalue()
- finally:
- output.close()
- session.rollback()
- tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack)
- session.add(tweet_log)
- session.commit()
-
-
-
-class TweetProcess(BaseProcess):
-
- def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
- super(TweetProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
- self.twitter_query_user = options.twitter_query_user
-
-
- def do_run(self):
-
- self.logger = set_logging_process(self.options, self.logger_queue)
- session = self.session_maker()
- try:
- while not self.stop_event.is_set() and self.parent_is_alive():
- try:
- source_id, tweet_txt = self.queue.get(True, 3)
- self.logger.debug("Processing source id " + repr(source_id))
- except Exception as e:
- self.logger.debug('Process tweet exception in loop : ' + repr(e))
- continue
- process_tweet(tweet_txt, source_id, session, self.access_token, self.twitter_query_user, self.logger)
- session.commit()
- finally:
- session.rollback()
- session.close()
-
-
-def get_sessionmaker(conn_str):
- engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False)
- Session = scoped_session(Session)
- return Session, engine, metadata
-
-
-def process_leftovers(session, access_token, twitter_query_user, logger):
-
- sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
-
- for src in sources:
- tweet_txt = src.original_json
- process_tweet(tweet_txt, src.id, session, access_token, twitter_query_user, logger)
- session.commit()
-
-
-
- #get tweet source that do not match any message
- #select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull;
-def process_log(logger_queues, stop_event):
- while not stop_event.is_set():
- for lqueue in logger_queues:
- try:
- record = lqueue.get_nowait()
- logging.getLogger(record.name).handle(record)
- except Queue.Empty:
- continue
- except IOError:
- continue
- time.sleep(0.1)
-
-
-def get_options():
-
- usage = "usage: %prog [options]"
-
- parser = OptionParser(usage=usage)
-
- parser.add_option("-f", "--file", dest="conn_str",
- help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db")
- parser.add_option("-u", "--user", dest="username",
- help="Twitter user", metavar="USER", default=None)
- parser.add_option("-w", "--password", dest="password",
- help="Twitter password", metavar="PASSWORD", default=None)
- parser.add_option("-T", "--track", dest="track",
- help="Twitter track", metavar="TRACK")
- parser.add_option("-n", "--new", dest="new", action="store_true",
- help="new database", default=False)
- parser.add_option("-D", "--daemon", dest="daemon", action="store_true",
- help="launch daemon", default=False)
- parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
- help="Token file name")
- parser.add_option("-d", "--duration", dest="duration",
- help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
- parser.add_option("-N", "--nb-process", dest="process_nb",
- help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int')
- parser.add_option("--url", dest="url",
- help="The twitter url to connect to.", metavar="URL", default=tweetstream.FilterStream.url)
- parser.add_option("--query-user", dest="twitter_query_user", action="store_true",
- help="Query twitter for users", default=False, metavar="QUERY_USER")
- parser.add_option("--catchup", dest="catchup",
- help="catchup count for tweets", default=None, metavar="CATCHUP", type='int')
- parser.add_option("--timeout", dest="timeout",
- help="timeout for connecting in seconds", default=60, metavar="TIMEOUT", type='int')
-
-
-
-
- utils.set_logging_options(parser)
-
- return parser.parse_args()
-
-
-def do_run(options, session_maker):
-
- stop_args = {}
-
- access_token = None
- if not options.username or not options.password:
- access_token = utils.get_oauth_token(options.token_filename)
-
- session = session_maker()
- try:
- process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger())
- session.commit()
- finally:
- session.rollback()
- session.close()
-
- if options.process_nb <= 0:
- utils.get_logger().debug("Leftovers processed. Exiting.")
- return None
-
- queue = mQueue()
- stop_event = Event()
-
- #workaround for bug on using urllib2 and multiprocessing
- req = urllib2.Request('http://localhost')
- conn = None
- try:
- conn = urllib2.urlopen(req)
- except:
- utils.get_logger().debug("could not open localhost")
- #donothing
- finally:
- if conn is not None:
- conn.close()
-
- process_engines = []
- logger_queues = []
-
- SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
- process_engines.append(engine_process)
- lqueue = mQueue(50)
- logger_queues.append(lqueue)
- pid = os.getpid()
- sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
-
- tweet_processes = []
-
- for i in range(options.process_nb - 1):
- SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
- process_engines.append(engine_process)
- lqueue = mQueue(50)
- logger_queues.append(lqueue)
- cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
- tweet_processes.append(cprocess)
-
- log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,))
- log_thread.daemon = True
-
- log_thread.start()
-
- sprocess.start()
- for cprocess in tweet_processes:
- cprocess.start()
-
- add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name, sprocess.pid), 'consumers':dict([(p.name, p.pid) for p in tweet_processes])}, session_maker)
-
- if options.duration >= 0:
- end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)
-
- def interupt_handler(signum, frame):
- utils.get_logger().debug("shutdown asked " + repr(signum) + " " + repr(inspect.getframeinfo(frame, 9)))
- stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)})
- stop_event.set()
-
- signal.signal(signal.SIGINT , interupt_handler)
- signal.signal(signal.SIGHUP , interupt_handler)
- signal.signal(signal.SIGALRM, interupt_handler)
- signal.signal(signal.SIGTERM, interupt_handler)
-
-
- while not stop_event.is_set():
- if options.duration >= 0 and datetime.datetime.utcnow() >= end_ts:
- stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts})
- stop_event.set()
- break
- if sprocess.is_alive():
- utils.get_logger().debug("Source process alive")
- time.sleep(1)
- else:
- stop_args.update({'message': 'Source process killed'})
- stop_event.set()
- break
- utils.get_logger().debug("Joining Source Process")
- try:
- sprocess.join(10)
- except:
- utils.get_logger().debug("Pb joining Source Process - terminating")
- sprocess.terminate()
-
- for i, cprocess in enumerate(tweet_processes):
- utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
- try:
- cprocess.join(3)
- except:
- utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
- cprocess.terminate()
-
-
- utils.get_logger().debug("Close queues")
- try:
- queue.close()
- for lqueue in logger_queues:
- lqueue.close()
- except exception as e:
- utils.get_logger().error("error when closing queues %s", repr(e))
- #do nothing
-
-
- if options.process_nb > 1:
- utils.get_logger().debug("Processing leftovers")
- session = session_maker()
- try:
- process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger())
- session.commit()
- finally:
- session.rollback()
- session.close()
-
- for pengine in process_engines:
- pengine.dispose()
-
- return stop_args
-
-
-def main(options, args):
-
- global conn_str
-
- conn_str = options.conn_str.strip()
- if not re.match("^\w+://.+", conn_str):
- conn_str = 'sqlite:///' + options.conn_str
-
- if conn_str.startswith("sqlite") and options.new:
- filepath = conn_str[conn_str.find(":///") + 4:]
- if os.path.exists(filepath):
- i = 1
- basename, extension = os.path.splitext(filepath)
- new_path = '%s.%d%s' % (basename, i, extension)
- while i < 1000000 and os.path.exists(new_path):
- i += 1
- new_path = '%s.%d%s' % (basename, i, extension)
- if i >= 1000000:
- raise Exception("Unable to find new filename for " + filepath)
- else:
- shutil.move(filepath, new_path)
-
- Session, engine, metadata = get_sessionmaker(conn_str)
-
- if options.new:
- check_metadata = sqlalchemy.schema.MetaData(bind=engine, reflect=True)
- if len(check_metadata.sorted_tables) > 0:
- message = "Database %s not empty exiting" % conn_str
- utils.get_logger().error(message)
- sys.exit(message)
-
- metadata.create_all(engine)
- session = Session()
- try:
- models.add_model_version(session)
- finally:
- session.close()
-
- stop_args = {}
- try:
- add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session)
- stop_args = do_run(options, Session)
- except Exception as e:
- utils.get_logger().exception("Error in main thread")
- outfile = StringIO.StringIO()
- try:
- traceback.print_exc(file=outfile)
- stop_args = {'error': repr(e), 'message': getattr(e, 'message', ''), 'stacktrace':outfile.getvalue()}
- finally:
- outfile.close()
- raise
- finally:
- add_process_event(type="shutdown", args=stop_args, session_maker=Session)
-
- utils.get_logger().debug("Done. Exiting. " + repr(stop_args))
-
-
-
-if __name__ == '__main__':
-
- (options, args) = get_options()
-
- loggers = set_logging(options)
-
- utils.get_logger().debug("OPTIONS : " + repr(options))
-
- if options.daemon:
- import daemon
- import lockfile
-
- hdlr_preserve = []
- for logger in loggers:
- hdlr_preserve.extend([h.stream for h in logger.handlers])
-
- context = daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=hdlr_preserve)
- with context:
- main(options, args)
- else:
- main(options, args)
-
Binary file script/virtualenv/res/SQLAlchemy-0.7.8.tar.gz has changed
Binary file script/virtualenv/res/blessings-1.5.tar.gz has changed
--- a/script/virtualenv/res/lib/lib_create_env.py Tue Dec 18 10:40:15 2012 +0100
+++ b/script/virtualenv/res/lib/lib_create_env.py Tue Dec 18 12:26:05 2012 +0100
@@ -26,7 +26,6 @@
'PYTZ': {'setup': 'pytz', 'url':'http://pypi.python.org/packages/source/p/pytz/pytz-2012h.tar.bz2', 'local':"pytz-2012h.tar.bz2", 'install': {'method': 'pip', 'option_str': None, 'dict_extra_env': None}},
'SIMPLEJSON': {'setup': 'simplejson', 'url':'http://pypi.python.org/packages/source/s/simplejson/simplejson-2.6.2.tar.gz', 'local':"simplejson-2.6.2.tar.gz", 'install': {'method': 'pip', 'option_str': None, 'dict_extra_env': None}},
'SQLALCHEMY': {'setup': 'sqlalchemy', 'url':'http://downloads.sourceforge.net/project/sqlalchemy/sqlalchemy/0.8.0b1/SQLAlchemy-0.8.0b1.tar.gz?r=http%3A%2F%2Fwww.sqlalchemy.org%2Fdownload.html&ts=1355091775&use_mirror=ignum', 'local':"SQLAlchemy-0.8.0b1.tar.gz", 'install': {'method': 'pip', 'option_str': None, 'dict_extra_env': None}},
- 'TWEEPY': {'setup': 'tweepy', 'url':'https://github.com/tweepy/tweepy/archive/1.12.tar.gz', 'local':"tweepy-1.12.tar.gz", 'install': {'method': 'pip', 'option_str': None, 'dict_extra_env': None}},
'TWITTER': {'setup': 'twitter', 'url':'http://pypi.python.org/packages/source/t/twitter/twitter-1.9.0.tar.gz', 'local':"twitter-1.9.0.tar.gz", 'install': {'method': 'pip', 'option_str': None, 'dict_extra_env': None}},
'TWITTER-TEXT': {'setup': 'twitter-text', 'url':'https://github.com/dryan/twitter-text-py/archive/master.tar.gz', 'local':"twitter-text-1.0.4.tar.gz", 'install': {'method': 'pip', 'option_str': None, 'dict_extra_env': None}},
'REQUESTS': {'setup': 'requests', 'url':'https://github.com/kennethreitz/requests/archive/v1.0.2.tar.gz', 'local':'requests-v1.0.2.tar.gz', 'install' : {'method':'pip', 'option_str': None, 'dict_extra_env': None}},
Binary file script/virtualenv/res/requests-0.13.1.tar.gz has changed
Binary file script/virtualenv/res/src/tweepy-1.12.tar.gz has changed
Binary file script/virtualenv/res/twitter-1.9.0.tar.gz has changed
--- a/script/virtualenv/script/res/res_create_env.py Tue Dec 18 10:40:15 2012 +0100
+++ b/script/virtualenv/script/res/res_create_env.py Tue Dec 18 12:26:05 2012 +0100
@@ -17,7 +17,6 @@
'DATEUTIL',
'SIMPLEJSON',
'SQLALCHEMY',
- 'TWEEPY',
'TWITTER',
'TWITTER-TEXT',
'REQUESTS',