Starting 'parallel_twitter' branch
authorYves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
Wed, 27 Jul 2011 00:04:55 +0200
changeset 242 cdd7d3c0549c
parent 240 ee6305b4a7dc
child 243 9213a63fa34a
Starting 'parallel_twitter' branch
script/lib/iri_tweet/export_tweet_db.py
script/lib/iri_tweet/models.py
script/lib/iri_tweet/utils.py
script/lib/tweetstream/tweetstream/streamclasses.py
script/rest/search_twitter.py
script/stream/recorder_tweetstream.py
--- a/script/lib/iri_tweet/export_tweet_db.py	Tue Jul 26 23:57:09 2011 +0200
+++ b/script/lib/iri_tweet/export_tweet_db.py	Wed Jul 27 00:04:55 2011 +0200
@@ -35,7 +35,7 @@
             fields_mapping = {}
             for i,res in enumerate(curs_in.execute("select json from tweet_tweet;")):
                 logging.debug("main loop %d : %s" % (i, res[0])) #@UndefinedVariable
-                processor = TwitterProcessor(eval(res[0]), res[0], session, options.token_filename)
+                processor = TwitterProcessor(eval(res[0]), res[0], None, session, options.token_filename)
                 processor.process()
                 session.commit()
             logging.debug("main : %d tweet processed" % (i+1)) #@UndefinedVariable
--- a/script/lib/iri_tweet/models.py	Tue Jul 26 23:57:09 2011 +0200
+++ b/script/lib/iri_tweet/models.py	Wed Jul 27 00:04:55 2011 +0200
@@ -42,7 +42,34 @@
             if hasattr(self,key):
                 setattr(self,key,value)
 
+class TweetSource(Base):
+    __tablename__ = 'tweet_tweet_source'
+    id = Column(Integer, primary_key = True, autoincrement=True)
+    original_json = Column(String)
+    received_at = Column(DateTime, default=datetime.datetime.now())
+    
+    def __init__(self, **kwargs):
+        for key, value in kwargs.items():
+            if hasattr(self,key):
+                setattr(self,key,value)
 
+
+class TweetLog(Base):
+    
+    TWEET_STATUS = {
+        'OK' : 1,
+        'ERROR' : 2,
+    }
+    
+    __tablename__ = 'tweet_tweet_log'
+    id = Column(Integer, primary_key = True, autoincrement=True)
+    tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id'))
+    tweet_source = relationship("TweetSource", backref="logs")
+    status = Column(Integer)
+    error = Column(String)
+    error_stack = Column(String)
+ 
+    
 class Tweet(Base):
     __tablename__ = 'tweet_tweet'
 
@@ -65,7 +92,10 @@
     text = Column(String)
     truncated = Column(Boolean)
     user_id = Column(Integer, ForeignKey('tweet_user.id'))
-    original_json = Column(String)
+    user = relationship("TweetUser", backref="tweets")
+#    original_json = Column(String)
+    tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id'))
+    tweet_source = relationship("TweetSource", backref="tweet")
     entity_list = relationship(Entity, backref='tweet')
     received_at = Column(DateTime, default=datetime.datetime.now())
     
@@ -81,11 +111,11 @@
 
     id = Column(Integer, primary_key = True)
     user_id = Column(Integer, ForeignKey('tweet_user.id'))
+    user = relationship("TweetUser", backref="messages")
     created_at = Column(DateTime, default=datetime.datetime.now())
     message_id = Column(Integer, ForeignKey('tweet_message.id'))
 
 class Message(Base):
-    
     __tablename__ = "tweet_message"
     
     id = Column(Integer, primary_key = True)
--- a/script/lib/iri_tweet/utils.py	Tue Jul 26 23:57:09 2011 +0200
+++ b/script/lib/iri_tweet/utils.py	Wed Jul 27 00:04:55 2011 +0200
@@ -1,6 +1,6 @@
-from models import Tweet, User, Hashtag, EntityHashtag, EntityUser, Url, \
-    EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY, \
-    ACCESS_TOKEN_SECRET, adapt_date, adapt_json
+from models import (Tweet, User, Hashtag, EntityHashtag, EntityUser, Url, 
+    EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY, 
+    ACCESS_TOKEN_SECRET, adapt_date, adapt_json, TweetSource, TweetLog)
 from sqlalchemy.sql import select, or_ #@UnresolvedImport
 import anyjson #@UnresolvedImport
 import datetime
@@ -83,7 +83,7 @@
 
 class TwitterProcessor(object):
     
-    def __init__(self, json_dict, json_txt, session, token_filename=None):
+    def __init__(self, json_dict, json_txt, source_id, session, token_filename=None):
 
         if json_dict is None and json_txt is None:
             raise TwitterProcessorException("No json")
@@ -101,6 +101,7 @@
         if "id" not in self.json_dict:
             raise TwitterProcessorException("No id in json")
         
+        self.source_id = source_id
         self.session = session
         self.token_filename = token_filename
 
@@ -225,7 +226,8 @@
         else:
             ts_copy["user"] = user
             ts_copy["user_id"] = ts_copy["user"].id
-        ts_copy["original_json"] = self.json_txt
+            
+        ts_copy["tweet_source_id"] = self.source_id
         
         self.tweet = Tweet(**ts_copy)
         self.session.add(self.tweet)
@@ -241,7 +243,8 @@
         tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
         if tweet_nb > 0:
             return
-            
+        
+        
         tweet_fields = {
             'created_at': self.json_dict["created_at"], 
             'favorited': False,
@@ -253,8 +256,8 @@
             #'place': ts["place"],
             'source': self.json_dict["source"],
             'text': self.json_dict["text"],
-            'truncated': False,
-            'original_json' : self.json_txt,
+            'truncated': False,            
+            'tweet_source_id' : self.source_id,
         }
         
         #user
@@ -295,10 +298,23 @@
 
 
     def process(self):
-        if "metadata" in self.json_dict:
-            self.__process_twitter_rest()
-        else:
-            self.__process_twitter_stream()
+        
+        if self.source_id is None:
+            tweet_source = TweetSource(original_json=self.json_txt);
+            self.session.add(tweet_source)
+            self.session.flush()
+            self.source_id = tweet_source.id
+        
+        try:
+            if "metadata" in self.json_dict:
+                self.__process_twitter_rest()
+            else:
+                self.__process_twitter_stream()
+                
+            tweet_log = TweetLog(tweet_source_id = self.source_id, status = TweetLog.TWEET_STATUS['OK'])
+        except:
+            
+            raise
         
 
 def set_logging(options):
--- a/script/lib/tweetstream/tweetstream/streamclasses.py	Tue Jul 26 23:57:09 2011 +0200
+++ b/script/lib/tweetstream/tweetstream/streamclasses.py	Wed Jul 27 00:04:55 2011 +0200
@@ -54,7 +54,7 @@
         :attr: `USER_AGENT`.
     """
 
-    def __init__(self, auth, catchup=None, url=None):
+    def __init__(self, auth, catchup=None, url=None, as_text=False):
         self._conn = None
         self._rate_ts = None
         self._rate_cnt = 0
@@ -68,6 +68,7 @@
         self.rate = 0
         self.user_agent = USER_AGENT
         if url: self.url = url
+        self._as_text = as_text
         
         self.muststop = False
 
@@ -119,12 +120,18 @@
         this method and return post data. The data should be in the format
         returned by urllib.urlencode."""
         return None
+    
+    def __muststop(self):
+        if callable(self.muststop):
+            return self.muststop()
+        else:
+            return self.muststop
 
     def next(self):
         """Return the next available tweet. This call is blocking!"""
         while True:
             try:
-                if self.muststop:
+                if self.__muststop():
                     raise StopIteration()
                 
                 if not self.connected:
@@ -143,10 +150,15 @@
                 elif data.isspace():
                     continue
 
-                data = anyjson.deserialize(data)
-                if 'text' in data:
+                if not self._as_text: 
+                    data = anyjson.deserialize(data)
+                    if 'text' in data:
+                        self.count += 1
+                        self._rate_cnt += 1
+                else: # count and rate may be off, but we count everything
                     self.count += 1
                     self._rate_cnt += 1
+                    
                 return data
 
             except ValueError, e:
@@ -175,12 +187,12 @@
     url = "http://stream.twitter.com/1/statuses/filter.json"
 
     def __init__(self, auth, follow=None, locations=None,
-                 track=None, catchup=None, url=None):
+                 track=None, catchup=None, url=None, as_text=False):
         self._follow = follow
         self._locations = locations
         self._track = track
         # remove follow, locations, track
-        BaseStream.__init__(self, auth, url=url)
+        BaseStream.__init__(self, auth, url=url, as_text=as_text)
 
     def _get_post_data(self):
         postdata = {}
--- a/script/rest/search_twitter.py	Tue Jul 26 23:57:09 2011 +0200
+++ b/script/rest/search_twitter.py	Wed Jul 27 00:04:55 2011 +0200
@@ -49,12 +49,12 @@
         page = 1
         
         while page <= int(1500/int(options.rpp)) and  ( results is None  or len(results) > 0):
-            results = twitter. search(q=options.query, rpp=options.rpp, page=page)
+            results = twitter.search(q=options.query, rpp=options.rpp, page=page)
             for tweet in results["results"]:
                 print tweet
                 tweet_str = anyjson.serialize(tweet)
                 #invalidate user id
-                processor = utils.TwitterProcessor(tweet, tweet_str, session, options.token_filename)
+                processor = utils.TwitterProcessor(tweet, tweet_str, None, session, options.token_filename)
                 processor.process()
                 session.flush()
                 session.commit()
--- a/script/stream/recorder_tweetstream.py	Tue Jul 26 23:57:09 2011 +0200
+++ b/script/stream/recorder_tweetstream.py	Wed Jul 27 00:04:55 2011 +0200
@@ -1,16 +1,23 @@
 from getpass import getpass
 from iri_tweet import models, utils
+from iri_tweet.models import TweetSource, TweetLog
+from multiprocessing import Queue, JoinableQueue, Process, Event
 from optparse import OptionParser
 from sqlalchemy.orm import sessionmaker
 from sqlite3 import *
+import StringIO
+import anyjson
 import datetime
 import logging
 import os
+import shutil
+import signal
 import socket
 import sys
 import time
+import traceback
+import tweepy.auth
 import tweetstream
-import tweepy.auth
 socket._fileobject.default_bufsize = 0
 
 
@@ -44,12 +51,12 @@
 
     """
 
-    def __init__(self, auth, keywords, reconnects=3, error_cb=None, retry_wait=5, **kwargs):
+    def __init__(self, auth, keywords, reconnects=3, error_cb=None, retry_wait=5, as_text=False, **kwargs):
         self.max_reconnects = reconnects
         self.retry_wait = retry_wait
         self._reconnects = 0
         self._error_cb = error_cb
-        super(ReconnectingTweetStream, self).__init__(auth=auth, track=keywords, **kwargs)
+        super(ReconnectingTweetStream, self).__init__(auth=auth, track=keywords, as_text=as_text, **kwargs)
 
     def next(self):
         while True:
@@ -72,45 +79,134 @@
 
 
 
-def process_tweet(tweet, session, debug, token_filename):
-    screen_name = ""
-    if 'user' in tweet and 'screen_name' in tweet['user']:
-        screen_name = tweet['user']['screen_name']
-    logging.info("Process_tweet from %s : %s" % (screen_name, tweet['text']))
-    logging.debug("Process_tweet :" + repr(tweet))
-    processor = utils.TwitterProcessor(tweet, None, session, token_filename)
-    processor.process()
+class SourceProcess(Process):
+    
+    def __init__(self, session_maker, queue, auth, track, debug, reconnects, token_filename, stop_event):
+        super(SourceProcess, self).__init__()
+        self.session_maker = session_maker
+        self.queue = queue
+        self.auth = auth
+        self.track = track
+        self.debug = debug
+        self.reconnects = reconnects
+        self.token_filename = token_filename
+        self.stop_event = stop_event
+#        self.stop_event = 
+    
+    def run(self):
+        
+        track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
+        track_list = [k for k in track_list.split(',')]
+                        
+        stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True)
+        stream.muststop = lambda: self.stop_event.is_set()
+        
+        session = self.session_maker()
+        
+        try:
+            for tweet in stream:
+                source = TweetSource(original_json=tweet)
+                session.add(source)
+                session.flush()
+                source_id = source.id
+                queue.put((source_id, tweet), False)
+                #process_tweet(tweet, source_id, self.session, self.debug, self.token_filename)
+                logging.info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
+                session.commit()
+#                if duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
+#                    print "Stop recording after %d seconds." % (duration)
+#                    break
+        except:
+            session.rollback()
+        finally:
+            stream.close()
+            session.close()
+            
+        
+class TweetProcess(Process):
+    
+    def __init__(self, session_maker, queue, debug, token_filename, stop_event):
+        super(TweetProcess, self).__init__()
+        self.session_maker = session_maker
+        self.queue = queue
+        self.debug = debug
+        self.stop_event = stop_event
+        self.token_filename = token_filename
 
-def main(username, password, track, session, debug, reconnects, token_filename, duration):
+    def run(self):
+        
+        session = self.session_maker()
+        try:
+            while not self.stop_event.is_set():
+                try:
+                    source_id, tweet_txt = queue.get(True, 30)
+                except:
+                    continue
+                process_tweet(tweet_txt, source_id, session)
+                session.commit()
+                self.queue.task_done()
+        except:
+            session.rollback()
+            raise
+        finally:
+            session.close()
+            
+        
+    def process_tweet(tweet, source_id, session):
+        
+        try:
+            tweet_obj = anyjson.deserialize(tweet)
+            screen_name = ""
+            if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
+                screen_name = tweet_obj['user']['screen_name']
+            logging.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
+            logging.debug(u"Process_tweet :" + repr(tweet))
+            processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, self.token_filename)
+            processor.process()
+        except Exception, e:
+            message = u"Error %e processing tweet %s" % (unicode(e), tweet)
+            logging.error(message)
+            output = StringIO.StringIO()
+            traceback.print_exception(Exception, e, None, None, output)
+            tweet_log = TweetLog(status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=output.getvalue())
+            output.close()
+
+
+
+#def main_source(username, password, track, session, debug, reconnects, token_filename, duration):
 
     #username = username or raw_input('Twitter username: ')
     #password = password or getpass('Twitter password: ')
 
-    track_list = track or raw_input('Keywords to track (comma seperated): ').strip()
-    track_list = [k for k in track_list.split(',')]
+#    track_list = track or raw_input('Keywords to track (comma seperated): ').strip()
+#    track_list = [k for k in track_list.split(',')]
     
-    if username and password:
-        auth = tweepy.auth.BasicAuthHandler(username, password)        
-    else:
-        consumer_key = models.CONSUMER_KEY
-        consumer_secret = models.CONSUMER_SECRET
-        auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
-        auth.set_access_token(*(utils.get_oauth_token(token_filename)))
+#    if username and password:
+#        auth = tweepy.auth.BasicAuthHandler(username, password)        
+#    else:
+#        consumer_key = models.CONSUMER_KEY
+#        consumer_secret = models.CONSUMER_SECRET
+#        auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
+#        auth.set_access_token(*(utils.get_oauth_token(token_filename)))
+    
+#    if duration >= 0:
+#        end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=duration)
     
-    if duration >= 0:
-        end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=duration)
-    
-    stream = ReconnectingTweetStream(auth, track_list, reconnects=reconnects)
-    try:
-        for tweet in stream:            
-            if duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
-                print "Stop recording after %d seconds." % (duration)
-                break
-            process_tweet(tweet, session, debug, token_filename)
-            logging.info("Tweet count: %d - current rate : %.2f - running : %s"%(stream.count, stream.rate, int(time.time()-stream.starttime)))
-            session.commit()
-    finally:
-        stream.close()
+#    stream = ReconnectingTweetStream(auth, track_list, reconnects=reconnects, as_text=True)
+#    try:
+#        for tweet in stream:
+#            source = TweetSource(original_json=tweet)
+#            session.add(source)
+#            session.flush()            
+#            source_id = source.id
+#            process_tweet(tweet, source_id, session, debug, token_filename)
+#            logging.info("Tweet count: %d - current rate : %.2f - running : %s"%(stream.count, stream.rate, int(time.time()-stream.starttime)))
+#            session.commit()
+#            if duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
+#                print "Stop recording after %d seconds." % (duration)
+#                break
+#    finally:
+#        stream.close()
         
 def get_options():
     parser = OptionParser()
@@ -130,6 +226,9 @@
                       help="Token file name")
     parser.add_option("-d", "--duration", dest="duration",
                       help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
+    parser.add_option("-N", "--consumer", dest="consumer_nb",
+                      help="number of consumer", metavar="CONSUMER", default=1, type='int')
+
 
 
     utils.set_logging_options(parser)
@@ -139,7 +238,6 @@
 
 if __name__ == '__main__':
     
-
     (options, args) = get_options()
     
     utils.set_logging(options)
@@ -149,17 +247,62 @@
         print repr(options)
     
     if options.new and os.path.exists(options.filename):
-        os.remove(options.filename)
+        i = 1
+        basename, extension = os.path.splitext(options.filename)
+        new_path = '$s.%d.%s' % (basename, i, extension)
+        while i < 1000000 and os.path.exists(new_path):
+            i += 1
+            new_path = '$s.%d.%s' % (basename, i, extension)
+        if i >= 1000000:
+            raise Exception("Unable to find new filename for " + options.filename)
+        else:
+            shutil.move(options.filename, new_path)
+
     
-    engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug>=2))
+    engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2))
     Session = sessionmaker(bind=engine)
-    session = Session()
+    queue = JoinableQueue()
+    stop_event = Event()
+
+    if options.username and options.password:
+        auth = tweepy.auth.BasicAuthHandler(options.username, options.password)        
+    else:
+        consumer_key = models.CONSUMER_KEY
+        consumer_secret = models.CONSUMER_SECRET
+        auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
+        auth.set_access_token(*(utils.get_oauth_token(options.token_filename)))
 
-    try:
-        try:
-            main(options.username, options.password, options.track, session, options.debug, options.reconnects, options.token_filename, options.duration)
-        except KeyboardInterrupt:
-            print '\nGoodbye!'
-        session.commit()
-    finally:
-        session.close()
+     
+    sprocess = SourceProcess(Session, queue, auth, options.track, options.debug, options.reconnects, options.token_filename, stop_event)    
+    
+    tweet_processes = []
+    
+    for i in range(options.consumer_nb):
+        cprocess = TweetProcess(Session, queue, options.debug, options.token_filename, stop_event)
+        tweet_processes.append(cprocess)
+
+    def interupt_handler(signum, frame):
+        stop_event.set()
+        
+    signal.signal(signal.SIGINT, interupt_handler)
+
+    sprocess.start()
+    for cprocess in tweet_processes:
+        cprocess.start()
+
+    if options.duration >= 0:
+        end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)
+
+    while not stop_event.is_set():
+        if options.duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
+            stop_event.set()
+            break
+        if sprocess.is_alive():
+            time.sleep(0.1)
+        else:
+            break
+    
+    sprocess.join()
+    queue.join()
+    for cprocess in tweet_processes:
+        cprocess.join()