import email.utils
import logging
from models import *
import datetime
import twitter
import twitter_text
def parse_date(date_str):
ts = email.utils.parsedate_tz(date_str)
return datetime.datetime(*ts[0:7])
fields_adapter = {
'stream': {
"tweet": {
"created_at" : adapt_date,
"coordinates" : adapt_json,
"place" : adapt_json,
"geo" : adapt_json,
# "original_json" : adapt_json,
},
"user": {
"created_at" : adapt_date,
},
},
'rest': {
"tweet" : {
"place" : adapt_json,
"geo" : adapt_json,
"created_at" : adapt_date,
# "original_json" : adapt_json,
},
},
}
#
# adapt fields, return a copy of the field_dict with adapted fields
#
def adapt_fields(fields_dict, adapter_mapping):
def adapt_one_field(field, value):
if field in adapter_mapping and adapter_mapping[field] is not None:
return adapter_mapping[field](value)
else:
return value
return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()])
def get_user(user_dict, session):
logging.debug("Get user : " + repr(user_dict))
user_id = user_dict.get("id",None)
user_name = user_dict.get("screen_name", user_dict.get("name", None))
if user_id is None and user_name is None:
return None
if user_id:
user = session.query(User).filter(User.id == user_id).first()
else:
user = session.query(User).filter(User.screen_name == user_name).first()
if user is not None:
return user
user_created_at = user_dict.get("created_at", None)
if user_created_at is None:
t = twitter.Twitter(auth=twitter.OAuth(ACCESS_TOKEN_KEY, ACCESS_TOKEN_SECRET, CONSUMER_KEY, CONSUMER_SECRET))
try:
if user_id:
user_dict = t.users.show(user_id=user_id)
else:
user_dict = t.users.show(screen_name=user_name)
except Exception as e:
logging.info("get_user : TWITTER ERROR : " + repr(e))
logging.info("get_user : TWITTER ERROR : " + str(e))
user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"])
if "id" not in user_dict:
return None
user = User(**user_dict)
session.add(user)
session.flush()
return user
# if not, if needed get info from twitter
# create user
# return it
def process_entity(ind, ind_type, tweet, session):
logging.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type))
entity_dict = {
"indice_start": ind["indices"][0],
"indice_end" : ind["indices"][1],
"tweet_id" : tweet.id,
"tweet" : tweet
}
def process_hashtags():
text = ind.get("text", ind.get("hashtag", None))
if text is None:
return None
hashtag = session.query(Hashtag).filter(Hashtag.text == text).first()
if not hashtag:
ind["text"] = text
hashtag = Hashtag(**ind)
session.add(hashtag)
session.flush()
entity_dict['hashtag'] = hashtag
entity_dict['hashtag_id'] = hashtag.id
entity = EntityHashtag(**entity_dict)
return entity
def process_user_mentions():
user_mention = get_user(ind, session)
if user_mention is None:
entity_dict['user'] = None
entity_dict['user_id'] = None
else:
entity_dict['user'] = user_mention
entity_dict['user_id'] = user_mention.id
entity = EntityUser(**entity_dict)
return entity
def process_urls():
url = session.query(Url).filter(Url.url == ind["url"]).first()
if url is None:
url = Url(**ind)
session.add(url)
session.flush()
entity_dict['url'] = url
entity_dict['url_id'] = url.id
entity = EntityUrl(**entity_dict)
return entity
#{'': lambda }
entity = {
'hashtags': process_hashtags,
'user_mentions' : process_user_mentions,
'urls' : process_urls
}[ind_type]()
logging.debug("Process_entity entity_dict: " + repr(entity_dict))
if entity:
session.add(entity)
def from_twitter_rest(ts, jsontxt, session):
tweet_nb = session.query(Tweet).filter(Tweet.id == ts["id"]).count()
if tweet_nb > 0:
return
tweet_fields = {
'created_at': ts["created_at"],
'favorited': False,
'id': ts["id"],
'id_str': ts["id_str"],
#'in_reply_to_screen_name': ts["to_user"],
'in_reply_to_user_id': ts["to_user_id"],
'in_reply_to_user_id_str': ts["to_user_id_str"],
#'place': ts["place"],
'source': ts["source"],
'text': ts["text"],
'truncated': False,
'original_json' : jsontxt,
}
#user
user_fields = {
'id' : ts['from_user_id'],
'id_str' : ts['from_user_id_str'],
'lang' : ts['iso_language_code'],
'profile_image_url' : ts["profile_image_url"],
'screen_name' : ts["from_user"],
}
user = get_user(user_fields, session)
if user is None:
log.warning("USER not found " + repr(user_fields))
tweet_fields["user"] = None
tweet_fields["user_id"] = None
else:
tweet_fields["user"] = user
tweet_fields["user_id"] = user.id
tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"])
tweet = Tweet(**tweet_fields)
session.add(tweet)
text = tweet.text
extractor = twitter_text.Extractor(text)
for ind in extractor.extract_hashtags_with_indices():
process_entity(ind, "hashtags", tweet, session)
for ind in extractor.extract_mentioned_screen_names_with_indices():
process_entity(ind, "user_mentions", tweet, session)
for ind in extractor.extract_urls_with_indices():
process_entity(ind, "urls", tweet, session)
def from_twitter_stream(ts, jsontxt, session):
tweet_nb = session.query(Tweet).filter(Tweet.id == ts["id"]).count()
if tweet_nb > 0:
return
ts_copy = adapt_fields(ts, fields_adapter["stream"]["tweet"])
# get or create user
user = get_user(ts["user"], session)
if user is None:
log.warning("USER not found " + repr(ts["user"]))
ts_copy["user"] = None
ts_copy["user_id"] = None
else:
ts_copy["user"] = user
ts_copy["user_id"] = ts_copy["user"].id
ts_copy["original_json"] = jsontxt
tweet = Tweet(**ts_copy)
session.add(tweet)
session.flush()
# get entities
for ind_type, entity_list in ts["entities"].items():
for ind in entity_list:
process_entity(ind, ind_type, tweet, session)