# HG changeset patch # User Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com> # Date 1316687873 -7200 # Node ID a5eff8f2b81dbda569f4f32d9cc9147fddea9160 # Parent 4c870c767d3edc19c09b7412cee8967eb87784a6 improve session maker creation + models version + add model version in db diff -r 4c870c767d3e -r a5eff8f2b81d script/lib/iri_tweet/__init__.py --- a/script/lib/iri_tweet/__init__.py Tue Sep 20 16:55:16 2011 +0200 +++ b/script/lib/iri_tweet/__init__.py Thu Sep 22 12:37:53 2011 +0200 @@ -0,0 +1,17 @@ +VERSION = (0, 82, 0, "final", 0) + +VERSION_STR = unicode(".".join(map(lambda i:"%02d" % (i,), VERSION[:2]))) + + +def get_version(): + version = '%s.%s' % (VERSION[0], VERSION[1]) + if VERSION[2]: + version = '%s.%s' % (version, VERSION[2]) + if VERSION[3:] == ('alpha', 0): + version = '%s pre-alpha' % version + else: + if VERSION[3] != 'final': + version = '%s %s %s' % (version, VERSION[3], VERSION[4]) + return version + +__version__ = get_version() diff -r 4c870c767d3e -r a5eff8f2b81d script/lib/iri_tweet/export_tweet_db.py --- a/script/lib/iri_tweet/export_tweet_db.py Tue Sep 20 16:55:16 2011 +0200 +++ b/script/lib/iri_tweet/export_tweet_db.py Thu Sep 22 12:37:53 2011 +0200 @@ -26,8 +26,7 @@ set_logging(options) with sqlite3.connect(args[0]) as conn_in: - engine, metadata = setup_database('sqlite:///'+args[1], echo=((options.verbose-options.quiet)>0)) - Session = sessionmaker(bind=engine) + engine, metadata, Session = setup_database('sqlite:///'+args[1], echo=((options.verbose-options.quiet)>0)) session = Session() try: curs_in = conn_in.cursor() diff -r 4c870c767d3e -r a5eff8f2b81d script/lib/iri_tweet/export_twitter_alchemy.py --- a/script/lib/iri_tweet/export_twitter_alchemy.py Tue Sep 20 16:55:16 2011 +0200 +++ b/script/lib/iri_tweet/export_twitter_alchemy.py Thu Sep 22 12:37:53 2011 +0200 @@ -2,19 +2,19 @@ # coding=utf-8 from lxml import etree +from models import setup_database from optparse import OptionParser #@UnresolvedImport -from sqlalchemy import Table, Column, BigInteger, MetaData -from sqlalchemy.orm import sessionmaker -from utils import parse_date, set_logging_options, set_logging, get_filter_query, get_logger -from models import setup_database +from sqlalchemy import Table, Column, BigInteger +from utils import (parse_date, set_logging_options, set_logging, get_filter_query, + get_logger) +import anyjson import datetime +import httplib2 import os.path import re import sys import time import uuid #@UnresolvedImport -import httplib2 -import anyjson #class TweetExclude(object): # def __init__(self, id): @@ -57,9 +57,9 @@ parser.add_option("-d", "--database", dest="database", help="Input database", metavar="DATABASE") parser.add_option("-s", "--start-date", dest="start_date", - help="start date", metavar="START_DATE") + help="start date", metavar="START_DATE", default=None) parser.add_option("-e", "--end-date", dest="end_date", - help="end date", metavar="END_DATE") + help="end date", metavar="END_DATE", default=None) parser.add_option("-I", "--content-file", dest="content_file", help="Content file", metavar="CONTENT_FILE") parser.add_option("-c", "--content", dest="content", @@ -110,18 +110,16 @@ if not re.match("^\w+://.+", conn_str): conn_str = 'sqlite:///' + conn_str - engine, metadata = setup_database(conn_str, echo=((options.verbose-options.quiet)>0), create_all = False) - - Session = sessionmaker() - conn = engine.connect() + engine, metadata, Session = setup_database(conn_str, echo=((options.verbose-options.quiet)>0), create_all = False) + conn = None try : - session = Session(bind=conn) - try : - - metadata = MetaData(bind=conn) + conn = engine.connect() + session = None + try : + session = Session(bind=conn) tweet_exclude_table = Table("tweet_exclude", metadata, Column('id', BigInteger, primary_key=True), prefixes=['TEMPORARY']) #mapper(TweetExclude, tweet_exclude_table) - metadata.create_all() + metadata.create_all(bind=conn, tables=[tweet_exclude_table]) if options.exclude and os.path.exists(options.exclude): with open(options.exclude, 'r+') as f: @@ -174,14 +172,16 @@ with open(user_whitelist_file, 'r+') as f: user_whitelist = list(set([s.strip() for s in f])) - start_date = parse_date(start_date_str) - ts = time.mktime(start_date.timetuple()) + start_date = None + ts = None + if start_date_str: + start_date = parse_date(start_date_str) + ts = time.mktime(start_date.timetuple()) + end_date = None if end_date_str: end_date = parse_date(end_date_str) - te = time.mktime(end_date.timetuple()) - else: - te = ts + duration + elif start_date and duration: end_date = start_date + datetime.timedelta(seconds=duration) query = get_filter_query(session, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist) @@ -276,6 +276,8 @@ for tw in query_res: tweet_ts_dt = tw.created_at tweet_ts = int(time.mktime(tweet_ts_dt.timetuple())) + if ts is None: + ts = tweet_ts tweet_ts_rel = (tweet_ts-ts) * 1000 username = None profile_url = "" @@ -330,6 +332,8 @@ output.close() finally: - session.close() + if session: + session.close() finally: - conn.close() + if conn: + conn.close() diff -r 4c870c767d3e -r a5eff8f2b81d script/lib/iri_tweet/models.py --- a/script/lib/iri_tweet/models.py Tue Sep 20 16:55:16 2011 +0200 +++ b/script/lib/iri_tweet/models.py Thu Sep 22 12:37:53 2011 +0200 @@ -1,10 +1,11 @@ from sqlalchemy import (Boolean, Column, Enum, BigInteger, Integer, String, ForeignKey, DateTime, create_engine) from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.orm import relationship +from sqlalchemy.orm import relationship, sessionmaker import anyjson import datetime import email.utils +import iri_tweet Base = declarative_base() @@ -44,7 +45,7 @@ __tablename__ = "tweet_process_event" id = Column(Integer, primary_key=True, autoincrement=True) ts = Column(DateTime, default=datetime.datetime.utcnow, index=True) - type = Column(Enum("start","pid","shutdown","error", "start_worker", "stop_worker", name="process_event_type_enum"), nullable=False) + type = Column(Enum("start","pid","shutdown","error", "start_worker", "stop_worker", "model_version", "application_name", "application_version", name="process_event_type_enum"), nullable=False) args = Column(String) class EntityType(Base): @@ -255,19 +256,35 @@ media_id = Column(BigInteger, ForeignKey('tweet_media.id')) media = relationship(Media, primaryjoin=(media_id == Media.id)) +def add_model_version(session, must_commit=True): + pe = ProcessEvent(args=iri_tweet.get_version(), type="model_version") + session.add(pe) + if must_commit: + session.commit() def setup_database(*args, **kwargs): - - create_all = True - if "create_all" in kwargs: - create_all = kwargs["create_all"] - del(kwargs["create_all"]) + + session_argname = [ 'autoflush','binds', "class_", "_enable_transaction_accounting","expire_on_commit", "extension", "query_cls", "twophase", "weak_identity_map", "autocommit"] + + kwargs_ce = dict((k, v) for k,v in kwargs.items() if (k not in session_argname and k != "create_all")) + + engine = create_engine(*args, **kwargs_ce) + metadata = Base.metadata + + kwargs_sm = {'bind': engine} + + kwargs_sm.update([(argname, kwargs[argname]) for argname in session_argname if argname in kwargs]) - engine = create_engine(*args, **kwargs) - metadata = Base.metadata - - if create_all: + Session = sessionmaker(**kwargs_sm) + #set model version + + if kwargs.get('create_all', True): metadata.create_all(engine) + session = Session() + try: + add_model_version(session) + finally: + session.close() - return (engine, metadata) + return (engine, metadata, Session) diff -r 4c870c767d3e -r a5eff8f2b81d script/lib/iri_tweet/tests.py --- a/script/lib/iri_tweet/tests.py Tue Sep 20 16:55:16 2011 +0200 +++ b/script/lib/iri_tweet/tests.py Thu Sep 22 12:37:53 2011 +0200 @@ -127,8 +127,7 @@ class TestTwitterProcessor(unittest.TestCase): def setUp(self): - self.engine, self.metadata = models.setup_database('sqlite:///:memory:', echo=True) - sessionMaker = sessionmaker(bind=self.engine) + self.engine, self.metadata, sessionMaker = models.setup_database('sqlite:///:memory:', echo=True) self.session = sessionMaker() file, self.tmpfilepath = tempfile.mkstemp() os.close(file) diff -r 4c870c767d3e -r a5eff8f2b81d script/lib/iri_tweet/tweet_twitter_user.py --- a/script/lib/iri_tweet/tweet_twitter_user.py Tue Sep 20 16:55:16 2011 +0200 +++ b/script/lib/iri_tweet/tweet_twitter_user.py Thu Sep 22 12:37:53 2011 +0200 @@ -3,14 +3,12 @@ set_logging, parse_date, get_logger) from optparse import OptionParser #@UnresolvedImport from sqlalchemy import BigInteger -from sqlalchemy.orm import sessionmaker -from sqlalchemy.schema import MetaData, Table, Column +from sqlalchemy.schema import Table, Column from sqlalchemy.sql import and_ import datetime +import re import sys -import time import twitter -import re APPLICATION_NAME = "Tweet recorder user" CONSUMER_KEY = "Vdr5ZcsjI1G3esTPI8yDg" @@ -22,7 +20,7 @@ parser.add_option("-d", "--database", dest="database", help="Input database", metavar="DATABASE") parser.add_option("-s", "--start-date", dest="start_date", - help="start date", metavar="START_DATE") + help="start date", metavar="START_DATE", default=None) parser.add_option("-e", "--end-date", dest="end_date", help="end date", metavar="END_DATE") parser.add_option("-H", "--hashtag", dest="hashtag", @@ -57,37 +55,37 @@ get_logger().debug("OPTIONS : " + repr(options)) #@UndefinedVariable if not options.message or len(options.message) == 0: + get_logger().warning("No message exiting") sys.exit() conn_str = options.database.strip() if not re.match("^\w+://.+", conn_str): conn_str = 'sqlite:///' + conn_str - engine, metadata = setup_database(conn_str, echo=((options.verbose-options.quiet)>0), create_all = False) + engine, metadata, Session = setup_database(conn_str, echo=((options.verbose-options.quiet)>0), create_all = False) - Session = sessionmaker() - conn = engine.connect() + conn = None try : - session = Session(bind=conn, autoflush=True, autocommit=True) + conn = engine.connect() + session = None try: - metadata = MetaData(bind=conn) + session = Session(bind=conn, autoflush=True, autocommit=True) tweet_exclude_table = Table("tweet_exclude", metadata, Column('id', BigInteger, primary_key=True), prefixes=['TEMPORARY']) - metadata.create_all() + metadata.create_all(bind=conn,tables=[tweet_exclude_table]) start_date_str = options.start_date end_date_str = options.end_date duration = options.duration hashtags = options.hashtag - - start_date = parse_date(start_date_str) - ts = time.mktime(start_date.timetuple()) - + start_date = None + if start_date_str: + start_date = parse_date(start_date_str) + + end_date = None if end_date_str: end_date = parse_date(end_date_str) - te = time.mktime(end_date.timetuple()) - else: - te = ts + duration + elif start_date and duration: end_date = start_date + datetime.timedelta(seconds=duration) base_message = options.message.decode(sys.getfilesystemencoding()) @@ -119,8 +117,10 @@ session.add(user_message) session.flush() finally: - # if message created and simulate, do not - session.close() + # if message created and simulate, do not + if session: + session.close() finally: - conn.close() + if conn: + conn.close() diff -r 4c870c767d3e -r a5eff8f2b81d script/lib/iri_tweet/utils.py --- a/script/lib/iri_tweet/utils.py Tue Sep 20 16:55:16 2011 +0200 +++ b/script/lib/iri_tweet/utils.py Thu Sep 22 12:37:53 2011 +0200 @@ -497,14 +497,17 @@ parser.add_option("-q", dest="quiet", action="count", help="quiet", metavar="QUIET", default=0) +def get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist): -def get_filter_query(session, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist): + query = query.join(EntityHashtag).join(Hashtag) - query = session.query(Tweet).join(EntityHashtag).join(Hashtag) if tweet_exclude_table is not None: query = query.filter(~Tweet.id.in_(select([tweet_exclude_table.c.id]))) #@UndefinedVariable - - query = query.filter(Tweet.created_at >= start_date).filter(Tweet.created_at <= end_date) + + if start_date: + query = query.filter(Tweet.created_at >= start_date) + if end_date: + query = query.filter(Tweet.created_at <= end_date) if user_whitelist: query = query.join(User).filter(User.screen_name.in_(user_whitelist)) @@ -517,26 +520,23 @@ htags = reduce(merge_hash, hashtags, []) query = query.filter(or_(*map(lambda h: Hashtag.text.contains(h), htags))) #@UndefinedVariable - return query + + + +def get_filter_query(session, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist): + + query = session.query(Tweet) + query = get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist) + return query.order_by(Tweet.created_at) def get_user_query(session, start_date, end_date, hashtags, tweet_exclude_table): - query = session.query(User).join(Tweet).join(EntityHashtag).join(Hashtag) - if tweet_exclude_table is not None: - query = query.filter(~Tweet.id.in_(select([tweet_exclude_table.c.id]))) #@UndefinedVariable - - query = query.filter(Tweet.created_at >= start_date).filter(Tweet.created_at <= end_date) + query = session.query(User).join(Tweet) - if hashtags : - def merge_hash(l,h): - l.extend(h.split(",")) - return l - htags = reduce(merge_hash, hashtags, []) - - query = query.filter(or_(*map(lambda h: Hashtag.text.contains(h), htags))) #@UndefinedVariable + query = get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, None) return query.distinct() diff -r 4c870c767d3e -r a5eff8f2b81d script/rest/search_twitter.py --- a/script/rest/search_twitter.py Tue Sep 20 16:55:16 2011 +0200 +++ b/script/rest/search_twitter.py Thu Sep 22 12:37:53 2011 +0200 @@ -42,10 +42,10 @@ conn_str = 'sqlite:///' + conn_str - engine, metadata = models.setup_database(conn_str, echo=((options.verbose-options.quiet)>0), create_all=True) - Session = sessionmaker(bind=engine) - session = Session() + engine, metadata, Session = models.setup_database(conn_str, echo=((options.verbose-options.quiet)>0), create_all=True) + session = None try: + session = Session() #conn.row_factory = sqlite3.Row #curs = conn.cursor() #curs.execute("create table if not exists tweet_tweet (json);") @@ -67,6 +67,7 @@ page += 1 #session.commit() finally: - session.close() + if session: + session.close() diff -r 4c870c767d3e -r a5eff8f2b81d script/stream/recorder_tweetstream.py --- a/script/stream/recorder_tweetstream.py Tue Sep 20 16:55:16 2011 +0200 +++ b/script/stream/recorder_tweetstream.py Thu Sep 22 12:37:53 2011 +0200 @@ -1,11 +1,11 @@ from getpass import getpass from iri_tweet import models, utils from iri_tweet.models import TweetSource, TweetLog, ProcessEvent -from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event, +from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event, get_logger) from optparse import OptionParser from sqlalchemy.exc import OperationalError -from sqlalchemy.orm import scoped_session, sessionmaker +from sqlalchemy.orm import scoped_session import Queue import StringIO import anyjson @@ -25,7 +25,6 @@ import tweepy.auth import tweetstream import urllib2 -#from iri_tweet.utils import get_logger socket._fileobject.default_bufsize = 0 @@ -160,7 +159,7 @@ add_process_event("start_worker", self.__get_process_event_args(), self.session_maker) self.do_run() finally: - add_process_event("stop_worker",self.__get_process_event_args(), self.session_maker) + add_process_event("stop_worker", self.__get_process_event_args(), self.session_maker) def do_run(self): raise NotImplementedError() @@ -287,8 +286,8 @@ def get_sessionmaker(conn_str): - engine, metadata = models.setup_database(conn_str, echo=False, create_all=False) - Session = scoped_session(sessionmaker(bind=engine, autocommit=False)) + engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False) + Session = scoped_session(Session) return Session, engine, metadata @@ -424,7 +423,7 @@ for cprocess in tweet_processes: cprocess.start() - add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name,sprocess.pid), 'consumers':dict([(p.name,p.pid) for p in tweet_processes])}, session_maker) + add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name, sprocess.pid), 'consumers':dict([(p.name, p.pid) for p in tweet_processes])}, session_maker) if options.duration >= 0: end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration) @@ -515,6 +514,12 @@ sys.exit(message) metadata.create_all(engine) + session = Session() + try: + models.add_model_version(session) + finally: + session.close() + stop_args = {} try: add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session)