--- a/script/stream/recorder_tweetstream.py Tue Aug 09 13:07:23 2011 +0200
+++ b/script/stream/recorder_tweetstream.py Fri Aug 12 18:17:27 2011 +0200
@@ -3,22 +3,25 @@
from iri_tweet.models import TweetSource, TweetLog
from multiprocessing import Queue, JoinableQueue, Process, Event, get_logger
from optparse import OptionParser
-from sqlalchemy.orm import sessionmaker
+from sqlalchemy.exc import OperationalError
+from sqlalchemy.orm import scoped_session, sessionmaker
import StringIO
-import logging
import anyjson
import datetime
+import logging
import os
+import re
import shutil
import signal
import socket
+import sqlalchemy.schema
import sys
import time
import traceback
import tweepy.auth
import tweetstream
-from iri_tweet.utils import logger
-from sqlalchemy.exc import OperationalError
+import urllib2
+#from iri_tweet.utils import get_logger
socket._fileobject.default_bufsize = 0
@@ -30,6 +33,26 @@
#just put it in a sqlite3 tqble
+def set_logging(options):
+ utils.set_logging(options, logging.getLogger('iri_tweet'))
+ utils.set_logging(options, logging.getLogger('multiprocessing'))
+ if options.debug >= 2:
+ utils.set_logging(options, logging.getLogger('sqlalchemy.engine'))
+ #utils.set_logging(options, logging.getLogger('sqlalchemy.dialects'))
+ #utils.set_logging(options, logging.getLogger('sqlalchemy.pool'))
+ #utils.set_logging(options, logging.getLogger('sqlalchemy.orm'))
+
+def get_auth(options, access_token):
+ if options.username and options.password:
+ auth = tweepy.auth.BasicAuthHandler(options.username, options.password)
+ else:
+ consumer_key = models.CONSUMER_KEY
+ consumer_secret = models.CONSUMER_SECRET
+ auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
+ auth.set_access_token(*access_token)
+ return auth
+
+
class ReconnectingTweetStream(tweetstream.FilterStream):
"""TweetStream class that automatically tries to reconnect if the
connecting goes down. Reconnecting, and waiting for reconnecting, is
@@ -62,9 +85,10 @@
def next(self):
while True:
try:
+ utils.get_logger().debug("return super.next")
return super(ReconnectingTweetStream, self).next()
except tweetstream.ConnectionError, e:
- logging.debug("connection error :" + str(e))
+ utils.get_logger().debug("connection error :" + str(e))
self._reconnects += 1
if self._reconnects > self.max_reconnects:
raise tweetstream.ConnectionError("Too many retries")
@@ -80,38 +104,43 @@
+
class SourceProcess(Process):
- def __init__(self, session_maker, queue, auth, track, debug, reconnects, token_filename, stop_event):
+ def __init__(self, session_maker, queue, options, access_token, stop_event):
self.session_maker = session_maker
self.queue = queue
- self.auth = auth
- self.track = track
- self.debug = debug
- self.reconnects = reconnects
- self.token_filename = token_filename
+ self.track = options.track
+ self.reconnects = options.reconnects
+ self.token_filename = options.token_filename
self.stop_event = stop_event
+ self.options = options
+ self.access_token = access_token
super(SourceProcess, self).__init__()
-# self.stop_event =
def run(self):
+ #import pydevd
+ #pydevd.settrace(suspend=False)
+
+ set_logging(self.options)
+ self.auth = get_auth(self.options, self.access_token)
- get_logger().debug("SourceProcess : run")
+ utils.get_logger().debug("SourceProcess : run")
track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
track_list = [k for k in track_list.split(',')]
- get_logger().debug("SourceProcess : before connecting to stream " + repr(track_list))
+ utils.get_logger().debug("SourceProcess : before connecting to stream " + repr(track_list))
stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True)
- get_logger().debug("SourceProcess : after connecting to stream")
+ utils.get_logger().debug("SourceProcess : after connecting to stream")
stream.muststop = lambda: self.stop_event.is_set()
session = self.session_maker()
try:
for tweet in stream:
- get_logger().debug("tweet " + repr(tweet))
+ utils.get_logger().debug("tweet " + repr(tweet))
source = TweetSource(original_json=tweet)
- get_logger().debug("source created")
+ utils.get_logger().debug("source created")
add_retries = 0
while add_retries < 10:
try:
@@ -121,21 +150,18 @@
break
except OperationalError as e:
session.rollback()
- get_logger().debug("Operational Error %s nb %d" % (repr(e), add_retries))
- if add_retries==10:
+ utils.get_logger().debug("Operational Error %s nb %d" % (repr(e), add_retries))
+ if add_retries == 10:
raise e
source_id = source.id
- get_logger().debug("before queue + source id " + repr(source_id))
- self.queue.put((source_id, tweet), False)
- #process_tweet(tweet, source_id, self.session, self.debug, self.token_filename)
- get_logger().info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
+ utils.get_logger().debug("before queue + source id " + repr(source_id))
+ utils.get_logger().info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
session.commit()
-# if duration >= 0 and datetime.datetime.utcnow() >= end_ts:
-# print "Stop recording after %d seconds." % (duration)
-# break
+ self.queue.put((source_id, tweet), False)
+
except Exception as e:
- get_logger().error("Error when processing tweet " + repr(e))
+ utils.get_logger().error("Error when processing tweet " + repr(e))
finally:
session.rollback()
stream.close()
@@ -144,19 +170,19 @@
self.stop_event.set()
-def process_tweet(tweet, source_id, session, token_filename):
+def process_tweet(tweet, source_id, session, access_token):
try:
tweet_obj = anyjson.deserialize(tweet)
screen_name = ""
if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
screen_name = tweet_obj['user']['screen_name']
- get_logger().info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
- get_logger().debug(u"Process_tweet :" + repr(tweet))
- processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, token_filename)
+ utils.get_logger().debug(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
+ utils.get_logger().debug(u"Process_tweet :" + repr(tweet))
+ processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None)
processor.process()
except Exception as e:
message = u"Error %s processing tweet %s" % (repr(e), tweet)
- get_logger().error(message)
+ utils.get_logger().error(message)
output = StringIO.StringIO()
traceback.print_exception(Exception, e, None, None, output)
error_stack = output.getvalue()
@@ -170,42 +196,49 @@
class TweetProcess(Process):
- def __init__(self, session_maker, queue, debug, token_filename, stop_event):
+ def __init__(self, session_maker, queue, options, access_token, stop_event):
self.session_maker = session_maker
self.queue = queue
- self.debug = debug
self.stop_event = stop_event
- self.token_filename = token_filename
+ self.options = options
+ self.access_token = access_token
super(TweetProcess, self).__init__()
def run(self):
+ set_logging(self.options)
session = self.session_maker()
try:
while not self.stop_event.is_set():
try:
- source_id, tweet_txt = queue.get(True, 10)
- get_logger().debug("Processing source id " + repr(source_id))
+ source_id, tweet_txt = queue.get(True, 3)
+ utils.get_logger().debug("Processing source id " + repr(source_id))
except Exception as e:
- get_logger().debug('Process tweet exception in loop : ' + repr(e))
+ utils.get_logger().debug('Process tweet exception in loop : ' + repr(e))
continue
- process_tweet(tweet_txt, source_id, session, self.token_filename)
+ process_tweet(tweet_txt, source_id, session, self.access_token)
session.commit()
- except:
- raise
finally:
session.rollback()
self.stop_event.set()
session.close()
+
+
+def get_sessionmaker(conn_str):
+ engine, metadata = models.setup_database(conn_str, echo=False, create_all=False)
+ Session = scoped_session(sessionmaker(bind=engine, autocommit=False))
+ return Session, engine, metadata
+
-def process_leftovers(session, token_filename):
+def process_leftovers(session, access_token):
sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
for src in sources:
tweet_txt = src.original_json
- process_tweet(tweet_txt, src.id, session, token_filename)
+ process_tweet(tweet_txt, src.id, session, access_token)
+ session.commit()
@@ -215,8 +248,8 @@
def get_options():
parser = OptionParser()
- parser.add_option("-f", "--file", dest="filename",
- help="write tweet to FILE", metavar="FILE", default="enmi2010_twitter.db")
+ parser.add_option("-f", "--file", dest="conn_str",
+ help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db")
parser.add_option("-u", "--user", dest="username",
help="Twitter user", metavar="USER", default=None)
parser.add_option("-w", "--password", dest="password",
@@ -231,8 +264,8 @@
help="Token file name")
parser.add_option("-d", "--duration", dest="duration",
help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
- parser.add_option("-N", "--consumer", dest="consumer_nb",
- help="number of consumer", metavar="CONSUMER", default=1, type='int')
+ parser.add_option("-N", "--nb-process", dest="process_nb",
+ help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int')
@@ -245,53 +278,81 @@
(options, args) = get_options()
- utils.set_logging(options, get_logger())
+ set_logging(options)
if options.debug:
print "OPTIONS : "
print repr(options)
- if options.new and os.path.exists(options.filename):
- i = 1
- basename, extension = os.path.splitext(options.filename)
- new_path = '%s.%d%s' % (basename, i, extension)
- while i < 1000000 and os.path.exists(new_path):
- i += 1
+
+ conn_str = options.conn_str.strip()
+ if not re.match("^\w+://.+", conn_str):
+ conn_str = 'sqlite://' + options.conn_str
+
+ if conn_str.startswith("sqlite") and options.new:
+ filepath = conn_str[conn_str.find("://"):]
+ if os.path.exists(filepath):
+ i = 1
+ basename, extension = os.path.splitext(filepath)
new_path = '%s.%d%s' % (basename, i, extension)
- if i >= 1000000:
- raise Exception("Unable to find new filename for " + options.filename)
- else:
- shutil.move(options.filename, new_path)
+ while i < 1000000 and os.path.exists(new_path):
+ i += 1
+ new_path = '%s.%d%s' % (basename, i, extension)
+ if i >= 1000000:
+ raise Exception("Unable to find new filename for " + filepath)
+ else:
+ shutil.move(filepath, new_path)
+ Session, engine, metadata = get_sessionmaker(conn_str)
+ if options.new:
+ check_metadata = sqlalchemy.schema.MetaData(bind=engine, reflect=True)
+ if len(check_metadata.sorted_tables) > 0:
+ message = "Database %s not empty exiting" % conn_str
+ utils.get_logger().error(message)
+ sys.exit(message)
+
+ metadata.create_all(engine)
+
+ access_token = None
+ if not options.username or not options.password:
+ access_token = utils.get_oauth_token(options.token_filename)
+
+ session = Session()
+ try:
+ process_leftovers(session, access_token)
+ session.commit()
+ finally:
+ session.rollback()
+ session.close()
+
+ if options.process_nb <= 0:
+ utils.get_logger().debug("Leftovers processed. Exiting.")
+ sys.exit()
+
queue = JoinableQueue()
stop_event = Event()
-
- if options.username and options.password:
- auth = tweepy.auth.BasicAuthHandler(options.username, options.password)
- else:
- consumer_key = models.CONSUMER_KEY
- consumer_secret = models.CONSUMER_SECRET
- auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
- auth.set_access_token(*(utils.get_oauth_token(options.token_filename)))
-
-
- engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2))
- Session = sessionmaker(bind=engine)
- session = Session()
- process_leftovers(session, options.token_filename)
- session.commit()
- session.close()
-
- sprocess = SourceProcess(Session, queue, auth, options.track, options.debug, options.reconnects, options.token_filename, stop_event)
+ #workaround for bug on using urllib2 and multiprocessing
+ req = urllib2.Request('http://localhost')
+ conn = None
+ try:
+ conn = urllib2.urlopen(req)
+ except:
+ pass
+ #donothing
+ finally:
+ if conn is not None:
+ conn.close()
+
+
+ sprocess = SourceProcess(Session, queue, options, access_token, stop_event)
tweet_processes = []
- for i in range(options.consumer_nb):
- engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2))
- Session = sessionmaker(bind=engine)
- cprocess = TweetProcess(Session, queue, options.debug, options.token_filename, stop_event)
+ for i in range(options.process_nb - 1):
+ Session, engine, metadata = get_sessionmaker(conn_str)
+ cprocess = TweetProcess(Session, queue, options, access_token, stop_event)
tweet_processes.append(cprocess)
def interupt_handler(signum, frame):
@@ -311,23 +372,36 @@
stop_event.set()
break
if sprocess.is_alive():
- time.sleep(0.1)
+ time.sleep(1)
else:
+ stop_event.set()
break
- get_logger().debug("Joining Source Process")
- sprocess.join()
- get_logger().debug("Joining Queue")
+ utils.get_logger().debug("Joining Source Process")
+ try:
+ sprocess.join(10)
+ except:
+ utils.get_logger().debug("Pb joining Source Process - terminating")
+ sprocess.terminate()
+
+ utils.get_logger().debug("Joining Queue")
#queue.join()
- for i,cprocess in enumerate(tweet_processes):
- get_logger().debug("Joining consumer process Nb %d" % (i+1))
- cprocess.join()
+ for i, cprocess in enumerate(tweet_processes):
+ utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
+ try:
+ cprocess.join(3)
+ except:
+ utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
+ cprocess.terminate()
- get_logger().debug("Processing leftovers")
+ utils.get_logger().debug("Processing leftovers")
session = Session()
- process_leftovers(session, options.token_filename)
- session.commit()
- session.close()
+ try:
+ process_leftovers(session, access_token)
+ session.commit()
+ finally:
+ session.rollback()
+ session.close()
- get_logger().debug("Done. Exiting.")
-
\ No newline at end of file
+ utils.get_logger().debug("Done. Exiting.")
+