1 from sqlalchemy import (Boolean, Column, Enum, BigInteger, Integer, String, |
1 from sqlalchemy import (Boolean, Column, Enum, BigInteger, Integer, String, |
2 ForeignKey, DateTime, create_engine) |
2 ForeignKey, DateTime, create_engine) |
3 from sqlalchemy.ext.declarative import declarative_base |
3 from sqlalchemy.ext.declarative import declarative_base |
4 from sqlalchemy.orm import relationship |
4 from sqlalchemy.orm import relationship, sessionmaker |
5 import anyjson |
5 import anyjson |
6 import datetime |
6 import datetime |
7 import email.utils |
7 import email.utils |
|
8 import iri_tweet |
8 |
9 |
9 |
10 |
10 Base = declarative_base() |
11 Base = declarative_base() |
11 |
12 |
12 APPLICATION_NAME = "IRI_TWITTER" |
13 APPLICATION_NAME = "IRI_TWITTER" |
42 class ProcessEvent(Base): |
43 class ProcessEvent(Base): |
43 __metaclass__ = TweetMeta |
44 __metaclass__ = TweetMeta |
44 __tablename__ = "tweet_process_event" |
45 __tablename__ = "tweet_process_event" |
45 id = Column(Integer, primary_key=True, autoincrement=True) |
46 id = Column(Integer, primary_key=True, autoincrement=True) |
46 ts = Column(DateTime, default=datetime.datetime.utcnow, index=True) |
47 ts = Column(DateTime, default=datetime.datetime.utcnow, index=True) |
47 type = Column(Enum("start","pid","shutdown","error", "start_worker", "stop_worker", name="process_event_type_enum"), nullable=False) |
48 type = Column(Enum("start","pid","shutdown","error", "start_worker", "stop_worker", "model_version", "application_name", "application_version", name="process_event_type_enum"), nullable=False) |
48 args = Column(String) |
49 args = Column(String) |
49 |
50 |
50 class EntityType(Base): |
51 class EntityType(Base): |
51 __metaclass__ = TweetMeta |
52 __metaclass__ = TweetMeta |
52 __tablename__ = "tweet_entity_type" |
53 __tablename__ = "tweet_entity_type" |
253 __mapper_args__ = {'polymorphic_identity': 'entity_media'} |
254 __mapper_args__ = {'polymorphic_identity': 'entity_media'} |
254 id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True) |
255 id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True) |
255 media_id = Column(BigInteger, ForeignKey('tweet_media.id')) |
256 media_id = Column(BigInteger, ForeignKey('tweet_media.id')) |
256 media = relationship(Media, primaryjoin=(media_id == Media.id)) |
257 media = relationship(Media, primaryjoin=(media_id == Media.id)) |
257 |
258 |
|
259 def add_model_version(session, must_commit=True): |
|
260 pe = ProcessEvent(args=iri_tweet.get_version(), type="model_version") |
|
261 session.add(pe) |
|
262 if must_commit: |
|
263 session.commit() |
258 |
264 |
259 def setup_database(*args, **kwargs): |
265 def setup_database(*args, **kwargs): |
260 |
266 |
261 create_all = True |
267 session_argname = [ 'autoflush','binds', "class_", "_enable_transaction_accounting","expire_on_commit", "extension", "query_cls", "twophase", "weak_identity_map", "autocommit"] |
262 if "create_all" in kwargs: |
268 |
263 create_all = kwargs["create_all"] |
269 kwargs_ce = dict((k, v) for k,v in kwargs.items() if (k not in session_argname and k != "create_all")) |
264 del(kwargs["create_all"]) |
270 |
265 |
271 engine = create_engine(*args, **kwargs_ce) |
266 engine = create_engine(*args, **kwargs) |
272 metadata = Base.metadata |
267 metadata = Base.metadata |
273 |
268 |
274 kwargs_sm = {'bind': engine} |
269 if create_all: |
275 |
|
276 kwargs_sm.update([(argname, kwargs[argname]) for argname in session_argname if argname in kwargs]) |
|
277 |
|
278 Session = sessionmaker(**kwargs_sm) |
|
279 #set model version |
|
280 |
|
281 if kwargs.get('create_all', True): |
270 metadata.create_all(engine) |
282 metadata.create_all(engine) |
271 |
283 session = Session() |
272 return (engine, metadata) |
284 try: |
273 |
285 add_model_version(session) |
|
286 finally: |
|
287 session.close() |
|
288 |
|
289 return (engine, metadata, Session) |
|
290 |