import datetime
import email.utils
import json
from sqlalchemy import (BigInteger, Boolean, Column, DateTime, Enum,
ForeignKey, Integer, String, create_engine, event)
from sqlalchemy.engine import Engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker
import iri_tweet
Base = declarative_base()
APPLICATION_NAME = "IRI_TWITTER"
ACCESS_TOKEN_KEY = None
ACCESS_TOKEN_SECRET = None
def adapt_date(date_str):
ts = email.utils.parsedate_tz(date_str) #@UndefinedVariable
return datetime.datetime(*ts[0:7])
def adapt_json(obj):
if obj is None:
return None
else:
return json.dumps(obj)
class TweetMeta(type(Base)):
def __init__(cls, name, bases, ns): #@NoSelf
def init(self, **kwargs):
for key, value in kwargs.items():
if hasattr(self, key):
setattr(self, key, value)
super(cls, self).__init__()
setattr(cls, '__init__', init)
super(TweetMeta, cls).__init__(name, bases, ns)
class ProcessEvent(Base):
__metaclass__ = TweetMeta
__tablename__ = "tweet_process_event"
id = Column(Integer, primary_key=True, autoincrement=True)
ts = Column(DateTime, default=datetime.datetime.utcnow, index=True)
type = Column(Enum("start","pid","shutdown","error", "start_worker", "stop_worker", "model_version", "application_name", "application_version", name="process_event_type_enum"), nullable=False)
args = Column(String)
class EntityType(Base):
__metaclass__ = TweetMeta
__tablename__ = "tweet_entity_type"
id = Column(Integer, primary_key=True, autoincrement=True)
label = Column(String)
class Entity(Base):
__metaclass__ = TweetMeta
__tablename__ = "tweet_entity"
id = Column(Integer, primary_key=True)
tweet_id = Column(BigInteger, ForeignKey('tweet_tweet.id'))
type = Column(String)
entity_type_id = Column(Integer, ForeignKey('tweet_entity_type.id'), nullable=False)
entity_type = relationship("EntityType", backref="entities")
indice_start = Column(Integer)
indice_end = Column(Integer)
source = Column(String)
__mapper_args__ = {'polymorphic_on': type, 'polymorphic_identity': 'entity_entity', 'with_polymorphic':'*'}
class TweetSource(Base):
__metaclass__ = TweetMeta
__tablename__ = 'tweet_tweet_source'
id = Column(Integer, primary_key=True, autoincrement=True)
original_json = Column(String)
received_at = Column(DateTime, default=datetime.datetime.utcnow, index=True)
class TweetLog(Base):
TWEET_STATUS = {
'OK' : 1,
'ERROR' : 2,
'NOT_TWEET': 3,
'DELETE': 4,
'SCRUB_GEO': 5,
'LIMIT': 6,
'STATUS_WITHHELD': 7,
'USER_WITHHELD': 8,
'DISCONNECT': 9,
'STALL_WARNING': 10,
'DELETE_PENDING': 4
}
__metaclass__ = TweetMeta
__tablename__ = 'tweet_tweet_log'
id = Column(Integer, primary_key=True, autoincrement=True)
ts = Column(DateTime, default=datetime.datetime.utcnow, index=True)
tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id'))
tweet_source = relationship("TweetSource", backref="logs")
status_id = Column(BigInteger, index=True, nullable=True, default=None)
status = Column(Integer)
error = Column(String)
error_stack = Column(String)
class Tweet(Base):
__metaclass__ = TweetMeta
__tablename__ = 'tweet_tweet'
id = Column(BigInteger, primary_key=True, autoincrement=False)
id_str = Column(String)
contributors = Column(String)
coordinates = Column(String)
created_at = Column(DateTime, index=True)
favorited = Column(Boolean)
geo = Column(String)
in_reply_to_screen_name = Column(String)
in_reply_to_status_id = Column(BigInteger)
in_reply_to_status_id_str = Column(String)
in_reply_to_user_id = Column(BigInteger)
in_reply_to_user_id_str = Column(String)
place = Column(String)
retweet_count = Column(String)
retweeted = Column(Boolean)
source = Column(String)
text = Column(String)
truncated = Column(Boolean)
user_id = Column(Integer, ForeignKey('tweet_user.id'))
user = relationship("User", backref="tweets")
tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id'))
tweet_source = relationship("TweetSource", backref="tweet")
entity_list = relationship(Entity, backref='tweet', cascade="all, delete-orphan")
received_at = Column(DateTime, default=datetime.datetime.utcnow, index=True)
class UserMessage(Base):
__metaclass__ = TweetMeta
__tablename__ = "tweet_user_message"
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey('tweet_user.id'))
user = relationship("User", backref="messages")
created_at = Column(DateTime, default=datetime.datetime.utcnow)
message_id = Column(Integer, ForeignKey('tweet_message.id'))
class Message(Base):
__metaclass__ = TweetMeta
__tablename__ = "tweet_message"
id = Column(Integer, primary_key=True)
created_at = Column(DateTime, default=datetime.datetime.utcnow)
text = Column(String)
users = relationship(UserMessage, backref='message')
class User(Base):
__metaclass__ = TweetMeta
__tablename__ = "tweet_user"
id = Column(BigInteger, primary_key=True, autoincrement=False)
id_str = Column(String)
created_at = Column(DateTime, index=True)
description = Column(String)
favourites_count = Column(Integer)
followers_count = Column(Integer)
friends_count = Column(Integer)
listed_count = Column(Integer)
location = Column(String)
name = Column(String)
default_profile_image = Column(String)
default_profile = Column(Boolean)
protected = Column(Boolean)
screen_name = Column(String, index=True)
statuses_count = Column(Integer)
url = Column(String)
verified = Column(Boolean)
derived = Column(String) #JSON
profile_banner_url = Column(String)
profile_image_url_https = Column(String)
withheld_in_countries = Column(String) # ARRAY
withheld_scope = Column(String)
class Hashtag(Base):
__metaclass__ = TweetMeta
__tablename__ = "tweet_hashtag"
id = Column(Integer, primary_key=True)
text = Column(String, unique=True, index=True)
class Url(Base):
__metaclass__ = TweetMeta
__tablename__ = "tweet_url"
id = Column(Integer, primary_key=True)
url = Column(String, unique=True)
expanded_url = Column(String)
class MediaType(Base):
__metaclass__ = TweetMeta
__tablename__ = "tweet_media_type"
id = Column(Integer, primary_key=True, autoincrement=True)
label = Column(String, unique=True, index=True)
class Media(Base):
__metaclass__ = TweetMeta
__tablename__ = "tweet_media"
id = Column(BigInteger, primary_key=True, autoincrement=False)
id_str = Column(String, unique=True)
media_url = Column(String, unique=True)
media_url_https = Column(String, unique=True)
url = Column(String)
display_url = Column(String)
expanded_url = Column(String)
sizes = Column(String)
type_id = Column(Integer, ForeignKey("tweet_media_type.id"))
type = relationship(MediaType, primaryjoin=type_id == MediaType.id)
class EntityHashtag(Entity):
__tablename__ = "tweet_entity_hashtag"
__mapper_args__ = {'polymorphic_identity': 'entity_hashtag'}
id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True)
hashtag_id = Column(Integer, ForeignKey("tweet_hashtag.id"))
hashtag = relationship(Hashtag, primaryjoin=hashtag_id == Hashtag.id)
class EntityUrl(Entity):
__tablename__ = "tweet_entity_url"
__mapper_args__ = {'polymorphic_identity': 'entity_url'}
id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True)
url_id = Column(Integer, ForeignKey("tweet_url.id"))
url = relationship(Url, primaryjoin=url_id == Url.id)
class EntityUser(Entity):
__tablename__ = "tweet_entity_user"
__mapper_args__ = {'polymorphic_identity': 'entity_user'}
id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True)
user_id = Column(BigInteger, ForeignKey('tweet_user.id'))
user = relationship(User, primaryjoin=(user_id == User.id))
class EntityMedia(Entity):
__tablename__ = "tweet_entity_media"
__mapper_args__ = {'polymorphic_identity': 'entity_media'}
id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True)
media_id = Column(BigInteger, ForeignKey('tweet_media.id'))
media = relationship(Media, primaryjoin=(media_id == Media.id))
def add_model_version(session, must_commit=True):
pe = ProcessEvent(args=iri_tweet.get_version(), type="model_version")
session.add(pe)
if must_commit:
session.commit()
def setup_database(*args, **kwargs):
session_argname = [ 'autoflush','binds', "class_", "_enable_transaction_accounting","expire_on_commit", "extension", "query_cls", "twophase", "weak_identity_map", "autocommit"]
kwargs_ce = dict((k, v) for k,v in kwargs.items() if (k not in session_argname and k != "create_all"))
engine = create_engine(*args, **kwargs_ce)
if engine.name == "sqlite":
@event.listens_for(Engine, "connect")
def set_sqlite_pragma(dbapi_connection, connection_record): #pylint: W0612
cursor = dbapi_connection.cursor()
cursor.execute("PRAGMA foreign_keys=ON")
cursor.close()
metadata = Base.metadata
kwargs_sm = {'bind': engine}
kwargs_sm.update([(argname, kwargs[argname]) for argname in session_argname if argname in kwargs])
Session = sessionmaker(**kwargs_sm)
#set model version
if kwargs.get('create_all', True):
metadata.create_all(engine)
session = Session()
try:
add_model_version(session)
finally:
session.close()
return (engine, metadata, Session)