diff -r b7f4b0554ef8 -r bb44692e09ee script/iri_tweet/utils.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/script/iri_tweet/utils.py Tue Jan 11 11:17:17 2011 +0100 @@ -0,0 +1,240 @@ +import email.utils +import logging +from models import * +import datetime +import twitter +import twitter_text + + +def parse_date(date_str): + ts = email.utils.parsedate_tz(date_str) + return datetime.datetime(*ts[0:7]) + + +fields_adapter = { + 'stream': { + "tweet": { + "created_at" : adapt_date, + "coordinates" : adapt_json, + "place" : adapt_json, + "geo" : adapt_json, +# "original_json" : adapt_json, + }, + "user": { + "created_at" : adapt_date, + }, + }, + 'rest': { + "tweet" : { + "place" : adapt_json, + "geo" : adapt_json, + "created_at" : adapt_date, +# "original_json" : adapt_json, + }, + }, +} + +# +# adapt fields, return a copy of the field_dict with adapted fields +# +def adapt_fields(fields_dict, adapter_mapping): + def adapt_one_field(field, value): + if field in adapter_mapping and adapter_mapping[field] is not None: + return adapter_mapping[field](value) + else: + return value + return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()]) + +def get_user(user_dict, session): + + logging.debug("Get user : " + repr(user_dict)) + + user_id = user_dict.get("id",None) + user_name = user_dict.get("screen_name", user_dict.get("name", None)) + + if user_id is None and user_name is None: + return None + + if user_id: + user = session.query(User).filter(User.id == user_id).first() + else: + user = session.query(User).filter(User.screen_name == user_name).first() + + if user is not None: + return user + + user_created_at = user_dict.get("created_at", None) + + if user_created_at is None: + t = twitter.Twitter(auth=twitter.OAuth(ACCESS_TOKEN_KEY, ACCESS_TOKEN_SECRET, CONSUMER_KEY, CONSUMER_SECRET)) + try: + if user_id: + user_dict = t.users.show(user_id=user_id) + else: + user_dict = t.users.show(screen_name=user_name) + except Exception as e: + logging.info("get_user : TWITTER ERROR : " + repr(e)) + logging.info("get_user : TWITTER ERROR : " + str(e)) + + user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"]) + if "id" not in user_dict: + return None + + user = User(**user_dict) + + session.add(user) + session.flush() + + return user + # if not, if needed get info from twitter + # create user + # return it + +def process_entity(ind, ind_type, tweet, session): + + logging.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) + + entity_dict = { + "indice_start": ind["indices"][0], + "indice_end" : ind["indices"][1], + "tweet_id" : tweet.id, + "tweet" : tweet + } + + def process_hashtags(): + text = ind.get("text", ind.get("hashtag", None)) + if text is None: + return None + hashtag = session.query(Hashtag).filter(Hashtag.text == text).first() + if not hashtag: + ind["text"] = text + hashtag = Hashtag(**ind) + session.add(hashtag) + session.flush() + entity_dict['hashtag'] = hashtag + entity_dict['hashtag_id'] = hashtag.id + entity = EntityHashtag(**entity_dict) + return entity + + def process_user_mentions(): + user_mention = get_user(ind, session) + if user_mention is None: + entity_dict['user'] = None + entity_dict['user_id'] = None + else: + entity_dict['user'] = user_mention + entity_dict['user_id'] = user_mention.id + entity = EntityUser(**entity_dict) + return entity + + def process_urls(): + url = session.query(Url).filter(Url.url == ind["url"]).first() + if url is None: + url = Url(**ind) + session.add(url) + session.flush() + entity_dict['url'] = url + entity_dict['url_id'] = url.id + entity = EntityUrl(**entity_dict) + return entity + + #{'': lambda } + entity = { + 'hashtags': process_hashtags, + 'user_mentions' : process_user_mentions, + 'urls' : process_urls + }[ind_type]() + + logging.debug("Process_entity entity_dict: " + repr(entity_dict)) + if entity: + session.add(entity) + + + +def from_twitter_rest(ts, jsontxt, session): + + tweet_nb = session.query(Tweet).filter(Tweet.id == ts["id"]).count() + if tweet_nb > 0: + return + + tweet_fields = { + 'created_at': ts["created_at"], + 'favorited': False, + 'id': ts["id"], + 'id_str': ts["id_str"], + #'in_reply_to_screen_name': ts["to_user"], + 'in_reply_to_user_id': ts["to_user_id"], + 'in_reply_to_user_id_str': ts["to_user_id_str"], + #'place': ts["place"], + 'source': ts["source"], + 'text': ts["text"], + 'truncated': False, + 'original_json' : jsontxt, + } + + #user + + user_fields = { + 'id' : ts['from_user_id'], + 'id_str' : ts['from_user_id_str'], + 'lang' : ts['iso_language_code'], + 'profile_image_url' : ts["profile_image_url"], + 'screen_name' : ts["from_user"], + } + + user = get_user(user_fields, session) + if user is None: + log.warning("USER not found " + repr(user_fields)) + tweet_fields["user"] = None + tweet_fields["user_id"] = None + else: + tweet_fields["user"] = user + tweet_fields["user_id"] = user.id + + tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"]) + tweet = Tweet(**tweet_fields) + session.add(tweet) + + text = tweet.text + + extractor = twitter_text.Extractor(text) + + for ind in extractor.extract_hashtags_with_indices(): + process_entity(ind, "hashtags", tweet, session) + + for ind in extractor.extract_mentioned_screen_names_with_indices(): + process_entity(ind, "user_mentions", tweet, session) + + for ind in extractor.extract_urls_with_indices(): + process_entity(ind, "urls", tweet, session) + + + + +def from_twitter_stream(ts, jsontxt, session): + + tweet_nb = session.query(Tweet).filter(Tweet.id == ts["id"]).count() + if tweet_nb > 0: + return + + ts_copy = adapt_fields(ts, fields_adapter["stream"]["tweet"]) + + # get or create user + user = get_user(ts["user"], session) + if user is None: + log.warning("USER not found " + repr(ts["user"])) + ts_copy["user"] = None + ts_copy["user_id"] = None + else: + ts_copy["user"] = user + ts_copy["user_id"] = ts_copy["user"].id + ts_copy["original_json"] = jsontxt + + tweet = Tweet(**ts_copy) + session.add(tweet) + session.flush() + + # get entities + for ind_type, entity_list in ts["entities"].items(): + for ind in entity_list: + process_entity(ind, ind_type, tweet, session)