# HG changeset patch # User Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com> # Date 1350313310 -7200 # Node ID 2ef837069108740e92a313a4ca80c551c54fc4ef # Parent 51072e5e6ea93eafabb1ba5a782e6fe4c1fa08e3 Starting 'listener_update' branch diff -r 51072e5e6ea9 -r 2ef837069108 sbin/sync/sync_live --- a/sbin/sync/sync_live Mon Oct 15 16:56:57 2012 +0200 +++ b/sbin/sync/sync_live Mon Oct 15 17:01:50 2012 +0200 @@ -9,7 +9,8 @@ #text2unix ~/tmp/tweet_live_V$1 if [ -d ~/tmp/tweet_live_V$1 ]; then - cat <' % self.reason diff -r 51072e5e6ea9 -r 2ef837069108 script/lib/iri_tweet/iri_tweet/stream.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/script/lib/iri_tweet/iri_tweet/stream.py Mon Oct 15 17:01:50 2012 +0200 @@ -0,0 +1,364 @@ +# -*- coding: utf-8 -*- +''' +Created on Mar 22, 2012 + +@author: ymh + +Module directly inspired by tweetstream + +''' +import time +import requests +from requests.utils import stream_untransfer, stream_decode_response_unicode +import anyjson +import select + +from . import USER_AGENT, ConnectionError, AuthenticationError + + +def iter_content_non_blocking(req, max_chunk_size=4096, decode_unicode=False, timeout=-1): + + if req._content_consumed: + raise RuntimeError( + 'The content for this response was already consumed' + ) + + req.raw._fp.fp._sock.setblocking(False) + + def generate(): + chunk_size = 1 + while True: + if timeout < 0: + rlist,_,_ = select.select([req.raw._fp.fp._sock], [], []) + else: + rlist,_,_ = select.select([req.raw._fp.fp._sock], [], [], timeout) + + if not rlist: + continue + + try: + chunk = req.raw.read(chunk_size) + if not chunk: + break + if len(chunk) >= chunk_size and chunk_size < max_chunk_size: + chunk_size = min(chunk_size*2, max_chunk_size) + elif len(chunk) < chunk_size/2 and chunk_size < max_chunk_size: + chunk_size = max(chunk_size/2,1) + yield chunk + except SSLError as e: + if e.errno == 2: + # Apparently this means there was nothing in the socket buf + pass + else: + raise + + req._content_consumed = True + + gen = stream_untransfer(generate(), req) + + if decode_unicode: + gen = stream_decode_response_unicode(gen, req) + + return gen + + + + +class BaseStream(object): + + """A network connection to Twitters streaming API + + :param auth: tweepy auth object. + :keyword catchup: Number of tweets from the past to get before switching to + live stream. + :keyword raw: If True, return each tweet's raw data direct from the socket, + without UTF8 decoding or parsing, rather than a parsed object. The + default is False. + :keyword timeout: If non-None, set a timeout in seconds on the receiving + socket. Certain types of network problems (e.g., disconnecting a VPN) + can cause the connection to hang, leading to indefinite blocking that + requires kill -9 to resolve. Setting a timeout leads to an orderly + shutdown in these cases. The default is None (i.e., no timeout). + :keyword url: Endpoint URL for the object. Note: you should not + need to edit this. It's present to make testing easier. + + .. attribute:: connected + + True if the object is currently connected to the stream. + + .. attribute:: url + + The URL to which the object is connected + + .. attribute:: starttime + + The timestamp, in seconds since the epoch, the object connected to the + streaming api. + + .. attribute:: count + + The number of tweets that have been returned by the object. + + .. attribute:: rate + + The rate at which tweets have been returned from the object as a + float. see also :attr: `rate_period`. + + .. attribute:: rate_period + + The ammount of time to sample tweets to calculate tweet rate. By + default 10 seconds. Changes to this attribute will not be reflected + until the next time the rate is calculated. The rate of tweets vary + with time of day etc. so it's usefull to set this to something + sensible. + + .. attribute:: user_agent + + User agent string that will be included in the request. NOTE: This can + not be changed after the connection has been made. This property must + thus be set before accessing the iterator. The default is set in + :attr: `USER_AGENT`. + """ + + def __init__(self, auth, + catchup=None, raw=False, timeout=-1, url=None, compressed=False, chunk_size=4096): + self._conn = None + self._rate_ts = None + self._rate_cnt = 0 + self._auth = auth + self._catchup_count = catchup + self.raw_mode = raw + self.timeout = timeout + self._compressed = compressed + + self.rate_period = 10 # in seconds + self.connected = False + self.starttime = None + self.count = 0 + self.rate = 0 + self.user_agent = USER_AGENT + self.chunk_size = chunk_size + if url: self.url = url + + self.muststop = False + + self._iter = self.__iter__() + + + def __enter__(self): + return self + + def __exit__(self, *params): + self.close() + return False + + def _init_conn(self): + """Open the connection to the twitter server""" + headers = {'User-Agent': self.user_agent} + + if self._compressed: + headers['Accept-Encoding'] = "deflate, gzip" + + postdata = self._get_post_data() or {} + postdata['stall_warnings'] = 'true' + if self._catchup_count: + postdata["count"] = self._catchup_count + + if self._auth: + self._auth.apply_auth(self.url, "POST", headers, postdata) + + self._resp = requests.post(self.url, headers=headers, data=postdata) + self._resp.raise_for_status() + self.connected = True + if not self._rate_ts: + self._rate_ts = time.time() + if not self.starttime: + self.starttime = time.time() + + + + def _get_post_data(self): + """Subclasses that need to add post data to the request can override + this method and return post data. The data should be in the format + returned by urllib.urlencode.""" + return None + + def testmuststop(self): + if callable(self.muststop): + return self.muststop() + else: + return self.muststop + + def _update_rate(self): + rate_time = time.time() - self._rate_ts + if not self._rate_ts or rate_time > self.rate_period: + self.rate = self._rate_cnt / rate_time + self._rate_cnt = 0 + self._rate_ts = time.time() + + def _iter_object(self): + pending = None + has_stopped = False + +# for chunk in iter_content_non_blocking(self._resp, +# max_chunk_size=self.chunk_size, +# decode_unicode=False, +# timeout=self.timeout): + for chunk in self._resp.iter_content( + chunk_size=self.chunk_size, + decode_unicode=False): + + if self.testmuststop(): + has_stopped = True + break + + if pending is not None: + chunk = pending + chunk + lines = chunk.split('\r') + + if chunk and lines[-1] and lines[-1][-1] == chunk[-1]: + pending = lines.pop() + else: + pending = None + + for line in lines: + yield line.strip('\n') + + if self.testmuststop(): + has_stopped = True + break + + if pending is not None: + yield pending + if has_stopped: + raise StopIteration() + + def __iter__(self): + + if not self.connected: + self._init_conn() + + for line in self._iter_object(): + + if not line: + continue + + if (self.raw_mode): + tweet = line + else: + line = line.decode("utf8") + try: + tweet = anyjson.deserialize(line) + except ValueError: + self.close() + raise ConnectionError("Got invalid data from twitter", details=line) + if 'text' in tweet: + self.count += 1 + self._rate_cnt += 1 + self._update_rate() + yield tweet + + + def next(self): + """Return the next available tweet. This call is blocking!""" + return self._iter.next() + + + def close(self): + """ + Close the connection to the streaming server. + """ + self.connected = False + + +class FilterStream(BaseStream): + url = "https://stream.twitter.com/1/statuses/filter.json" + + def __init__(self, auth, follow=None, locations=None, + track=None, catchup=None, url=None, raw=False, timeout=None, compressed=False, chunk_size=4096): + self._follow = follow + self._locations = locations + self._track = track + # remove follow, locations, track + BaseStream.__init__(self, auth, url=url, raw=raw, catchup=catchup, timeout=timeout, compressed=compressed, chunk_size=chunk_size) + + def _get_post_data(self): + postdata = {} + if self._follow: postdata["follow"] = ",".join([str(e) for e in self._follow]) + if self._locations: postdata["locations"] = ",".join(self._locations) + if self._track: postdata["track"] = ",".join(self._track) + return postdata + + +class SafeStreamWrapper(object): + + def __init__(self, base_stream, logger=None, error_cb=None, max_reconnects=-1, initial_tcp_wait=250, initial_http_wait=5000, max_wait=240000): + self._stream = base_stream + self._logger = logger + self._error_cb = error_cb + self._max_reconnects = max_reconnects + self._initial_tcp_wait = initial_tcp_wait + self._initial_http_wait = initial_http_wait + self._max_wait = max_wait + self._retry_wait = 0 + self._retry_nb = 0 + + def __post_process_error(self,e): + # Note: error_cb is not called on the last error since we + # raise a ConnectionError instead + if callable(self._error_cb): + self._error_cb(e) + self._logger.info("stream sleeping for %d ms " % self._retry_wait) + time.sleep(float(self._retry_wait)/1000.0) + + + def __process_tcp_error(self,e): + if self._logger: self._logger.debug("connection error :" + str(e)) + self._reconnects += 1 + if self._max_reconnects >= 0 and self._reconnects > self._max_reconnects: + raise ConnectionError("Too many retries") + if self._retry_wait < self._max_wait: + self._retry_wait += self._initial_tcp_wait + if self._retry_wait > self._max_wait: + self._retry_wait = self._max_wait + + self.__post_process_error(e) + + + def __process_http_error(self,e): + if self._logger: self._logger.debug("http error on %s : %s" % (e.response.url,e.message)) + if self._retry_wait < self._max_wait: + self._retry_wait = 2*self._retry_wait if self._retry_wait > 0 else self._initial_http_wait + if self._retry_wait > self._max_wait: + self._retry_wait = self._max_wait + + self.__post_process_error(e) + + def __iter__(self): + while not self._stream.testmuststop(): + self._retry_nb += 1 + try: + if self._logger: self._logger.debug("inner loop") + for tweet in self._stream: + self._reconnects = 0 + self._retry_wait = 0 + if "warning" in tweet: + self._logger.warning("Tweet warning received : %s" % repr(tweet)) + continue + if not tweet.strip(): + self._logger.debug("Empty Tweet received : PING") + continue + yield tweet + except (ConnectionError, requests.exceptions.ConnectionError, requests.exceptions.Timeout, requests.exceptions.RequestException) as e: + self.__process_tcp_error(e) + except requests.exceptions.HTTPError as e: + if e.response.status_code == 401 and self._retry_nb <= 1: + raise AuthenticationError("Error connecting to %s : %s" % (e.response.url,e.message)) + if e.response.status_code > 200: + self.__process_http_error(e) + else: + self.__process_tcp_error(e) + + + + \ No newline at end of file diff -r 51072e5e6ea9 -r 2ef837069108 script/lib/iri_tweet/iri_tweet/utils.py --- a/script/lib/iri_tweet/iri_tweet/utils.py Mon Oct 15 16:56:57 2012 +0200 +++ b/script/lib/iri_tweet/iri_tweet/utils.py Mon Oct 15 17:01:50 2012 +0200 @@ -48,7 +48,7 @@ if res is None: get_logger().debug("get_oauth_token : doing the oauth dance") - res = twitter.oauth_dance.oauth_dance(application_name, consumer_key, consumer_secret, token_file_path) + res = twitter.oauth_dance(application_name, consumer_key, consumer_secret, token_file_path) CACHE_ACCESS_TOKEN[application_name] = res @@ -615,9 +615,9 @@ spaces = math.floor(width - marks) loader = u'[' + (u'=' * int(marks)) + (u' ' * int(spaces)) + u']' - + s = u"%s %3d%% %*d/%d - %*s\r" % (loader, percent, len(str(total_line)), current_line, total_line, width, label[:width]) - + writer.write(s) #takes the header into account if percent >= 100: writer.write("\n") diff -r 51072e5e6ea9 -r 2ef837069108 script/stream/recorder_tweetstream.py --- a/script/stream/recorder_tweetstream.py Mon Oct 15 16:56:57 2012 +0200 +++ b/script/stream/recorder_tweetstream.py Mon Oct 15 17:01:50 2012 +0200 @@ -23,7 +23,7 @@ import time import traceback import tweepy.auth -import tweetstream +import iri_tweet.stream as tweetstream import urllib2 socket._fileobject.default_bufsize = 0 @@ -35,6 +35,7 @@ columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count'] #just put it in a sqlite3 tqble +DEFAULT_TIMEOUT = 5 def set_logging(options): loggers = [] @@ -124,32 +125,36 @@ self.token_filename = options.token_filename self.catchup = options.catchup self.timeout = options.timeout + self.stream = None super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid) - - def do_run(self): + + def __source_stream_iter(self): - #import pydevd - #pydevd.settrace(suspend=True) - self.logger = set_logging_process(self.options, self.logger_queue) - self.auth = get_auth(self.options, self.access_token) + self.logger.debug("SourceProcess : run ") - self.logger.debug("SourceProcess : run ") + self.auth = get_auth(self.options, self.access_token) + self.logger.debug("SourceProcess : auth set ") + track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip() self.logger.debug("SourceProcess : track list " + track_list) track_list = [k.strip() for k in track_list.split(',')] self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list)) - stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout) + self.stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout, chunk_size=1) self.logger.debug("SourceProcess : after connecting to stream") - stream.muststop = lambda: self.stop_event.is_set() + self.stream.muststop = lambda: self.stop_event.is_set() + + stream_wrapper = tweetstream.SafeStreamWrapper(self.stream, logger=self.logger) session = self.session_maker() try: - for tweet in stream: + for tweet in stream_wrapper: if not self.parent_is_alive(): + self.stop_event.set() + stop_thread.join(5) sys.exit() self.logger.debug("SourceProcess : tweet " + repr(tweet)) source = TweetSource(original_json=tweet) @@ -165,26 +170,56 @@ session.rollback() self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries)) if add_retries == 10: - raise e + raise source_id = source.id self.logger.debug("SourceProcess : before queue + source id " + repr(source_id)) - self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime))) + self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (self.stream.count, self.stream.rate, int(time.time() - self.stream.starttime))) session.commit() self.queue.put((source_id, tweet), False) except Exception as e: self.logger.error("SourceProcess : Error when processing tweet " + repr(e)) + raise finally: session.rollback() - stream.close() session.close() - self.queue.close() - self.stop_event.set() + with self.logger_queue.mutex: + self.logger_queue.clear() + self.logger_queue.close() + with self.queue.mutex: + self.queue.clear() + self.queue.close() + self.stream.close() + self.stream = None + if not self.stop_event.is_set(): + self.stop_event.set() + + + def do_run(self): + + import pydevd + pydevd.settrace(suspend=False) + + source_stream_iter_thread = threading.Thread(target=self.__source_stream_iter , name="SourceStreamIterThread") + + source_stream_iter_thread.start() + + while not self.stop_event.is_set(): + self.stop_event.wait(DEFAULT_TIMEOUT) + if self.stop_event.is_set() and self.stream: + self.stream.close() + elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive: + with self.stop_event.mutex: + self.stop_event.set() + + source_stream_iter_thread.join(30) def process_tweet(tweet, source_id, session, access_token, twitter_query_user, logger): try: + if not tweet.strip(): + return tweet_obj = anyjson.deserialize(tweet) if 'text' not in tweet_obj: tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET']) @@ -197,6 +232,17 @@ logger.debug(u"Process_tweet :" + repr(tweet)) processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None, twitter_query_user) processor.process() + except ValueError as e: + message = u"Value Error %s processing tweet %s" % (repr(e), tweet) + output = StringIO.StringIO() + try: + traceback.print_exc(file=output) + error_stack = output.getvalue() + finally: + output.close() + tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'], error=message, error_stack=error_stack) + session.add(tweet_log) + session.commit() except Exception as e: message = u"Error %s processing tweet %s" % (repr(e), tweet) logger.exception(message) @@ -237,6 +283,10 @@ finally: session.rollback() self.stop_event.set() + with self.logger_queue.mutex: + self.logger_queue.clear() + with self.queue.mutex: + self.queue.clear() session.close() @@ -353,7 +403,7 @@ SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) process_engines.append(engine_process) - lqueue = mQueue(1) + lqueue = mQueue(50) logger_queues.append(lqueue) pid = os.getpid() sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) @@ -363,7 +413,7 @@ for i in range(options.process_nb - 1): SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) process_engines.append(engine_process) - lqueue = mQueue(1) + lqueue = mQueue(50) logger_queues.append(lqueue) cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) tweet_processes.append(cprocess) diff -r 51072e5e6ea9 -r 2ef837069108 script/utils/export_pad.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/script/utils/export_pad.py Mon Oct 15 17:01:50 2012 +0200 @@ -0,0 +1,324 @@ +#!/usr/bin/env python +# coding=utf-8 + +from lxml import etree +from iri_tweet.models import setup_database +from optparse import OptionParser #@UnresolvedImport +from sqlalchemy import Table, Column, BigInteger +from iri_tweet.utils import (set_logging_options, set_logging, get_filter_query, + get_logger) +import anyjson +import datetime +import httplib2 +import os.path +import re +import sys +import time +import uuid #@UnresolvedImport +from dateutil.parser import parse as parse_date +import json +import functools + + +class EtherpadRequestException(Exception): + def __init__(self, original_resp): + super(EtherpadRequestException, self).__init__(original_resp["message"]) + self.status = original_resp["status"] + self.original_resp = original_resp + + +class EtherpadRequest(): + + def __init__(self, base_url, api_key): + self.base_url = base_url + self.api_key = api_key + self.__request = None + + def __getattr__(self, name): + return functools.partial(self.__action, name) + + def __action(self, action, **kwargs): + url = "%s/%s" % (self.base_url, action) + params = dict(kwargs) + params['apikey'] = self.api_key + + r = requests.get(url, params) + + resp = anyjson.deserialize(r.text) + + if resp["code"] == 0: + return resp["data"] + else: + raise EtherpadRequestException(resp) + + return resp + + def getRevisionsCount(self, padID): + f = self.__getattr__("getRevisionsCount") + res = f(padID=padID) + + return res["revisions"] + + def getPadUrl(self, padID): + + return "%s/%s" % (self.base_url,padID) + + + +def abort(message, parser): + if message is not None: + sys.stderr.write(message + "\n") + parser.print_help() + sys.exit(1) + +def get_options(): + + parser = OptionParser() + parser.add_option("-u", "--api-url", dest="api_url", + help="Base etherpad-lite api url", metavar="API_URL", default=None) + parser.add_option("-k", "--api-key", dest="api_key", + help="Base etherpad-lite api url", metavar="API_KEY", default=None) + parser.add_option("-p", "--pad-id", dest="pad_id", + help="pad id", metavar="PADID") + parser.add_option("-s", "--start-date", dest="start_date", + help="start date", metavar="START_DATE", default=None) + parser.add_option("-e", "--end-date", dest="end_date", + help="end date", metavar="END_DATE", default=None) + parser.add_option("-f", "--format", dest="format", type="choice", + help="format", metavar="FORMAT", choice=['html', 'text'], default='html') + parser.add_option("-I", "--content-file", dest="content_file", + help="Content file", metavar="CONTENT_FILE") + parser.add_option("-C", "--color", dest="color", + help="Color code", metavar="COLOR", default="16763904") + parser.add_option("-D", "--duration", dest="duration", type="int", + help="Duration", metavar="DURATION", default=None) + parser.add_option("-n", "--name", dest="name", + help="Cutting name", metavar="NAME", default=u"pads") + parser.add_option("-R", "--replace", dest="replace", action="store_true", + help="Replace tweet ensemble", metavar="REPLACE", default=False) + parser.add_option("-m", "--merge", dest="merge", action="store_true", + help="merge tweet ensemble, choose the first ensemble", metavar="MERGE", default=False) + parser.add_option("-E", "--extended", dest="extended_mode", action="store_true", + help="Trigger polemic extended mode", metavar="EXTENDED", default=False) + parser.add_option("-S", "--step", dest="step", type=1, + help="step for version", metavar="STEP", default=False) + + + + set_logging_options(parser) + + + return parser.parse_args() + (parser,) + + +if __name__ == "__main__" : + + (options, args, parser) = get_options() + + set_logging(options) + get_logger().debug("OPTIONS : " + repr(options)) #@UndefinedVariable + + if len(sys.argv) == 1: + abort(None) + + base_url = options.get("api_url", None) + if not base_url: + abort("No base url") + + api_key = options.get("api_key", None) + if not api_key: + abort("api key missing") + + pad_id = options.get("pad_id", None) + if not pad_id: + abort("No pad id") + + start_date_str = options.get("start_date",None) + end_date_str = options.get("end_date", None) + duration = options.get("duration", None) + + start_date = None + start_ts = None + if start_date_str: + start_date = parse_date(start_date_str) + start_ts = time.mktime(start_date.timetuple())*1000 + + end_date = None + if end_date_str: + end_date = parse_date(end_date_str) + elif start_date and duration: + end_date = start_date + datetime.timedelta(seconds=duration) + + if start_date is None or ts is None: + abort("No start date found") + + end_ts = None + if end_date is not None: + end_ts = time.mktime(end_date.timetuple())*1000 + + content_file = options.get("content_file", None) + + if not content_file: + abort("No content file") + + root = None + + if content_file.find("http") == 0: + + get_logger().debug("url : " + content_file) #@UndefinedVariable + + h = httplib2.Http() + resp, content = h.request(content_file) + + get_logger().debug("url response " + repr(resp) + " content " + repr(content)) #@UndefinedVariable + + project = anyjson.deserialize(content) + root = etree.fromstring(project["ldt"]) + + elif os.path.exists(content_file): + + doc = etree.parse(content_file) + root = doc.getroot() + + if root is None: + abort("No content file, file not found") + + cutting_name = options.get("name", None) + if cutting_name is None: + cutting_name = "pad_%s" % pad_id + + format = options.get('format','html') + ensemble_parent = None + + file_type = None + for node in root: + if node.tag == "project": + file_type = "ldt" + break + elif node.tag == "head": + file_type = "iri" + break + if file_type is None: + abort("Unknown file type") + + if file_type == "ldt": + media_nodes = root.xpath("//media") + if len(media_nodes) > 0: + media = media_nodes[0] + annotations_node = root.find(u"annotations") + if annotations_node is None: + annotations_node = etree.SubElement(root, u"annotations") + content_node = annotations_node.find(u"content") + if content_node is None: + content_node = etree.SubElement(annotations_node,u"content", id=media.get(u"id")) + ensemble_parent = content_node + elif file_type == "iri": + body_node = root.find(u"body") + if body_node is None: + body_node = etree.SubElement(root, u"body") + ensembles_node = body_node.find(u"ensembles") + if ensembles_node is None: + ensembles_node = etree.SubElement(body_node, u"ensembles") + ensemble_parent = ensembles_node + + if ensemble_parent is None: + abort("Can not add cutting") + + if options.replace: + for ens in ensemble_parent.iterchildren(tag=u"ensemble"): + if ens.get("id","").startswith(cutting_name): + ensemble_parent.remove(ens) + + ensemble = None + elements = None + + if options.merge: + ensemble = ensemble_parent.find(u"ensemble") + if ensemble is not None: + elements = ensemble.find(u".//elements") + + if ensemble is None or elements is None: + ensemble = etree.SubElement(ensemble_parent, u"ensemble", {u"id":u"tweet_" + unicode(uuid.uuid4()), u"title":u"Ensemble pad", u"author":u"IRI Web", u"abstract":u"Ensemble Pad"}) + decoupage = etree.SubElement(ensemble, u"decoupage", {u"id": unicode(uuid.uuid4()), u"author": u"IRI Web"}) + + etree.SubElement(decoupage, u"title").text = unicode(cutting_name) + etree.SubElement(decoupage, u"abstract").text = unicode(cutting_name) + + elements = etree.SubElement(decoupage, u"elements") + + + etp_req = EtherpadRequest(base_url, api_key) + rev_count = et_req.getRevisionCount(pad_id) + + + version_range = range(1,rev_count+1, step) + #make sure that teh last version is exported + if rev_count not in version_range: + version_range.append(rev_count) + for rev in version_range: + + data = None + text = "" + + if format == "html": + data = etp_req.getHtml(padID=padID, rev=rev) + text = data.get("html", "") + else: + data = etp_req.getText(padID=padID, rev=rev) + text = data.get("text","") + + pad_ts = data['timestamp'] + + if pad_ts < start_ts: + continue + + if end_ts is not None and pad_ts > end_ts: + break + + pad_dt = datetime.fromtimestamp(float(pad_ts)/1000.0) + pad_ts_rel = pad_ts - start_ts + + username = None + color = "" + if 'author' in data: + username = data['author']['name'] if ('name' in data['author'] and data['author']['name']) else data['author']['id'] + color = data['author']['color'] if ('color' in data['author'] and data['author']['color']) else "" + + if not username: + username = "anon." + + + element = etree.SubElement(elements, u"element" , {u"id":"%s-%s-%d" %(unicode(uuid.uuid4()),unicode(pad_id),rev), u"color":unicode(color), u"author":unicode(username), u"date":unicode(pad_dt.strftime("%Y/%m/%d")), u"begin": unicode(pad_ts_rel), u"dur":u"0", u"src":""}) + etree.SubElement(element, u"title").text = "%s: %s - rev %d" % (unicode(username), unicode(pad_id), rev) + etree.SubElement(element, u"abstract").text = unicode(text) + + meta_element = etree.SubElement(element, u'meta') + etree.SubElement(meta_element, "pad_url").text = etree.CDATA(unicode(etp_req.getPadUrl(padID))) + etree.SubElement(meta_element, "revision").text = etree.CDATA(unicode(rev)) + + # sort by tc in + if options.merge : + elements[:] = sorted(elements,key=lambda n: int(n.get('begin'))) + + output_data = etree.tostring(root, encoding="utf-8", method="xml", pretty_print=False, xml_declaration=True) + + if content_file and content_file.find("http") == 0: + + project["ldt"] = output_data + body = anyjson.serialize(project) + h = httplib2.Http() + resp, content = h.request(content_file, "PUT", headers={'content-type':'application/json'}, body=body) + if resp.status != 200: + raise Exception("Error writing content : %d : %s"%(resp.status, resp.reason)) + else: + if content_file and os.path.exists(content_file): + dest_file_name = content_file + else: + dest_file_name = options.filename + + output = open(dest_file_name, "w") + output.write(output_data) + output.flush() + output.close() + + diff -r 51072e5e6ea9 -r 2ef837069108 script/utils/export_twitter_alchemy.py --- a/script/utils/export_twitter_alchemy.py Mon Oct 15 16:56:57 2012 +0200 +++ b/script/utils/export_twitter_alchemy.py Mon Oct 15 17:01:50 2012 +0200 @@ -295,7 +295,7 @@ username = None profile_url = "" if tw.user is not None: - username = tw.user.name + username = tw.user.screen_name profile_url = tw.user.profile_image_url if tw.user.profile_image_url is not None else "" if not username: username = "anon." diff -r 51072e5e6ea9 -r 2ef837069108 script/utils/merge_tweets.py --- a/script/utils/merge_tweets.py Mon Oct 15 16:56:57 2012 +0200 +++ b/script/utils/merge_tweets.py Mon Oct 15 17:01:50 2012 +0200 @@ -31,9 +31,7 @@ return parser.parse_args() if __name__ == "__main__": - - sys.stdout = codecs.getwriter(sys.stdout.encoding)(sys.stdout) - + options = get_option() access_token = None @@ -91,7 +89,8 @@ session_tgt.flush() - show_progress(i+1, count_tw, progress_text+tweet.text, 70) + ptext = progress_text + tweet.text + show_progress(i+1, count_tw, ptext.replace("\n",""), 70) session_tgt.commit() print u"%d new tweet added" % (added) diff -r 51072e5e6ea9 -r 2ef837069108 script/utils/search_topsy.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/script/utils/search_topsy.py Mon Oct 15 17:01:50 2012 +0200 @@ -0,0 +1,170 @@ +from iri_tweet import models, utils +from sqlalchemy.orm import sessionmaker +import anyjson +import sqlite3 +import twitter +import re +import requests +from optparse import OptionParser +import simplejson +import time +from blessings import Terminal +import sys +import math +from symbol import except_clause + +APPLICATION_NAME = "Tweet recorder user" +CONSUMER_KEY = "Vdr5ZcsjI1G3esTPI8yDg" +CONSUMER_SECRET = "LMhNrY99R6a7E0YbZZkRFpUZpX5EfB1qATbDk1sIVLs" + + +class TopsyResource(object): + + def __init__(self, query, **kwargs): + + self.options = kwargs + self.options['q'] = query + self.url = kwargs.get("url", "http://otter.topsy.com/search.json") + self.page = 0 + self.req = None + self.res = {} + + def __initialize(self): + + params = {} + params.update(self.options) + self.req = requests.get(self.url, params=params) + self.res = self.req.json + + def __next_page(self): + page = self.res.get("response").get("page") + 1 + params = {} + params.update(self.options) + params['page'] = page + self.req = requests.get(self.url, params=params) + self.res = self.req.json + + def __iter__(self): + if not self.req: + self.__initialize() + while "response" in self.res and "list" in self.res.get("response") and self.res.get("response").get("list"): + for item in self.res.get("response").get("list"): + yield item + self.__next_page() + + def total(self): + if not self.res: + return 0 + else: + return self.res.get("response",{}).get("total",0) + + + +def get_option(): + + parser = OptionParser() + + parser.add_option("-d", "--database", dest="database", + help="Input database", metavar="DATABASE") + parser.add_option("-Q", dest="query", + help="query", metavar="QUERY") + parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token", + help="Token file name") + parser.add_option("-T", dest="topsy_apikey", metavar="TOPSY_APIKEY", default=None, + help="Topsy apikey") + + utils.set_logging_options(parser) + + return parser.parse_args() + + + +if __name__ == "__main__": + + (options, args) = get_option() + + utils.set_logging(options); + + + acess_token_key, access_token_secret = utils.get_oauth_token(options.token_filename, application_name=APPLICATION_NAME, consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET) + + t = twitter.Twitter(domain="api.twitter.com", auth=twitter.OAuth(acess_token_key, access_token_secret, CONSUMER_KEY, CONSUMER_SECRET), secure=True) + t.secure = True + + conn_str = options.database.strip() + if not re.match("^\w+://.+", conn_str): + conn_str = 'sqlite:///' + conn_str + + engine, metadata, Session = models.setup_database(conn_str, echo=((options.verbose-options.quiet)>0), create_all=True) + session = None + + + topsy_parameters = { + 'apikey': options.topsy_apikey, + 'perpage': 100, + 'window': 'a', + 'type': 'tweet', + 'hidden': True, + } + + term = Terminal() + + try: + session = Session() + + results = None + page = 1 + print options.query + + tr = TopsyResource(options.query, **topsy_parameters) + + move_up = 0 + + for i,item in enumerate(tr): + # get id + url = item.get("url") + tweet_id = url.split("/")[-1] + + if move_up > 0: + print((move_up+1)*term.move_up()) + move_up = 0 + + print ("%d/%d:%03d%% - %s - %r" % (i+1, tr.total(), int(float(i+1)/float(tr.total())*100.0), tweet_id, item.get("content") ) + term.clear_eol()) + move_up += 1 + + count_tweet = session.query(models.Tweet).filter_by(id_str=tweet_id).count() + + if count_tweet: + continue + try: + tweet = t.statuses.show(id=tweet_id, include_entities=True) + except twitter.api.TwitterHTTPError as e: + if e.e.code == 404 or e.e.code == 403: + continue + else: + raise + + processor = utils.TwitterProcessor(tweet, None, None, session, None, options.token_filename) + processor.process() + session.flush() + session.commit() + + time_to_sleep = int(math.ceil((tweet.rate_limit_reset - time.mktime(time.gmtime())) / tweet.rate_limit_remaining)) + + print "rate limit remaining %s of %s" % (str(tweet.rate_limit_remaining), str(tweet.headers.getheader('x-ratelimit-limit'))) + term.clear_eol() + move_up += 1 + for i in xrange(time_to_sleep): + if i: + print(2*term.move_up()) + else: + move_up += 1 + print(("Sleeping for %d seconds, %d remaining" % (time_to_sleep, time_to_sleep-i)) + term.clear_eol()) + time.sleep(1) + + except twitter.api.TwitterHTTPError as e: + fmt = ("." + e.format) if e.format else "" + print "Twitter sent status %s for URL: %s%s using parameters: (%s)\ndetails: %s" % (repr(e.e.code), repr(e.uri), repr(fmt), repr(e.uriparts), repr(e.response_data)) + + finally: + if session: + session.close() diff -r 51072e5e6ea9 -r 2ef837069108 script/utils/tweet_twitter_user.py --- a/script/utils/tweet_twitter_user.py Mon Oct 15 16:56:57 2012 +0200 +++ b/script/utils/tweet_twitter_user.py Mon Oct 15 17:01:50 2012 +0200 @@ -38,6 +38,7 @@ parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token", help="Token file name") parser.add_option("-S", dest="simulate", metavar="SIMULATE", default=False, action="store_true", help="Simulate call to twitter. Do not change the database") + parser.add_option("--direct-message", dest="direct_message", metavar="DIRECT_MESSAGE", default=False, action="store_true", help="send direc t message to the user, else create a status update mentioning the user (@username)") parser.add_option("-f", dest="force", metavar="FORCE", default=False, action="store_true", help="force sending message to all user even if it has already been sent") @@ -109,10 +110,19 @@ for user in query_res: screen_name = user.screen_name - message = u"@%s: %s" % (screen_name, base_message) - get_logger().debug("new status : " + message) #@UndefinedVariable + if options.direct_message: + message = base_message + else: + message = u"@%s: %s" % (screen_name, base_message) + + print("new message : " + message) + get_logger().debug("new message : " + message) #@UndefinedVariable + if not options.simulate: - t.statuses.update(status=message) + if options.direct_message: + t.direct_messages.new(user_id=user.id, screen_name=screen_name, text=message) + else: + t.statuses.update(status=message) user_message = UserMessage(user_id=user.id, message_id=message_obj.id) session.add(user_message) session.flush() diff -r 51072e5e6ea9 -r 2ef837069108 script/virtualenv/res/SQLAlchemy-0.7.3.tar.gz Binary file script/virtualenv/res/SQLAlchemy-0.7.3.tar.gz has changed diff -r 51072e5e6ea9 -r 2ef837069108 script/virtualenv/res/SQLAlchemy-0.7.8.tar.gz Binary file script/virtualenv/res/SQLAlchemy-0.7.8.tar.gz has changed diff -r 51072e5e6ea9 -r 2ef837069108 script/virtualenv/res/blessings-1.5.tar.gz Binary file script/virtualenv/res/blessings-1.5.tar.gz has changed diff -r 51072e5e6ea9 -r 2ef837069108 script/virtualenv/res/requests-0.13.1.tar.gz Binary file script/virtualenv/res/requests-0.13.1.tar.gz has changed diff -r 51072e5e6ea9 -r 2ef837069108 script/virtualenv/res/twitter-1.4.2.tar.gz Binary file script/virtualenv/res/twitter-1.4.2.tar.gz has changed diff -r 51072e5e6ea9 -r 2ef837069108 script/virtualenv/res/twitter-1.9.0.tar.gz Binary file script/virtualenv/res/twitter-1.9.0.tar.gz has changed