improve session maker creation + models version + add model version in db
authorYves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
Thu, 22 Sep 2011 12:37:53 +0200
changeset 289 a5eff8f2b81d
parent 288 4c870c767d3e
child 290 2ddd11ec2da2
improve session maker creation + models version + add model version in db
script/lib/iri_tweet/__init__.py
script/lib/iri_tweet/export_tweet_db.py
script/lib/iri_tweet/export_twitter_alchemy.py
script/lib/iri_tweet/models.py
script/lib/iri_tweet/tests.py
script/lib/iri_tweet/tweet_twitter_user.py
script/lib/iri_tweet/utils.py
script/rest/search_twitter.py
script/stream/recorder_tweetstream.py
--- a/script/lib/iri_tweet/__init__.py	Tue Sep 20 16:55:16 2011 +0200
+++ b/script/lib/iri_tweet/__init__.py	Thu Sep 22 12:37:53 2011 +0200
@@ -0,0 +1,17 @@
+VERSION = (0, 82, 0, "final", 0)
+
+VERSION_STR = unicode(".".join(map(lambda i:"%02d" % (i,), VERSION[:2])))
+
+
+def get_version():
+    version = '%s.%s' % (VERSION[0], VERSION[1])
+    if VERSION[2]:
+        version = '%s.%s' % (version, VERSION[2])
+    if VERSION[3:] == ('alpha', 0):
+        version = '%s pre-alpha' % version
+    else:
+        if VERSION[3] != 'final':
+            version = '%s %s %s' % (version, VERSION[3], VERSION[4])
+    return version
+
+__version__ = get_version()
--- a/script/lib/iri_tweet/export_tweet_db.py	Tue Sep 20 16:55:16 2011 +0200
+++ b/script/lib/iri_tweet/export_tweet_db.py	Thu Sep 22 12:37:53 2011 +0200
@@ -26,8 +26,7 @@
     set_logging(options)
     
     with sqlite3.connect(args[0]) as conn_in:
-        engine, metadata = setup_database('sqlite:///'+args[1], echo=((options.verbose-options.quiet)>0))
-        Session = sessionmaker(bind=engine)
+        engine, metadata, Session = setup_database('sqlite:///'+args[1], echo=((options.verbose-options.quiet)>0))
         session = Session()
         try:
             curs_in = conn_in.cursor()
--- a/script/lib/iri_tweet/export_twitter_alchemy.py	Tue Sep 20 16:55:16 2011 +0200
+++ b/script/lib/iri_tweet/export_twitter_alchemy.py	Thu Sep 22 12:37:53 2011 +0200
@@ -2,19 +2,19 @@
 # coding=utf-8
 
 from lxml import etree
+from models import setup_database
 from optparse import OptionParser #@UnresolvedImport
-from sqlalchemy import Table, Column, BigInteger, MetaData
-from sqlalchemy.orm import sessionmaker
-from utils import parse_date, set_logging_options, set_logging, get_filter_query, get_logger
-from models import setup_database
+from sqlalchemy import Table, Column, BigInteger
+from utils import (parse_date, set_logging_options, set_logging, get_filter_query, 
+    get_logger)
+import anyjson
 import datetime
+import httplib2
 import os.path
 import re
 import sys
 import time
 import uuid #@UnresolvedImport
-import httplib2
-import anyjson
 
 #class TweetExclude(object):
 #    def __init__(self, id):
@@ -57,9 +57,9 @@
     parser.add_option("-d", "--database", dest="database",
                       help="Input database", metavar="DATABASE")
     parser.add_option("-s", "--start-date", dest="start_date",
-                      help="start date", metavar="START_DATE")
+                      help="start date", metavar="START_DATE", default=None)
     parser.add_option("-e", "--end-date", dest="end_date",
-                      help="end date", metavar="END_DATE")
+                      help="end date", metavar="END_DATE", default=None)
     parser.add_option("-I", "--content-file", dest="content_file",
                       help="Content file", metavar="CONTENT_FILE")
     parser.add_option("-c", "--content", dest="content",
@@ -110,18 +110,16 @@
     if not re.match("^\w+://.+", conn_str):
         conn_str = 'sqlite:///' + conn_str
 
-    engine, metadata = setup_database(conn_str, echo=((options.verbose-options.quiet)>0), create_all = False)        
-    
-    Session = sessionmaker()
-    conn = engine.connect()
+    engine, metadata, Session = setup_database(conn_str, echo=((options.verbose-options.quiet)>0), create_all = False)        
+    conn = None
     try :
-        session = Session(bind=conn)
-        try : 
-        
-            metadata = MetaData(bind=conn)
+        conn = engine.connect()    
+        session = None
+        try :
+            session = Session(bind=conn)         
             tweet_exclude_table = Table("tweet_exclude", metadata, Column('id', BigInteger, primary_key=True), prefixes=['TEMPORARY'])
             #mapper(TweetExclude, tweet_exclude_table)
-            metadata.create_all()
+            metadata.create_all(bind=conn, tables=[tweet_exclude_table])
             
             if options.exclude and os.path.exists(options.exclude):
                 with open(options.exclude, 'r+') as f:
@@ -174,14 +172,16 @@
                     with open(user_whitelist_file, 'r+') as f:
                         user_whitelist = list(set([s.strip() for s in f]))
                 
-                start_date = parse_date(start_date_str) 
-                ts = time.mktime(start_date.timetuple())
+                start_date = None
+                ts = None
+                if start_date_str:
+                    start_date = parse_date(start_date_str) 
+                    ts = time.mktime(start_date.timetuple())
             
+                end_date = None
                 if end_date_str:
                     end_date = parse_date(end_date_str)
-                    te = time.mktime(end_date.timetuple())
-                else:
-                    te = ts + duration
+                elif start_date and duration:
                     end_date = start_date + datetime.timedelta(seconds=duration)
                 
                 query = get_filter_query(session, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist)
@@ -276,6 +276,8 @@
                 for tw in query_res:
                     tweet_ts_dt = tw.created_at
                     tweet_ts = int(time.mktime(tweet_ts_dt.timetuple()))
+                    if ts is None:
+                        ts = tweet_ts
                     tweet_ts_rel = (tweet_ts-ts) * 1000
                     username = None
                     profile_url = ""
@@ -330,6 +332,8 @@
                     output.close()
                 
         finally:
-            session.close()
+            if session:
+                session.close()
     finally:
-        conn.close()
+        if conn:
+            conn.close()
--- a/script/lib/iri_tweet/models.py	Tue Sep 20 16:55:16 2011 +0200
+++ b/script/lib/iri_tweet/models.py	Thu Sep 22 12:37:53 2011 +0200
@@ -1,10 +1,11 @@
 from sqlalchemy import (Boolean, Column, Enum, BigInteger, Integer, String, 
     ForeignKey, DateTime, create_engine)
 from sqlalchemy.ext.declarative import declarative_base
-from sqlalchemy.orm import relationship
+from sqlalchemy.orm import relationship, sessionmaker
 import anyjson
 import datetime
 import email.utils
+import iri_tweet
 
 
 Base = declarative_base()
@@ -44,7 +45,7 @@
     __tablename__ = "tweet_process_event"
     id = Column(Integer, primary_key=True, autoincrement=True)
     ts = Column(DateTime, default=datetime.datetime.utcnow, index=True)
-    type = Column(Enum("start","pid","shutdown","error", "start_worker", "stop_worker", name="process_event_type_enum"), nullable=False)
+    type = Column(Enum("start","pid","shutdown","error", "start_worker", "stop_worker", "model_version", "application_name", "application_version", name="process_event_type_enum"), nullable=False)
     args = Column(String)
     
 class EntityType(Base):
@@ -255,19 +256,35 @@
     media_id = Column(BigInteger, ForeignKey('tweet_media.id'))
     media = relationship(Media, primaryjoin=(media_id == Media.id))
 
+def add_model_version(session, must_commit=True):
+    pe = ProcessEvent(args=iri_tweet.get_version(), type="model_version")
+    session.add(pe)
+    if must_commit:
+        session.commit()
                 
 def setup_database(*args, **kwargs):
-        
-    create_all = True
-    if "create_all" in kwargs:
-        create_all = kwargs["create_all"]
-        del(kwargs["create_all"])
+    
+    session_argname = [ 'autoflush','binds', "class_", "_enable_transaction_accounting","expire_on_commit", "extension", "query_cls", "twophase", "weak_identity_map", "autocommit"]
+    
+    kwargs_ce = dict((k, v) for k,v in kwargs.items() if (k not in session_argname and k != "create_all"))
+
+    engine = create_engine(*args, **kwargs_ce)
+    metadata = Base.metadata        
+                
+    kwargs_sm = {'bind': engine}
+    
+    kwargs_sm.update([(argname, kwargs[argname]) for argname in session_argname if argname in kwargs])
 
-    engine = create_engine(*args, **kwargs)
-    metadata = Base.metadata
-            
-    if create_all:
+    Session = sessionmaker(**kwargs_sm)
+    #set model version
+    
+    if kwargs.get('create_all', True):
         metadata.create_all(engine)
+        session = Session()
+        try:
+            add_model_version(session)
+        finally:
+            session.close()
 
-    return (engine, metadata)
+    return (engine, metadata, Session)
 
--- a/script/lib/iri_tweet/tests.py	Tue Sep 20 16:55:16 2011 +0200
+++ b/script/lib/iri_tweet/tests.py	Thu Sep 22 12:37:53 2011 +0200
@@ -127,8 +127,7 @@
 class TestTwitterProcessor(unittest.TestCase):
     
     def setUp(self):
-        self.engine, self.metadata = models.setup_database('sqlite:///:memory:', echo=True)
-        sessionMaker = sessionmaker(bind=self.engine)
+        self.engine, self.metadata, sessionMaker = models.setup_database('sqlite:///:memory:', echo=True)
         self.session = sessionMaker()
         file, self.tmpfilepath = tempfile.mkstemp()
         os.close(file)
--- a/script/lib/iri_tweet/tweet_twitter_user.py	Tue Sep 20 16:55:16 2011 +0200
+++ b/script/lib/iri_tweet/tweet_twitter_user.py	Thu Sep 22 12:37:53 2011 +0200
@@ -3,14 +3,12 @@
     set_logging, parse_date, get_logger)
 from optparse import OptionParser #@UnresolvedImport
 from sqlalchemy import BigInteger
-from sqlalchemy.orm import sessionmaker
-from sqlalchemy.schema import MetaData, Table, Column
+from sqlalchemy.schema import Table, Column
 from sqlalchemy.sql import and_
 import datetime
+import re
 import sys
-import time
 import twitter
-import re
 
 APPLICATION_NAME = "Tweet recorder user"
 CONSUMER_KEY = "Vdr5ZcsjI1G3esTPI8yDg"
@@ -22,7 +20,7 @@
     parser.add_option("-d", "--database", dest="database",
                       help="Input database", metavar="DATABASE")
     parser.add_option("-s", "--start-date", dest="start_date",
-                      help="start date", metavar="START_DATE")
+                      help="start date", metavar="START_DATE", default=None)
     parser.add_option("-e", "--end-date", dest="end_date",
                       help="end date", metavar="END_DATE")
     parser.add_option("-H", "--hashtag", dest="hashtag",
@@ -57,37 +55,37 @@
     get_logger().debug("OPTIONS : " + repr(options)) #@UndefinedVariable
 
     if not options.message or len(options.message) == 0:
+        get_logger().warning("No message exiting")
         sys.exit()
 
     conn_str = options.database.strip()
     if not re.match("^\w+://.+", conn_str):
         conn_str = 'sqlite:///' + conn_str
 
-    engine, metadata = setup_database(conn_str, echo=((options.verbose-options.quiet)>0), create_all = False)        
+    engine, metadata, Session = setup_database(conn_str, echo=((options.verbose-options.quiet)>0), create_all = False)        
     
-    Session = sessionmaker()
-    conn = engine.connect()
+    conn = None
     try :
-        session = Session(bind=conn, autoflush=True, autocommit=True)
+        conn = engine.connect()  
+        session = None      
         try:
-            metadata = MetaData(bind=conn)
+            session = Session(bind=conn, autoflush=True, autocommit=True)
             tweet_exclude_table = Table("tweet_exclude", metadata, Column('id', BigInteger, primary_key=True), prefixes=['TEMPORARY'])
-            metadata.create_all()
+            metadata.create_all(bind=conn,tables=[tweet_exclude_table])
 
             start_date_str = options.start_date
             end_date_str = options.end_date
             duration = options.duration
             hashtags = options.hashtag
             
-            
-            start_date = parse_date(start_date_str) 
-            ts = time.mktime(start_date.timetuple())
-        
+            start_date = None
+            if start_date_str:
+                start_date = parse_date(start_date_str) 
+
+            end_date = None
             if end_date_str:
                 end_date = parse_date(end_date_str)
-                te = time.mktime(end_date.timetuple())
-            else:
-                te = ts + duration
+            elif start_date and duration:
                 end_date = start_date + datetime.timedelta(seconds=duration)
             
             base_message = options.message.decode(sys.getfilesystemencoding())
@@ -119,8 +117,10 @@
                     session.add(user_message)
                     session.flush()
         finally:
-            # if message created and simulate, do not  
-            session.close()
+            # if message created and simulate, do not
+            if session:  
+                session.close()
     finally:
-        conn.close()
+        if conn:
+            conn.close()
 
--- a/script/lib/iri_tweet/utils.py	Tue Sep 20 16:55:16 2011 +0200
+++ b/script/lib/iri_tweet/utils.py	Thu Sep 22 12:37:53 2011 +0200
@@ -497,14 +497,17 @@
     parser.add_option("-q", dest="quiet", action="count",
                       help="quiet", metavar="QUIET", default=0)
 
+def get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist):
     
-def get_filter_query(session, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist):
+    query = query.join(EntityHashtag).join(Hashtag)
     
-    query = session.query(Tweet).join(EntityHashtag).join(Hashtag)
     if tweet_exclude_table is not None:
         query = query.filter(~Tweet.id.in_(select([tweet_exclude_table.c.id]))) #@UndefinedVariable
-        
-    query = query.filter(Tweet.created_at >=  start_date).filter(Tweet.created_at <=  end_date)
+    
+    if start_date:
+        query = query.filter(Tweet.created_at >=  start_date)
+    if end_date:
+        query = query.filter(Tweet.created_at <=  end_date)
 
     if user_whitelist:
         query = query.join(User).filter(User.screen_name.in_(user_whitelist))
@@ -517,26 +520,23 @@
         htags = reduce(merge_hash, hashtags, [])
         
         query = query.filter(or_(*map(lambda h: Hashtag.text.contains(h), htags))) #@UndefinedVariable
-        
     
     return query
+
+    
+    
+def get_filter_query(session, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist):
+    
+    query = session.query(Tweet)
+    query = get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist) 
+    return query.order_by(Tweet.created_at)
     
 
 def get_user_query(session, start_date, end_date, hashtags, tweet_exclude_table):
     
-    query = session.query(User).join(Tweet).join(EntityHashtag).join(Hashtag)
-    if tweet_exclude_table is not None:
-        query = query.filter(~Tweet.id.in_(select([tweet_exclude_table.c.id]))) #@UndefinedVariable
-        
-    query = query.filter(Tweet.created_at >=  start_date).filter(Tweet.created_at <=  end_date)
+    query = session.query(User).join(Tweet)
     
-    if hashtags :
-        def merge_hash(l,h):
-            l.extend(h.split(","))
-            return l
-        htags = reduce(merge_hash, hashtags, [])
-        
-        query = query.filter(or_(*map(lambda h: Hashtag.text.contains(h), htags))) #@UndefinedVariable
+    query = get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, None)    
     
     return query.distinct()
 
--- a/script/rest/search_twitter.py	Tue Sep 20 16:55:16 2011 +0200
+++ b/script/rest/search_twitter.py	Thu Sep 22 12:37:53 2011 +0200
@@ -42,10 +42,10 @@
         conn_str = 'sqlite:///' + conn_str
 
     
-    engine, metadata = models.setup_database(conn_str, echo=((options.verbose-options.quiet)>0), create_all=True)
-    Session = sessionmaker(bind=engine)
-    session = Session()
+    engine, metadata, Session = models.setup_database(conn_str, echo=((options.verbose-options.quiet)>0), create_all=True)
+    session = None
     try:
+        session = Session()
         #conn.row_factory = sqlite3.Row
         #curs = conn.cursor()
         #curs.execute("create table if not exists tweet_tweet (json);")
@@ -67,6 +67,7 @@
             page += 1
             #session.commit()
     finally:
-        session.close()
+        if session:
+            session.close()
 
 
--- a/script/stream/recorder_tweetstream.py	Tue Sep 20 16:55:16 2011 +0200
+++ b/script/stream/recorder_tweetstream.py	Thu Sep 22 12:37:53 2011 +0200
@@ -1,11 +1,11 @@
 from getpass import getpass
 from iri_tweet import models, utils
 from iri_tweet.models import TweetSource, TweetLog, ProcessEvent
-from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event, 
+from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event,
     get_logger)
 from optparse import OptionParser
 from sqlalchemy.exc import OperationalError
-from sqlalchemy.orm import scoped_session, sessionmaker
+from sqlalchemy.orm import scoped_session
 import Queue
 import StringIO
 import anyjson
@@ -25,7 +25,6 @@
 import tweepy.auth
 import tweetstream
 import urllib2
-#from iri_tweet.utils import get_logger
 socket._fileobject.default_bufsize = 0
 
 
@@ -160,7 +159,7 @@
             add_process_event("start_worker", self.__get_process_event_args(), self.session_maker)
             self.do_run()
         finally:
-            add_process_event("stop_worker",self.__get_process_event_args(), self.session_maker)
+            add_process_event("stop_worker", self.__get_process_event_args(), self.session_maker)
         
     def do_run(self):
         raise NotImplementedError()
@@ -287,8 +286,8 @@
 
 
 def get_sessionmaker(conn_str):
-    engine, metadata = models.setup_database(conn_str, echo=False, create_all=False)
-    Session = scoped_session(sessionmaker(bind=engine, autocommit=False))
+    engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False)
+    Session = scoped_session(Session)
     return Session, engine, metadata
 
             
@@ -424,7 +423,7 @@
     for cprocess in tweet_processes:
         cprocess.start()
 
-    add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name,sprocess.pid), 'consumers':dict([(p.name,p.pid) for p in tweet_processes])}, session_maker)
+    add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name, sprocess.pid), 'consumers':dict([(p.name, p.pid) for p in tweet_processes])}, session_maker)
 
     if options.duration >= 0:
         end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)    
@@ -515,6 +514,12 @@
             sys.exit(message)
     
     metadata.create_all(engine)
+    session = Session()
+    try:
+        models.add_model_version(session)
+    finally:
+        session.close()
+    
     stop_args = {}
     try:
         add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session)