--- a/script/lib/iri_tweet/export_twitter_alchemy.py Fri Aug 12 18:17:27 2011 +0200
+++ b/script/lib/iri_tweet/export_twitter_alchemy.py Wed Aug 17 18:32:07 2011 +0200
@@ -5,7 +5,7 @@
from optparse import OptionParser #@UnresolvedImport
from sqlalchemy import Table, Column, BigInteger, MetaData
from sqlalchemy.orm import sessionmaker
-from utils import parse_date, set_logging_options, set_logging, get_filter_query, logger
+from utils import parse_date, set_logging_options, set_logging, get_filter_query, get_logger
from models import setup_database
import datetime
import os.path
@@ -100,13 +100,17 @@
set_logging(options)
- logger.debug("OPTIONS : " + repr(options)) #@UndefinedVariable
+ get_logger().debug("OPTIONS : " + repr(options)) #@UndefinedVariable
if len(sys.argv) == 1 or options.database is None:
parser.print_help()
sys.exit(1)
- engine, metadata = setup_database('sqlite:///'+options.database, echo=((options.verbose-options.quiet)>0), create_all = False)
+ conn_str = options.database.strip()
+ if not re.match("^\w+://.+", conn_str):
+ conn_str = 'sqlite:///' + conn_str
+
+ engine, metadata = setup_database(conn_str, echo=((options.verbose-options.quiet)>0), create_all = False)
Session = sessionmaker()
conn = engine.connect()
@@ -158,7 +162,7 @@
for params in parameters:
- logger.debug("PARAMETERS " + repr(params)) #@UndefinedVariable
+ get_logger().debug("PARAMETERS " + repr(params)) #@UndefinedVariable
start_date_str = params.get("start_date",None)
end_date_str = params.get("end_date", None)
@@ -191,12 +195,12 @@
if content_file and content_file.find("http") == 0:
- logger.debug("url : " + content_file) #@UndefinedVariable
+ get_logger().debug("url : " + content_file) #@UndefinedVariable
h = httplib2.Http()
resp, content = h.request(content_file)
- logger.debug("url response " + repr(resp) + " content " + repr(content)) #@UndefinedVariable
+ get_logger().debug("url response " + repr(resp) + " content " + repr(content)) #@UndefinedVariable
project = anyjson.deserialize(content)
root = etree.fromstring(project["ldt"])
@@ -253,7 +257,7 @@
if ensemble_parent is None:
- logger.error("Can not process file") #@UndefinedVariable
+ get_logger().error("Can not process file") #@UndefinedVariable
sys.exit()
if options.replace:
@@ -308,18 +312,18 @@
project["ldt"] = output_data
body = anyjson.serialize(project)
- logger.debug("write http " + content_file) #@UndefinedVariable
- logger.debug("write http " + repr(body)) #@UndefinedVariable
+ get_logger().debug("write http " + content_file) #@UndefinedVariable
+ get_logger().debug("write http " + repr(body)) #@UndefinedVariable
h = httplib2.Http()
resp, content = h.request(content_file, "PUT", headers={'content-type':'application/json'}, body=body)
- logger.debug("write http " + repr(resp) + " content " + content) #@UndefinedVariable
+ get_logger().debug("write http " + repr(resp) + " content " + content) #@UndefinedVariable
else:
if content_file and os.path.exists(content_file):
dest_file_name = content_file
else:
dest_file_name = options.filename
- logger.debug("WRITE : " + dest_file_name) #@UndefinedVariable
+ get_logger().debug("WRITE : " + dest_file_name) #@UndefinedVariable
output = open(dest_file_name, "w")
output.write(output_data)
output.flush()
--- a/script/lib/iri_tweet/models.py Fri Aug 12 18:17:27 2011 +0200
+++ b/script/lib/iri_tweet/models.py Wed Aug 17 18:32:07 2011 +0200
@@ -66,6 +66,7 @@
TWEET_STATUS = {
'OK' : 1,
'ERROR' : 2,
+ 'NOT_TWEET': 3,
}
__tablename__ = 'tweet_tweet_log'
@@ -158,7 +159,7 @@
profile_text_color = Column(String)
profile_use_background_image = Column(Boolean)
protected = Column(Boolean)
- screen_name = Column(String, index=True, unique=True)
+ screen_name = Column(String, index=True)
show_all_inline_media = Column(Boolean)
statuses_count = Column(Integer)
time_zone = Column(String)
--- a/script/lib/iri_tweet/tweet_twitter_user.py Fri Aug 12 18:17:27 2011 +0200
+++ b/script/lib/iri_tweet/tweet_twitter_user.py Wed Aug 17 18:32:07 2011 +0200
@@ -1,6 +1,6 @@
from iri_tweet.models import setup_database, Message, UserMessage, User
from iri_tweet.utils import (get_oauth_token, get_user_query, set_logging_options,
- set_logging, parse_date, logger)
+ set_logging, parse_date, get_logger)
from optparse import OptionParser #@UnresolvedImport
from sqlalchemy import BigInteger
from sqlalchemy.orm import sessionmaker
@@ -10,6 +10,7 @@
import sys
import time
import twitter
+import re
APPLICATION_NAME = "Tweet recorder user"
CONSUMER_KEY = "Vdr5ZcsjI1G3esTPI8yDg"
@@ -58,7 +59,11 @@
if not options.message or len(options.message) == 0:
sys.exit()
- engine, metadata = setup_database('sqlite:///'+options.database, echo=((options.verbose-options.quiet)>0), create_all = True)
+ conn_str = options.database.strip()
+ if not re.match("^\w+://.+", conn_str):
+ conn_str = 'sqlite:///' + conn_str
+
+ engine, metadata = setup_database(conn_str, echo=((options.verbose-options.quiet)>0), create_all = False)
Session = sessionmaker()
conn = engine.connect()
@@ -107,7 +112,7 @@
screen_name = user.screen_name
message = u"@%s: %s" % (screen_name, base_message)
- logger.debug("new status : " + message) #@UndefinedVariable
+ get_logger.debug("new status : " + message) #@UndefinedVariable
if not options.simulate:
t.statuses.update(status=message)
user_message = UserMessage(user_id=user.id, message_id=message_obj.id)
--- a/script/lib/iri_tweet/utils.py Fri Aug 12 18:17:27 2011 +0200
+++ b/script/lib/iri_tweet/utils.py Wed Aug 17 18:32:07 2011 +0200
@@ -3,10 +3,11 @@
ACCESS_TOKEN_SECRET, adapt_date, adapt_json, TweetSource, TweetLog, MediaType,
Media, EntityMedia, Entity, EntityType)
from sqlalchemy.sql import select, or_ #@UnresolvedImport
+import Queue #@UnresolvedImport
import anyjson #@UnresolvedImport
import datetime
import email.utils
-import logging #@UnresolvedImport
+import logging
import os.path
import sys
import twitter.oauth #@UnresolvedImport
@@ -14,7 +15,6 @@
import twitter_text #@UnresolvedImport
-
CACHE_ACCESS_TOKEN = {}
def get_oauth_token(token_file_path=None, check_access_token=True, application_name=APPLICATION_NAME, consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET):
@@ -448,7 +448,7 @@
self.obj_buffer.persists(self.session)
-def set_logging(options, plogger=None):
+def set_logging(options, plogger=None, queue=None):
logging_config = {
"format" : '%(asctime)s %(levelname)s:%(name)s:%(message)s',
@@ -466,14 +466,17 @@
if logger is None:
logger = get_logger() #@UndefinedVariable
- if len(logger.handlers) == 0:
+ if len(logger.handlers) == 0:
filename = logging_config.get("filename")
- if filename:
+ if queue is not None:
+ hdlr = QueueHandler(queue, True)
+ elif filename:
mode = logging_config.get("filemode", 'a')
hdlr = logging.FileHandler(filename, mode) #@UndefinedVariable
else:
stream = logging_config.get("stream")
hdlr = logging.StreamHandler(stream) #@UndefinedVariable
+
fs = logging_config.get("format", logging.BASIC_FORMAT) #@UndefinedVariable
dfs = logging_config.get("datefmt", None)
fmt = logging.Formatter(fs, dfs) #@UndefinedVariable
@@ -484,6 +487,7 @@
logger.setLevel(level)
options.debug = (options.verbose-options.quiet > 0)
+ return logger
def set_logging_options(parser):
parser.add_option("-l", "--log", dest="logfile",
@@ -536,5 +540,47 @@
return query.distinct()
+logger_name = "iri.tweet"
+
def get_logger():
- return logging.getLogger("iri_tweet") #@UndefinedVariable
+ global logger_name
+ return logging.getLogger(logger_name) #@UndefinedVariable
+
+
+# Next two import lines for this demo only
+
+class QueueHandler(logging.Handler): #@UndefinedVariable
+ """
+ This is a logging handler which sends events to a multiprocessing queue.
+ """
+
+ def __init__(self, queue, ignore_full):
+ """
+ Initialise an instance, using the passed queue.
+ """
+ logging.Handler.__init__(self) #@UndefinedVariable
+ self.queue = queue
+ self.ignore_full = True
+
+ def emit(self, record):
+ """
+ Emit a record.
+
+ Writes the LogRecord to the queue.
+ """
+ try:
+ ei = record.exc_info
+ if ei:
+ dummy = self.format(record) # just to get traceback text into record.exc_text
+ record.exc_info = None # not needed any more
+ if not self.ignore_full or not self.queue.full():
+ self.queue.put_nowait(record)
+ except Queue.Full:
+ if self.ignore_full:
+ pass
+ else:
+ raise
+ except (KeyboardInterrupt, SystemExit):
+ raise
+ except:
+ self.handleError(record)
--- a/script/rest/search_twitter.py Fri Aug 12 18:17:27 2011 +0200
+++ b/script/rest/search_twitter.py Wed Aug 17 18:32:07 2011 +0200
@@ -36,7 +36,13 @@
(options, args) = get_option()
twitter = twitter.Twitter(domain="search.twitter.com")
- engine, metadata = models.setup_database('sqlite:///'+args[0], echo=((options.verbose-options.quiet)>0))
+
+ conn_str = args[0].strip()
+ if not re.match("^\w+://.+", conn_str):
+ conn_str = 'sqlite:///' + conn_str
+
+
+ engine, metadata = models.setup_database(conn_str, echo=((options.verbose-options.quiet)>0), create_all=True)
Session = sessionmaker(bind=engine)
session = Session()
try:
@@ -54,7 +60,7 @@
print tweet
tweet_str = anyjson.serialize(tweet)
#invalidate user id
- processor = utils.TwitterProcessor(tweet, tweet_str, None, session, options.token_filename)
+ processor = utils.TwitterProcessor(tweet, tweet_str, None, session, None, options.token_filename)
processor.process()
session.flush()
session.commit()
--- a/script/stream/recorder_tweetstream.py Fri Aug 12 18:17:27 2011 +0200
+++ b/script/stream/recorder_tweetstream.py Wed Aug 17 18:32:07 2011 +0200
@@ -1,10 +1,12 @@
from getpass import getpass
from iri_tweet import models, utils
from iri_tweet.models import TweetSource, TweetLog
-from multiprocessing import Queue, JoinableQueue, Process, Event, get_logger
+from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event,
+ get_logger)
from optparse import OptionParser
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import scoped_session, sessionmaker
+import Queue
import StringIO
import anyjson
import datetime
@@ -16,6 +18,7 @@
import socket
import sqlalchemy.schema
import sys
+import threading
import time
import traceback
import tweepy.auth
@@ -34,7 +37,7 @@
def set_logging(options):
- utils.set_logging(options, logging.getLogger('iri_tweet'))
+ utils.set_logging(options, logging.getLogger('iri.tweet'))
utils.set_logging(options, logging.getLogger('multiprocessing'))
if options.debug >= 2:
utils.set_logging(options, logging.getLogger('sqlalchemy.engine'))
@@ -42,6 +45,11 @@
#utils.set_logging(options, logging.getLogger('sqlalchemy.pool'))
#utils.set_logging(options, logging.getLogger('sqlalchemy.orm'))
+def set_logging_process(options, queue):
+ qlogger = utils.set_logging(options, logging.getLogger('iri.tweet.p'), queue)
+ qlogger.propagate = 0
+ return qlogger
+
def get_auth(options, access_token):
if options.username and options.password:
auth = tweepy.auth.BasicAuthHandler(options.username, options.password)
@@ -107,7 +115,7 @@
class SourceProcess(Process):
- def __init__(self, session_maker, queue, options, access_token, stop_event):
+ def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue):
self.session_maker = session_maker
self.queue = queue
self.track = options.track
@@ -116,31 +124,33 @@
self.stop_event = stop_event
self.options = options
self.access_token = access_token
+ self.logger_queue = logger_queue
super(SourceProcess, self).__init__()
def run(self):
+
#import pydevd
#pydevd.settrace(suspend=False)
- set_logging(self.options)
+ self.logger = set_logging_process(self.options, self.logger_queue)
self.auth = get_auth(self.options, self.access_token)
- utils.get_logger().debug("SourceProcess : run")
+ self.logger.debug("SourceProcess : run")
track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
track_list = [k for k in track_list.split(',')]
- utils.get_logger().debug("SourceProcess : before connecting to stream " + repr(track_list))
+ self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list))
stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True)
- utils.get_logger().debug("SourceProcess : after connecting to stream")
+ self.logger.debug("SourceProcess : after connecting to stream")
stream.muststop = lambda: self.stop_event.is_set()
session = self.session_maker()
try:
for tweet in stream:
- utils.get_logger().debug("tweet " + repr(tweet))
+ self.logger.debug("tweet " + repr(tweet))
source = TweetSource(original_json=tweet)
- utils.get_logger().debug("source created")
+ self.logger.debug("source created")
add_retries = 0
while add_retries < 10:
try:
@@ -150,18 +160,18 @@
break
except OperationalError as e:
session.rollback()
- utils.get_logger().debug("Operational Error %s nb %d" % (repr(e), add_retries))
+ self.logger.debug("Operational Error %s nb %d" % (repr(e), add_retries))
if add_retries == 10:
raise e
source_id = source.id
- utils.get_logger().debug("before queue + source id " + repr(source_id))
- utils.get_logger().info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
+ self.logger.debug("before queue + source id " + repr(source_id))
+ self.logger.info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
session.commit()
self.queue.put((source_id, tweet), False)
except Exception as e:
- utils.get_logger().error("Error when processing tweet " + repr(e))
+ self.logger.error("Error when processing tweet " + repr(e))
finally:
session.rollback()
stream.close()
@@ -170,19 +180,23 @@
self.stop_event.set()
-def process_tweet(tweet, source_id, session, access_token):
+def process_tweet(tweet, source_id, session, access_token, logger):
try:
tweet_obj = anyjson.deserialize(tweet)
+ if 'text' not in tweet_obj:
+ tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
+ session.add(tweet_log)
+ return
screen_name = ""
if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
screen_name = tweet_obj['user']['screen_name']
- utils.get_logger().debug(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
- utils.get_logger().debug(u"Process_tweet :" + repr(tweet))
+ logger.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
+ logger.debug(u"Process_tweet :" + repr(tweet))
processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None)
processor.process()
except Exception as e:
message = u"Error %s processing tweet %s" % (repr(e), tweet)
- utils.get_logger().error(message)
+ logger.error(message)
output = StringIO.StringIO()
traceback.print_exception(Exception, e, None, None, output)
error_stack = output.getvalue()
@@ -196,28 +210,29 @@
class TweetProcess(Process):
- def __init__(self, session_maker, queue, options, access_token, stop_event):
+ def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue):
self.session_maker = session_maker
self.queue = queue
self.stop_event = stop_event
self.options = options
self.access_token = access_token
+ self.logger_queue = logger_queue
super(TweetProcess, self).__init__()
def run(self):
- set_logging(self.options)
+ self.logger = set_logging_process(self.options, self.logger_queue)
session = self.session_maker()
try:
while not self.stop_event.is_set():
try:
source_id, tweet_txt = queue.get(True, 3)
- utils.get_logger().debug("Processing source id " + repr(source_id))
+ self.logger.debug("Processing source id " + repr(source_id))
except Exception as e:
- utils.get_logger().debug('Process tweet exception in loop : ' + repr(e))
+ self.logger.debug('Process tweet exception in loop : ' + repr(e))
continue
- process_tweet(tweet_txt, source_id, session, self.access_token)
+ process_tweet(tweet_txt, source_id, session, self.access_token, self.logger)
session.commit()
finally:
session.rollback()
@@ -231,19 +246,28 @@
return Session, engine, metadata
-def process_leftovers(session, access_token):
+def process_leftovers(session, access_token, logger):
sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
for src in sources:
tweet_txt = src.original_json
- process_tweet(tweet_txt, src.id, session, access_token)
+ process_tweet(tweet_txt, src.id, session, access_token, logger)
session.commit()
#get tweet source that do not match any message
#select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull;
+def process_log(logger_queues, stop_event):
+ while not stop_event.is_set():
+ for lqueue in logger_queues:
+ try:
+ record = lqueue.get_nowait()
+ logging.getLogger(record.name).handle(record)
+ except Queue.Empty:
+ continue
+ time.sleep(0.1)
def get_options():
@@ -275,7 +299,7 @@
if __name__ == '__main__':
-
+
(options, args) = get_options()
set_logging(options)
@@ -287,10 +311,10 @@
conn_str = options.conn_str.strip()
if not re.match("^\w+://.+", conn_str):
- conn_str = 'sqlite://' + options.conn_str
+ conn_str = 'sqlite:///' + options.conn_str
if conn_str.startswith("sqlite") and options.new:
- filepath = conn_str[conn_str.find("://"):]
+ filepath = conn_str[conn_str.find(":///")+4:]
if os.path.exists(filepath):
i = 1
basename, extension = os.path.splitext(filepath)
@@ -320,7 +344,7 @@
session = Session()
try:
- process_leftovers(session, access_token)
+ process_leftovers(session, access_token, utils.get_logger())
session.commit()
finally:
session.rollback()
@@ -330,7 +354,7 @@
utils.get_logger().debug("Leftovers processed. Exiting.")
sys.exit()
- queue = JoinableQueue()
+ queue = mQueue()
stop_event = Event()
#workaround for bug on using urllib2 and multiprocessing
@@ -344,15 +368,24 @@
finally:
if conn is not None:
conn.close()
-
+
+ process_engines = []
+ logger_queues = []
- sprocess = SourceProcess(Session, queue, options, access_token, stop_event)
+ SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
+ process_engines.append(engine_process)
+ lqueue = mQueue(1)
+ logger_queues.append(lqueue)
+ sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue)
tweet_processes = []
for i in range(options.process_nb - 1):
- Session, engine, metadata = get_sessionmaker(conn_str)
- cprocess = TweetProcess(Session, queue, options, access_token, stop_event)
+ SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
+ process_engines.append(engine_process)
+ lqueue = mQueue(1)
+ logger_queues.append(lqueue)
+ cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue)
tweet_processes.append(cprocess)
def interupt_handler(signum, frame):
@@ -360,23 +393,28 @@
signal.signal(signal.SIGINT, interupt_handler)
+ log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues,stop_event,))
+ log_thread.daemon = True
+
sprocess.start()
for cprocess in tweet_processes:
cprocess.start()
+ log_thread.start()
+
if options.duration >= 0:
- end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)
+ end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)
+
while not stop_event.is_set():
if options.duration >= 0 and datetime.datetime.utcnow() >= end_ts:
stop_event.set()
break
- if sprocess.is_alive():
+ if sprocess.is_alive():
time.sleep(1)
else:
stop_event.set()
break
-
utils.get_logger().debug("Joining Source Process")
try:
sprocess.join(10)
@@ -384,8 +422,6 @@
utils.get_logger().debug("Pb joining Source Process - terminating")
sprocess.terminate()
- utils.get_logger().debug("Joining Queue")
- #queue.join()
for i, cprocess in enumerate(tweet_processes):
utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
try:
@@ -393,15 +429,30 @@
except:
utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
cprocess.terminate()
+
- utils.get_logger().debug("Processing leftovers")
- session = Session()
+ utils.get_logger().debug("Close queues")
try:
- process_leftovers(session, access_token)
- session.commit()
- finally:
- session.rollback()
- session.close()
+ queue.close()
+ for lqueue in logger_queues:
+ lqueue.close()
+ except exception as e:
+ utils.get_logger().error("error when closing queues %s", repr(e))
+ #do nothing
+
+
+ if options.process_nb > 1:
+ utils.get_logger().debug("Processing leftovers")
+ session = Session()
+ try:
+ process_leftovers(session, access_token, utils.get_logger())
+ session.commit()
+ finally:
+ session.rollback()
+ session.close()
+ for pengine in process_engines:
+ pengine.dispose()
+
utils.get_logger().debug("Done. Exiting.")
Binary file script/virtualenv/res/psycopg2-2.4.2.tar.gz has changed