# HG changeset patch # User Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com> # Date 1311717895 -7200 # Node ID cdd7d3c0549ce8132f0911e10b7d3b4215a7be18 # Parent ee6305b4a7dc4d805a6c4723703804c0ae6b165e Starting 'parallel_twitter' branch diff -r ee6305b4a7dc -r cdd7d3c0549c script/lib/iri_tweet/export_tweet_db.py --- a/script/lib/iri_tweet/export_tweet_db.py Tue Jul 26 23:57:09 2011 +0200 +++ b/script/lib/iri_tweet/export_tweet_db.py Wed Jul 27 00:04:55 2011 +0200 @@ -35,7 +35,7 @@ fields_mapping = {} for i,res in enumerate(curs_in.execute("select json from tweet_tweet;")): logging.debug("main loop %d : %s" % (i, res[0])) #@UndefinedVariable - processor = TwitterProcessor(eval(res[0]), res[0], session, options.token_filename) + processor = TwitterProcessor(eval(res[0]), res[0], None, session, options.token_filename) processor.process() session.commit() logging.debug("main : %d tweet processed" % (i+1)) #@UndefinedVariable diff -r ee6305b4a7dc -r cdd7d3c0549c script/lib/iri_tweet/models.py --- a/script/lib/iri_tweet/models.py Tue Jul 26 23:57:09 2011 +0200 +++ b/script/lib/iri_tweet/models.py Wed Jul 27 00:04:55 2011 +0200 @@ -42,7 +42,34 @@ if hasattr(self,key): setattr(self,key,value) +class TweetSource(Base): + __tablename__ = 'tweet_tweet_source' + id = Column(Integer, primary_key = True, autoincrement=True) + original_json = Column(String) + received_at = Column(DateTime, default=datetime.datetime.now()) + + def __init__(self, **kwargs): + for key, value in kwargs.items(): + if hasattr(self,key): + setattr(self,key,value) + +class TweetLog(Base): + + TWEET_STATUS = { + 'OK' : 1, + 'ERROR' : 2, + } + + __tablename__ = 'tweet_tweet_log' + id = Column(Integer, primary_key = True, autoincrement=True) + tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id')) + tweet_source = relationship("TweetSource", backref="logs") + status = Column(Integer) + error = Column(String) + error_stack = Column(String) + + class Tweet(Base): __tablename__ = 'tweet_tweet' @@ -65,7 +92,10 @@ text = Column(String) truncated = Column(Boolean) user_id = Column(Integer, ForeignKey('tweet_user.id')) - original_json = Column(String) + user = relationship("TweetUser", backref="tweets") +# original_json = Column(String) + tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id')) + tweet_source = relationship("TweetSource", backref="tweet") entity_list = relationship(Entity, backref='tweet') received_at = Column(DateTime, default=datetime.datetime.now()) @@ -81,11 +111,11 @@ id = Column(Integer, primary_key = True) user_id = Column(Integer, ForeignKey('tweet_user.id')) + user = relationship("TweetUser", backref="messages") created_at = Column(DateTime, default=datetime.datetime.now()) message_id = Column(Integer, ForeignKey('tweet_message.id')) class Message(Base): - __tablename__ = "tweet_message" id = Column(Integer, primary_key = True) diff -r ee6305b4a7dc -r cdd7d3c0549c script/lib/iri_tweet/utils.py --- a/script/lib/iri_tweet/utils.py Tue Jul 26 23:57:09 2011 +0200 +++ b/script/lib/iri_tweet/utils.py Wed Jul 27 00:04:55 2011 +0200 @@ -1,6 +1,6 @@ -from models import Tweet, User, Hashtag, EntityHashtag, EntityUser, Url, \ - EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY, \ - ACCESS_TOKEN_SECRET, adapt_date, adapt_json +from models import (Tweet, User, Hashtag, EntityHashtag, EntityUser, Url, + EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY, + ACCESS_TOKEN_SECRET, adapt_date, adapt_json, TweetSource, TweetLog) from sqlalchemy.sql import select, or_ #@UnresolvedImport import anyjson #@UnresolvedImport import datetime @@ -83,7 +83,7 @@ class TwitterProcessor(object): - def __init__(self, json_dict, json_txt, session, token_filename=None): + def __init__(self, json_dict, json_txt, source_id, session, token_filename=None): if json_dict is None and json_txt is None: raise TwitterProcessorException("No json") @@ -101,6 +101,7 @@ if "id" not in self.json_dict: raise TwitterProcessorException("No id in json") + self.source_id = source_id self.session = session self.token_filename = token_filename @@ -225,7 +226,8 @@ else: ts_copy["user"] = user ts_copy["user_id"] = ts_copy["user"].id - ts_copy["original_json"] = self.json_txt + + ts_copy["tweet_source_id"] = self.source_id self.tweet = Tweet(**ts_copy) self.session.add(self.tweet) @@ -241,7 +243,8 @@ tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count() if tweet_nb > 0: return - + + tweet_fields = { 'created_at': self.json_dict["created_at"], 'favorited': False, @@ -253,8 +256,8 @@ #'place': ts["place"], 'source': self.json_dict["source"], 'text': self.json_dict["text"], - 'truncated': False, - 'original_json' : self.json_txt, + 'truncated': False, + 'tweet_source_id' : self.source_id, } #user @@ -295,10 +298,23 @@ def process(self): - if "metadata" in self.json_dict: - self.__process_twitter_rest() - else: - self.__process_twitter_stream() + + if self.source_id is None: + tweet_source = TweetSource(original_json=self.json_txt); + self.session.add(tweet_source) + self.session.flush() + self.source_id = tweet_source.id + + try: + if "metadata" in self.json_dict: + self.__process_twitter_rest() + else: + self.__process_twitter_stream() + + tweet_log = TweetLog(tweet_source_id = self.source_id, status = TweetLog.TWEET_STATUS['OK']) + except: + + raise def set_logging(options): diff -r ee6305b4a7dc -r cdd7d3c0549c script/lib/tweetstream/tweetstream/streamclasses.py --- a/script/lib/tweetstream/tweetstream/streamclasses.py Tue Jul 26 23:57:09 2011 +0200 +++ b/script/lib/tweetstream/tweetstream/streamclasses.py Wed Jul 27 00:04:55 2011 +0200 @@ -54,7 +54,7 @@ :attr: `USER_AGENT`. """ - def __init__(self, auth, catchup=None, url=None): + def __init__(self, auth, catchup=None, url=None, as_text=False): self._conn = None self._rate_ts = None self._rate_cnt = 0 @@ -68,6 +68,7 @@ self.rate = 0 self.user_agent = USER_AGENT if url: self.url = url + self._as_text = as_text self.muststop = False @@ -119,12 +120,18 @@ this method and return post data. The data should be in the format returned by urllib.urlencode.""" return None + + def __muststop(self): + if callable(self.muststop): + return self.muststop() + else: + return self.muststop def next(self): """Return the next available tweet. This call is blocking!""" while True: try: - if self.muststop: + if self.__muststop(): raise StopIteration() if not self.connected: @@ -143,10 +150,15 @@ elif data.isspace(): continue - data = anyjson.deserialize(data) - if 'text' in data: + if not self._as_text: + data = anyjson.deserialize(data) + if 'text' in data: + self.count += 1 + self._rate_cnt += 1 + else: # count and rate may be off, but we count everything self.count += 1 self._rate_cnt += 1 + return data except ValueError, e: @@ -175,12 +187,12 @@ url = "http://stream.twitter.com/1/statuses/filter.json" def __init__(self, auth, follow=None, locations=None, - track=None, catchup=None, url=None): + track=None, catchup=None, url=None, as_text=False): self._follow = follow self._locations = locations self._track = track # remove follow, locations, track - BaseStream.__init__(self, auth, url=url) + BaseStream.__init__(self, auth, url=url, as_text=as_text) def _get_post_data(self): postdata = {} diff -r ee6305b4a7dc -r cdd7d3c0549c script/rest/search_twitter.py --- a/script/rest/search_twitter.py Tue Jul 26 23:57:09 2011 +0200 +++ b/script/rest/search_twitter.py Wed Jul 27 00:04:55 2011 +0200 @@ -49,12 +49,12 @@ page = 1 while page <= int(1500/int(options.rpp)) and ( results is None or len(results) > 0): - results = twitter. search(q=options.query, rpp=options.rpp, page=page) + results = twitter.search(q=options.query, rpp=options.rpp, page=page) for tweet in results["results"]: print tweet tweet_str = anyjson.serialize(tweet) #invalidate user id - processor = utils.TwitterProcessor(tweet, tweet_str, session, options.token_filename) + processor = utils.TwitterProcessor(tweet, tweet_str, None, session, options.token_filename) processor.process() session.flush() session.commit() diff -r ee6305b4a7dc -r cdd7d3c0549c script/stream/recorder_tweetstream.py --- a/script/stream/recorder_tweetstream.py Tue Jul 26 23:57:09 2011 +0200 +++ b/script/stream/recorder_tweetstream.py Wed Jul 27 00:04:55 2011 +0200 @@ -1,16 +1,23 @@ from getpass import getpass from iri_tweet import models, utils +from iri_tweet.models import TweetSource, TweetLog +from multiprocessing import Queue, JoinableQueue, Process, Event from optparse import OptionParser from sqlalchemy.orm import sessionmaker from sqlite3 import * +import StringIO +import anyjson import datetime import logging import os +import shutil +import signal import socket import sys import time +import traceback +import tweepy.auth import tweetstream -import tweepy.auth socket._fileobject.default_bufsize = 0 @@ -44,12 +51,12 @@ """ - def __init__(self, auth, keywords, reconnects=3, error_cb=None, retry_wait=5, **kwargs): + def __init__(self, auth, keywords, reconnects=3, error_cb=None, retry_wait=5, as_text=False, **kwargs): self.max_reconnects = reconnects self.retry_wait = retry_wait self._reconnects = 0 self._error_cb = error_cb - super(ReconnectingTweetStream, self).__init__(auth=auth, track=keywords, **kwargs) + super(ReconnectingTweetStream, self).__init__(auth=auth, track=keywords, as_text=as_text, **kwargs) def next(self): while True: @@ -72,45 +79,134 @@ -def process_tweet(tweet, session, debug, token_filename): - screen_name = "" - if 'user' in tweet and 'screen_name' in tweet['user']: - screen_name = tweet['user']['screen_name'] - logging.info("Process_tweet from %s : %s" % (screen_name, tweet['text'])) - logging.debug("Process_tweet :" + repr(tweet)) - processor = utils.TwitterProcessor(tweet, None, session, token_filename) - processor.process() +class SourceProcess(Process): + + def __init__(self, session_maker, queue, auth, track, debug, reconnects, token_filename, stop_event): + super(SourceProcess, self).__init__() + self.session_maker = session_maker + self.queue = queue + self.auth = auth + self.track = track + self.debug = debug + self.reconnects = reconnects + self.token_filename = token_filename + self.stop_event = stop_event +# self.stop_event = + + def run(self): + + track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip() + track_list = [k for k in track_list.split(',')] + + stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True) + stream.muststop = lambda: self.stop_event.is_set() + + session = self.session_maker() + + try: + for tweet in stream: + source = TweetSource(original_json=tweet) + session.add(source) + session.flush() + source_id = source.id + queue.put((source_id, tweet), False) + #process_tweet(tweet, source_id, self.session, self.debug, self.token_filename) + logging.info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime))) + session.commit() +# if duration >= 0 and datetime.datetime.utcnow() >= end_ts: +# print "Stop recording after %d seconds." % (duration) +# break + except: + session.rollback() + finally: + stream.close() + session.close() + + +class TweetProcess(Process): + + def __init__(self, session_maker, queue, debug, token_filename, stop_event): + super(TweetProcess, self).__init__() + self.session_maker = session_maker + self.queue = queue + self.debug = debug + self.stop_event = stop_event + self.token_filename = token_filename -def main(username, password, track, session, debug, reconnects, token_filename, duration): + def run(self): + + session = self.session_maker() + try: + while not self.stop_event.is_set(): + try: + source_id, tweet_txt = queue.get(True, 30) + except: + continue + process_tweet(tweet_txt, source_id, session) + session.commit() + self.queue.task_done() + except: + session.rollback() + raise + finally: + session.close() + + + def process_tweet(tweet, source_id, session): + + try: + tweet_obj = anyjson.deserialize(tweet) + screen_name = "" + if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']: + screen_name = tweet_obj['user']['screen_name'] + logging.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text'])) + logging.debug(u"Process_tweet :" + repr(tweet)) + processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, self.token_filename) + processor.process() + except Exception, e: + message = u"Error %e processing tweet %s" % (unicode(e), tweet) + logging.error(message) + output = StringIO.StringIO() + traceback.print_exception(Exception, e, None, None, output) + tweet_log = TweetLog(status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=output.getvalue()) + output.close() + + + +#def main_source(username, password, track, session, debug, reconnects, token_filename, duration): #username = username or raw_input('Twitter username: ') #password = password or getpass('Twitter password: ') - track_list = track or raw_input('Keywords to track (comma seperated): ').strip() - track_list = [k for k in track_list.split(',')] +# track_list = track or raw_input('Keywords to track (comma seperated): ').strip() +# track_list = [k for k in track_list.split(',')] - if username and password: - auth = tweepy.auth.BasicAuthHandler(username, password) - else: - consumer_key = models.CONSUMER_KEY - consumer_secret = models.CONSUMER_SECRET - auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False) - auth.set_access_token(*(utils.get_oauth_token(token_filename))) +# if username and password: +# auth = tweepy.auth.BasicAuthHandler(username, password) +# else: +# consumer_key = models.CONSUMER_KEY +# consumer_secret = models.CONSUMER_SECRET +# auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False) +# auth.set_access_token(*(utils.get_oauth_token(token_filename))) + +# if duration >= 0: +# end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=duration) - if duration >= 0: - end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=duration) - - stream = ReconnectingTweetStream(auth, track_list, reconnects=reconnects) - try: - for tweet in stream: - if duration >= 0 and datetime.datetime.utcnow() >= end_ts: - print "Stop recording after %d seconds." % (duration) - break - process_tweet(tweet, session, debug, token_filename) - logging.info("Tweet count: %d - current rate : %.2f - running : %s"%(stream.count, stream.rate, int(time.time()-stream.starttime))) - session.commit() - finally: - stream.close() +# stream = ReconnectingTweetStream(auth, track_list, reconnects=reconnects, as_text=True) +# try: +# for tweet in stream: +# source = TweetSource(original_json=tweet) +# session.add(source) +# session.flush() +# source_id = source.id +# process_tweet(tweet, source_id, session, debug, token_filename) +# logging.info("Tweet count: %d - current rate : %.2f - running : %s"%(stream.count, stream.rate, int(time.time()-stream.starttime))) +# session.commit() +# if duration >= 0 and datetime.datetime.utcnow() >= end_ts: +# print "Stop recording after %d seconds." % (duration) +# break +# finally: +# stream.close() def get_options(): parser = OptionParser() @@ -130,6 +226,9 @@ help="Token file name") parser.add_option("-d", "--duration", dest="duration", help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int') + parser.add_option("-N", "--consumer", dest="consumer_nb", + help="number of consumer", metavar="CONSUMER", default=1, type='int') + utils.set_logging_options(parser) @@ -139,7 +238,6 @@ if __name__ == '__main__': - (options, args) = get_options() utils.set_logging(options) @@ -149,17 +247,62 @@ print repr(options) if options.new and os.path.exists(options.filename): - os.remove(options.filename) + i = 1 + basename, extension = os.path.splitext(options.filename) + new_path = '$s.%d.%s' % (basename, i, extension) + while i < 1000000 and os.path.exists(new_path): + i += 1 + new_path = '$s.%d.%s' % (basename, i, extension) + if i >= 1000000: + raise Exception("Unable to find new filename for " + options.filename) + else: + shutil.move(options.filename, new_path) + - engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug>=2)) + engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2)) Session = sessionmaker(bind=engine) - session = Session() + queue = JoinableQueue() + stop_event = Event() + + if options.username and options.password: + auth = tweepy.auth.BasicAuthHandler(options.username, options.password) + else: + consumer_key = models.CONSUMER_KEY + consumer_secret = models.CONSUMER_SECRET + auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False) + auth.set_access_token(*(utils.get_oauth_token(options.token_filename))) - try: - try: - main(options.username, options.password, options.track, session, options.debug, options.reconnects, options.token_filename, options.duration) - except KeyboardInterrupt: - print '\nGoodbye!' - session.commit() - finally: - session.close() + + sprocess = SourceProcess(Session, queue, auth, options.track, options.debug, options.reconnects, options.token_filename, stop_event) + + tweet_processes = [] + + for i in range(options.consumer_nb): + cprocess = TweetProcess(Session, queue, options.debug, options.token_filename, stop_event) + tweet_processes.append(cprocess) + + def interupt_handler(signum, frame): + stop_event.set() + + signal.signal(signal.SIGINT, interupt_handler) + + sprocess.start() + for cprocess in tweet_processes: + cprocess.start() + + if options.duration >= 0: + end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration) + + while not stop_event.is_set(): + if options.duration >= 0 and datetime.datetime.utcnow() >= end_ts: + stop_event.set() + break + if sprocess.is_alive(): + time.sleep(0.1) + else: + break + + sprocess.join() + queue.join() + for cprocess in tweet_processes: + cprocess.join()