diff -r 54d7f1486ac4 -r 4daf47fcf792 script/iri_tweet/utils.py --- a/script/iri_tweet/utils.py Tue Jan 18 10:08:03 2011 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,310 +0,0 @@ -from models import * -import datetime -import email.utils -import json -import logging -import sys -import twitter -import twitter_text -import os.path -import twitter.oauth - - -def get_oauth_token(token_file_path=None): - - if token_file_path and os.path.file_exists(token_file_path): - logging.debug("reading token from file %s" % token_file_path) - return twitter.oauth.read_token_file(token_file_path) - #read access token info from path - - if 'ACCESS_TOKEN_KEY' in dict() and 'ACCESS_TOKEN_SECRET' in dict() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET: - return ACCESS_TOKEN_KEY,ACCESS_TOKEN_SECRET - - return twitter.oauth_dance.oauth_dance(APPLICATION_NAME, CONSUMER_KEY, CONSUMER_SECRET, token_filename) - -def parse_date(date_str): - ts = email.utils.parsedate_tz(date_str) - return datetime.datetime(*ts[0:7]) - - -fields_adapter = { - 'stream': { - "tweet": { - "created_at" : adapt_date, - "coordinates" : adapt_json, - "place" : adapt_json, - "geo" : adapt_json, -# "original_json" : adapt_json, - }, - "user": { - "created_at" : adapt_date, - }, - }, - 'rest': { - "tweet" : { - "place" : adapt_json, - "geo" : adapt_json, - "created_at" : adapt_date, -# "original_json" : adapt_json, - }, - }, -} - -# -# adapt fields, return a copy of the field_dict with adapted fields -# -def adapt_fields(fields_dict, adapter_mapping): - def adapt_one_field(field, value): - if field in adapter_mapping and adapter_mapping[field] is not None: - return adapter_mapping[field](value) - else: - return value - return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()]) - - - -class TwitterProcessorException(Exception): - pass - -class TwitterProcessor(object): - - def __init__(self, json_dict, json_txt, session): - - if json_dict is None and json_txt is None: - raise TwitterProcessorException("No json") - - if json_dict is None: - self.json_dict = json.loads(json_txt) - else: - self.json_dict = json_dict - - if not json_txt: - self.json_txt = json.dumps(json_dict) - else: - self.json_txt = json_txt - - if "id" not in self.json_dict: - raise TwitterProcessorException("No id in json") - - self.session = session - - def __get_user(self, user_dict): - logging.debug("Get user : " + repr(user_dict)) - - user_id = user_dict.get("id",None) - user_name = user_dict.get("screen_name", user_dict.get("name", None)) - - if user_id is None and user_name is None: - return None - - if user_id: - user = self.session.query(User).filter(User.id == user_id).first() - else: - user = self.session.query(User).filter(User.screen_name == user_name).first() - - if user is not None: - return user - - user_created_at = user_dict.get("created_at", None) - - if user_created_at is None: - acess_token_key, access_token_secret = get_oauth_token() - t = twitter.Twitter(auth=twitter.OAuth(token_key, token_secret, CONSUMER_KEY, CONSUMER_SECRET)) - try: - if user_id: - user_dict = t.users.show(user_id=user_id) - else: - user_dict = t.users.show(screen_name=user_name) - except Exception as e: - logging.info("get_user : TWITTER ERROR : " + repr(e)) - logging.info("get_user : TWITTER ERROR : " + str(e)) - - user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"]) - if "id" not in user_dict: - return None - - user = User(**user_dict) - - self.session.add(user) - self.session.flush() - - return user - - def __process_entity(self, ind, ind_type): - logging.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) - - entity_dict = { - "indice_start": ind["indices"][0], - "indice_end" : ind["indices"][1], - "tweet_id" : self.tweet.id, - "tweet" : self.tweet - } - - def process_hashtags(): - text = ind.get("text", ind.get("hashtag", None)) - if text is None: - return None - hashtag = self.session.query(Hashtag).filter(Hashtag.text == text).first() - if not hashtag: - ind["text"] = text - hashtag = Hashtag(**ind) - self.session.add(hashtag) - self.session.flush() - entity_dict['hashtag'] = hashtag - entity_dict['hashtag_id'] = hashtag.id - entity = EntityHashtag(**entity_dict) - return entity - - def process_user_mentions(): - user_mention = self.__get_user(ind) - if user_mention is None: - entity_dict['user'] = None - entity_dict['user_id'] = None - else: - entity_dict['user'] = user_mention - entity_dict['user_id'] = user_mention.id - entity = EntityUser(**entity_dict) - return entity - - def process_urls(): - url = self.session.query(Url).filter(Url.url == ind["url"]).first() - if url is None: - url = Url(**ind) - self.session.add(url) - self.session.flush() - entity_dict['url'] = url - entity_dict['url_id'] = url.id - entity = EntityUrl(**entity_dict) - return entity - - #{'': lambda } - entity = { - 'hashtags': process_hashtags, - 'user_mentions' : process_user_mentions, - 'urls' : process_urls - }[ind_type]() - - logging.debug("Process_entity entity_dict: " + repr(entity_dict)) - if entity: - self.session.add(entity) - self.session.flush() - - - def __process_twitter_stream(self): - - tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count() - if tweet_nb > 0: - return - - ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"]) - - # get or create user - user = self.__get_user(self.json_dict["user"]) - if user is None: - log.warning("USER not found " + repr(ts["user"])) - ts_copy["user"] = None - ts_copy["user_id"] = None - else: - ts_copy["user"] = user - ts_copy["user_id"] = ts_copy["user"].id - ts_copy["original_json"] = self.json_txt - - self.tweet = Tweet(**ts_copy) - self.session.add(self.tweet) - self.session.flush() - - # get entities - for ind_type, entity_list in self.json_dict["entities"].items(): - for ind in entity_list: - self.__process_entity(ind, ind_type) - - - def __process_twitter_rest(self): - tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count() - if tweet_nb > 0: - return - - tweet_fields = { - 'created_at': self.json_dict["created_at"], - 'favorited': False, - 'id': self.json_dict["id"], - 'id_str': self.json_dict["id_str"], - #'in_reply_to_screen_name': ts["to_user"], - 'in_reply_to_user_id': self.json_dict["to_user_id"], - 'in_reply_to_user_id_str': self.json_dict["to_user_id_str"], - #'place': ts["place"], - 'source': self.json_dict["source"], - 'text': self.json_dict["text"], - 'truncated': False, - 'original_json' : self.json_txt, - } - - #user - - user_fields = { - 'id' : self.json_dict['from_user_id'], - 'id_str' : self.json_dict['from_user_id_str'], - 'lang' : self.json_dict['iso_language_code'], - 'profile_image_url' : self.json_dict["profile_image_url"], - 'screen_name' : self.json_dict["from_user"], - } - - user = self.__get_user(user_fields) - if user is None: - log.warning("USER not found " + repr(user_fields)) - tweet_fields["user"] = None - tweet_fields["user_id"] = None - else: - tweet_fields["user"] = user - tweet_fields["user_id"] = user.id - - tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"]) - self.tweet = Tweet(**tweet_fields) - session.add(self.tweet) - - text = self.tweet.text - - extractor = twitter_text.Extractor(text) - - for ind in extractor.extract_hashtags_with_indices(): - self.__process_entity(ind, "hashtags") - - for ind in extractor.extract_mentioned_screen_names_with_indices(): - self.__process_entity(ind, "user_mentions") - - for ind in extractor.extract_urls_with_indices(): - self.__process_entity(ind, "urls") - - self.session.flush() - - - def process(self): - if "metadata" in self.json_dict: - self.__process_twitter_rest() - else: - self.__process_twitter_stream() - - -def set_logging(options): - - logging_config = {} - - if options.logfile == "stdout": - logging_config["stream"] = sys.stdout - elif options.logfile == "stderr": - logging_config["stream"] = sys.stderr - else: - logging_config["filename"] = options.logfile - - logging_config["level"] = max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)) - logging.basicConfig(**logging_config) - - options.debug = (options.verbose-options.quiet > 0) - -def set_logging_options(parser): - parser.add_option("-l", "--log", dest="logfile", - help="log to file", metavar="LOG", default="stderr") - parser.add_option("-v", dest="verbose", action="count", - help="verbose", metavar="VERBOSE", default=0) - parser.add_option("-q", dest="quiet", action="count", - help="quiet", metavar="QUIET", default=0)