diff -r 4c870c767d3e -r a5eff8f2b81d script/stream/recorder_tweetstream.py --- a/script/stream/recorder_tweetstream.py Tue Sep 20 16:55:16 2011 +0200 +++ b/script/stream/recorder_tweetstream.py Thu Sep 22 12:37:53 2011 +0200 @@ -1,11 +1,11 @@ from getpass import getpass from iri_tweet import models, utils from iri_tweet.models import TweetSource, TweetLog, ProcessEvent -from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event, +from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event, get_logger) from optparse import OptionParser from sqlalchemy.exc import OperationalError -from sqlalchemy.orm import scoped_session, sessionmaker +from sqlalchemy.orm import scoped_session import Queue import StringIO import anyjson @@ -25,7 +25,6 @@ import tweepy.auth import tweetstream import urllib2 -#from iri_tweet.utils import get_logger socket._fileobject.default_bufsize = 0 @@ -160,7 +159,7 @@ add_process_event("start_worker", self.__get_process_event_args(), self.session_maker) self.do_run() finally: - add_process_event("stop_worker",self.__get_process_event_args(), self.session_maker) + add_process_event("stop_worker", self.__get_process_event_args(), self.session_maker) def do_run(self): raise NotImplementedError() @@ -287,8 +286,8 @@ def get_sessionmaker(conn_str): - engine, metadata = models.setup_database(conn_str, echo=False, create_all=False) - Session = scoped_session(sessionmaker(bind=engine, autocommit=False)) + engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False) + Session = scoped_session(Session) return Session, engine, metadata @@ -424,7 +423,7 @@ for cprocess in tweet_processes: cprocess.start() - add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name,sprocess.pid), 'consumers':dict([(p.name,p.pid) for p in tweet_processes])}, session_maker) + add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name, sprocess.pid), 'consumers':dict([(p.name, p.pid) for p in tweet_processes])}, session_maker) if options.duration >= 0: end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration) @@ -515,6 +514,12 @@ sys.exit(message) metadata.create_all(engine) + session = Session() + try: + models.add_model_version(session) + finally: + session.close() + stop_args = {} try: add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session)