--- a/script/stream/recorder_tweetstream.py Tue Sep 20 16:55:16 2011 +0200
+++ b/script/stream/recorder_tweetstream.py Thu Sep 22 12:37:53 2011 +0200
@@ -1,11 +1,11 @@
from getpass import getpass
from iri_tweet import models, utils
from iri_tweet.models import TweetSource, TweetLog, ProcessEvent
-from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event,
+from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event,
get_logger)
from optparse import OptionParser
from sqlalchemy.exc import OperationalError
-from sqlalchemy.orm import scoped_session, sessionmaker
+from sqlalchemy.orm import scoped_session
import Queue
import StringIO
import anyjson
@@ -25,7 +25,6 @@
import tweepy.auth
import tweetstream
import urllib2
-#from iri_tweet.utils import get_logger
socket._fileobject.default_bufsize = 0
@@ -160,7 +159,7 @@
add_process_event("start_worker", self.__get_process_event_args(), self.session_maker)
self.do_run()
finally:
- add_process_event("stop_worker",self.__get_process_event_args(), self.session_maker)
+ add_process_event("stop_worker", self.__get_process_event_args(), self.session_maker)
def do_run(self):
raise NotImplementedError()
@@ -287,8 +286,8 @@
def get_sessionmaker(conn_str):
- engine, metadata = models.setup_database(conn_str, echo=False, create_all=False)
- Session = scoped_session(sessionmaker(bind=engine, autocommit=False))
+ engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False)
+ Session = scoped_session(Session)
return Session, engine, metadata
@@ -424,7 +423,7 @@
for cprocess in tweet_processes:
cprocess.start()
- add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name,sprocess.pid), 'consumers':dict([(p.name,p.pid) for p in tweet_processes])}, session_maker)
+ add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name, sprocess.pid), 'consumers':dict([(p.name, p.pid) for p in tweet_processes])}, session_maker)
if options.duration >= 0:
end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)
@@ -515,6 +514,12 @@
sys.exit(message)
metadata.create_all(engine)
+ session = Session()
+ try:
+ models.add_model_version(session)
+ finally:
+ session.close()
+
stop_args = {}
try:
add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session)