--- a/script/lib/iri_tweet/__init__.py Tue Sep 20 16:55:16 2011 +0200
+++ b/script/lib/iri_tweet/__init__.py Thu Sep 22 12:37:53 2011 +0200
@@ -0,0 +1,17 @@
+VERSION = (0, 82, 0, "final", 0)
+
+VERSION_STR = unicode(".".join(map(lambda i:"%02d" % (i,), VERSION[:2])))
+
+
+def get_version():
+ version = '%s.%s' % (VERSION[0], VERSION[1])
+ if VERSION[2]:
+ version = '%s.%s' % (version, VERSION[2])
+ if VERSION[3:] == ('alpha', 0):
+ version = '%s pre-alpha' % version
+ else:
+ if VERSION[3] != 'final':
+ version = '%s %s %s' % (version, VERSION[3], VERSION[4])
+ return version
+
+__version__ = get_version()
--- a/script/lib/iri_tweet/export_tweet_db.py Tue Sep 20 16:55:16 2011 +0200
+++ b/script/lib/iri_tweet/export_tweet_db.py Thu Sep 22 12:37:53 2011 +0200
@@ -26,8 +26,7 @@
set_logging(options)
with sqlite3.connect(args[0]) as conn_in:
- engine, metadata = setup_database('sqlite:///'+args[1], echo=((options.verbose-options.quiet)>0))
- Session = sessionmaker(bind=engine)
+ engine, metadata, Session = setup_database('sqlite:///'+args[1], echo=((options.verbose-options.quiet)>0))
session = Session()
try:
curs_in = conn_in.cursor()
--- a/script/lib/iri_tweet/export_twitter_alchemy.py Tue Sep 20 16:55:16 2011 +0200
+++ b/script/lib/iri_tweet/export_twitter_alchemy.py Thu Sep 22 12:37:53 2011 +0200
@@ -2,19 +2,19 @@
# coding=utf-8
from lxml import etree
+from models import setup_database
from optparse import OptionParser #@UnresolvedImport
-from sqlalchemy import Table, Column, BigInteger, MetaData
-from sqlalchemy.orm import sessionmaker
-from utils import parse_date, set_logging_options, set_logging, get_filter_query, get_logger
-from models import setup_database
+from sqlalchemy import Table, Column, BigInteger
+from utils import (parse_date, set_logging_options, set_logging, get_filter_query,
+ get_logger)
+import anyjson
import datetime
+import httplib2
import os.path
import re
import sys
import time
import uuid #@UnresolvedImport
-import httplib2
-import anyjson
#class TweetExclude(object):
# def __init__(self, id):
@@ -57,9 +57,9 @@
parser.add_option("-d", "--database", dest="database",
help="Input database", metavar="DATABASE")
parser.add_option("-s", "--start-date", dest="start_date",
- help="start date", metavar="START_DATE")
+ help="start date", metavar="START_DATE", default=None)
parser.add_option("-e", "--end-date", dest="end_date",
- help="end date", metavar="END_DATE")
+ help="end date", metavar="END_DATE", default=None)
parser.add_option("-I", "--content-file", dest="content_file",
help="Content file", metavar="CONTENT_FILE")
parser.add_option("-c", "--content", dest="content",
@@ -110,18 +110,16 @@
if not re.match("^\w+://.+", conn_str):
conn_str = 'sqlite:///' + conn_str
- engine, metadata = setup_database(conn_str, echo=((options.verbose-options.quiet)>0), create_all = False)
-
- Session = sessionmaker()
- conn = engine.connect()
+ engine, metadata, Session = setup_database(conn_str, echo=((options.verbose-options.quiet)>0), create_all = False)
+ conn = None
try :
- session = Session(bind=conn)
- try :
-
- metadata = MetaData(bind=conn)
+ conn = engine.connect()
+ session = None
+ try :
+ session = Session(bind=conn)
tweet_exclude_table = Table("tweet_exclude", metadata, Column('id', BigInteger, primary_key=True), prefixes=['TEMPORARY'])
#mapper(TweetExclude, tweet_exclude_table)
- metadata.create_all()
+ metadata.create_all(bind=conn, tables=[tweet_exclude_table])
if options.exclude and os.path.exists(options.exclude):
with open(options.exclude, 'r+') as f:
@@ -174,14 +172,16 @@
with open(user_whitelist_file, 'r+') as f:
user_whitelist = list(set([s.strip() for s in f]))
- start_date = parse_date(start_date_str)
- ts = time.mktime(start_date.timetuple())
+ start_date = None
+ ts = None
+ if start_date_str:
+ start_date = parse_date(start_date_str)
+ ts = time.mktime(start_date.timetuple())
+ end_date = None
if end_date_str:
end_date = parse_date(end_date_str)
- te = time.mktime(end_date.timetuple())
- else:
- te = ts + duration
+ elif start_date and duration:
end_date = start_date + datetime.timedelta(seconds=duration)
query = get_filter_query(session, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist)
@@ -276,6 +276,8 @@
for tw in query_res:
tweet_ts_dt = tw.created_at
tweet_ts = int(time.mktime(tweet_ts_dt.timetuple()))
+ if ts is None:
+ ts = tweet_ts
tweet_ts_rel = (tweet_ts-ts) * 1000
username = None
profile_url = ""
@@ -330,6 +332,8 @@
output.close()
finally:
- session.close()
+ if session:
+ session.close()
finally:
- conn.close()
+ if conn:
+ conn.close()
--- a/script/lib/iri_tweet/models.py Tue Sep 20 16:55:16 2011 +0200
+++ b/script/lib/iri_tweet/models.py Thu Sep 22 12:37:53 2011 +0200
@@ -1,10 +1,11 @@
from sqlalchemy import (Boolean, Column, Enum, BigInteger, Integer, String,
ForeignKey, DateTime, create_engine)
from sqlalchemy.ext.declarative import declarative_base
-from sqlalchemy.orm import relationship
+from sqlalchemy.orm import relationship, sessionmaker
import anyjson
import datetime
import email.utils
+import iri_tweet
Base = declarative_base()
@@ -44,7 +45,7 @@
__tablename__ = "tweet_process_event"
id = Column(Integer, primary_key=True, autoincrement=True)
ts = Column(DateTime, default=datetime.datetime.utcnow, index=True)
- type = Column(Enum("start","pid","shutdown","error", "start_worker", "stop_worker", name="process_event_type_enum"), nullable=False)
+ type = Column(Enum("start","pid","shutdown","error", "start_worker", "stop_worker", "model_version", "application_name", "application_version", name="process_event_type_enum"), nullable=False)
args = Column(String)
class EntityType(Base):
@@ -255,19 +256,35 @@
media_id = Column(BigInteger, ForeignKey('tweet_media.id'))
media = relationship(Media, primaryjoin=(media_id == Media.id))
+def add_model_version(session, must_commit=True):
+ pe = ProcessEvent(args=iri_tweet.get_version(), type="model_version")
+ session.add(pe)
+ if must_commit:
+ session.commit()
def setup_database(*args, **kwargs):
-
- create_all = True
- if "create_all" in kwargs:
- create_all = kwargs["create_all"]
- del(kwargs["create_all"])
+
+ session_argname = [ 'autoflush','binds', "class_", "_enable_transaction_accounting","expire_on_commit", "extension", "query_cls", "twophase", "weak_identity_map", "autocommit"]
+
+ kwargs_ce = dict((k, v) for k,v in kwargs.items() if (k not in session_argname and k != "create_all"))
+
+ engine = create_engine(*args, **kwargs_ce)
+ metadata = Base.metadata
+
+ kwargs_sm = {'bind': engine}
+
+ kwargs_sm.update([(argname, kwargs[argname]) for argname in session_argname if argname in kwargs])
- engine = create_engine(*args, **kwargs)
- metadata = Base.metadata
-
- if create_all:
+ Session = sessionmaker(**kwargs_sm)
+ #set model version
+
+ if kwargs.get('create_all', True):
metadata.create_all(engine)
+ session = Session()
+ try:
+ add_model_version(session)
+ finally:
+ session.close()
- return (engine, metadata)
+ return (engine, metadata, Session)
--- a/script/lib/iri_tweet/tests.py Tue Sep 20 16:55:16 2011 +0200
+++ b/script/lib/iri_tweet/tests.py Thu Sep 22 12:37:53 2011 +0200
@@ -127,8 +127,7 @@
class TestTwitterProcessor(unittest.TestCase):
def setUp(self):
- self.engine, self.metadata = models.setup_database('sqlite:///:memory:', echo=True)
- sessionMaker = sessionmaker(bind=self.engine)
+ self.engine, self.metadata, sessionMaker = models.setup_database('sqlite:///:memory:', echo=True)
self.session = sessionMaker()
file, self.tmpfilepath = tempfile.mkstemp()
os.close(file)
--- a/script/lib/iri_tweet/tweet_twitter_user.py Tue Sep 20 16:55:16 2011 +0200
+++ b/script/lib/iri_tweet/tweet_twitter_user.py Thu Sep 22 12:37:53 2011 +0200
@@ -3,14 +3,12 @@
set_logging, parse_date, get_logger)
from optparse import OptionParser #@UnresolvedImport
from sqlalchemy import BigInteger
-from sqlalchemy.orm import sessionmaker
-from sqlalchemy.schema import MetaData, Table, Column
+from sqlalchemy.schema import Table, Column
from sqlalchemy.sql import and_
import datetime
+import re
import sys
-import time
import twitter
-import re
APPLICATION_NAME = "Tweet recorder user"
CONSUMER_KEY = "Vdr5ZcsjI1G3esTPI8yDg"
@@ -22,7 +20,7 @@
parser.add_option("-d", "--database", dest="database",
help="Input database", metavar="DATABASE")
parser.add_option("-s", "--start-date", dest="start_date",
- help="start date", metavar="START_DATE")
+ help="start date", metavar="START_DATE", default=None)
parser.add_option("-e", "--end-date", dest="end_date",
help="end date", metavar="END_DATE")
parser.add_option("-H", "--hashtag", dest="hashtag",
@@ -57,37 +55,37 @@
get_logger().debug("OPTIONS : " + repr(options)) #@UndefinedVariable
if not options.message or len(options.message) == 0:
+ get_logger().warning("No message exiting")
sys.exit()
conn_str = options.database.strip()
if not re.match("^\w+://.+", conn_str):
conn_str = 'sqlite:///' + conn_str
- engine, metadata = setup_database(conn_str, echo=((options.verbose-options.quiet)>0), create_all = False)
+ engine, metadata, Session = setup_database(conn_str, echo=((options.verbose-options.quiet)>0), create_all = False)
- Session = sessionmaker()
- conn = engine.connect()
+ conn = None
try :
- session = Session(bind=conn, autoflush=True, autocommit=True)
+ conn = engine.connect()
+ session = None
try:
- metadata = MetaData(bind=conn)
+ session = Session(bind=conn, autoflush=True, autocommit=True)
tweet_exclude_table = Table("tweet_exclude", metadata, Column('id', BigInteger, primary_key=True), prefixes=['TEMPORARY'])
- metadata.create_all()
+ metadata.create_all(bind=conn,tables=[tweet_exclude_table])
start_date_str = options.start_date
end_date_str = options.end_date
duration = options.duration
hashtags = options.hashtag
-
- start_date = parse_date(start_date_str)
- ts = time.mktime(start_date.timetuple())
-
+ start_date = None
+ if start_date_str:
+ start_date = parse_date(start_date_str)
+
+ end_date = None
if end_date_str:
end_date = parse_date(end_date_str)
- te = time.mktime(end_date.timetuple())
- else:
- te = ts + duration
+ elif start_date and duration:
end_date = start_date + datetime.timedelta(seconds=duration)
base_message = options.message.decode(sys.getfilesystemencoding())
@@ -119,8 +117,10 @@
session.add(user_message)
session.flush()
finally:
- # if message created and simulate, do not
- session.close()
+ # if message created and simulate, do not
+ if session:
+ session.close()
finally:
- conn.close()
+ if conn:
+ conn.close()
--- a/script/lib/iri_tweet/utils.py Tue Sep 20 16:55:16 2011 +0200
+++ b/script/lib/iri_tweet/utils.py Thu Sep 22 12:37:53 2011 +0200
@@ -497,14 +497,17 @@
parser.add_option("-q", dest="quiet", action="count",
help="quiet", metavar="QUIET", default=0)
+def get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist):
-def get_filter_query(session, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist):
+ query = query.join(EntityHashtag).join(Hashtag)
- query = session.query(Tweet).join(EntityHashtag).join(Hashtag)
if tweet_exclude_table is not None:
query = query.filter(~Tweet.id.in_(select([tweet_exclude_table.c.id]))) #@UndefinedVariable
-
- query = query.filter(Tweet.created_at >= start_date).filter(Tweet.created_at <= end_date)
+
+ if start_date:
+ query = query.filter(Tweet.created_at >= start_date)
+ if end_date:
+ query = query.filter(Tweet.created_at <= end_date)
if user_whitelist:
query = query.join(User).filter(User.screen_name.in_(user_whitelist))
@@ -517,26 +520,23 @@
htags = reduce(merge_hash, hashtags, [])
query = query.filter(or_(*map(lambda h: Hashtag.text.contains(h), htags))) #@UndefinedVariable
-
return query
+
+
+
+def get_filter_query(session, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist):
+
+ query = session.query(Tweet)
+ query = get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist)
+ return query.order_by(Tweet.created_at)
def get_user_query(session, start_date, end_date, hashtags, tweet_exclude_table):
- query = session.query(User).join(Tweet).join(EntityHashtag).join(Hashtag)
- if tweet_exclude_table is not None:
- query = query.filter(~Tweet.id.in_(select([tweet_exclude_table.c.id]))) #@UndefinedVariable
-
- query = query.filter(Tweet.created_at >= start_date).filter(Tweet.created_at <= end_date)
+ query = session.query(User).join(Tweet)
- if hashtags :
- def merge_hash(l,h):
- l.extend(h.split(","))
- return l
- htags = reduce(merge_hash, hashtags, [])
-
- query = query.filter(or_(*map(lambda h: Hashtag.text.contains(h), htags))) #@UndefinedVariable
+ query = get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, None)
return query.distinct()
--- a/script/rest/search_twitter.py Tue Sep 20 16:55:16 2011 +0200
+++ b/script/rest/search_twitter.py Thu Sep 22 12:37:53 2011 +0200
@@ -42,10 +42,10 @@
conn_str = 'sqlite:///' + conn_str
- engine, metadata = models.setup_database(conn_str, echo=((options.verbose-options.quiet)>0), create_all=True)
- Session = sessionmaker(bind=engine)
- session = Session()
+ engine, metadata, Session = models.setup_database(conn_str, echo=((options.verbose-options.quiet)>0), create_all=True)
+ session = None
try:
+ session = Session()
#conn.row_factory = sqlite3.Row
#curs = conn.cursor()
#curs.execute("create table if not exists tweet_tweet (json);")
@@ -67,6 +67,7 @@
page += 1
#session.commit()
finally:
- session.close()
+ if session:
+ session.close()
--- a/script/stream/recorder_tweetstream.py Tue Sep 20 16:55:16 2011 +0200
+++ b/script/stream/recorder_tweetstream.py Thu Sep 22 12:37:53 2011 +0200
@@ -1,11 +1,11 @@
from getpass import getpass
from iri_tweet import models, utils
from iri_tweet.models import TweetSource, TweetLog, ProcessEvent
-from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event,
+from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event,
get_logger)
from optparse import OptionParser
from sqlalchemy.exc import OperationalError
-from sqlalchemy.orm import scoped_session, sessionmaker
+from sqlalchemy.orm import scoped_session
import Queue
import StringIO
import anyjson
@@ -25,7 +25,6 @@
import tweepy.auth
import tweetstream
import urllib2
-#from iri_tweet.utils import get_logger
socket._fileobject.default_bufsize = 0
@@ -160,7 +159,7 @@
add_process_event("start_worker", self.__get_process_event_args(), self.session_maker)
self.do_run()
finally:
- add_process_event("stop_worker",self.__get_process_event_args(), self.session_maker)
+ add_process_event("stop_worker", self.__get_process_event_args(), self.session_maker)
def do_run(self):
raise NotImplementedError()
@@ -287,8 +286,8 @@
def get_sessionmaker(conn_str):
- engine, metadata = models.setup_database(conn_str, echo=False, create_all=False)
- Session = scoped_session(sessionmaker(bind=engine, autocommit=False))
+ engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False)
+ Session = scoped_session(Session)
return Session, engine, metadata
@@ -424,7 +423,7 @@
for cprocess in tweet_processes:
cprocess.start()
- add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name,sprocess.pid), 'consumers':dict([(p.name,p.pid) for p in tweet_processes])}, session_maker)
+ add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name, sprocess.pid), 'consumers':dict([(p.name, p.pid) for p in tweet_processes])}, session_maker)
if options.duration >= 0:
end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)
@@ -515,6 +514,12 @@
sys.exit(message)
metadata.create_all(engine)
+ session = Session()
+ try:
+ models.add_model_version(session)
+ finally:
+ session.close()
+
stop_args = {}
try:
add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session)