- debug multithread (still database lock problem)
- add object buffer
- add test
- try to improve logging
--- a/script/lib/iri_tweet/export_tweet_db.py Wed Jul 27 00:04:55 2011 +0200
+++ b/script/lib/iri_tweet/export_tweet_db.py Mon Aug 08 09:01:40 2011 +0200
@@ -1,8 +1,7 @@
from models import setup_database
from optparse import OptionParser #@UnresolvedImport
from sqlalchemy.orm import sessionmaker
-from utils import set_logging_options, set_logging, TwitterProcessor
-import logging
+from utils import set_logging_options, set_logging, TwitterProcessor, logger
import sqlite3 #@UnresolvedImport
@@ -34,11 +33,11 @@
curs_in = conn_in.cursor()
fields_mapping = {}
for i,res in enumerate(curs_in.execute("select json from tweet_tweet;")):
- logging.debug("main loop %d : %s" % (i, res[0])) #@UndefinedVariable
+ logger.debug("main loop %d : %s" % (i, res[0])) #@UndefinedVariable
processor = TwitterProcessor(eval(res[0]), res[0], None, session, options.token_filename)
processor.process()
session.commit()
- logging.debug("main : %d tweet processed" % (i+1)) #@UndefinedVariable
+ logger.debug("main : %d tweet processed" % (i+1)) #@UndefinedVariable
except Exception, e:
session.rollback()
raise e
--- a/script/lib/iri_tweet/export_twitter_alchemy.py Wed Jul 27 00:04:55 2011 +0200
+++ b/script/lib/iri_tweet/export_twitter_alchemy.py Mon Aug 08 09:01:40 2011 +0200
@@ -5,10 +5,9 @@
from optparse import OptionParser #@UnresolvedImport
from sqlalchemy import Table, Column, BigInteger, MetaData
from sqlalchemy.orm import sessionmaker
-from utils import parse_date, set_logging_options, set_logging, get_filter_query
+from utils import parse_date, set_logging_options, set_logging, get_filter_query, logger
from models import setup_database
import datetime
-import logging
import os.path
import re
import sys
@@ -101,7 +100,7 @@
set_logging(options)
- logging.debug("OPTIONS : " + repr(options)) #@UndefinedVariable
+ logger.debug("OPTIONS : " + repr(options)) #@UndefinedVariable
if len(sys.argv) == 1 or options.database is None:
parser.print_help()
@@ -159,7 +158,7 @@
for params in parameters:
- logging.debug("PARAMETERS " + repr(params)) #@UndefinedVariable
+ logger.debug("PARAMETERS " + repr(params)) #@UndefinedVariable
start_date_str = params.get("start_date",None)
end_date_str = params.get("end_date", None)
@@ -192,12 +191,12 @@
if content_file and content_file.find("http") == 0:
- logging.debug("url : " + content_file) #@UndefinedVariable
+ logger.debug("url : " + content_file) #@UndefinedVariable
h = httplib2.Http()
resp, content = h.request(content_file)
- logging.debug("url response " + repr(resp) + " content " + repr(content)) #@UndefinedVariable
+ logger.debug("url response " + repr(resp) + " content " + repr(content)) #@UndefinedVariable
project = anyjson.deserialize(content)
root = etree.fromstring(project["ldt"])
@@ -254,7 +253,7 @@
if ensemble_parent is None:
- logging.error("Can not process file") #@UndefinedVariable
+ logger.error("Can not process file") #@UndefinedVariable
sys.exit()
if options.replace:
@@ -309,18 +308,18 @@
project["ldt"] = output_data
body = anyjson.serialize(project)
- logging.debug("write http " + content_file) #@UndefinedVariable
- logging.debug("write http " + repr(body)) #@UndefinedVariable
+ logger.debug("write http " + content_file) #@UndefinedVariable
+ logger.debug("write http " + repr(body)) #@UndefinedVariable
h = httplib2.Http()
resp, content = h.request(content_file, "PUT", headers={'content-type':'application/json'}, body=body)
- logging.debug("write http " + repr(resp) + " content " + content) #@UndefinedVariable
+ logger.debug("write http " + repr(resp) + " content " + content) #@UndefinedVariable
else:
if content_file and os.path.exists(content_file):
dest_file_name = content_file
else:
dest_file_name = options.filename
- logging.debug("WRITE : " + dest_file_name) #@UndefinedVariable
+ logger.debug("WRITE : " + dest_file_name) #@UndefinedVariable
output = open(dest_file_name, "w")
output.write(output_data)
output.flush()
--- a/script/lib/iri_tweet/models.py Wed Jul 27 00:04:55 2011 +0200
+++ b/script/lib/iri_tweet/models.py Mon Aug 08 09:01:40 2011 +0200
@@ -92,15 +92,12 @@
text = Column(String)
truncated = Column(Boolean)
user_id = Column(Integer, ForeignKey('tweet_user.id'))
- user = relationship("TweetUser", backref="tweets")
-# original_json = Column(String)
+ user = relationship("User", backref="tweets")
tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id'))
tweet_source = relationship("TweetSource", backref="tweet")
entity_list = relationship(Entity, backref='tweet')
received_at = Column(DateTime, default=datetime.datetime.now())
-
- #user = relationship(User, primaryjoin=user_id == User.id)
-
+
def __init__(self, **kwargs):
for key, value in kwargs.items():
if hasattr(self,key):
@@ -111,7 +108,7 @@
id = Column(Integer, primary_key = True)
user_id = Column(Integer, ForeignKey('tweet_user.id'))
- user = relationship("TweetUser", backref="messages")
+ user = relationship("User", backref="messages")
created_at = Column(DateTime, default=datetime.datetime.now())
message_id = Column(Integer, ForeignKey('tweet_message.id'))
@@ -161,8 +158,8 @@
url= Column(String)
utc_offset = Column(Integer)
verified= Column(Boolean)
- tweets = relationship(Tweet, backref='user')
- messages = relationship(UserMessage, backref='user')
+ #tweets = relationship(Tweet, backref='user')
+ #messages = relationship(UserMessage, backref='user')
def __init__(self, **kwargs):
for key, value in kwargs.items():
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/script/lib/iri_tweet/tests.py Mon Aug 08 09:01:40 2011 +0200
@@ -0,0 +1,97 @@
+from sqlalchemy import Column, Integer, String, ForeignKey, create_engine
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy.orm import relationship, backref
+import unittest #@UnresolvedImport
+from sqlalchemy.orm import sessionmaker
+from iri_tweet.utils import ObjectsBuffer
+
+Base = declarative_base()
+
+class User(Base):
+ __tablename__ = 'users'
+
+ id = Column(Integer, primary_key=True)
+ name = Column(String)
+ fullname = Column(String)
+ password = Column(String)
+
+ def __init__(self, name, fullname, password):
+ self.name = name
+ self.fullname = fullname
+ self.password = password
+
+ def __repr__(self):
+ return "<User('%s','%s', '%s')>" % (self.name, self.fullname, self.password)
+
+
+class Address(Base):
+ __tablename__ = 'addresses'
+ id = Column(Integer, primary_key=True)
+ email_address = Column(String, nullable=False)
+ user_id = Column(Integer, ForeignKey('users.id'))
+
+ user = relationship("User", backref=backref('addresses', order_by=id))
+
+ def __init__(self, user_id, email_address):
+ self.email_address = email_address
+ self.user_id = user_id
+
+ def __repr__(self):
+ return "<Address('%s')>" % self.email_address
+
+
+
+class TestObjectBuffer(unittest.TestCase):
+
+ def setUp(self):
+ self.engine = create_engine('sqlite:///:memory:', echo=False)
+ Base.metadata.create_all(self.engine)
+ sessionMaker = sessionmaker(bind=self.engine)
+ self.session = sessionMaker()
+
+ def testCreateUser(self):
+ ed_user = User('ed', 'Ed Jones', 'edspassword')
+ self.session.add(ed_user)
+ self.assertTrue(ed_user.id is None)
+ self.session.commit()
+ self.assertTrue(ed_user.id is not None)
+
+ def testSimpleBuffer(self):
+ obj_buffer = ObjectsBuffer()
+ obj_proxy = obj_buffer.add_object(User, ['ed1', 'Ed1 Jones', 'edspassword'], None, False)
+ self.assertTrue(obj_proxy.id() is None)
+ obj_buffer.persists(self.session)
+ self.assertTrue(obj_proxy.id() is None)
+ self.session.commit()
+ self.assertTrue(obj_proxy.id() is not None)
+
+ def testSimpleBufferFlush(self):
+ obj_buffer = ObjectsBuffer()
+ obj_proxy = obj_buffer.add_object(User, ['ed2', 'Ed2 Jones', 'edspassword'], None, True)
+ self.assertTrue(obj_proxy.id() is None)
+ obj_buffer.persists(self.session)
+ self.assertTrue(obj_proxy.id() is not None)
+ self.session.commit()
+ self.assertTrue(obj_proxy.id() is not None)
+
+ def testRelationBuffer(self):
+ obj_buffer = ObjectsBuffer()
+ user1_proxy = obj_buffer.add_object(User, ['ed3', 'Ed3 Jones', 'edspassword'], None, True)
+ obj_buffer.add_object(Address, [user1_proxy.id,'ed3@mail.com'], None, False)
+ obj_buffer.add_object(Address, [user1_proxy.id,'ed3@other.com'], None, False)
+ user2_proxy = obj_buffer.add_object(User, ['ed4', 'Ed3 Jones', 'edspassword'], None, True)
+ obj_buffer.add_object(Address, [user2_proxy.id,'ed4@mail.com'], None, False)
+ obj_buffer.persists(self.session)
+ self.session.commit()
+ ed_user = self.session.query(User).filter_by(name='ed3').first()
+ self.assertEquals(2, len(ed_user.addresses))
+ ed_user = self.session.query(User).filter_by(name='ed4').first()
+ self.assertEquals(1, len(ed_user.addresses))
+
+
+ def tearDown(self):
+ self.session.close()
+
+
+if __name__ == '__main__':
+ unittest.main()
\ No newline at end of file
--- a/script/lib/iri_tweet/tweet_twitter_user.py Wed Jul 27 00:04:55 2011 +0200
+++ b/script/lib/iri_tweet/tweet_twitter_user.py Mon Aug 08 09:01:40 2011 +0200
@@ -1,13 +1,12 @@
from iri_tweet.models import setup_database, Message, UserMessage, User
from iri_tweet.utils import (get_oauth_token, get_user_query, set_logging_options,
- set_logging, parse_date)
+ set_logging, parse_date, logger)
from optparse import OptionParser #@UnresolvedImport
from sqlalchemy import BigInteger
from sqlalchemy.orm import sessionmaker
from sqlalchemy.schema import MetaData, Table, Column
from sqlalchemy.sql import and_
import datetime
-import logging #@UnresolvedImport
import sys
import time
import twitter
@@ -54,7 +53,7 @@
set_logging(options)
- logging.debug("OPTIONS : " + repr(options)) #@UndefinedVariable
+ logger.debug("OPTIONS : " + repr(options)) #@UndefinedVariable
if not options.message or len(options.message) == 0:
sys.exit()
@@ -108,7 +107,7 @@
screen_name = user.screen_name
message = u"@%s: %s" % (screen_name, base_message)
- logging.debug("new status : " + message) #@UndefinedVariable
+ logger.debug("new status : " + message) #@UndefinedVariable
if not options.simulate:
t.statuses.update(status=message)
user_message = UserMessage(user_id=user.id, message_id=message_obj.id)
--- a/script/lib/iri_tweet/utils.py Wed Jul 27 00:04:55 2011 +0200
+++ b/script/lib/iri_tweet/utils.py Mon Aug 08 09:01:40 2011 +0200
@@ -77,6 +77,45 @@
return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()])
+class ObjectBufferProxy(object):
+ def __init__(self, klass, args, kwargs, must_flush):
+ self.klass= klass
+ self.args = args
+ self.kwargs = kwargs
+ self.must_flush = must_flush
+ self.instance = None
+
+ def persists(self, session):
+ new_args = [arg() if callable(arg) else arg for arg in self.args] if self.args is not None else []
+ new_kwargs = dict([(k,v()) if callable(v) else (k,v) for k,v in self.kwargs.items()]) if self.kwargs is not None else {}
+
+ self.instance = self.klass(*new_args, **new_kwargs)
+ session.add(self.instance)
+ if self.must_flush:
+ session.flush()
+
+ def __getattr__(self, name):
+ return lambda : getattr(self.instance, name) if self.instance else None
+
+
+
+
+class ObjectsBuffer(object):
+
+ def __init__(self):
+ self.__bufferlist = []
+
+ def persists(self, session):
+ for object_proxy in self.__bufferlist:
+ object_proxy.persists(session)
+
+ def add_object(self, klass, args, kwargs, must_flush):
+ new_proxy = ObjectBufferProxy(klass, args, kwargs, must_flush)
+ self.__bufferlist.append(new_proxy)
+ return new_proxy
+
+
+
class TwitterProcessorException(Exception):
pass
@@ -104,9 +143,10 @@
self.source_id = source_id
self.session = session
self.token_filename = token_filename
+ self.obj_buffer = ObjectsBuffer()
def __get_user(self, user_dict):
- logging.debug("Get user : " + repr(user_dict)) #@UndefinedVariable
+ logger.debug("Get user : " + repr(user_dict)) #@UndefinedVariable
user_id = user_dict.get("id",None)
user_name = user_dict.get("screen_name", user_dict.get("name", None))
@@ -133,8 +173,8 @@
else:
user_dict = t.users.show(screen_name=user_name)
except Exception as e:
- logging.info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable
- logging.info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable
+ logger.info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable
+ logger.info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable
user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"])
if "id" not in user_dict:
@@ -148,7 +188,7 @@
return user
def __process_entity(self, ind, ind_type):
- logging.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable
+ logger.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable
ind = clean_keys(ind)
@@ -203,7 +243,7 @@
'urls' : process_urls
}[ind_type]()
- logging.debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable
+ logger.debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable
if entity:
self.session.add(entity)
self.session.flush()
@@ -220,7 +260,7 @@
# get or create user
user = self.__get_user(self.json_dict["user"])
if user is None:
- logging.warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable
+ logger.warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable
ts_copy["user"] = None
ts_copy["user_id"] = None
else:
@@ -270,7 +310,7 @@
user = self.__get_user(user_fields)
if user is None:
- logging.warning("USER not found " + repr(user_fields)) #@UndefinedVariable
+ logger.warning("USER not found " + repr(user_fields)) #@UndefinedVariable
tweet_fields["user"] = None
tweet_fields["user_id"] = None
else:
@@ -287,14 +327,13 @@
for ind in extractor.extract_hashtags_with_indices():
self.__process_entity(ind, "hashtags")
-
- for ind in extractor.extract_mentioned_screen_names_with_indices():
- self.__process_entity(ind, "user_mentions")
-
+
for ind in extractor.extract_urls_with_indices():
self.__process_entity(ind, "urls")
- self.session.flush()
+ for ind in extractor.extract_mentioned_screen_names_with_indices():
+ self.__process_entity(ind, "user_mentions")
+
def process(self):
@@ -305,21 +344,21 @@
self.session.flush()
self.source_id = tweet_source.id
- try:
- if "metadata" in self.json_dict:
- self.__process_twitter_rest()
- else:
- self.__process_twitter_stream()
-
- tweet_log = TweetLog(tweet_source_id = self.source_id, status = TweetLog.TWEET_STATUS['OK'])
- except:
-
- raise
+ if "metadata" in self.json_dict:
+ self.__process_twitter_rest()
+ else:
+ self.__process_twitter_stream()
+
+ tweet_log = TweetLog(tweet_source_id = self.source_id, status = TweetLog.TWEET_STATUS['OK'])
+ self.session.add(tweet_log)
-def set_logging(options):
+def set_logging(options, plogger=None):
- logging_config = {}
+ logging_config = {
+ "format" : '%(asctime)s %(levelname)s:%(name)s:%(message)s',
+ "level" : max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)), #@UndefinedVariable
+ }
if options.logfile == "stdout":
logging_config["stream"] = sys.stdout
@@ -327,9 +366,27 @@
logging_config["stream"] = sys.stderr
else:
logging_config["filename"] = options.logfile
-
- logging_config["level"] = max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)) #@UndefinedVariable
- logging.basicConfig(**logging_config) #@UndefinedVariable
+
+ logger = plogger
+ if logger is None:
+ logger = logging.getLogger() #@UndefinedVariable
+
+ if len(logger.handlers) == 0:
+ filename = logging_config.get("filename")
+ if filename:
+ mode = logging_config.get("filemode", 'a')
+ hdlr = logging.FileHandler(filename, mode) #@UndefinedVariable
+ else:
+ stream = logging_config.get("stream")
+ hdlr = logging.StreamHandler(stream) #@UndefinedVariable
+ fs = logging_config.get("format", logging.BASIC_FORMAT) #@UndefinedVariable
+ dfs = logging_config.get("datefmt", None)
+ fmt = logging.Formatter(fs, dfs) #@UndefinedVariable
+ hdlr.setFormatter(fmt)
+ logger.addHandler(hdlr)
+ level = logging_config.get("level")
+ if level is not None:
+ logger.setLevel(level)
options.debug = (options.verbose-options.quiet > 0)
@@ -384,4 +441,4 @@
return query.distinct()
-
+logger = logging.getLogger() #@UndefinedVariable
--- a/script/stream/recorder_tweetstream.py Wed Jul 27 00:04:55 2011 +0200
+++ b/script/stream/recorder_tweetstream.py Mon Aug 08 09:01:40 2011 +0200
@@ -1,14 +1,13 @@
from getpass import getpass
from iri_tweet import models, utils
from iri_tweet.models import TweetSource, TweetLog
-from multiprocessing import Queue, JoinableQueue, Process, Event
+from multiprocessing import Queue, JoinableQueue, Process, Event, get_logger
from optparse import OptionParser
from sqlalchemy.orm import sessionmaker
-from sqlite3 import *
import StringIO
+import logging
import anyjson
import datetime
-import logging
import os
import shutil
import signal
@@ -18,6 +17,8 @@
import traceback
import tweepy.auth
import tweetstream
+from iri_tweet.utils import logger
+from sqlalchemy.exc import OperationalError
socket._fileobject.default_bufsize = 0
@@ -82,7 +83,6 @@
class SourceProcess(Process):
def __init__(self, session_maker, queue, auth, track, debug, reconnects, token_filename, stop_event):
- super(SourceProcess, self).__init__()
self.session_maker = session_maker
self.queue = queue
self.auth = auth
@@ -91,47 +91,93 @@
self.reconnects = reconnects
self.token_filename = token_filename
self.stop_event = stop_event
+ super(SourceProcess, self).__init__()
# self.stop_event =
def run(self):
+ get_logger().debug("SourceProcess : run")
track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
track_list = [k for k in track_list.split(',')]
-
+
+ get_logger().debug("SourceProcess : before connecting to stream " + repr(track_list))
stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True)
+ get_logger().debug("SourceProcess : after connecting to stream")
stream.muststop = lambda: self.stop_event.is_set()
session = self.session_maker()
try:
for tweet in stream:
+ get_logger().debug("tweet " + repr(tweet))
source = TweetSource(original_json=tweet)
- session.add(source)
- session.flush()
+ get_logger().debug("source created")
+ add_retries = 0
+ while add_retries < 10:
+ try:
+ add_retries += 1
+ session.add(source)
+ session.flush()
+ break
+ except OperationalError as e:
+ session.rollback()
+ get_logger().debug("Operational Error %s nb %d" % (repr(e), add_retries))
+ if add_retries==10:
+ raise e
+
source_id = source.id
- queue.put((source_id, tweet), False)
+ get_logger().debug("before queue + source id " + repr(source_id))
+ self.queue.put((source_id, tweet), False)
#process_tweet(tweet, source_id, self.session, self.debug, self.token_filename)
- logging.info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
+ get_logger().info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
session.commit()
# if duration >= 0 and datetime.datetime.utcnow() >= end_ts:
# print "Stop recording after %d seconds." % (duration)
# break
- except:
+ except Exception as e:
+ get_logger().error("Error when processing tweet " + repr(e))
+ finally:
session.rollback()
- finally:
stream.close()
session.close()
-
+ self.queue.close()
+ self.stop_event.set()
+
+
+def process_tweet(tweet, source_id, session, token_filename):
+ try:
+ tweet_obj = anyjson.deserialize(tweet)
+ screen_name = ""
+ if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
+ screen_name = tweet_obj['user']['screen_name']
+ get_logger().info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
+ get_logger().debug(u"Process_tweet :" + repr(tweet))
+ processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, token_filename)
+ processor.process()
+ except Exception as e:
+ message = u"Error %s processing tweet %s" % (repr(e), tweet)
+ get_logger().error(message)
+ output = StringIO.StringIO()
+ traceback.print_exception(Exception, e, None, None, output)
+ error_stack = output.getvalue()
+ output.close()
+ session.rollback()
+ tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack)
+ session.add(tweet_log)
+ session.commit()
+
+
class TweetProcess(Process):
def __init__(self, session_maker, queue, debug, token_filename, stop_event):
- super(TweetProcess, self).__init__()
self.session_maker = session_maker
self.queue = queue
self.debug = debug
self.stop_event = stop_event
self.token_filename = token_filename
+ super(TweetProcess, self).__init__()
+
def run(self):
@@ -139,74 +185,33 @@
try:
while not self.stop_event.is_set():
try:
- source_id, tweet_txt = queue.get(True, 30)
- except:
+ source_id, tweet_txt = queue.get(True, 10)
+ get_logger().debug("Processing source id " + repr(source_id))
+ except Exception as e:
+ get_logger().debug('Process tweet exception in loop : ' + repr(e))
continue
- process_tweet(tweet_txt, source_id, session)
+ process_tweet(tweet_txt, source_id, session, self.token_filename)
session.commit()
- self.queue.task_done()
except:
- session.rollback()
raise
finally:
+ session.rollback()
+ self.stop_event.set()
session.close()
-
- def process_tweet(tweet, source_id, session):
-
- try:
- tweet_obj = anyjson.deserialize(tweet)
- screen_name = ""
- if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
- screen_name = tweet_obj['user']['screen_name']
- logging.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
- logging.debug(u"Process_tweet :" + repr(tweet))
- processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, self.token_filename)
- processor.process()
- except Exception, e:
- message = u"Error %e processing tweet %s" % (unicode(e), tweet)
- logging.error(message)
- output = StringIO.StringIO()
- traceback.print_exception(Exception, e, None, None, output)
- tweet_log = TweetLog(status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=output.getvalue())
- output.close()
-
-
-
-#def main_source(username, password, track, session, debug, reconnects, token_filename, duration):
-
- #username = username or raw_input('Twitter username: ')
- #password = password or getpass('Twitter password: ')
+def process_leftovers(session, token_filename):
+
+ sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
+
+ for src in sources:
+ tweet_txt = src.original_json
+ process_tweet(tweet_txt, src.id, session, token_filename)
-# track_list = track or raw_input('Keywords to track (comma seperated): ').strip()
-# track_list = [k for k in track_list.split(',')]
-
-# if username and password:
-# auth = tweepy.auth.BasicAuthHandler(username, password)
-# else:
-# consumer_key = models.CONSUMER_KEY
-# consumer_secret = models.CONSUMER_SECRET
-# auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
-# auth.set_access_token(*(utils.get_oauth_token(token_filename)))
-
-# if duration >= 0:
-# end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=duration)
+
-# stream = ReconnectingTweetStream(auth, track_list, reconnects=reconnects, as_text=True)
-# try:
-# for tweet in stream:
-# source = TweetSource(original_json=tweet)
-# session.add(source)
-# session.flush()
-# source_id = source.id
-# process_tweet(tweet, source_id, session, debug, token_filename)
-# logging.info("Tweet count: %d - current rate : %.2f - running : %s"%(stream.count, stream.rate, int(time.time()-stream.starttime)))
-# session.commit()
-# if duration >= 0 and datetime.datetime.utcnow() >= end_ts:
-# print "Stop recording after %d seconds." % (duration)
-# break
-# finally:
-# stream.close()
+ #get tweet source that do not match any message
+ #select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull;
+
def get_options():
parser = OptionParser()
@@ -240,7 +245,7 @@
(options, args) = get_options()
- utils.set_logging(options)
+ utils.set_logging(options, get_logger())
if options.debug:
print "OPTIONS : "
@@ -249,18 +254,16 @@
if options.new and os.path.exists(options.filename):
i = 1
basename, extension = os.path.splitext(options.filename)
- new_path = '$s.%d.%s' % (basename, i, extension)
+ new_path = '%s.%d%s' % (basename, i, extension)
while i < 1000000 and os.path.exists(new_path):
i += 1
- new_path = '$s.%d.%s' % (basename, i, extension)
+ new_path = '%s.%d%s' % (basename, i, extension)
if i >= 1000000:
raise Exception("Unable to find new filename for " + options.filename)
else:
shutil.move(options.filename, new_path)
- engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2))
- Session = sessionmaker(bind=engine)
queue = JoinableQueue()
stop_event = Event()
@@ -272,12 +275,22 @@
auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
auth.set_access_token(*(utils.get_oauth_token(options.token_filename)))
-
+
+ engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2))
+ Session = sessionmaker(bind=engine)
+
+ session = Session()
+ process_leftovers(session, options.token_filename)
+ session.commit()
+ session.close()
+
sprocess = SourceProcess(Session, queue, auth, options.track, options.debug, options.reconnects, options.token_filename, stop_event)
tweet_processes = []
for i in range(options.consumer_nb):
+ engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2))
+ Session = sessionmaker(bind=engine)
cprocess = TweetProcess(Session, queue, options.debug, options.token_filename, stop_event)
tweet_processes.append(cprocess)
@@ -302,7 +315,19 @@
else:
break
+ get_logger().debug("Joining Source Process")
sprocess.join()
- queue.join()
- for cprocess in tweet_processes:
+ get_logger().debug("Joining Queue")
+ #queue.join()
+ for i,cprocess in enumerate(tweet_processes):
+ get_logger().debug("Joining consumer process Nb %d" % (i+1))
cprocess.join()
+
+ get_logger().debug("Processing leftovers")
+ session = Session()
+ process_leftovers(session, options.token_filename)
+ session.commit()
+ session.close()
+
+ get_logger().debug("Done. Exiting.")
+
\ No newline at end of file