--- a/script/lib/iri_tweet/export_tweet_db.py Tue Jul 26 23:57:09 2011 +0200
+++ b/script/lib/iri_tweet/export_tweet_db.py Wed Jul 27 00:04:55 2011 +0200
@@ -35,7 +35,7 @@
fields_mapping = {}
for i,res in enumerate(curs_in.execute("select json from tweet_tweet;")):
logging.debug("main loop %d : %s" % (i, res[0])) #@UndefinedVariable
- processor = TwitterProcessor(eval(res[0]), res[0], session, options.token_filename)
+ processor = TwitterProcessor(eval(res[0]), res[0], None, session, options.token_filename)
processor.process()
session.commit()
logging.debug("main : %d tweet processed" % (i+1)) #@UndefinedVariable
--- a/script/lib/iri_tweet/models.py Tue Jul 26 23:57:09 2011 +0200
+++ b/script/lib/iri_tweet/models.py Wed Jul 27 00:04:55 2011 +0200
@@ -42,7 +42,34 @@
if hasattr(self,key):
setattr(self,key,value)
+class TweetSource(Base):
+ __tablename__ = 'tweet_tweet_source'
+ id = Column(Integer, primary_key = True, autoincrement=True)
+ original_json = Column(String)
+ received_at = Column(DateTime, default=datetime.datetime.now())
+
+ def __init__(self, **kwargs):
+ for key, value in kwargs.items():
+ if hasattr(self,key):
+ setattr(self,key,value)
+
+class TweetLog(Base):
+
+ TWEET_STATUS = {
+ 'OK' : 1,
+ 'ERROR' : 2,
+ }
+
+ __tablename__ = 'tweet_tweet_log'
+ id = Column(Integer, primary_key = True, autoincrement=True)
+ tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id'))
+ tweet_source = relationship("TweetSource", backref="logs")
+ status = Column(Integer)
+ error = Column(String)
+ error_stack = Column(String)
+
+
class Tweet(Base):
__tablename__ = 'tweet_tweet'
@@ -65,7 +92,10 @@
text = Column(String)
truncated = Column(Boolean)
user_id = Column(Integer, ForeignKey('tweet_user.id'))
- original_json = Column(String)
+ user = relationship("TweetUser", backref="tweets")
+# original_json = Column(String)
+ tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id'))
+ tweet_source = relationship("TweetSource", backref="tweet")
entity_list = relationship(Entity, backref='tweet')
received_at = Column(DateTime, default=datetime.datetime.now())
@@ -81,11 +111,11 @@
id = Column(Integer, primary_key = True)
user_id = Column(Integer, ForeignKey('tweet_user.id'))
+ user = relationship("TweetUser", backref="messages")
created_at = Column(DateTime, default=datetime.datetime.now())
message_id = Column(Integer, ForeignKey('tweet_message.id'))
class Message(Base):
-
__tablename__ = "tweet_message"
id = Column(Integer, primary_key = True)
--- a/script/lib/iri_tweet/utils.py Tue Jul 26 23:57:09 2011 +0200
+++ b/script/lib/iri_tweet/utils.py Wed Jul 27 00:04:55 2011 +0200
@@ -1,6 +1,6 @@
-from models import Tweet, User, Hashtag, EntityHashtag, EntityUser, Url, \
- EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY, \
- ACCESS_TOKEN_SECRET, adapt_date, adapt_json
+from models import (Tweet, User, Hashtag, EntityHashtag, EntityUser, Url,
+ EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY,
+ ACCESS_TOKEN_SECRET, adapt_date, adapt_json, TweetSource, TweetLog)
from sqlalchemy.sql import select, or_ #@UnresolvedImport
import anyjson #@UnresolvedImport
import datetime
@@ -83,7 +83,7 @@
class TwitterProcessor(object):
- def __init__(self, json_dict, json_txt, session, token_filename=None):
+ def __init__(self, json_dict, json_txt, source_id, session, token_filename=None):
if json_dict is None and json_txt is None:
raise TwitterProcessorException("No json")
@@ -101,6 +101,7 @@
if "id" not in self.json_dict:
raise TwitterProcessorException("No id in json")
+ self.source_id = source_id
self.session = session
self.token_filename = token_filename
@@ -225,7 +226,8 @@
else:
ts_copy["user"] = user
ts_copy["user_id"] = ts_copy["user"].id
- ts_copy["original_json"] = self.json_txt
+
+ ts_copy["tweet_source_id"] = self.source_id
self.tweet = Tweet(**ts_copy)
self.session.add(self.tweet)
@@ -241,7 +243,8 @@
tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
if tweet_nb > 0:
return
-
+
+
tweet_fields = {
'created_at': self.json_dict["created_at"],
'favorited': False,
@@ -253,8 +256,8 @@
#'place': ts["place"],
'source': self.json_dict["source"],
'text': self.json_dict["text"],
- 'truncated': False,
- 'original_json' : self.json_txt,
+ 'truncated': False,
+ 'tweet_source_id' : self.source_id,
}
#user
@@ -295,10 +298,23 @@
def process(self):
- if "metadata" in self.json_dict:
- self.__process_twitter_rest()
- else:
- self.__process_twitter_stream()
+
+ if self.source_id is None:
+ tweet_source = TweetSource(original_json=self.json_txt);
+ self.session.add(tweet_source)
+ self.session.flush()
+ self.source_id = tweet_source.id
+
+ try:
+ if "metadata" in self.json_dict:
+ self.__process_twitter_rest()
+ else:
+ self.__process_twitter_stream()
+
+ tweet_log = TweetLog(tweet_source_id = self.source_id, status = TweetLog.TWEET_STATUS['OK'])
+ except:
+
+ raise
def set_logging(options):
--- a/script/lib/tweetstream/tweetstream/streamclasses.py Tue Jul 26 23:57:09 2011 +0200
+++ b/script/lib/tweetstream/tweetstream/streamclasses.py Wed Jul 27 00:04:55 2011 +0200
@@ -54,7 +54,7 @@
:attr: `USER_AGENT`.
"""
- def __init__(self, auth, catchup=None, url=None):
+ def __init__(self, auth, catchup=None, url=None, as_text=False):
self._conn = None
self._rate_ts = None
self._rate_cnt = 0
@@ -68,6 +68,7 @@
self.rate = 0
self.user_agent = USER_AGENT
if url: self.url = url
+ self._as_text = as_text
self.muststop = False
@@ -119,12 +120,18 @@
this method and return post data. The data should be in the format
returned by urllib.urlencode."""
return None
+
+ def __muststop(self):
+ if callable(self.muststop):
+ return self.muststop()
+ else:
+ return self.muststop
def next(self):
"""Return the next available tweet. This call is blocking!"""
while True:
try:
- if self.muststop:
+ if self.__muststop():
raise StopIteration()
if not self.connected:
@@ -143,10 +150,15 @@
elif data.isspace():
continue
- data = anyjson.deserialize(data)
- if 'text' in data:
+ if not self._as_text:
+ data = anyjson.deserialize(data)
+ if 'text' in data:
+ self.count += 1
+ self._rate_cnt += 1
+ else: # count and rate may be off, but we count everything
self.count += 1
self._rate_cnt += 1
+
return data
except ValueError, e:
@@ -175,12 +187,12 @@
url = "http://stream.twitter.com/1/statuses/filter.json"
def __init__(self, auth, follow=None, locations=None,
- track=None, catchup=None, url=None):
+ track=None, catchup=None, url=None, as_text=False):
self._follow = follow
self._locations = locations
self._track = track
# remove follow, locations, track
- BaseStream.__init__(self, auth, url=url)
+ BaseStream.__init__(self, auth, url=url, as_text=as_text)
def _get_post_data(self):
postdata = {}
--- a/script/rest/search_twitter.py Tue Jul 26 23:57:09 2011 +0200
+++ b/script/rest/search_twitter.py Wed Jul 27 00:04:55 2011 +0200
@@ -49,12 +49,12 @@
page = 1
while page <= int(1500/int(options.rpp)) and ( results is None or len(results) > 0):
- results = twitter. search(q=options.query, rpp=options.rpp, page=page)
+ results = twitter.search(q=options.query, rpp=options.rpp, page=page)
for tweet in results["results"]:
print tweet
tweet_str = anyjson.serialize(tweet)
#invalidate user id
- processor = utils.TwitterProcessor(tweet, tweet_str, session, options.token_filename)
+ processor = utils.TwitterProcessor(tweet, tweet_str, None, session, options.token_filename)
processor.process()
session.flush()
session.commit()
--- a/script/stream/recorder_tweetstream.py Tue Jul 26 23:57:09 2011 +0200
+++ b/script/stream/recorder_tweetstream.py Wed Jul 27 00:04:55 2011 +0200
@@ -1,16 +1,23 @@
from getpass import getpass
from iri_tweet import models, utils
+from iri_tweet.models import TweetSource, TweetLog
+from multiprocessing import Queue, JoinableQueue, Process, Event
from optparse import OptionParser
from sqlalchemy.orm import sessionmaker
from sqlite3 import *
+import StringIO
+import anyjson
import datetime
import logging
import os
+import shutil
+import signal
import socket
import sys
import time
+import traceback
+import tweepy.auth
import tweetstream
-import tweepy.auth
socket._fileobject.default_bufsize = 0
@@ -44,12 +51,12 @@
"""
- def __init__(self, auth, keywords, reconnects=3, error_cb=None, retry_wait=5, **kwargs):
+ def __init__(self, auth, keywords, reconnects=3, error_cb=None, retry_wait=5, as_text=False, **kwargs):
self.max_reconnects = reconnects
self.retry_wait = retry_wait
self._reconnects = 0
self._error_cb = error_cb
- super(ReconnectingTweetStream, self).__init__(auth=auth, track=keywords, **kwargs)
+ super(ReconnectingTweetStream, self).__init__(auth=auth, track=keywords, as_text=as_text, **kwargs)
def next(self):
while True:
@@ -72,45 +79,134 @@
-def process_tweet(tweet, session, debug, token_filename):
- screen_name = ""
- if 'user' in tweet and 'screen_name' in tweet['user']:
- screen_name = tweet['user']['screen_name']
- logging.info("Process_tweet from %s : %s" % (screen_name, tweet['text']))
- logging.debug("Process_tweet :" + repr(tweet))
- processor = utils.TwitterProcessor(tweet, None, session, token_filename)
- processor.process()
+class SourceProcess(Process):
+
+ def __init__(self, session_maker, queue, auth, track, debug, reconnects, token_filename, stop_event):
+ super(SourceProcess, self).__init__()
+ self.session_maker = session_maker
+ self.queue = queue
+ self.auth = auth
+ self.track = track
+ self.debug = debug
+ self.reconnects = reconnects
+ self.token_filename = token_filename
+ self.stop_event = stop_event
+# self.stop_event =
+
+ def run(self):
+
+ track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
+ track_list = [k for k in track_list.split(',')]
+
+ stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True)
+ stream.muststop = lambda: self.stop_event.is_set()
+
+ session = self.session_maker()
+
+ try:
+ for tweet in stream:
+ source = TweetSource(original_json=tweet)
+ session.add(source)
+ session.flush()
+ source_id = source.id
+ queue.put((source_id, tweet), False)
+ #process_tweet(tweet, source_id, self.session, self.debug, self.token_filename)
+ logging.info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
+ session.commit()
+# if duration >= 0 and datetime.datetime.utcnow() >= end_ts:
+# print "Stop recording after %d seconds." % (duration)
+# break
+ except:
+ session.rollback()
+ finally:
+ stream.close()
+ session.close()
+
+
+class TweetProcess(Process):
+
+ def __init__(self, session_maker, queue, debug, token_filename, stop_event):
+ super(TweetProcess, self).__init__()
+ self.session_maker = session_maker
+ self.queue = queue
+ self.debug = debug
+ self.stop_event = stop_event
+ self.token_filename = token_filename
-def main(username, password, track, session, debug, reconnects, token_filename, duration):
+ def run(self):
+
+ session = self.session_maker()
+ try:
+ while not self.stop_event.is_set():
+ try:
+ source_id, tweet_txt = queue.get(True, 30)
+ except:
+ continue
+ process_tweet(tweet_txt, source_id, session)
+ session.commit()
+ self.queue.task_done()
+ except:
+ session.rollback()
+ raise
+ finally:
+ session.close()
+
+
+ def process_tweet(tweet, source_id, session):
+
+ try:
+ tweet_obj = anyjson.deserialize(tweet)
+ screen_name = ""
+ if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
+ screen_name = tweet_obj['user']['screen_name']
+ logging.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
+ logging.debug(u"Process_tweet :" + repr(tweet))
+ processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, self.token_filename)
+ processor.process()
+ except Exception, e:
+ message = u"Error %e processing tweet %s" % (unicode(e), tweet)
+ logging.error(message)
+ output = StringIO.StringIO()
+ traceback.print_exception(Exception, e, None, None, output)
+ tweet_log = TweetLog(status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=output.getvalue())
+ output.close()
+
+
+
+#def main_source(username, password, track, session, debug, reconnects, token_filename, duration):
#username = username or raw_input('Twitter username: ')
#password = password or getpass('Twitter password: ')
- track_list = track or raw_input('Keywords to track (comma seperated): ').strip()
- track_list = [k for k in track_list.split(',')]
+# track_list = track or raw_input('Keywords to track (comma seperated): ').strip()
+# track_list = [k for k in track_list.split(',')]
- if username and password:
- auth = tweepy.auth.BasicAuthHandler(username, password)
- else:
- consumer_key = models.CONSUMER_KEY
- consumer_secret = models.CONSUMER_SECRET
- auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
- auth.set_access_token(*(utils.get_oauth_token(token_filename)))
+# if username and password:
+# auth = tweepy.auth.BasicAuthHandler(username, password)
+# else:
+# consumer_key = models.CONSUMER_KEY
+# consumer_secret = models.CONSUMER_SECRET
+# auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
+# auth.set_access_token(*(utils.get_oauth_token(token_filename)))
+
+# if duration >= 0:
+# end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=duration)
- if duration >= 0:
- end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=duration)
-
- stream = ReconnectingTweetStream(auth, track_list, reconnects=reconnects)
- try:
- for tweet in stream:
- if duration >= 0 and datetime.datetime.utcnow() >= end_ts:
- print "Stop recording after %d seconds." % (duration)
- break
- process_tweet(tweet, session, debug, token_filename)
- logging.info("Tweet count: %d - current rate : %.2f - running : %s"%(stream.count, stream.rate, int(time.time()-stream.starttime)))
- session.commit()
- finally:
- stream.close()
+# stream = ReconnectingTweetStream(auth, track_list, reconnects=reconnects, as_text=True)
+# try:
+# for tweet in stream:
+# source = TweetSource(original_json=tweet)
+# session.add(source)
+# session.flush()
+# source_id = source.id
+# process_tweet(tweet, source_id, session, debug, token_filename)
+# logging.info("Tweet count: %d - current rate : %.2f - running : %s"%(stream.count, stream.rate, int(time.time()-stream.starttime)))
+# session.commit()
+# if duration >= 0 and datetime.datetime.utcnow() >= end_ts:
+# print "Stop recording after %d seconds." % (duration)
+# break
+# finally:
+# stream.close()
def get_options():
parser = OptionParser()
@@ -130,6 +226,9 @@
help="Token file name")
parser.add_option("-d", "--duration", dest="duration",
help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
+ parser.add_option("-N", "--consumer", dest="consumer_nb",
+ help="number of consumer", metavar="CONSUMER", default=1, type='int')
+
utils.set_logging_options(parser)
@@ -139,7 +238,6 @@
if __name__ == '__main__':
-
(options, args) = get_options()
utils.set_logging(options)
@@ -149,17 +247,62 @@
print repr(options)
if options.new and os.path.exists(options.filename):
- os.remove(options.filename)
+ i = 1
+ basename, extension = os.path.splitext(options.filename)
+ new_path = '$s.%d.%s' % (basename, i, extension)
+ while i < 1000000 and os.path.exists(new_path):
+ i += 1
+ new_path = '$s.%d.%s' % (basename, i, extension)
+ if i >= 1000000:
+ raise Exception("Unable to find new filename for " + options.filename)
+ else:
+ shutil.move(options.filename, new_path)
+
- engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug>=2))
+ engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2))
Session = sessionmaker(bind=engine)
- session = Session()
+ queue = JoinableQueue()
+ stop_event = Event()
+
+ if options.username and options.password:
+ auth = tweepy.auth.BasicAuthHandler(options.username, options.password)
+ else:
+ consumer_key = models.CONSUMER_KEY
+ consumer_secret = models.CONSUMER_SECRET
+ auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
+ auth.set_access_token(*(utils.get_oauth_token(options.token_filename)))
- try:
- try:
- main(options.username, options.password, options.track, session, options.debug, options.reconnects, options.token_filename, options.duration)
- except KeyboardInterrupt:
- print '\nGoodbye!'
- session.commit()
- finally:
- session.close()
+
+ sprocess = SourceProcess(Session, queue, auth, options.track, options.debug, options.reconnects, options.token_filename, stop_event)
+
+ tweet_processes = []
+
+ for i in range(options.consumer_nb):
+ cprocess = TweetProcess(Session, queue, options.debug, options.token_filename, stop_event)
+ tweet_processes.append(cprocess)
+
+ def interupt_handler(signum, frame):
+ stop_event.set()
+
+ signal.signal(signal.SIGINT, interupt_handler)
+
+ sprocess.start()
+ for cprocess in tweet_processes:
+ cprocess.start()
+
+ if options.duration >= 0:
+ end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)
+
+ while not stop_event.is_set():
+ if options.duration >= 0 and datetime.datetime.utcnow() >= end_ts:
+ stop_event.set()
+ break
+ if sprocess.is_alive():
+ time.sleep(0.1)
+ else:
+ break
+
+ sprocess.join()
+ queue.join()
+ for cprocess in tweet_processes:
+ cprocess.join()