script/stream/recorder_tweetstream.py
changeset 289 a5eff8f2b81d
parent 272 fe2efe3600ea
child 290 2ddd11ec2da2
--- a/script/stream/recorder_tweetstream.py	Tue Sep 20 16:55:16 2011 +0200
+++ b/script/stream/recorder_tweetstream.py	Thu Sep 22 12:37:53 2011 +0200
@@ -1,11 +1,11 @@
 from getpass import getpass
 from iri_tweet import models, utils
 from iri_tweet.models import TweetSource, TweetLog, ProcessEvent
-from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event, 
+from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event,
     get_logger)
 from optparse import OptionParser
 from sqlalchemy.exc import OperationalError
-from sqlalchemy.orm import scoped_session, sessionmaker
+from sqlalchemy.orm import scoped_session
 import Queue
 import StringIO
 import anyjson
@@ -25,7 +25,6 @@
 import tweepy.auth
 import tweetstream
 import urllib2
-#from iri_tweet.utils import get_logger
 socket._fileobject.default_bufsize = 0
 
 
@@ -160,7 +159,7 @@
             add_process_event("start_worker", self.__get_process_event_args(), self.session_maker)
             self.do_run()
         finally:
-            add_process_event("stop_worker",self.__get_process_event_args(), self.session_maker)
+            add_process_event("stop_worker", self.__get_process_event_args(), self.session_maker)
         
     def do_run(self):
         raise NotImplementedError()
@@ -287,8 +286,8 @@
 
 
 def get_sessionmaker(conn_str):
-    engine, metadata = models.setup_database(conn_str, echo=False, create_all=False)
-    Session = scoped_session(sessionmaker(bind=engine, autocommit=False))
+    engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False)
+    Session = scoped_session(Session)
     return Session, engine, metadata
 
             
@@ -424,7 +423,7 @@
     for cprocess in tweet_processes:
         cprocess.start()
 
-    add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name,sprocess.pid), 'consumers':dict([(p.name,p.pid) for p in tweet_processes])}, session_maker)
+    add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name, sprocess.pid), 'consumers':dict([(p.name, p.pid) for p in tweet_processes])}, session_maker)
 
     if options.duration >= 0:
         end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)    
@@ -515,6 +514,12 @@
             sys.exit(message)
     
     metadata.create_all(engine)
+    session = Session()
+    try:
+        models.add_model_version(session)
+    finally:
+        session.close()
+    
     stop_args = {}
     try:
         add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session)