--- a/sbin/sync/sync_live Mon Oct 15 16:56:57 2012 +0200
+++ b/sbin/sync/sync_live Mon Oct 15 17:01:50 2012 +0200
@@ -9,7 +9,8 @@
#text2unix ~/tmp/tweet_live_V$1
if [ -d ~/tmp/tweet_live_V$1 ]; then
- cat <<EOT | rsync -Cvrlz --delete --filter=". -" ~/tmp/tweet_live_V$1/web/ iri@www.iri.centrepompidou.fr:/home/polemictweet/
+# cat <<EOT | rsync -Cvrlz --delete --filter=". -" ~/tmp/tweet_live_V$1/web/ iri@www.iri.centrepompidou.fr:/home/polemictweet/
+ cat <<EOT | rsync -Cvrlz --delete --filter=". -" ~/tmp/tweet_live_V$1/web/ iri@ftv.iri-research.org:/srv/www/pt/
+ core
P config.php
P .htaccess
@@ -23,4 +24,4 @@
rm -fr ~/tmp/tweet_live_V$1;
fi
-ssh iri@www.iri.centrepompidou.fr sudo apache2ctl restart
+#ssh iri@www.iri.centrepompidou.fr sudo apache2ctl restart
--- a/script/lib/iri_tweet/iri_tweet/__init__.py Mon Oct 15 16:56:57 2012 +0200
+++ b/script/lib/iri_tweet/iri_tweet/__init__.py Mon Oct 15 17:01:50 2012 +0200
@@ -21,3 +21,34 @@
__contact__ = "ymh.work@gmail.com"
__homepage__ = ""
__docformat__ = "restructuredtext"
+
+
+"""
+ .. data:: USER_AGENT
+
+ The default user agent string for stream objects
+"""
+
+USER_AGENT = "IRITweet %s" % __version__
+
+
+class IRITweetError(Exception):
+ """Base class for all tweetstream errors"""
+ pass
+
+
+class AuthenticationError(IRITweetError):
+ """Exception raised if the username/password is not accepted"""
+ pass
+
+
+class ConnectionError(IRITweetError):
+ """Raised when there are network problems. This means when there are
+ dns errors, network errors, twitter issues"""
+
+ def __init__(self, reason, details=None):
+ self.reason = reason
+ self.details = details
+
+ def __str__(self):
+ return '<ConnectionError %s>' % self.reason
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/script/lib/iri_tweet/iri_tweet/stream.py Mon Oct 15 17:01:50 2012 +0200
@@ -0,0 +1,364 @@
+# -*- coding: utf-8 -*-
+'''
+Created on Mar 22, 2012
+
+@author: ymh
+
+Module directly inspired by tweetstream
+
+'''
+import time
+import requests
+from requests.utils import stream_untransfer, stream_decode_response_unicode
+import anyjson
+import select
+
+from . import USER_AGENT, ConnectionError, AuthenticationError
+
+
+def iter_content_non_blocking(req, max_chunk_size=4096, decode_unicode=False, timeout=-1):
+
+ if req._content_consumed:
+ raise RuntimeError(
+ 'The content for this response was already consumed'
+ )
+
+ req.raw._fp.fp._sock.setblocking(False)
+
+ def generate():
+ chunk_size = 1
+ while True:
+ if timeout < 0:
+ rlist,_,_ = select.select([req.raw._fp.fp._sock], [], [])
+ else:
+ rlist,_,_ = select.select([req.raw._fp.fp._sock], [], [], timeout)
+
+ if not rlist:
+ continue
+
+ try:
+ chunk = req.raw.read(chunk_size)
+ if not chunk:
+ break
+ if len(chunk) >= chunk_size and chunk_size < max_chunk_size:
+ chunk_size = min(chunk_size*2, max_chunk_size)
+ elif len(chunk) < chunk_size/2 and chunk_size < max_chunk_size:
+ chunk_size = max(chunk_size/2,1)
+ yield chunk
+ except SSLError as e:
+ if e.errno == 2:
+ # Apparently this means there was nothing in the socket buf
+ pass
+ else:
+ raise
+
+ req._content_consumed = True
+
+ gen = stream_untransfer(generate(), req)
+
+ if decode_unicode:
+ gen = stream_decode_response_unicode(gen, req)
+
+ return gen
+
+
+
+
+class BaseStream(object):
+
+ """A network connection to Twitters streaming API
+
+ :param auth: tweepy auth object.
+ :keyword catchup: Number of tweets from the past to get before switching to
+ live stream.
+ :keyword raw: If True, return each tweet's raw data direct from the socket,
+ without UTF8 decoding or parsing, rather than a parsed object. The
+ default is False.
+ :keyword timeout: If non-None, set a timeout in seconds on the receiving
+ socket. Certain types of network problems (e.g., disconnecting a VPN)
+ can cause the connection to hang, leading to indefinite blocking that
+ requires kill -9 to resolve. Setting a timeout leads to an orderly
+ shutdown in these cases. The default is None (i.e., no timeout).
+ :keyword url: Endpoint URL for the object. Note: you should not
+ need to edit this. It's present to make testing easier.
+
+ .. attribute:: connected
+
+ True if the object is currently connected to the stream.
+
+ .. attribute:: url
+
+ The URL to which the object is connected
+
+ .. attribute:: starttime
+
+ The timestamp, in seconds since the epoch, the object connected to the
+ streaming api.
+
+ .. attribute:: count
+
+ The number of tweets that have been returned by the object.
+
+ .. attribute:: rate
+
+ The rate at which tweets have been returned from the object as a
+ float. see also :attr: `rate_period`.
+
+ .. attribute:: rate_period
+
+ The ammount of time to sample tweets to calculate tweet rate. By
+ default 10 seconds. Changes to this attribute will not be reflected
+ until the next time the rate is calculated. The rate of tweets vary
+ with time of day etc. so it's usefull to set this to something
+ sensible.
+
+ .. attribute:: user_agent
+
+ User agent string that will be included in the request. NOTE: This can
+ not be changed after the connection has been made. This property must
+ thus be set before accessing the iterator. The default is set in
+ :attr: `USER_AGENT`.
+ """
+
+ def __init__(self, auth,
+ catchup=None, raw=False, timeout=-1, url=None, compressed=False, chunk_size=4096):
+ self._conn = None
+ self._rate_ts = None
+ self._rate_cnt = 0
+ self._auth = auth
+ self._catchup_count = catchup
+ self.raw_mode = raw
+ self.timeout = timeout
+ self._compressed = compressed
+
+ self.rate_period = 10 # in seconds
+ self.connected = False
+ self.starttime = None
+ self.count = 0
+ self.rate = 0
+ self.user_agent = USER_AGENT
+ self.chunk_size = chunk_size
+ if url: self.url = url
+
+ self.muststop = False
+
+ self._iter = self.__iter__()
+
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *params):
+ self.close()
+ return False
+
+ def _init_conn(self):
+ """Open the connection to the twitter server"""
+ headers = {'User-Agent': self.user_agent}
+
+ if self._compressed:
+ headers['Accept-Encoding'] = "deflate, gzip"
+
+ postdata = self._get_post_data() or {}
+ postdata['stall_warnings'] = 'true'
+ if self._catchup_count:
+ postdata["count"] = self._catchup_count
+
+ if self._auth:
+ self._auth.apply_auth(self.url, "POST", headers, postdata)
+
+ self._resp = requests.post(self.url, headers=headers, data=postdata)
+ self._resp.raise_for_status()
+ self.connected = True
+ if not self._rate_ts:
+ self._rate_ts = time.time()
+ if not self.starttime:
+ self.starttime = time.time()
+
+
+
+ def _get_post_data(self):
+ """Subclasses that need to add post data to the request can override
+ this method and return post data. The data should be in the format
+ returned by urllib.urlencode."""
+ return None
+
+ def testmuststop(self):
+ if callable(self.muststop):
+ return self.muststop()
+ else:
+ return self.muststop
+
+ def _update_rate(self):
+ rate_time = time.time() - self._rate_ts
+ if not self._rate_ts or rate_time > self.rate_period:
+ self.rate = self._rate_cnt / rate_time
+ self._rate_cnt = 0
+ self._rate_ts = time.time()
+
+ def _iter_object(self):
+ pending = None
+ has_stopped = False
+
+# for chunk in iter_content_non_blocking(self._resp,
+# max_chunk_size=self.chunk_size,
+# decode_unicode=False,
+# timeout=self.timeout):
+ for chunk in self._resp.iter_content(
+ chunk_size=self.chunk_size,
+ decode_unicode=False):
+
+ if self.testmuststop():
+ has_stopped = True
+ break
+
+ if pending is not None:
+ chunk = pending + chunk
+ lines = chunk.split('\r')
+
+ if chunk and lines[-1] and lines[-1][-1] == chunk[-1]:
+ pending = lines.pop()
+ else:
+ pending = None
+
+ for line in lines:
+ yield line.strip('\n')
+
+ if self.testmuststop():
+ has_stopped = True
+ break
+
+ if pending is not None:
+ yield pending
+ if has_stopped:
+ raise StopIteration()
+
+ def __iter__(self):
+
+ if not self.connected:
+ self._init_conn()
+
+ for line in self._iter_object():
+
+ if not line:
+ continue
+
+ if (self.raw_mode):
+ tweet = line
+ else:
+ line = line.decode("utf8")
+ try:
+ tweet = anyjson.deserialize(line)
+ except ValueError:
+ self.close()
+ raise ConnectionError("Got invalid data from twitter", details=line)
+ if 'text' in tweet:
+ self.count += 1
+ self._rate_cnt += 1
+ self._update_rate()
+ yield tweet
+
+
+ def next(self):
+ """Return the next available tweet. This call is blocking!"""
+ return self._iter.next()
+
+
+ def close(self):
+ """
+ Close the connection to the streaming server.
+ """
+ self.connected = False
+
+
+class FilterStream(BaseStream):
+ url = "https://stream.twitter.com/1/statuses/filter.json"
+
+ def __init__(self, auth, follow=None, locations=None,
+ track=None, catchup=None, url=None, raw=False, timeout=None, compressed=False, chunk_size=4096):
+ self._follow = follow
+ self._locations = locations
+ self._track = track
+ # remove follow, locations, track
+ BaseStream.__init__(self, auth, url=url, raw=raw, catchup=catchup, timeout=timeout, compressed=compressed, chunk_size=chunk_size)
+
+ def _get_post_data(self):
+ postdata = {}
+ if self._follow: postdata["follow"] = ",".join([str(e) for e in self._follow])
+ if self._locations: postdata["locations"] = ",".join(self._locations)
+ if self._track: postdata["track"] = ",".join(self._track)
+ return postdata
+
+
+class SafeStreamWrapper(object):
+
+ def __init__(self, base_stream, logger=None, error_cb=None, max_reconnects=-1, initial_tcp_wait=250, initial_http_wait=5000, max_wait=240000):
+ self._stream = base_stream
+ self._logger = logger
+ self._error_cb = error_cb
+ self._max_reconnects = max_reconnects
+ self._initial_tcp_wait = initial_tcp_wait
+ self._initial_http_wait = initial_http_wait
+ self._max_wait = max_wait
+ self._retry_wait = 0
+ self._retry_nb = 0
+
+ def __post_process_error(self,e):
+ # Note: error_cb is not called on the last error since we
+ # raise a ConnectionError instead
+ if callable(self._error_cb):
+ self._error_cb(e)
+ self._logger.info("stream sleeping for %d ms " % self._retry_wait)
+ time.sleep(float(self._retry_wait)/1000.0)
+
+
+ def __process_tcp_error(self,e):
+ if self._logger: self._logger.debug("connection error :" + str(e))
+ self._reconnects += 1
+ if self._max_reconnects >= 0 and self._reconnects > self._max_reconnects:
+ raise ConnectionError("Too many retries")
+ if self._retry_wait < self._max_wait:
+ self._retry_wait += self._initial_tcp_wait
+ if self._retry_wait > self._max_wait:
+ self._retry_wait = self._max_wait
+
+ self.__post_process_error(e)
+
+
+ def __process_http_error(self,e):
+ if self._logger: self._logger.debug("http error on %s : %s" % (e.response.url,e.message))
+ if self._retry_wait < self._max_wait:
+ self._retry_wait = 2*self._retry_wait if self._retry_wait > 0 else self._initial_http_wait
+ if self._retry_wait > self._max_wait:
+ self._retry_wait = self._max_wait
+
+ self.__post_process_error(e)
+
+ def __iter__(self):
+ while not self._stream.testmuststop():
+ self._retry_nb += 1
+ try:
+ if self._logger: self._logger.debug("inner loop")
+ for tweet in self._stream:
+ self._reconnects = 0
+ self._retry_wait = 0
+ if "warning" in tweet:
+ self._logger.warning("Tweet warning received : %s" % repr(tweet))
+ continue
+ if not tweet.strip():
+ self._logger.debug("Empty Tweet received : PING")
+ continue
+ yield tweet
+ except (ConnectionError, requests.exceptions.ConnectionError, requests.exceptions.Timeout, requests.exceptions.RequestException) as e:
+ self.__process_tcp_error(e)
+ except requests.exceptions.HTTPError as e:
+ if e.response.status_code == 401 and self._retry_nb <= 1:
+ raise AuthenticationError("Error connecting to %s : %s" % (e.response.url,e.message))
+ if e.response.status_code > 200:
+ self.__process_http_error(e)
+ else:
+ self.__process_tcp_error(e)
+
+
+
+
\ No newline at end of file
--- a/script/lib/iri_tweet/iri_tweet/utils.py Mon Oct 15 16:56:57 2012 +0200
+++ b/script/lib/iri_tweet/iri_tweet/utils.py Mon Oct 15 17:01:50 2012 +0200
@@ -48,7 +48,7 @@
if res is None:
get_logger().debug("get_oauth_token : doing the oauth dance")
- res = twitter.oauth_dance.oauth_dance(application_name, consumer_key, consumer_secret, token_file_path)
+ res = twitter.oauth_dance(application_name, consumer_key, consumer_secret, token_file_path)
CACHE_ACCESS_TOKEN[application_name] = res
@@ -615,9 +615,9 @@
spaces = math.floor(width - marks)
loader = u'[' + (u'=' * int(marks)) + (u' ' * int(spaces)) + u']'
-
+
s = u"%s %3d%% %*d/%d - %*s\r" % (loader, percent, len(str(total_line)), current_line, total_line, width, label[:width])
-
+
writer.write(s) #takes the header into account
if percent >= 100:
writer.write("\n")
--- a/script/stream/recorder_tweetstream.py Mon Oct 15 16:56:57 2012 +0200
+++ b/script/stream/recorder_tweetstream.py Mon Oct 15 17:01:50 2012 +0200
@@ -23,7 +23,7 @@
import time
import traceback
import tweepy.auth
-import tweetstream
+import iri_tweet.stream as tweetstream
import urllib2
socket._fileobject.default_bufsize = 0
@@ -35,6 +35,7 @@
columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count']
#just put it in a sqlite3 tqble
+DEFAULT_TIMEOUT = 5
def set_logging(options):
loggers = []
@@ -124,32 +125,36 @@
self.token_filename = options.token_filename
self.catchup = options.catchup
self.timeout = options.timeout
+ self.stream = None
super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
-
- def do_run(self):
+
+ def __source_stream_iter(self):
- #import pydevd
- #pydevd.settrace(suspend=True)
-
self.logger = set_logging_process(self.options, self.logger_queue)
- self.auth = get_auth(self.options, self.access_token)
+ self.logger.debug("SourceProcess : run ")
- self.logger.debug("SourceProcess : run ")
+ self.auth = get_auth(self.options, self.access_token)
+ self.logger.debug("SourceProcess : auth set ")
+
track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
self.logger.debug("SourceProcess : track list " + track_list)
track_list = [k.strip() for k in track_list.split(',')]
self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list))
- stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout)
+ self.stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout, chunk_size=1)
self.logger.debug("SourceProcess : after connecting to stream")
- stream.muststop = lambda: self.stop_event.is_set()
+ self.stream.muststop = lambda: self.stop_event.is_set()
+
+ stream_wrapper = tweetstream.SafeStreamWrapper(self.stream, logger=self.logger)
session = self.session_maker()
try:
- for tweet in stream:
+ for tweet in stream_wrapper:
if not self.parent_is_alive():
+ self.stop_event.set()
+ stop_thread.join(5)
sys.exit()
self.logger.debug("SourceProcess : tweet " + repr(tweet))
source = TweetSource(original_json=tweet)
@@ -165,26 +170,56 @@
session.rollback()
self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries))
if add_retries == 10:
- raise e
+ raise
source_id = source.id
self.logger.debug("SourceProcess : before queue + source id " + repr(source_id))
- self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
+ self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (self.stream.count, self.stream.rate, int(time.time() - self.stream.starttime)))
session.commit()
self.queue.put((source_id, tweet), False)
except Exception as e:
self.logger.error("SourceProcess : Error when processing tweet " + repr(e))
+ raise
finally:
session.rollback()
- stream.close()
session.close()
- self.queue.close()
- self.stop_event.set()
+ with self.logger_queue.mutex:
+ self.logger_queue.clear()
+ self.logger_queue.close()
+ with self.queue.mutex:
+ self.queue.clear()
+ self.queue.close()
+ self.stream.close()
+ self.stream = None
+ if not self.stop_event.is_set():
+ self.stop_event.set()
+
+
+ def do_run(self):
+
+ import pydevd
+ pydevd.settrace(suspend=False)
+
+ source_stream_iter_thread = threading.Thread(target=self.__source_stream_iter , name="SourceStreamIterThread")
+
+ source_stream_iter_thread.start()
+
+ while not self.stop_event.is_set():
+ self.stop_event.wait(DEFAULT_TIMEOUT)
+ if self.stop_event.is_set() and self.stream:
+ self.stream.close()
+ elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive:
+ with self.stop_event.mutex:
+ self.stop_event.set()
+
+ source_stream_iter_thread.join(30)
def process_tweet(tweet, source_id, session, access_token, twitter_query_user, logger):
try:
+ if not tweet.strip():
+ return
tweet_obj = anyjson.deserialize(tweet)
if 'text' not in tweet_obj:
tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
@@ -197,6 +232,17 @@
logger.debug(u"Process_tweet :" + repr(tweet))
processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None, twitter_query_user)
processor.process()
+ except ValueError as e:
+ message = u"Value Error %s processing tweet %s" % (repr(e), tweet)
+ output = StringIO.StringIO()
+ try:
+ traceback.print_exc(file=output)
+ error_stack = output.getvalue()
+ finally:
+ output.close()
+ tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'], error=message, error_stack=error_stack)
+ session.add(tweet_log)
+ session.commit()
except Exception as e:
message = u"Error %s processing tweet %s" % (repr(e), tweet)
logger.exception(message)
@@ -237,6 +283,10 @@
finally:
session.rollback()
self.stop_event.set()
+ with self.logger_queue.mutex:
+ self.logger_queue.clear()
+ with self.queue.mutex:
+ self.queue.clear()
session.close()
@@ -353,7 +403,7 @@
SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
process_engines.append(engine_process)
- lqueue = mQueue(1)
+ lqueue = mQueue(50)
logger_queues.append(lqueue)
pid = os.getpid()
sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
@@ -363,7 +413,7 @@
for i in range(options.process_nb - 1):
SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
process_engines.append(engine_process)
- lqueue = mQueue(1)
+ lqueue = mQueue(50)
logger_queues.append(lqueue)
cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
tweet_processes.append(cprocess)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/script/utils/export_pad.py Mon Oct 15 17:01:50 2012 +0200
@@ -0,0 +1,324 @@
+#!/usr/bin/env python
+# coding=utf-8
+
+from lxml import etree
+from iri_tweet.models import setup_database
+from optparse import OptionParser #@UnresolvedImport
+from sqlalchemy import Table, Column, BigInteger
+from iri_tweet.utils import (set_logging_options, set_logging, get_filter_query,
+ get_logger)
+import anyjson
+import datetime
+import httplib2
+import os.path
+import re
+import sys
+import time
+import uuid #@UnresolvedImport
+from dateutil.parser import parse as parse_date
+import json
+import functools
+
+
+class EtherpadRequestException(Exception):
+ def __init__(self, original_resp):
+ super(EtherpadRequestException, self).__init__(original_resp["message"])
+ self.status = original_resp["status"]
+ self.original_resp = original_resp
+
+
+class EtherpadRequest():
+
+ def __init__(self, base_url, api_key):
+ self.base_url = base_url
+ self.api_key = api_key
+ self.__request = None
+
+ def __getattr__(self, name):
+ return functools.partial(self.__action, name)
+
+ def __action(self, action, **kwargs):
+ url = "%s/%s" % (self.base_url, action)
+ params = dict(kwargs)
+ params['apikey'] = self.api_key
+
+ r = requests.get(url, params)
+
+ resp = anyjson.deserialize(r.text)
+
+ if resp["code"] == 0:
+ return resp["data"]
+ else:
+ raise EtherpadRequestException(resp)
+
+ return resp
+
+ def getRevisionsCount(self, padID):
+ f = self.__getattr__("getRevisionsCount")
+ res = f(padID=padID)
+
+ return res["revisions"]
+
+ def getPadUrl(self, padID):
+
+ return "%s/%s" % (self.base_url,padID)
+
+
+
+def abort(message, parser):
+ if message is not None:
+ sys.stderr.write(message + "\n")
+ parser.print_help()
+ sys.exit(1)
+
+def get_options():
+
+ parser = OptionParser()
+ parser.add_option("-u", "--api-url", dest="api_url",
+ help="Base etherpad-lite api url", metavar="API_URL", default=None)
+ parser.add_option("-k", "--api-key", dest="api_key",
+ help="Base etherpad-lite api url", metavar="API_KEY", default=None)
+ parser.add_option("-p", "--pad-id", dest="pad_id",
+ help="pad id", metavar="PADID")
+ parser.add_option("-s", "--start-date", dest="start_date",
+ help="start date", metavar="START_DATE", default=None)
+ parser.add_option("-e", "--end-date", dest="end_date",
+ help="end date", metavar="END_DATE", default=None)
+ parser.add_option("-f", "--format", dest="format", type="choice",
+ help="format", metavar="FORMAT", choice=['html', 'text'], default='html')
+ parser.add_option("-I", "--content-file", dest="content_file",
+ help="Content file", metavar="CONTENT_FILE")
+ parser.add_option("-C", "--color", dest="color",
+ help="Color code", metavar="COLOR", default="16763904")
+ parser.add_option("-D", "--duration", dest="duration", type="int",
+ help="Duration", metavar="DURATION", default=None)
+ parser.add_option("-n", "--name", dest="name",
+ help="Cutting name", metavar="NAME", default=u"pads")
+ parser.add_option("-R", "--replace", dest="replace", action="store_true",
+ help="Replace tweet ensemble", metavar="REPLACE", default=False)
+ parser.add_option("-m", "--merge", dest="merge", action="store_true",
+ help="merge tweet ensemble, choose the first ensemble", metavar="MERGE", default=False)
+ parser.add_option("-E", "--extended", dest="extended_mode", action="store_true",
+ help="Trigger polemic extended mode", metavar="EXTENDED", default=False)
+ parser.add_option("-S", "--step", dest="step", type=1,
+ help="step for version", metavar="STEP", default=False)
+
+
+
+ set_logging_options(parser)
+
+
+ return parser.parse_args() + (parser,)
+
+
+if __name__ == "__main__" :
+
+ (options, args, parser) = get_options()
+
+ set_logging(options)
+ get_logger().debug("OPTIONS : " + repr(options)) #@UndefinedVariable
+
+ if len(sys.argv) == 1:
+ abort(None)
+
+ base_url = options.get("api_url", None)
+ if not base_url:
+ abort("No base url")
+
+ api_key = options.get("api_key", None)
+ if not api_key:
+ abort("api key missing")
+
+ pad_id = options.get("pad_id", None)
+ if not pad_id:
+ abort("No pad id")
+
+ start_date_str = options.get("start_date",None)
+ end_date_str = options.get("end_date", None)
+ duration = options.get("duration", None)
+
+ start_date = None
+ start_ts = None
+ if start_date_str:
+ start_date = parse_date(start_date_str)
+ start_ts = time.mktime(start_date.timetuple())*1000
+
+ end_date = None
+ if end_date_str:
+ end_date = parse_date(end_date_str)
+ elif start_date and duration:
+ end_date = start_date + datetime.timedelta(seconds=duration)
+
+ if start_date is None or ts is None:
+ abort("No start date found")
+
+ end_ts = None
+ if end_date is not None:
+ end_ts = time.mktime(end_date.timetuple())*1000
+
+ content_file = options.get("content_file", None)
+
+ if not content_file:
+ abort("No content file")
+
+ root = None
+
+ if content_file.find("http") == 0:
+
+ get_logger().debug("url : " + content_file) #@UndefinedVariable
+
+ h = httplib2.Http()
+ resp, content = h.request(content_file)
+
+ get_logger().debug("url response " + repr(resp) + " content " + repr(content)) #@UndefinedVariable
+
+ project = anyjson.deserialize(content)
+ root = etree.fromstring(project["ldt"])
+
+ elif os.path.exists(content_file):
+
+ doc = etree.parse(content_file)
+ root = doc.getroot()
+
+ if root is None:
+ abort("No content file, file not found")
+
+ cutting_name = options.get("name", None)
+ if cutting_name is None:
+ cutting_name = "pad_%s" % pad_id
+
+ format = options.get('format','html')
+ ensemble_parent = None
+
+ file_type = None
+ for node in root:
+ if node.tag == "project":
+ file_type = "ldt"
+ break
+ elif node.tag == "head":
+ file_type = "iri"
+ break
+ if file_type is None:
+ abort("Unknown file type")
+
+ if file_type == "ldt":
+ media_nodes = root.xpath("//media")
+ if len(media_nodes) > 0:
+ media = media_nodes[0]
+ annotations_node = root.find(u"annotations")
+ if annotations_node is None:
+ annotations_node = etree.SubElement(root, u"annotations")
+ content_node = annotations_node.find(u"content")
+ if content_node is None:
+ content_node = etree.SubElement(annotations_node,u"content", id=media.get(u"id"))
+ ensemble_parent = content_node
+ elif file_type == "iri":
+ body_node = root.find(u"body")
+ if body_node is None:
+ body_node = etree.SubElement(root, u"body")
+ ensembles_node = body_node.find(u"ensembles")
+ if ensembles_node is None:
+ ensembles_node = etree.SubElement(body_node, u"ensembles")
+ ensemble_parent = ensembles_node
+
+ if ensemble_parent is None:
+ abort("Can not add cutting")
+
+ if options.replace:
+ for ens in ensemble_parent.iterchildren(tag=u"ensemble"):
+ if ens.get("id","").startswith(cutting_name):
+ ensemble_parent.remove(ens)
+
+ ensemble = None
+ elements = None
+
+ if options.merge:
+ ensemble = ensemble_parent.find(u"ensemble")
+ if ensemble is not None:
+ elements = ensemble.find(u".//elements")
+
+ if ensemble is None or elements is None:
+ ensemble = etree.SubElement(ensemble_parent, u"ensemble", {u"id":u"tweet_" + unicode(uuid.uuid4()), u"title":u"Ensemble pad", u"author":u"IRI Web", u"abstract":u"Ensemble Pad"})
+ decoupage = etree.SubElement(ensemble, u"decoupage", {u"id": unicode(uuid.uuid4()), u"author": u"IRI Web"})
+
+ etree.SubElement(decoupage, u"title").text = unicode(cutting_name)
+ etree.SubElement(decoupage, u"abstract").text = unicode(cutting_name)
+
+ elements = etree.SubElement(decoupage, u"elements")
+
+
+ etp_req = EtherpadRequest(base_url, api_key)
+ rev_count = et_req.getRevisionCount(pad_id)
+
+
+ version_range = range(1,rev_count+1, step)
+ #make sure that teh last version is exported
+ if rev_count not in version_range:
+ version_range.append(rev_count)
+ for rev in version_range:
+
+ data = None
+ text = ""
+
+ if format == "html":
+ data = etp_req.getHtml(padID=padID, rev=rev)
+ text = data.get("html", "")
+ else:
+ data = etp_req.getText(padID=padID, rev=rev)
+ text = data.get("text","")
+
+ pad_ts = data['timestamp']
+
+ if pad_ts < start_ts:
+ continue
+
+ if end_ts is not None and pad_ts > end_ts:
+ break
+
+ pad_dt = datetime.fromtimestamp(float(pad_ts)/1000.0)
+ pad_ts_rel = pad_ts - start_ts
+
+ username = None
+ color = ""
+ if 'author' in data:
+ username = data['author']['name'] if ('name' in data['author'] and data['author']['name']) else data['author']['id']
+ color = data['author']['color'] if ('color' in data['author'] and data['author']['color']) else ""
+
+ if not username:
+ username = "anon."
+
+
+ element = etree.SubElement(elements, u"element" , {u"id":"%s-%s-%d" %(unicode(uuid.uuid4()),unicode(pad_id),rev), u"color":unicode(color), u"author":unicode(username), u"date":unicode(pad_dt.strftime("%Y/%m/%d")), u"begin": unicode(pad_ts_rel), u"dur":u"0", u"src":""})
+ etree.SubElement(element, u"title").text = "%s: %s - rev %d" % (unicode(username), unicode(pad_id), rev)
+ etree.SubElement(element, u"abstract").text = unicode(text)
+
+ meta_element = etree.SubElement(element, u'meta')
+ etree.SubElement(meta_element, "pad_url").text = etree.CDATA(unicode(etp_req.getPadUrl(padID)))
+ etree.SubElement(meta_element, "revision").text = etree.CDATA(unicode(rev))
+
+ # sort by tc in
+ if options.merge :
+ elements[:] = sorted(elements,key=lambda n: int(n.get('begin')))
+
+ output_data = etree.tostring(root, encoding="utf-8", method="xml", pretty_print=False, xml_declaration=True)
+
+ if content_file and content_file.find("http") == 0:
+
+ project["ldt"] = output_data
+ body = anyjson.serialize(project)
+ h = httplib2.Http()
+ resp, content = h.request(content_file, "PUT", headers={'content-type':'application/json'}, body=body)
+ if resp.status != 200:
+ raise Exception("Error writing content : %d : %s"%(resp.status, resp.reason))
+ else:
+ if content_file and os.path.exists(content_file):
+ dest_file_name = content_file
+ else:
+ dest_file_name = options.filename
+
+ output = open(dest_file_name, "w")
+ output.write(output_data)
+ output.flush()
+ output.close()
+
+
--- a/script/utils/export_twitter_alchemy.py Mon Oct 15 16:56:57 2012 +0200
+++ b/script/utils/export_twitter_alchemy.py Mon Oct 15 17:01:50 2012 +0200
@@ -295,7 +295,7 @@
username = None
profile_url = ""
if tw.user is not None:
- username = tw.user.name
+ username = tw.user.screen_name
profile_url = tw.user.profile_image_url if tw.user.profile_image_url is not None else ""
if not username:
username = "anon."
--- a/script/utils/merge_tweets.py Mon Oct 15 16:56:57 2012 +0200
+++ b/script/utils/merge_tweets.py Mon Oct 15 17:01:50 2012 +0200
@@ -31,9 +31,7 @@
return parser.parse_args()
if __name__ == "__main__":
-
- sys.stdout = codecs.getwriter(sys.stdout.encoding)(sys.stdout)
-
+
options = get_option()
access_token = None
@@ -91,7 +89,8 @@
session_tgt.flush()
- show_progress(i+1, count_tw, progress_text+tweet.text, 70)
+ ptext = progress_text + tweet.text
+ show_progress(i+1, count_tw, ptext.replace("\n",""), 70)
session_tgt.commit()
print u"%d new tweet added" % (added)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/script/utils/search_topsy.py Mon Oct 15 17:01:50 2012 +0200
@@ -0,0 +1,170 @@
+from iri_tweet import models, utils
+from sqlalchemy.orm import sessionmaker
+import anyjson
+import sqlite3
+import twitter
+import re
+import requests
+from optparse import OptionParser
+import simplejson
+import time
+from blessings import Terminal
+import sys
+import math
+from symbol import except_clause
+
+APPLICATION_NAME = "Tweet recorder user"
+CONSUMER_KEY = "Vdr5ZcsjI1G3esTPI8yDg"
+CONSUMER_SECRET = "LMhNrY99R6a7E0YbZZkRFpUZpX5EfB1qATbDk1sIVLs"
+
+
+class TopsyResource(object):
+
+ def __init__(self, query, **kwargs):
+
+ self.options = kwargs
+ self.options['q'] = query
+ self.url = kwargs.get("url", "http://otter.topsy.com/search.json")
+ self.page = 0
+ self.req = None
+ self.res = {}
+
+ def __initialize(self):
+
+ params = {}
+ params.update(self.options)
+ self.req = requests.get(self.url, params=params)
+ self.res = self.req.json
+
+ def __next_page(self):
+ page = self.res.get("response").get("page") + 1
+ params = {}
+ params.update(self.options)
+ params['page'] = page
+ self.req = requests.get(self.url, params=params)
+ self.res = self.req.json
+
+ def __iter__(self):
+ if not self.req:
+ self.__initialize()
+ while "response" in self.res and "list" in self.res.get("response") and self.res.get("response").get("list"):
+ for item in self.res.get("response").get("list"):
+ yield item
+ self.__next_page()
+
+ def total(self):
+ if not self.res:
+ return 0
+ else:
+ return self.res.get("response",{}).get("total",0)
+
+
+
+def get_option():
+
+ parser = OptionParser()
+
+ parser.add_option("-d", "--database", dest="database",
+ help="Input database", metavar="DATABASE")
+ parser.add_option("-Q", dest="query",
+ help="query", metavar="QUERY")
+ parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
+ help="Token file name")
+ parser.add_option("-T", dest="topsy_apikey", metavar="TOPSY_APIKEY", default=None,
+ help="Topsy apikey")
+
+ utils.set_logging_options(parser)
+
+ return parser.parse_args()
+
+
+
+if __name__ == "__main__":
+
+ (options, args) = get_option()
+
+ utils.set_logging(options);
+
+
+ acess_token_key, access_token_secret = utils.get_oauth_token(options.token_filename, application_name=APPLICATION_NAME, consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET)
+
+ t = twitter.Twitter(domain="api.twitter.com", auth=twitter.OAuth(acess_token_key, access_token_secret, CONSUMER_KEY, CONSUMER_SECRET), secure=True)
+ t.secure = True
+
+ conn_str = options.database.strip()
+ if not re.match("^\w+://.+", conn_str):
+ conn_str = 'sqlite:///' + conn_str
+
+ engine, metadata, Session = models.setup_database(conn_str, echo=((options.verbose-options.quiet)>0), create_all=True)
+ session = None
+
+
+ topsy_parameters = {
+ 'apikey': options.topsy_apikey,
+ 'perpage': 100,
+ 'window': 'a',
+ 'type': 'tweet',
+ 'hidden': True,
+ }
+
+ term = Terminal()
+
+ try:
+ session = Session()
+
+ results = None
+ page = 1
+ print options.query
+
+ tr = TopsyResource(options.query, **topsy_parameters)
+
+ move_up = 0
+
+ for i,item in enumerate(tr):
+ # get id
+ url = item.get("url")
+ tweet_id = url.split("/")[-1]
+
+ if move_up > 0:
+ print((move_up+1)*term.move_up())
+ move_up = 0
+
+ print ("%d/%d:%03d%% - %s - %r" % (i+1, tr.total(), int(float(i+1)/float(tr.total())*100.0), tweet_id, item.get("content") ) + term.clear_eol())
+ move_up += 1
+
+ count_tweet = session.query(models.Tweet).filter_by(id_str=tweet_id).count()
+
+ if count_tweet:
+ continue
+ try:
+ tweet = t.statuses.show(id=tweet_id, include_entities=True)
+ except twitter.api.TwitterHTTPError as e:
+ if e.e.code == 404 or e.e.code == 403:
+ continue
+ else:
+ raise
+
+ processor = utils.TwitterProcessor(tweet, None, None, session, None, options.token_filename)
+ processor.process()
+ session.flush()
+ session.commit()
+
+ time_to_sleep = int(math.ceil((tweet.rate_limit_reset - time.mktime(time.gmtime())) / tweet.rate_limit_remaining))
+
+ print "rate limit remaining %s of %s" % (str(tweet.rate_limit_remaining), str(tweet.headers.getheader('x-ratelimit-limit'))) + term.clear_eol()
+ move_up += 1
+ for i in xrange(time_to_sleep):
+ if i:
+ print(2*term.move_up())
+ else:
+ move_up += 1
+ print(("Sleeping for %d seconds, %d remaining" % (time_to_sleep, time_to_sleep-i)) + term.clear_eol())
+ time.sleep(1)
+
+ except twitter.api.TwitterHTTPError as e:
+ fmt = ("." + e.format) if e.format else ""
+ print "Twitter sent status %s for URL: %s%s using parameters: (%s)\ndetails: %s" % (repr(e.e.code), repr(e.uri), repr(fmt), repr(e.uriparts), repr(e.response_data))
+
+ finally:
+ if session:
+ session.close()
--- a/script/utils/tweet_twitter_user.py Mon Oct 15 16:56:57 2012 +0200
+++ b/script/utils/tweet_twitter_user.py Mon Oct 15 17:01:50 2012 +0200
@@ -38,6 +38,7 @@
parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
help="Token file name")
parser.add_option("-S", dest="simulate", metavar="SIMULATE", default=False, action="store_true", help="Simulate call to twitter. Do not change the database")
+ parser.add_option("--direct-message", dest="direct_message", metavar="DIRECT_MESSAGE", default=False, action="store_true", help="send direc t message to the user, else create a status update mentioning the user (@username)")
parser.add_option("-f", dest="force", metavar="FORCE", default=False, action="store_true", help="force sending message to all user even if it has already been sent")
@@ -109,10 +110,19 @@
for user in query_res:
screen_name = user.screen_name
- message = u"@%s: %s" % (screen_name, base_message)
- get_logger().debug("new status : " + message) #@UndefinedVariable
+ if options.direct_message:
+ message = base_message
+ else:
+ message = u"@%s: %s" % (screen_name, base_message)
+
+ print("new message : " + message)
+ get_logger().debug("new message : " + message) #@UndefinedVariable
+
if not options.simulate:
- t.statuses.update(status=message)
+ if options.direct_message:
+ t.direct_messages.new(user_id=user.id, screen_name=screen_name, text=message)
+ else:
+ t.statuses.update(status=message)
user_message = UserMessage(user_id=user.id, message_id=message_obj.id)
session.add(user_message)
session.flush()
Binary file script/virtualenv/res/SQLAlchemy-0.7.3.tar.gz has changed
Binary file script/virtualenv/res/SQLAlchemy-0.7.8.tar.gz has changed
Binary file script/virtualenv/res/blessings-1.5.tar.gz has changed
Binary file script/virtualenv/res/requests-0.13.1.tar.gz has changed
Binary file script/virtualenv/res/twitter-1.4.2.tar.gz has changed
Binary file script/virtualenv/res/twitter-1.9.0.tar.gz has changed