Starting 'listener_update' branch
authorYves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
Mon, 15 Oct 2012 17:01:50 +0200
changeset 693 2ef837069108
parent 692 51072e5e6ea9
child 736 87e1b1d5f644
Starting 'listener_update' branch
sbin/sync/sync_live
script/lib/iri_tweet/iri_tweet/__init__.py
script/lib/iri_tweet/iri_tweet/stream.py
script/lib/iri_tweet/iri_tweet/utils.py
script/stream/recorder_tweetstream.py
script/utils/export_pad.py
script/utils/export_twitter_alchemy.py
script/utils/merge_tweets.py
script/utils/search_topsy.py
script/utils/tweet_twitter_user.py
script/virtualenv/res/SQLAlchemy-0.7.3.tar.gz
script/virtualenv/res/SQLAlchemy-0.7.8.tar.gz
script/virtualenv/res/blessings-1.5.tar.gz
script/virtualenv/res/requests-0.13.1.tar.gz
script/virtualenv/res/twitter-1.4.2.tar.gz
script/virtualenv/res/twitter-1.9.0.tar.gz
--- a/sbin/sync/sync_live	Mon Oct 15 16:56:57 2012 +0200
+++ b/sbin/sync/sync_live	Mon Oct 15 17:01:50 2012 +0200
@@ -9,7 +9,8 @@
 #text2unix ~/tmp/tweet_live_V$1
 
 if [ -d ~/tmp/tweet_live_V$1 ]; then
-    cat <<EOT | rsync -Cvrlz --delete --filter=". -" ~/tmp/tweet_live_V$1/web/ iri@www.iri.centrepompidou.fr:/home/polemictweet/
+#    cat <<EOT | rsync -Cvrlz --delete --filter=". -" ~/tmp/tweet_live_V$1/web/ iri@www.iri.centrepompidou.fr:/home/polemictweet/
+    cat <<EOT | rsync -Cvrlz --delete --filter=". -" ~/tmp/tweet_live_V$1/web/ iri@ftv.iri-research.org:/srv/www/pt/
 + core
 P config.php
 P .htaccess
@@ -23,4 +24,4 @@
     rm -fr ~/tmp/tweet_live_V$1;
 fi
 
-ssh iri@www.iri.centrepompidou.fr sudo apache2ctl restart
+#ssh iri@www.iri.centrepompidou.fr sudo apache2ctl restart
--- a/script/lib/iri_tweet/iri_tweet/__init__.py	Mon Oct 15 16:56:57 2012 +0200
+++ b/script/lib/iri_tweet/iri_tweet/__init__.py	Mon Oct 15 17:01:50 2012 +0200
@@ -21,3 +21,34 @@
 __contact__ = "ymh.work@gmail.com"
 __homepage__ = ""
 __docformat__ = "restructuredtext"
+
+
+"""
+ .. data:: USER_AGENT
+
+     The default user agent string for stream objects
+"""
+
+USER_AGENT = "IRITweet %s" % __version__
+
+
+class IRITweetError(Exception):
+    """Base class for all tweetstream errors"""
+    pass
+
+
+class AuthenticationError(IRITweetError):
+    """Exception raised if the username/password is not accepted"""
+    pass
+
+
+class ConnectionError(IRITweetError):
+    """Raised when there are network problems. This means when there are
+    dns errors, network errors, twitter issues"""
+
+    def __init__(self, reason, details=None):
+        self.reason = reason
+        self.details = details
+
+    def __str__(self):
+        return '<ConnectionError %s>' % self.reason
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/script/lib/iri_tweet/iri_tweet/stream.py	Mon Oct 15 17:01:50 2012 +0200
@@ -0,0 +1,364 @@
+# -*- coding: utf-8 -*-
+'''
+Created on Mar 22, 2012
+
+@author: ymh
+
+Module directly inspired by tweetstream
+
+'''
+import time
+import requests
+from requests.utils import stream_untransfer, stream_decode_response_unicode
+import anyjson
+import select
+
+from . import  USER_AGENT, ConnectionError, AuthenticationError
+
+
+def iter_content_non_blocking(req, max_chunk_size=4096, decode_unicode=False, timeout=-1):
+    
+    if req._content_consumed:
+        raise RuntimeError(
+            'The content for this response was already consumed'
+        )
+    
+    req.raw._fp.fp._sock.setblocking(False)
+    
+    def generate():
+        chunk_size = 1        
+        while True:
+            if timeout < 0:
+                rlist,_,_ = select.select([req.raw._fp.fp._sock], [], [])
+            else:
+                rlist,_,_ = select.select([req.raw._fp.fp._sock], [], [], timeout)
+                
+            if not rlist:                 
+                continue
+            
+            try:
+                chunk = req.raw.read(chunk_size)            
+                if not chunk:
+                    break
+                if len(chunk) >= chunk_size and chunk_size < max_chunk_size:
+                    chunk_size = min(chunk_size*2, max_chunk_size)
+                elif len(chunk) < chunk_size/2 and chunk_size < max_chunk_size:
+                    chunk_size = max(chunk_size/2,1)
+                yield chunk
+            except SSLError as e:
+                if e.errno == 2:
+                    # Apparently this means there was nothing in the socket buf
+                    pass
+                else:
+                    raise                
+            
+        req._content_consumed = True
+
+    gen = stream_untransfer(generate(), req)
+
+    if decode_unicode:
+        gen = stream_decode_response_unicode(gen, req)
+
+    return gen
+
+    
+    
+
+class BaseStream(object):
+
+    """A network connection to Twitters streaming API
+
+    :param auth: tweepy auth object.
+    :keyword catchup: Number of tweets from the past to get before switching to
+      live stream.
+    :keyword raw: If True, return each tweet's raw data direct from the socket,
+      without UTF8 decoding or parsing, rather than a parsed object. The
+      default is False.
+    :keyword timeout: If non-None, set a timeout in seconds on the receiving
+      socket. Certain types of network problems (e.g., disconnecting a VPN)
+      can cause the connection to hang, leading to indefinite blocking that
+      requires kill -9 to resolve. Setting a timeout leads to an orderly
+      shutdown in these cases. The default is None (i.e., no timeout).
+    :keyword url: Endpoint URL for the object. Note: you should not
+      need to edit this. It's present to make testing easier.
+
+    .. attribute:: connected
+
+        True if the object is currently connected to the stream.
+
+    .. attribute:: url
+
+        The URL to which the object is connected
+
+    .. attribute:: starttime
+
+        The timestamp, in seconds since the epoch, the object connected to the
+        streaming api.
+
+    .. attribute:: count
+
+        The number of tweets that have been returned by the object.
+
+    .. attribute:: rate
+
+        The rate at which tweets have been returned from the object as a
+        float. see also :attr: `rate_period`.
+
+    .. attribute:: rate_period
+
+        The ammount of time to sample tweets to calculate tweet rate. By
+        default 10 seconds. Changes to this attribute will not be reflected
+        until the next time the rate is calculated. The rate of tweets vary
+        with time of day etc. so it's usefull to set this to something
+        sensible.
+
+    .. attribute:: user_agent
+
+        User agent string that will be included in the request. NOTE: This can
+        not be changed after the connection has been made. This property must
+        thus be set before accessing the iterator. The default is set in
+        :attr: `USER_AGENT`.
+    """
+
+    def __init__(self, auth,
+                 catchup=None, raw=False, timeout=-1, url=None, compressed=False, chunk_size=4096):
+        self._conn = None
+        self._rate_ts = None
+        self._rate_cnt = 0
+        self._auth = auth
+        self._catchup_count = catchup
+        self.raw_mode = raw
+        self.timeout = timeout
+        self._compressed = compressed
+
+        self.rate_period = 10  # in seconds
+        self.connected = False
+        self.starttime = None
+        self.count = 0
+        self.rate = 0
+        self.user_agent = USER_AGENT
+        self.chunk_size = chunk_size
+        if url: self.url = url
+        
+        self.muststop = False
+        
+        self._iter = self.__iter__()
+         
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *params):
+        self.close()
+        return False
+
+    def _init_conn(self):
+        """Open the connection to the twitter server"""
+        headers = {'User-Agent': self.user_agent}
+        
+        if self._compressed:
+            headers['Accept-Encoding'] = "deflate, gzip"
+
+        postdata = self._get_post_data() or {}
+        postdata['stall_warnings'] = 'true'
+        if self._catchup_count:
+            postdata["count"] = self._catchup_count
+        
+        if self._auth:
+            self._auth.apply_auth(self.url, "POST", headers, postdata)
+
+        self._resp = requests.post(self.url, headers=headers, data=postdata)
+        self._resp.raise_for_status()
+        self.connected = True
+        if not self._rate_ts:
+            self._rate_ts = time.time()
+        if not self.starttime:
+            self.starttime = time.time()
+
+
+
+    def _get_post_data(self):
+        """Subclasses that need to add post data to the request can override
+        this method and return post data. The data should be in the format
+        returned by urllib.urlencode."""
+        return None
+
+    def testmuststop(self):
+        if callable(self.muststop):
+            return self.muststop()
+        else:
+            return self.muststop
+    
+    def _update_rate(self):
+        rate_time = time.time() - self._rate_ts
+        if not self._rate_ts or rate_time > self.rate_period:
+            self.rate = self._rate_cnt / rate_time
+            self._rate_cnt = 0
+            self._rate_ts = time.time()
+            
+    def _iter_object(self):
+        pending = None
+        has_stopped = False
+
+#        for chunk in iter_content_non_blocking(self._resp,
+#            max_chunk_size=self.chunk_size,
+#            decode_unicode=False,
+#            timeout=self.timeout):
+        for chunk in self._resp.iter_content(
+            chunk_size=self.chunk_size,
+            decode_unicode=False):
+
+            if self.testmuststop():
+                has_stopped = True
+                break
+
+            if pending is not None:
+                chunk = pending + chunk
+            lines = chunk.split('\r')
+
+            if chunk and lines[-1] and lines[-1][-1] == chunk[-1]:
+                pending = lines.pop()
+            else:
+                pending = None
+
+            for line in lines:
+                yield line.strip('\n')
+
+            if self.testmuststop():
+                has_stopped = True
+                break
+
+        if pending is not None:
+            yield pending
+        if has_stopped:
+            raise StopIteration()
+            
+    def __iter__(self):
+        
+        if not self.connected:
+            self._init_conn()
+
+        for line in self._iter_object():
+
+            if not line:
+                continue
+                            
+            if (self.raw_mode):
+                tweet = line
+            else:
+                line = line.decode("utf8")
+                try:
+                    tweet = anyjson.deserialize(line)
+                except ValueError:
+                    self.close()
+                    raise ConnectionError("Got invalid data from twitter", details=line)
+            if 'text' in tweet:
+                self.count += 1
+                self._rate_cnt += 1
+            self._update_rate()
+            yield tweet
+
+
+    def next(self):
+        """Return the next available tweet. This call is blocking!"""
+        return self._iter.next()
+
+
+    def close(self):
+        """
+        Close the connection to the streaming server.
+        """
+        self.connected = False
+
+
+class FilterStream(BaseStream):
+    url = "https://stream.twitter.com/1/statuses/filter.json"
+
+    def __init__(self, auth, follow=None, locations=None,
+                 track=None, catchup=None, url=None, raw=False, timeout=None, compressed=False, chunk_size=4096):
+        self._follow = follow
+        self._locations = locations
+        self._track = track
+        # remove follow, locations, track
+        BaseStream.__init__(self, auth, url=url, raw=raw, catchup=catchup, timeout=timeout, compressed=compressed, chunk_size=chunk_size)
+
+    def _get_post_data(self):
+        postdata = {}
+        if self._follow: postdata["follow"] = ",".join([str(e) for e in self._follow])
+        if self._locations: postdata["locations"] = ",".join(self._locations)
+        if self._track: postdata["track"] = ",".join(self._track)
+        return postdata
+
+
+class SafeStreamWrapper(object):
+    
+    def __init__(self, base_stream, logger=None, error_cb=None, max_reconnects=-1, initial_tcp_wait=250, initial_http_wait=5000, max_wait=240000):
+        self._stream = base_stream
+        self._logger = logger
+        self._error_cb = error_cb
+        self._max_reconnects = max_reconnects
+        self._initial_tcp_wait = initial_tcp_wait
+        self._initial_http_wait = initial_http_wait
+        self._max_wait = max_wait
+        self._retry_wait = 0
+        self._retry_nb = 0
+
+    def __post_process_error(self,e):
+        # Note: error_cb is not called on the last error since we
+        # raise a ConnectionError instead
+        if  callable(self._error_cb):
+            self._error_cb(e)
+        self._logger.info("stream sleeping for %d ms " % self._retry_wait)
+        time.sleep(float(self._retry_wait)/1000.0)
+        
+        
+    def __process_tcp_error(self,e):
+        if self._logger: self._logger.debug("connection error :" + str(e))
+        self._reconnects += 1
+        if self._max_reconnects >= 0 and self._reconnects > self._max_reconnects:
+            raise ConnectionError("Too many retries")
+        if self._retry_wait < self._max_wait:
+            self._retry_wait += self._initial_tcp_wait
+            if self._retry_wait > self._max_wait:
+                self._retry_wait = self._max_wait
+        
+        self.__post_process_error(e)
+
+        
+    def __process_http_error(self,e):
+        if self._logger: self._logger.debug("http error on %s : %s" % (e.response.url,e.message))
+        if self._retry_wait < self._max_wait:
+            self._retry_wait = 2*self._retry_wait if self._retry_wait > 0 else self._initial_http_wait
+            if self._retry_wait > self._max_wait:
+                self._retry_wait = self._max_wait
+        
+        self.__post_process_error(e)
+        
+    def __iter__(self):
+        while not self._stream.testmuststop():
+            self._retry_nb += 1
+            try:
+                if self._logger: self._logger.debug("inner loop")
+                for tweet in self._stream:
+                    self._reconnects = 0
+                    self._retry_wait = 0
+                    if "warning" in tweet:
+                        self._logger.warning("Tweet warning received : %s" % repr(tweet))
+                        continue
+                    if not tweet.strip():
+                        self._logger.debug("Empty Tweet received : PING")
+                        continue
+                    yield tweet
+            except (ConnectionError, requests.exceptions.ConnectionError, requests.exceptions.Timeout, requests.exceptions.RequestException) as e:
+                self.__process_tcp_error(e)
+            except requests.exceptions.HTTPError as e:
+                if e.response.status_code == 401 and self._retry_nb <= 1:
+                    raise AuthenticationError("Error connecting to %s : %s" % (e.response.url,e.message))
+                if e.response.status_code > 200:
+                    self.__process_http_error(e)
+                else:
+                    self.__process_tcp_error(e)
+
+        
+    
+    
\ No newline at end of file
--- a/script/lib/iri_tweet/iri_tweet/utils.py	Mon Oct 15 16:56:57 2012 +0200
+++ b/script/lib/iri_tweet/iri_tweet/utils.py	Mon Oct 15 17:01:50 2012 +0200
@@ -48,7 +48,7 @@
 
     if res is None:
         get_logger().debug("get_oauth_token : doing the oauth dance")
-        res = twitter.oauth_dance.oauth_dance(application_name, consumer_key, consumer_secret, token_file_path)
+        res = twitter.oauth_dance(application_name, consumer_key, consumer_secret, token_file_path)
     
     CACHE_ACCESS_TOKEN[application_name] = res
     
@@ -615,9 +615,9 @@
     spaces = math.floor(width - marks)
 
     loader = u'[' + (u'=' * int(marks)) + (u' ' * int(spaces)) + u']'
-        
+
     s = u"%s %3d%% %*d/%d - %*s\r" % (loader, percent, len(str(total_line)), current_line, total_line, width, label[:width])
-    
+
     writer.write(s) #takes the header into account
     if percent >= 100:
         writer.write("\n")
--- a/script/stream/recorder_tweetstream.py	Mon Oct 15 16:56:57 2012 +0200
+++ b/script/stream/recorder_tweetstream.py	Mon Oct 15 17:01:50 2012 +0200
@@ -23,7 +23,7 @@
 import time
 import traceback
 import tweepy.auth
-import tweetstream
+import iri_tweet.stream as tweetstream
 import urllib2
 socket._fileobject.default_bufsize = 0
 
@@ -35,6 +35,7 @@
 columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count']
 #just put it in a sqlite3 tqble
 
+DEFAULT_TIMEOUT = 5
 
 def set_logging(options):
     loggers = []
@@ -124,32 +125,36 @@
         self.token_filename = options.token_filename
         self.catchup = options.catchup
         self.timeout = options.timeout
+        self.stream = None
         super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
-
-    def do_run(self):
+                    
+    def __source_stream_iter(self):
         
-        #import pydevd
-        #pydevd.settrace(suspend=True)
-
         self.logger = set_logging_process(self.options, self.logger_queue)
-        self.auth = get_auth(self.options, self.access_token) 
+        self.logger.debug("SourceProcess : run ")
         
-        self.logger.debug("SourceProcess : run ")
+        self.auth = get_auth(self.options, self.access_token) 
+        self.logger.debug("SourceProcess : auth set ")
+        
         track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
         self.logger.debug("SourceProcess : track list " + track_list)
         
         track_list = [k.strip() for k in track_list.split(',')]
 
         self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list))                        
-        stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout)
+        self.stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout, chunk_size=1)
         self.logger.debug("SourceProcess : after connecting to stream")
-        stream.muststop = lambda: self.stop_event.is_set()
+        self.stream.muststop = lambda: self.stop_event.is_set()        
+        
+        stream_wrapper = tweetstream.SafeStreamWrapper(self.stream, logger=self.logger)
         
         session = self.session_maker()
         
         try:
-            for tweet in stream:
+            for tweet in stream_wrapper:
                 if not self.parent_is_alive():
+                    self.stop_event.set()
+                    stop_thread.join(5)
                     sys.exit()
                 self.logger.debug("SourceProcess : tweet " + repr(tweet))
                 source = TweetSource(original_json=tweet)
@@ -165,26 +170,56 @@
                         session.rollback()
                         self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries))
                         if add_retries == 10:
-                            raise e
+                            raise
                      
                 source_id = source.id
                 self.logger.debug("SourceProcess : before queue + source id " + repr(source_id))
-                self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
+                self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (self.stream.count, self.stream.rate, int(time.time() - self.stream.starttime)))
                 session.commit()
                 self.queue.put((source_id, tweet), False)
 
         except Exception as e:
             self.logger.error("SourceProcess : Error when processing tweet " + repr(e))
+            raise
         finally:
             session.rollback()
-            stream.close()
             session.close()
-            self.queue.close()
-            self.stop_event.set()
+            with self.logger_queue.mutex:
+                self.logger_queue.clear()
+                self.logger_queue.close()
+            with self.queue.mutex:
+                self.queue.clear()
+                self.queue.close()
+            self.stream.close()
+            self.stream = None
+            if not self.stop_event.is_set():
+                self.stop_event.set()
+
+
+    def do_run(self):
+        
+        import pydevd
+        pydevd.settrace(suspend=False)
+        
+        source_stream_iter_thread = threading.Thread(target=self.__source_stream_iter , name="SourceStreamIterThread")
+        
+        source_stream_iter_thread.start()
+        
+        while not self.stop_event.is_set():        
+            self.stop_event.wait(DEFAULT_TIMEOUT)
+            if self.stop_event.is_set() and self.stream:
+                self.stream.close()
+            elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive:
+                with self.stop_event.mutex:
+                    self.stop_event.set()
+
+        source_stream_iter_thread.join(30)
 
 
 def process_tweet(tweet, source_id, session, access_token, twitter_query_user, logger):
     try:
+        if not tweet.strip():
+            return
         tweet_obj = anyjson.deserialize(tweet)
         if 'text' not in tweet_obj:
             tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
@@ -197,6 +232,17 @@
         logger.debug(u"Process_tweet :" + repr(tweet))
         processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None, twitter_query_user)
         processor.process()
+    except ValueError as e:
+        message = u"Value Error %s processing tweet %s" % (repr(e), tweet)
+        output = StringIO.StringIO()
+        try:
+            traceback.print_exc(file=output)
+            error_stack = output.getvalue()
+        finally:
+            output.close()
+        tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'], error=message, error_stack=error_stack)
+        session.add(tweet_log)
+        session.commit()        
     except Exception as e:
         message = u"Error %s processing tweet %s" % (repr(e), tweet)
         logger.exception(message)
@@ -237,6 +283,10 @@
         finally:
             session.rollback()
             self.stop_event.set()
+            with self.logger_queue.mutex:
+                self.logger_queue.clear()
+            with self.queue.mutex:
+                self.queue.clear()
             session.close()
 
 
@@ -353,7 +403,7 @@
     
     SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
     process_engines.append(engine_process)
-    lqueue = mQueue(1)
+    lqueue = mQueue(50)
     logger_queues.append(lqueue)
     pid = os.getpid()
     sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)    
@@ -363,7 +413,7 @@
     for i in range(options.process_nb - 1):
         SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
         process_engines.append(engine_process)
-        lqueue = mQueue(1)
+        lqueue = mQueue(50)
         logger_queues.append(lqueue)
         cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
         tweet_processes.append(cprocess)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/script/utils/export_pad.py	Mon Oct 15 17:01:50 2012 +0200
@@ -0,0 +1,324 @@
+#!/usr/bin/env python
+# coding=utf-8
+
+from lxml import etree
+from iri_tweet.models import setup_database
+from optparse import OptionParser #@UnresolvedImport
+from sqlalchemy import Table, Column, BigInteger
+from iri_tweet.utils import (set_logging_options, set_logging, get_filter_query, 
+    get_logger)
+import anyjson
+import datetime
+import httplib2
+import os.path
+import re
+import sys
+import time
+import uuid #@UnresolvedImport
+from dateutil.parser import parse as parse_date
+import json
+import functools
+
+
+class EtherpadRequestException(Exception):
+    def __init__(self, original_resp):
+        super(EtherpadRequestException, self).__init__(original_resp["message"])
+        self.status = original_resp["status"]
+        self.original_resp = original_resp
+
+
+class EtherpadRequest():
+    
+    def __init__(self, base_url, api_key):
+        self.base_url = base_url
+        self.api_key = api_key
+        self.__request = None
+
+    def __getattr__(self, name):
+        return functools.partial(self.__action, name)
+
+    def __action(self, action, **kwargs):
+        url = "%s/%s" % (self.base_url, action)
+        params = dict(kwargs)
+        params['apikey'] = self.api_key
+        
+        r = requests.get(url, params)
+        
+        resp = anyjson.deserialize(r.text)
+        
+        if resp["code"] == 0:
+            return resp["data"]
+        else:
+            raise EtherpadRequestException(resp)
+        
+        return resp
+    
+    def getRevisionsCount(self, padID):
+        f = self.__getattr__("getRevisionsCount")
+        res = f(padID=padID)
+        
+        return res["revisions"]
+    
+    def getPadUrl(self, padID):
+        
+        return "%s/%s" % (self.base_url,padID)
+    
+    
+
+def abort(message, parser):
+    if message is not None:
+        sys.stderr.write(message + "\n")
+    parser.print_help()
+    sys.exit(1)
+
+def get_options():
+    
+    parser = OptionParser()
+    parser.add_option("-u", "--api-url", dest="api_url",
+                      help="Base etherpad-lite api url", metavar="API_URL", default=None)
+    parser.add_option("-k", "--api-key", dest="api_key",
+                      help="Base etherpad-lite api url", metavar="API_KEY", default=None)
+    parser.add_option("-p", "--pad-id", dest="pad_id",
+                      help="pad id", metavar="PADID")
+    parser.add_option("-s", "--start-date", dest="start_date",
+                      help="start date", metavar="START_DATE", default=None)
+    parser.add_option("-e", "--end-date", dest="end_date",
+                      help="end date", metavar="END_DATE", default=None)
+    parser.add_option("-f", "--format", dest="format", type="choice",
+                      help="format", metavar="FORMAT", choice=['html', 'text'], default='html')
+    parser.add_option("-I", "--content-file", dest="content_file",
+                      help="Content file", metavar="CONTENT_FILE")
+    parser.add_option("-C", "--color", dest="color",
+                      help="Color code", metavar="COLOR", default="16763904")
+    parser.add_option("-D", "--duration", dest="duration", type="int",
+                      help="Duration", metavar="DURATION", default=None)
+    parser.add_option("-n", "--name", dest="name",
+                      help="Cutting name", metavar="NAME", default=u"pads")
+    parser.add_option("-R", "--replace", dest="replace", action="store_true",
+                      help="Replace tweet ensemble", metavar="REPLACE", default=False)
+    parser.add_option("-m", "--merge", dest="merge", action="store_true",
+                      help="merge tweet ensemble, choose the first ensemble", metavar="MERGE", default=False)
+    parser.add_option("-E", "--extended", dest="extended_mode", action="store_true",
+                      help="Trigger polemic extended mode", metavar="EXTENDED", default=False)
+    parser.add_option("-S", "--step", dest="step", type=1,
+                      help="step for version", metavar="STEP", default=False)
+
+    
+    
+    set_logging_options(parser)
+
+    
+    return parser.parse_args() + (parser,)
+
+
+if __name__ == "__main__" :
+
+    (options, args, parser) = get_options()
+    
+    set_logging(options)
+    get_logger().debug("OPTIONS : " + repr(options)) #@UndefinedVariable
+    
+    if len(sys.argv) == 1:
+        abort(None)
+
+    base_url = options.get("api_url", None)
+    if not base_url:
+        abort("No base url")
+
+    api_key = options.get("api_key", None)
+    if not api_key:
+        abort("api key missing")
+        
+    pad_id = options.get("pad_id", None)
+    if not pad_id:
+        abort("No pad id")
+
+    start_date_str = options.get("start_date",None)
+    end_date_str = options.get("end_date", None)
+    duration = options.get("duration", None)
+    
+    start_date = None
+    start_ts = None
+    if start_date_str:
+        start_date = parse_date(start_date_str) 
+        start_ts = time.mktime(start_date.timetuple())*1000
+
+    end_date = None
+    if end_date_str:
+        end_date = parse_date(end_date_str)
+    elif start_date and duration:
+        end_date = start_date + datetime.timedelta(seconds=duration)
+        
+    if start_date is None or ts is None:
+        abort("No start date found")
+
+    end_ts = None
+    if end_date is not None:
+        end_ts = time.mktime(end_date.timetuple())*1000
+
+    content_file = options.get("content_file", None)
+    
+    if not content_file:
+        abort("No content file")        
+
+    root = None
+
+    if content_file.find("http") == 0:
+
+        get_logger().debug("url : " + content_file) #@UndefinedVariable
+        
+        h = httplib2.Http()
+        resp, content = h.request(content_file)
+        
+        get_logger().debug("url response " + repr(resp) + " content " + repr(content)) #@UndefinedVariable
+        
+        project = anyjson.deserialize(content)
+        root = etree.fromstring(project["ldt"])
+                
+    elif os.path.exists(content_file):
+
+        doc = etree.parse(content_file)
+        root = doc.getroot()
+            
+    if root is None:
+        abort("No content file, file not found")
+
+    cutting_name = options.get("name", None)
+    if cutting_name is None:
+        cutting_name = "pad_%s" % pad_id
+
+    format = options.get('format','html')
+    ensemble_parent = None
+                
+    file_type = None
+    for node in root:
+        if node.tag == "project":
+            file_type = "ldt"
+            break
+        elif node.tag == "head":
+            file_type = "iri"
+            break
+    if file_type is None:
+        abort("Unknown file type")
+
+    if file_type == "ldt":
+        media_nodes = root.xpath("//media")
+        if len(media_nodes) > 0:
+            media = media_nodes[0]
+        annotations_node = root.find(u"annotations")
+        if annotations_node is None:
+            annotations_node = etree.SubElement(root, u"annotations")
+        content_node = annotations_node.find(u"content")
+        if content_node is None:
+            content_node = etree.SubElement(annotations_node,u"content", id=media.get(u"id"))
+        ensemble_parent = content_node
+    elif file_type == "iri":
+        body_node = root.find(u"body")
+        if body_node is None:
+            body_node = etree.SubElement(root, u"body")
+        ensembles_node = body_node.find(u"ensembles")
+        if ensembles_node is None:
+            ensembles_node = etree.SubElement(body_node, u"ensembles")
+        ensemble_parent = ensembles_node
+
+    if ensemble_parent is None:
+        abort("Can not add cutting")
+
+    if options.replace:
+        for ens in ensemble_parent.iterchildren(tag=u"ensemble"):
+            if ens.get("id","").startswith(cutting_name):
+                ensemble_parent.remove(ens)
+                
+    ensemble = None
+    elements = None
+                
+    if options.merge:
+        ensemble = ensemble_parent.find(u"ensemble")
+        if ensemble is not None:
+            elements = ensemble.find(u".//elements")                
+        
+    if ensemble is None or elements is None:
+        ensemble = etree.SubElement(ensemble_parent, u"ensemble", {u"id":u"tweet_" + unicode(uuid.uuid4()), u"title":u"Ensemble pad", u"author":u"IRI Web", u"abstract":u"Ensemble Pad"})
+        decoupage = etree.SubElement(ensemble, u"decoupage", {u"id": unicode(uuid.uuid4()), u"author": u"IRI Web"})
+    
+        etree.SubElement(decoupage, u"title").text = unicode(cutting_name)
+        etree.SubElement(decoupage, u"abstract").text = unicode(cutting_name)
+    
+        elements = etree.SubElement(decoupage, u"elements")
+
+
+    etp_req = EtherpadRequest(base_url, api_key)
+    rev_count = et_req.getRevisionCount(pad_id)
+    
+    
+    version_range = range(1,rev_count+1, step)
+    #make sure that teh last version is exported
+    if rev_count not in version_range:
+        version_range.append(rev_count)
+    for rev in version_range:
+        
+        data = None
+        text = ""
+
+        if format == "html":
+            data = etp_req.getHtml(padID=padID, rev=rev)
+            text = data.get("html", "")
+        else:
+            data = etp_req.getText(padID=padID, rev=rev)
+            text = data.get("text","")
+
+        pad_ts = data['timestamp']
+        
+        if pad_ts < start_ts:
+            continue
+        
+        if end_ts is not None and pad_ts > end_ts:
+             break
+
+        pad_dt = datetime.fromtimestamp(float(pad_ts)/1000.0)
+        pad_ts_rel = pad_ts - start_ts
+        
+        username = None
+        color = ""
+        if 'author' in data:
+            username = data['author']['name'] if ('name' in data['author'] and data['author']['name']) else data['author']['id']
+            color =  data['author']['color'] if ('color' in data['author'] and data['author']['color']) else ""
+        
+        if not username:
+            username = "anon."
+            
+            
+        element = etree.SubElement(elements, u"element" , {u"id":"%s-%s-%d" %(unicode(uuid.uuid4()),unicode(pad_id),rev), u"color":unicode(color), u"author":unicode(username), u"date":unicode(pad_dt.strftime("%Y/%m/%d")), u"begin": unicode(pad_ts_rel), u"dur":u"0", u"src":""})
+        etree.SubElement(element, u"title").text = "%s: %s - rev %d" % (unicode(username), unicode(pad_id), rev)
+        etree.SubElement(element, u"abstract").text = unicode(text)
+        
+        meta_element = etree.SubElement(element, u'meta')
+        etree.SubElement(meta_element, "pad_url").text = etree.CDATA(unicode(etp_req.getPadUrl(padID)))
+        etree.SubElement(meta_element, "revision").text = etree.CDATA(unicode(rev))
+
+    # sort by tc in
+    if options.merge :
+        elements[:] = sorted(elements,key=lambda n: int(n.get('begin')))
+    
+    output_data = etree.tostring(root, encoding="utf-8", method="xml", pretty_print=False, xml_declaration=True)  
+    
+    if content_file and content_file.find("http") == 0:
+        
+        project["ldt"] = output_data
+        body = anyjson.serialize(project)
+        h = httplib2.Http()
+        resp, content = h.request(content_file, "PUT", headers={'content-type':'application/json'}, body=body)
+        if resp.status != 200:
+            raise Exception("Error writing content : %d : %s"%(resp.status, resp.reason))                        
+    else:
+        if content_file and os.path.exists(content_file):
+            dest_file_name = content_file 
+        else:
+            dest_file_name = options.filename
+
+        output = open(dest_file_name, "w")
+        output.write(output_data)
+        output.flush()
+        output.close()
+        
+
--- a/script/utils/export_twitter_alchemy.py	Mon Oct 15 16:56:57 2012 +0200
+++ b/script/utils/export_twitter_alchemy.py	Mon Oct 15 17:01:50 2012 +0200
@@ -295,7 +295,7 @@
                     username = None
                     profile_url = ""
                     if tw.user is not None:
-                        username = tw.user.name                    
+                        username = tw.user.screen_name
                         profile_url = tw.user.profile_image_url if tw.user.profile_image_url is not None else ""
                     if not username:
                         username = "anon."
--- a/script/utils/merge_tweets.py	Mon Oct 15 16:56:57 2012 +0200
+++ b/script/utils/merge_tweets.py	Mon Oct 15 17:01:50 2012 +0200
@@ -31,9 +31,7 @@
     return parser.parse_args()
 
 if __name__ == "__main__":
-    
-    sys.stdout = codecs.getwriter(sys.stdout.encoding)(sys.stdout)
-    
+        
     options = get_option()
     
     access_token = None
@@ -91,7 +89,8 @@
                 
                 session_tgt.flush()
                 
-            show_progress(i+1, count_tw, progress_text+tweet.text, 70)
+            ptext = progress_text + tweet.text
+            show_progress(i+1, count_tw, ptext.replace("\n",""), 70)
                             
         session_tgt.commit()
         print u"%d new tweet added" % (added)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/script/utils/search_topsy.py	Mon Oct 15 17:01:50 2012 +0200
@@ -0,0 +1,170 @@
+from iri_tweet import models, utils
+from sqlalchemy.orm import sessionmaker
+import anyjson
+import sqlite3
+import twitter
+import re
+import requests
+from optparse import OptionParser
+import simplejson
+import time
+from blessings import Terminal
+import sys
+import math
+from symbol import except_clause
+
+APPLICATION_NAME = "Tweet recorder user"
+CONSUMER_KEY = "Vdr5ZcsjI1G3esTPI8yDg"
+CONSUMER_SECRET = "LMhNrY99R6a7E0YbZZkRFpUZpX5EfB1qATbDk1sIVLs"
+
+
+class TopsyResource(object):
+    
+    def __init__(self, query, **kwargs):
+                
+        self.options = kwargs
+        self.options['q'] = query
+        self.url = kwargs.get("url", "http://otter.topsy.com/search.json")
+        self.page = 0
+        self.req = None
+        self.res = {}
+        
+    def __initialize(self):
+        
+        params = {}
+        params.update(self.options)
+        self.req = requests.get(self.url, params=params)
+        self.res = self.req.json
+        
+    def __next_page(self):
+        page = self.res.get("response").get("page") + 1
+        params = {}
+        params.update(self.options)
+        params['page'] = page
+        self.req = requests.get(self.url, params=params)
+        self.res = self.req.json
+
+    def __iter__(self):        
+        if not self.req:
+            self.__initialize()
+        while "response" in self.res and "list" in self.res.get("response") and self.res.get("response").get("list"):
+            for item in  self.res.get("response").get("list"):
+                yield item
+            self.__next_page()
+            
+    def total(self):
+        if not self.res:
+            return 0
+        else:
+            return self.res.get("response",{}).get("total",0)
+            
+
+
+def get_option():
+    
+    parser = OptionParser()
+
+    parser.add_option("-d", "--database", dest="database",
+                      help="Input database", metavar="DATABASE")
+    parser.add_option("-Q", dest="query",
+                      help="query", metavar="QUERY")
+    parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
+                      help="Token file name")
+    parser.add_option("-T", dest="topsy_apikey", metavar="TOPSY_APIKEY", default=None,
+                      help="Topsy apikey")
+    
+    utils.set_logging_options(parser)
+
+    return parser.parse_args()
+
+
+
+if __name__ == "__main__":
+
+    (options, args) = get_option()
+    
+    utils.set_logging(options);
+
+
+    acess_token_key, access_token_secret = utils.get_oauth_token(options.token_filename, application_name=APPLICATION_NAME, consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET)
+
+    t = twitter.Twitter(domain="api.twitter.com", auth=twitter.OAuth(acess_token_key, access_token_secret, CONSUMER_KEY, CONSUMER_SECRET), secure=True)
+    t.secure = True
+    
+    conn_str = options.database.strip()
+    if not re.match("^\w+://.+", conn_str):
+        conn_str = 'sqlite:///' + conn_str
+    
+    engine, metadata, Session = models.setup_database(conn_str, echo=((options.verbose-options.quiet)>0), create_all=True)
+    session = None
+    
+    
+    topsy_parameters = {
+        'apikey': options.topsy_apikey,
+        'perpage': 100,
+        'window': 'a',
+        'type': 'tweet',
+        'hidden': True,
+    }
+    
+    term = Terminal()
+    
+    try:
+        session = Session()
+        
+        results = None        
+        page = 1
+        print options.query
+
+        tr = TopsyResource(options.query, **topsy_parameters)
+        
+        move_up = 0
+        
+        for i,item in enumerate(tr):
+            # get id
+            url = item.get("url")
+            tweet_id = url.split("/")[-1]
+            
+            if move_up > 0:
+                print((move_up+1)*term.move_up())
+                move_up = 0
+            
+            print ("%d/%d:%03d%% - %s - %r" % (i+1, tr.total(), int(float(i+1)/float(tr.total())*100.0), tweet_id, item.get("content") ) + term.clear_eol())            
+            move_up += 1
+            
+            count_tweet = session.query(models.Tweet).filter_by(id_str=tweet_id).count()
+            
+            if count_tweet:
+                continue
+            try:                                    
+                tweet = t.statuses.show(id=tweet_id, include_entities=True)
+            except twitter.api.TwitterHTTPError as e:
+                if e.e.code == 404 or e.e.code == 403:
+                    continue
+                else:
+                    raise
+            
+            processor = utils.TwitterProcessor(tweet, None, None, session, None, options.token_filename)
+            processor.process()
+            session.flush()
+            session.commit()
+                        
+            time_to_sleep = int(math.ceil((tweet.rate_limit_reset - time.mktime(time.gmtime())) / tweet.rate_limit_remaining))
+            
+            print "rate limit remaining %s of %s" % (str(tweet.rate_limit_remaining), str(tweet.headers.getheader('x-ratelimit-limit'))) + term.clear_eol()
+            move_up += 1
+            for i in xrange(time_to_sleep):
+                if i:
+                    print(2*term.move_up())
+                else:
+                    move_up += 1
+                print(("Sleeping for %d seconds, %d remaining" % (time_to_sleep, time_to_sleep-i)) + term.clear_eol())
+                time.sleep(1)
+                
+    except twitter.api.TwitterHTTPError as e:
+        fmt = ("." + e.format) if e.format else ""
+        print "Twitter sent status %s for URL: %s%s using parameters: (%s)\ndetails: %s" % (repr(e.e.code), repr(e.uri), repr(fmt), repr(e.uriparts), repr(e.response_data))
+        
+    finally:
+        if session:
+            session.close()
--- a/script/utils/tweet_twitter_user.py	Mon Oct 15 16:56:57 2012 +0200
+++ b/script/utils/tweet_twitter_user.py	Mon Oct 15 17:01:50 2012 +0200
@@ -38,6 +38,7 @@
     parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
                       help="Token file name")
     parser.add_option("-S", dest="simulate", metavar="SIMULATE", default=False, action="store_true", help="Simulate call to twitter. Do not change the database")
+    parser.add_option("--direct-message", dest="direct_message", metavar="DIRECT_MESSAGE", default=False, action="store_true", help="send direc t message to the user, else create a status update mentioning the user (@username)")
     parser.add_option("-f", dest="force", metavar="FORCE", default=False, action="store_true", help="force sending message to all user even if it has already been sent")
 
 
@@ -109,10 +110,19 @@
             for user in query_res:
                 screen_name = user.screen_name
                 
-                message = u"@%s: %s" % (screen_name, base_message)
-                get_logger().debug("new status : " + message) #@UndefinedVariable
+                if options.direct_message:
+                    message = base_message
+                else:
+                    message = u"@%s: %s" % (screen_name, base_message)
+
+                print("new message : " + message)
+                get_logger().debug("new message : " + message) #@UndefinedVariable
+
                 if not options.simulate:
-                    t.statuses.update(status=message)
+                    if options.direct_message:
+                        t.direct_messages.new(user_id=user.id, screen_name=screen_name, text=message)
+                    else:
+                        t.statuses.update(status=message)
                     user_message = UserMessage(user_id=user.id, message_id=message_obj.id)
                     session.add(user_message)
                     session.flush()
Binary file script/virtualenv/res/SQLAlchemy-0.7.3.tar.gz has changed
Binary file script/virtualenv/res/SQLAlchemy-0.7.8.tar.gz has changed
Binary file script/virtualenv/res/blessings-1.5.tar.gz has changed
Binary file script/virtualenv/res/requests-0.13.1.tar.gz has changed
Binary file script/virtualenv/res/twitter-1.4.2.tar.gz has changed
Binary file script/virtualenv/res/twitter-1.9.0.tar.gz has changed