# HG changeset patch # User Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com> # Date 1312786900 -7200 # Node ID 9213a63fa34a8edcaa905afdd9f709c47694886f # Parent cdd7d3c0549ce8132f0911e10b7d3b4215a7be18 - debug multithread (still database lock problem) - add object buffer - add test - try to improve logging diff -r cdd7d3c0549c -r 9213a63fa34a script/lib/iri_tweet/export_tweet_db.py --- a/script/lib/iri_tweet/export_tweet_db.py Wed Jul 27 00:04:55 2011 +0200 +++ b/script/lib/iri_tweet/export_tweet_db.py Mon Aug 08 09:01:40 2011 +0200 @@ -1,8 +1,7 @@ from models import setup_database from optparse import OptionParser #@UnresolvedImport from sqlalchemy.orm import sessionmaker -from utils import set_logging_options, set_logging, TwitterProcessor -import logging +from utils import set_logging_options, set_logging, TwitterProcessor, logger import sqlite3 #@UnresolvedImport @@ -34,11 +33,11 @@ curs_in = conn_in.cursor() fields_mapping = {} for i,res in enumerate(curs_in.execute("select json from tweet_tweet;")): - logging.debug("main loop %d : %s" % (i, res[0])) #@UndefinedVariable + logger.debug("main loop %d : %s" % (i, res[0])) #@UndefinedVariable processor = TwitterProcessor(eval(res[0]), res[0], None, session, options.token_filename) processor.process() session.commit() - logging.debug("main : %d tweet processed" % (i+1)) #@UndefinedVariable + logger.debug("main : %d tweet processed" % (i+1)) #@UndefinedVariable except Exception, e: session.rollback() raise e diff -r cdd7d3c0549c -r 9213a63fa34a script/lib/iri_tweet/export_twitter_alchemy.py --- a/script/lib/iri_tweet/export_twitter_alchemy.py Wed Jul 27 00:04:55 2011 +0200 +++ b/script/lib/iri_tweet/export_twitter_alchemy.py Mon Aug 08 09:01:40 2011 +0200 @@ -5,10 +5,9 @@ from optparse import OptionParser #@UnresolvedImport from sqlalchemy import Table, Column, BigInteger, MetaData from sqlalchemy.orm import sessionmaker -from utils import parse_date, set_logging_options, set_logging, get_filter_query +from utils import parse_date, set_logging_options, set_logging, get_filter_query, logger from models import setup_database import datetime -import logging import os.path import re import sys @@ -101,7 +100,7 @@ set_logging(options) - logging.debug("OPTIONS : " + repr(options)) #@UndefinedVariable + logger.debug("OPTIONS : " + repr(options)) #@UndefinedVariable if len(sys.argv) == 1 or options.database is None: parser.print_help() @@ -159,7 +158,7 @@ for params in parameters: - logging.debug("PARAMETERS " + repr(params)) #@UndefinedVariable + logger.debug("PARAMETERS " + repr(params)) #@UndefinedVariable start_date_str = params.get("start_date",None) end_date_str = params.get("end_date", None) @@ -192,12 +191,12 @@ if content_file and content_file.find("http") == 0: - logging.debug("url : " + content_file) #@UndefinedVariable + logger.debug("url : " + content_file) #@UndefinedVariable h = httplib2.Http() resp, content = h.request(content_file) - logging.debug("url response " + repr(resp) + " content " + repr(content)) #@UndefinedVariable + logger.debug("url response " + repr(resp) + " content " + repr(content)) #@UndefinedVariable project = anyjson.deserialize(content) root = etree.fromstring(project["ldt"]) @@ -254,7 +253,7 @@ if ensemble_parent is None: - logging.error("Can not process file") #@UndefinedVariable + logger.error("Can not process file") #@UndefinedVariable sys.exit() if options.replace: @@ -309,18 +308,18 @@ project["ldt"] = output_data body = anyjson.serialize(project) - logging.debug("write http " + content_file) #@UndefinedVariable - logging.debug("write http " + repr(body)) #@UndefinedVariable + logger.debug("write http " + content_file) #@UndefinedVariable + logger.debug("write http " + repr(body)) #@UndefinedVariable h = httplib2.Http() resp, content = h.request(content_file, "PUT", headers={'content-type':'application/json'}, body=body) - logging.debug("write http " + repr(resp) + " content " + content) #@UndefinedVariable + logger.debug("write http " + repr(resp) + " content " + content) #@UndefinedVariable else: if content_file and os.path.exists(content_file): dest_file_name = content_file else: dest_file_name = options.filename - logging.debug("WRITE : " + dest_file_name) #@UndefinedVariable + logger.debug("WRITE : " + dest_file_name) #@UndefinedVariable output = open(dest_file_name, "w") output.write(output_data) output.flush() diff -r cdd7d3c0549c -r 9213a63fa34a script/lib/iri_tweet/models.py --- a/script/lib/iri_tweet/models.py Wed Jul 27 00:04:55 2011 +0200 +++ b/script/lib/iri_tweet/models.py Mon Aug 08 09:01:40 2011 +0200 @@ -92,15 +92,12 @@ text = Column(String) truncated = Column(Boolean) user_id = Column(Integer, ForeignKey('tweet_user.id')) - user = relationship("TweetUser", backref="tweets") -# original_json = Column(String) + user = relationship("User", backref="tweets") tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id')) tweet_source = relationship("TweetSource", backref="tweet") entity_list = relationship(Entity, backref='tweet') received_at = Column(DateTime, default=datetime.datetime.now()) - - #user = relationship(User, primaryjoin=user_id == User.id) - + def __init__(self, **kwargs): for key, value in kwargs.items(): if hasattr(self,key): @@ -111,7 +108,7 @@ id = Column(Integer, primary_key = True) user_id = Column(Integer, ForeignKey('tweet_user.id')) - user = relationship("TweetUser", backref="messages") + user = relationship("User", backref="messages") created_at = Column(DateTime, default=datetime.datetime.now()) message_id = Column(Integer, ForeignKey('tweet_message.id')) @@ -161,8 +158,8 @@ url= Column(String) utc_offset = Column(Integer) verified= Column(Boolean) - tweets = relationship(Tweet, backref='user') - messages = relationship(UserMessage, backref='user') + #tweets = relationship(Tweet, backref='user') + #messages = relationship(UserMessage, backref='user') def __init__(self, **kwargs): for key, value in kwargs.items(): diff -r cdd7d3c0549c -r 9213a63fa34a script/lib/iri_tweet/tests.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/script/lib/iri_tweet/tests.py Mon Aug 08 09:01:40 2011 +0200 @@ -0,0 +1,97 @@ +from sqlalchemy import Column, Integer, String, ForeignKey, create_engine +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import relationship, backref +import unittest #@UnresolvedImport +from sqlalchemy.orm import sessionmaker +from iri_tweet.utils import ObjectsBuffer + +Base = declarative_base() + +class User(Base): + __tablename__ = 'users' + + id = Column(Integer, primary_key=True) + name = Column(String) + fullname = Column(String) + password = Column(String) + + def __init__(self, name, fullname, password): + self.name = name + self.fullname = fullname + self.password = password + + def __repr__(self): + return "" % (self.name, self.fullname, self.password) + + +class Address(Base): + __tablename__ = 'addresses' + id = Column(Integer, primary_key=True) + email_address = Column(String, nullable=False) + user_id = Column(Integer, ForeignKey('users.id')) + + user = relationship("User", backref=backref('addresses', order_by=id)) + + def __init__(self, user_id, email_address): + self.email_address = email_address + self.user_id = user_id + + def __repr__(self): + return "" % self.email_address + + + +class TestObjectBuffer(unittest.TestCase): + + def setUp(self): + self.engine = create_engine('sqlite:///:memory:', echo=False) + Base.metadata.create_all(self.engine) + sessionMaker = sessionmaker(bind=self.engine) + self.session = sessionMaker() + + def testCreateUser(self): + ed_user = User('ed', 'Ed Jones', 'edspassword') + self.session.add(ed_user) + self.assertTrue(ed_user.id is None) + self.session.commit() + self.assertTrue(ed_user.id is not None) + + def testSimpleBuffer(self): + obj_buffer = ObjectsBuffer() + obj_proxy = obj_buffer.add_object(User, ['ed1', 'Ed1 Jones', 'edspassword'], None, False) + self.assertTrue(obj_proxy.id() is None) + obj_buffer.persists(self.session) + self.assertTrue(obj_proxy.id() is None) + self.session.commit() + self.assertTrue(obj_proxy.id() is not None) + + def testSimpleBufferFlush(self): + obj_buffer = ObjectsBuffer() + obj_proxy = obj_buffer.add_object(User, ['ed2', 'Ed2 Jones', 'edspassword'], None, True) + self.assertTrue(obj_proxy.id() is None) + obj_buffer.persists(self.session) + self.assertTrue(obj_proxy.id() is not None) + self.session.commit() + self.assertTrue(obj_proxy.id() is not None) + + def testRelationBuffer(self): + obj_buffer = ObjectsBuffer() + user1_proxy = obj_buffer.add_object(User, ['ed3', 'Ed3 Jones', 'edspassword'], None, True) + obj_buffer.add_object(Address, [user1_proxy.id,'ed3@mail.com'], None, False) + obj_buffer.add_object(Address, [user1_proxy.id,'ed3@other.com'], None, False) + user2_proxy = obj_buffer.add_object(User, ['ed4', 'Ed3 Jones', 'edspassword'], None, True) + obj_buffer.add_object(Address, [user2_proxy.id,'ed4@mail.com'], None, False) + obj_buffer.persists(self.session) + self.session.commit() + ed_user = self.session.query(User).filter_by(name='ed3').first() + self.assertEquals(2, len(ed_user.addresses)) + ed_user = self.session.query(User).filter_by(name='ed4').first() + self.assertEquals(1, len(ed_user.addresses)) + + + def tearDown(self): + self.session.close() + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff -r cdd7d3c0549c -r 9213a63fa34a script/lib/iri_tweet/tweet_twitter_user.py --- a/script/lib/iri_tweet/tweet_twitter_user.py Wed Jul 27 00:04:55 2011 +0200 +++ b/script/lib/iri_tweet/tweet_twitter_user.py Mon Aug 08 09:01:40 2011 +0200 @@ -1,13 +1,12 @@ from iri_tweet.models import setup_database, Message, UserMessage, User from iri_tweet.utils import (get_oauth_token, get_user_query, set_logging_options, - set_logging, parse_date) + set_logging, parse_date, logger) from optparse import OptionParser #@UnresolvedImport from sqlalchemy import BigInteger from sqlalchemy.orm import sessionmaker from sqlalchemy.schema import MetaData, Table, Column from sqlalchemy.sql import and_ import datetime -import logging #@UnresolvedImport import sys import time import twitter @@ -54,7 +53,7 @@ set_logging(options) - logging.debug("OPTIONS : " + repr(options)) #@UndefinedVariable + logger.debug("OPTIONS : " + repr(options)) #@UndefinedVariable if not options.message or len(options.message) == 0: sys.exit() @@ -108,7 +107,7 @@ screen_name = user.screen_name message = u"@%s: %s" % (screen_name, base_message) - logging.debug("new status : " + message) #@UndefinedVariable + logger.debug("new status : " + message) #@UndefinedVariable if not options.simulate: t.statuses.update(status=message) user_message = UserMessage(user_id=user.id, message_id=message_obj.id) diff -r cdd7d3c0549c -r 9213a63fa34a script/lib/iri_tweet/utils.py --- a/script/lib/iri_tweet/utils.py Wed Jul 27 00:04:55 2011 +0200 +++ b/script/lib/iri_tweet/utils.py Mon Aug 08 09:01:40 2011 +0200 @@ -77,6 +77,45 @@ return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()]) +class ObjectBufferProxy(object): + def __init__(self, klass, args, kwargs, must_flush): + self.klass= klass + self.args = args + self.kwargs = kwargs + self.must_flush = must_flush + self.instance = None + + def persists(self, session): + new_args = [arg() if callable(arg) else arg for arg in self.args] if self.args is not None else [] + new_kwargs = dict([(k,v()) if callable(v) else (k,v) for k,v in self.kwargs.items()]) if self.kwargs is not None else {} + + self.instance = self.klass(*new_args, **new_kwargs) + session.add(self.instance) + if self.must_flush: + session.flush() + + def __getattr__(self, name): + return lambda : getattr(self.instance, name) if self.instance else None + + + + +class ObjectsBuffer(object): + + def __init__(self): + self.__bufferlist = [] + + def persists(self, session): + for object_proxy in self.__bufferlist: + object_proxy.persists(session) + + def add_object(self, klass, args, kwargs, must_flush): + new_proxy = ObjectBufferProxy(klass, args, kwargs, must_flush) + self.__bufferlist.append(new_proxy) + return new_proxy + + + class TwitterProcessorException(Exception): pass @@ -104,9 +143,10 @@ self.source_id = source_id self.session = session self.token_filename = token_filename + self.obj_buffer = ObjectsBuffer() def __get_user(self, user_dict): - logging.debug("Get user : " + repr(user_dict)) #@UndefinedVariable + logger.debug("Get user : " + repr(user_dict)) #@UndefinedVariable user_id = user_dict.get("id",None) user_name = user_dict.get("screen_name", user_dict.get("name", None)) @@ -133,8 +173,8 @@ else: user_dict = t.users.show(screen_name=user_name) except Exception as e: - logging.info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable - logging.info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable + logger.info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable + logger.info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"]) if "id" not in user_dict: @@ -148,7 +188,7 @@ return user def __process_entity(self, ind, ind_type): - logging.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable + logger.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable ind = clean_keys(ind) @@ -203,7 +243,7 @@ 'urls' : process_urls }[ind_type]() - logging.debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable + logger.debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable if entity: self.session.add(entity) self.session.flush() @@ -220,7 +260,7 @@ # get or create user user = self.__get_user(self.json_dict["user"]) if user is None: - logging.warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable + logger.warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable ts_copy["user"] = None ts_copy["user_id"] = None else: @@ -270,7 +310,7 @@ user = self.__get_user(user_fields) if user is None: - logging.warning("USER not found " + repr(user_fields)) #@UndefinedVariable + logger.warning("USER not found " + repr(user_fields)) #@UndefinedVariable tweet_fields["user"] = None tweet_fields["user_id"] = None else: @@ -287,14 +327,13 @@ for ind in extractor.extract_hashtags_with_indices(): self.__process_entity(ind, "hashtags") - - for ind in extractor.extract_mentioned_screen_names_with_indices(): - self.__process_entity(ind, "user_mentions") - + for ind in extractor.extract_urls_with_indices(): self.__process_entity(ind, "urls") - self.session.flush() + for ind in extractor.extract_mentioned_screen_names_with_indices(): + self.__process_entity(ind, "user_mentions") + def process(self): @@ -305,21 +344,21 @@ self.session.flush() self.source_id = tweet_source.id - try: - if "metadata" in self.json_dict: - self.__process_twitter_rest() - else: - self.__process_twitter_stream() - - tweet_log = TweetLog(tweet_source_id = self.source_id, status = TweetLog.TWEET_STATUS['OK']) - except: - - raise + if "metadata" in self.json_dict: + self.__process_twitter_rest() + else: + self.__process_twitter_stream() + + tweet_log = TweetLog(tweet_source_id = self.source_id, status = TweetLog.TWEET_STATUS['OK']) + self.session.add(tweet_log) -def set_logging(options): +def set_logging(options, plogger=None): - logging_config = {} + logging_config = { + "format" : '%(asctime)s %(levelname)s:%(name)s:%(message)s', + "level" : max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)), #@UndefinedVariable + } if options.logfile == "stdout": logging_config["stream"] = sys.stdout @@ -327,9 +366,27 @@ logging_config["stream"] = sys.stderr else: logging_config["filename"] = options.logfile - - logging_config["level"] = max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)) #@UndefinedVariable - logging.basicConfig(**logging_config) #@UndefinedVariable + + logger = plogger + if logger is None: + logger = logging.getLogger() #@UndefinedVariable + + if len(logger.handlers) == 0: + filename = logging_config.get("filename") + if filename: + mode = logging_config.get("filemode", 'a') + hdlr = logging.FileHandler(filename, mode) #@UndefinedVariable + else: + stream = logging_config.get("stream") + hdlr = logging.StreamHandler(stream) #@UndefinedVariable + fs = logging_config.get("format", logging.BASIC_FORMAT) #@UndefinedVariable + dfs = logging_config.get("datefmt", None) + fmt = logging.Formatter(fs, dfs) #@UndefinedVariable + hdlr.setFormatter(fmt) + logger.addHandler(hdlr) + level = logging_config.get("level") + if level is not None: + logger.setLevel(level) options.debug = (options.verbose-options.quiet > 0) @@ -384,4 +441,4 @@ return query.distinct() - +logger = logging.getLogger() #@UndefinedVariable diff -r cdd7d3c0549c -r 9213a63fa34a script/stream/recorder_tweetstream.py --- a/script/stream/recorder_tweetstream.py Wed Jul 27 00:04:55 2011 +0200 +++ b/script/stream/recorder_tweetstream.py Mon Aug 08 09:01:40 2011 +0200 @@ -1,14 +1,13 @@ from getpass import getpass from iri_tweet import models, utils from iri_tweet.models import TweetSource, TweetLog -from multiprocessing import Queue, JoinableQueue, Process, Event +from multiprocessing import Queue, JoinableQueue, Process, Event, get_logger from optparse import OptionParser from sqlalchemy.orm import sessionmaker -from sqlite3 import * import StringIO +import logging import anyjson import datetime -import logging import os import shutil import signal @@ -18,6 +17,8 @@ import traceback import tweepy.auth import tweetstream +from iri_tweet.utils import logger +from sqlalchemy.exc import OperationalError socket._fileobject.default_bufsize = 0 @@ -82,7 +83,6 @@ class SourceProcess(Process): def __init__(self, session_maker, queue, auth, track, debug, reconnects, token_filename, stop_event): - super(SourceProcess, self).__init__() self.session_maker = session_maker self.queue = queue self.auth = auth @@ -91,47 +91,93 @@ self.reconnects = reconnects self.token_filename = token_filename self.stop_event = stop_event + super(SourceProcess, self).__init__() # self.stop_event = def run(self): + get_logger().debug("SourceProcess : run") track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip() track_list = [k for k in track_list.split(',')] - + + get_logger().debug("SourceProcess : before connecting to stream " + repr(track_list)) stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True) + get_logger().debug("SourceProcess : after connecting to stream") stream.muststop = lambda: self.stop_event.is_set() session = self.session_maker() try: for tweet in stream: + get_logger().debug("tweet " + repr(tweet)) source = TweetSource(original_json=tweet) - session.add(source) - session.flush() + get_logger().debug("source created") + add_retries = 0 + while add_retries < 10: + try: + add_retries += 1 + session.add(source) + session.flush() + break + except OperationalError as e: + session.rollback() + get_logger().debug("Operational Error %s nb %d" % (repr(e), add_retries)) + if add_retries==10: + raise e + source_id = source.id - queue.put((source_id, tweet), False) + get_logger().debug("before queue + source id " + repr(source_id)) + self.queue.put((source_id, tweet), False) #process_tweet(tweet, source_id, self.session, self.debug, self.token_filename) - logging.info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime))) + get_logger().info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime))) session.commit() # if duration >= 0 and datetime.datetime.utcnow() >= end_ts: # print "Stop recording after %d seconds." % (duration) # break - except: + except Exception as e: + get_logger().error("Error when processing tweet " + repr(e)) + finally: session.rollback() - finally: stream.close() session.close() - + self.queue.close() + self.stop_event.set() + + +def process_tweet(tweet, source_id, session, token_filename): + try: + tweet_obj = anyjson.deserialize(tweet) + screen_name = "" + if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']: + screen_name = tweet_obj['user']['screen_name'] + get_logger().info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text'])) + get_logger().debug(u"Process_tweet :" + repr(tweet)) + processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, token_filename) + processor.process() + except Exception as e: + message = u"Error %s processing tweet %s" % (repr(e), tweet) + get_logger().error(message) + output = StringIO.StringIO() + traceback.print_exception(Exception, e, None, None, output) + error_stack = output.getvalue() + output.close() + session.rollback() + tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack) + session.add(tweet_log) + session.commit() + + class TweetProcess(Process): def __init__(self, session_maker, queue, debug, token_filename, stop_event): - super(TweetProcess, self).__init__() self.session_maker = session_maker self.queue = queue self.debug = debug self.stop_event = stop_event self.token_filename = token_filename + super(TweetProcess, self).__init__() + def run(self): @@ -139,74 +185,33 @@ try: while not self.stop_event.is_set(): try: - source_id, tweet_txt = queue.get(True, 30) - except: + source_id, tweet_txt = queue.get(True, 10) + get_logger().debug("Processing source id " + repr(source_id)) + except Exception as e: + get_logger().debug('Process tweet exception in loop : ' + repr(e)) continue - process_tweet(tweet_txt, source_id, session) + process_tweet(tweet_txt, source_id, session, self.token_filename) session.commit() - self.queue.task_done() except: - session.rollback() raise finally: + session.rollback() + self.stop_event.set() session.close() - - def process_tweet(tweet, source_id, session): - - try: - tweet_obj = anyjson.deserialize(tweet) - screen_name = "" - if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']: - screen_name = tweet_obj['user']['screen_name'] - logging.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text'])) - logging.debug(u"Process_tweet :" + repr(tweet)) - processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, self.token_filename) - processor.process() - except Exception, e: - message = u"Error %e processing tweet %s" % (unicode(e), tweet) - logging.error(message) - output = StringIO.StringIO() - traceback.print_exception(Exception, e, None, None, output) - tweet_log = TweetLog(status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=output.getvalue()) - output.close() - - - -#def main_source(username, password, track, session, debug, reconnects, token_filename, duration): - - #username = username or raw_input('Twitter username: ') - #password = password or getpass('Twitter password: ') +def process_leftovers(session, token_filename): + + sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None) + + for src in sources: + tweet_txt = src.original_json + process_tweet(tweet_txt, src.id, session, token_filename) -# track_list = track or raw_input('Keywords to track (comma seperated): ').strip() -# track_list = [k for k in track_list.split(',')] - -# if username and password: -# auth = tweepy.auth.BasicAuthHandler(username, password) -# else: -# consumer_key = models.CONSUMER_KEY -# consumer_secret = models.CONSUMER_SECRET -# auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False) -# auth.set_access_token(*(utils.get_oauth_token(token_filename))) - -# if duration >= 0: -# end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=duration) + -# stream = ReconnectingTweetStream(auth, track_list, reconnects=reconnects, as_text=True) -# try: -# for tweet in stream: -# source = TweetSource(original_json=tweet) -# session.add(source) -# session.flush() -# source_id = source.id -# process_tweet(tweet, source_id, session, debug, token_filename) -# logging.info("Tweet count: %d - current rate : %.2f - running : %s"%(stream.count, stream.rate, int(time.time()-stream.starttime))) -# session.commit() -# if duration >= 0 and datetime.datetime.utcnow() >= end_ts: -# print "Stop recording after %d seconds." % (duration) -# break -# finally: -# stream.close() + #get tweet source that do not match any message + #select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull; + def get_options(): parser = OptionParser() @@ -240,7 +245,7 @@ (options, args) = get_options() - utils.set_logging(options) + utils.set_logging(options, get_logger()) if options.debug: print "OPTIONS : " @@ -249,18 +254,16 @@ if options.new and os.path.exists(options.filename): i = 1 basename, extension = os.path.splitext(options.filename) - new_path = '$s.%d.%s' % (basename, i, extension) + new_path = '%s.%d%s' % (basename, i, extension) while i < 1000000 and os.path.exists(new_path): i += 1 - new_path = '$s.%d.%s' % (basename, i, extension) + new_path = '%s.%d%s' % (basename, i, extension) if i >= 1000000: raise Exception("Unable to find new filename for " + options.filename) else: shutil.move(options.filename, new_path) - engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2)) - Session = sessionmaker(bind=engine) queue = JoinableQueue() stop_event = Event() @@ -272,12 +275,22 @@ auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False) auth.set_access_token(*(utils.get_oauth_token(options.token_filename))) - + + engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2)) + Session = sessionmaker(bind=engine) + + session = Session() + process_leftovers(session, options.token_filename) + session.commit() + session.close() + sprocess = SourceProcess(Session, queue, auth, options.track, options.debug, options.reconnects, options.token_filename, stop_event) tweet_processes = [] for i in range(options.consumer_nb): + engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2)) + Session = sessionmaker(bind=engine) cprocess = TweetProcess(Session, queue, options.debug, options.token_filename, stop_event) tweet_processes.append(cprocess) @@ -302,7 +315,19 @@ else: break + get_logger().debug("Joining Source Process") sprocess.join() - queue.join() - for cprocess in tweet_processes: + get_logger().debug("Joining Queue") + #queue.join() + for i,cprocess in enumerate(tweet_processes): + get_logger().debug("Joining consumer process Nb %d" % (i+1)) cprocess.join() + + get_logger().debug("Processing leftovers") + session = Session() + process_leftovers(session, options.token_filename) + session.commit() + session.close() + + get_logger().debug("Done. Exiting.") + \ No newline at end of file