diff -r 7fb5a7b0d35c -r 99215db3da25 script/lib/iri_tweet/utils.py --- a/script/lib/iri_tweet/utils.py Mon Feb 20 18:52:19 2012 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,616 +0,0 @@ -from models import (Tweet, User, Hashtag, EntityHashtag, EntityUser, Url, - EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY, - ACCESS_TOKEN_SECRET, adapt_date, adapt_json, TweetSource, TweetLog, MediaType, - Media, EntityMedia, Entity, EntityType) -from sqlalchemy.sql import select, or_ #@UnresolvedImport -import Queue #@UnresolvedImport -import anyjson #@UnresolvedImport -import datetime -import email.utils -import logging -import os.path -import sys -import math -import twitter.oauth #@UnresolvedImport -import twitter.oauth_dance #@UnresolvedImport -import twitter_text #@UnresolvedImport - - -CACHE_ACCESS_TOKEN = {} - -def get_oauth_token(token_file_path=None, check_access_token=True, application_name=APPLICATION_NAME, consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET): - - global CACHE_ACCESS_TOKEN - - if 'ACCESS_TOKEN_KEY' in globals() and 'ACCESS_TOKEN_SECRET' in globals() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET: - return ACCESS_TOKEN_KEY,ACCESS_TOKEN_SECRET - - res = CACHE_ACCESS_TOKEN.get(application_name, None) - - if res is None and token_file_path and os.path.exists(token_file_path): - get_logger().debug("get_oauth_token : reading token from file %s" % token_file_path) #@UndefinedVariable - res = twitter.oauth.read_token_file(token_file_path) - - if res is not None and check_access_token: - get_logger().debug("get_oauth_token : Check oauth tokens") #@UndefinedVariable - t = twitter.Twitter(auth=twitter.OAuth(res[0], res[1], CONSUMER_KEY, CONSUMER_SECRET)) - status = None - try: - status = t.account.rate_limit_status() - except Exception as e: - get_logger().debug("get_oauth_token : error getting rate limit status %s" % repr(e)) - status = None - get_logger().debug("get_oauth_token : Check oauth tokens : status %s" % repr(status)) #@UndefinedVariable - if status is None or status['remaining_hits'] == 0: - get_logger().debug("get_oauth_token : Problem with status %s" % repr(status)) - res = None - - if res is None: - get_logger().debug("get_oauth_token : doing the oauth dance") - res = twitter.oauth_dance.oauth_dance(application_name, consumer_key, consumer_secret, token_file_path) - - CACHE_ACCESS_TOKEN[application_name] = res - - return res - -def parse_date(date_str): - ts = email.utils.parsedate_tz(date_str) #@UndefinedVariable - return datetime.datetime(*ts[0:7]) - -def clean_keys(dict_val): - return dict([(str(key),value) for key,value in dict_val.items()]) - -fields_adapter = { - 'stream': { - "tweet": { - "created_at" : adapt_date, - "coordinates" : adapt_json, - "place" : adapt_json, - "geo" : adapt_json, -# "original_json" : adapt_json, - }, - "user": { - "created_at" : adapt_date, - }, - - }, - - 'entities' : { - "medias": { - "sizes" : adapt_json, - }, - }, - 'rest': { - "tweet" : { - "place" : adapt_json, - "geo" : adapt_json, - "created_at" : adapt_date, -# "original_json" : adapt_json, - }, - }, -} - -# -# adapt fields, return a copy of the field_dict with adapted fields -# -def adapt_fields(fields_dict, adapter_mapping): - def adapt_one_field(field, value): - if field in adapter_mapping and adapter_mapping[field] is not None: - return adapter_mapping[field](value) - else: - return value - return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()]) - - -class ObjectBufferProxy(object): - def __init__(self, klass, args, kwargs, must_flush, instance=None): - self.klass= klass - self.args = args - self.kwargs = kwargs - self.must_flush = must_flush - self.instance = instance - - def persists(self, session): - new_args = [arg() if callable(arg) else arg for arg in self.args] if self.args is not None else [] - new_kwargs = dict([(k,v()) if callable(v) else (k,v) for k,v in self.kwargs.items()]) if self.kwargs is not None else {} - - if self.instance is None: - self.instance = self.klass(*new_args, **new_kwargs) - else: - self.instance = self.klass(*new_args, **new_kwargs) - self.instance = session.merge(self.instance) - - session.add(self.instance) - if self.must_flush: - session.flush() - - def __getattr__(self, name): - return lambda : getattr(self.instance, name) if self.instance else None - - - - -class ObjectsBuffer(object): - - def __init__(self): - self.__bufferlist = [] - self.__bufferdict = {} - - def __add_proxy_object(self, proxy): - proxy_list = self.__bufferdict.get(proxy.klass, None) - if proxy_list is None: - proxy_list = [] - self.__bufferdict[proxy.klass] = proxy_list - proxy_list.append(proxy) - self.__bufferlist.append(proxy) - - def persists(self, session): - for object_proxy in self.__bufferlist: - object_proxy.persists(session) - - def add_object(self, klass, args, kwargs, must_flush, instance=None): - new_proxy = ObjectBufferProxy(klass, args, kwargs, must_flush, instance) - self.__add_proxy_object(new_proxy) - return new_proxy - - def get(self, klass, **kwargs): - if klass in self.__bufferdict: - for proxy in self.__bufferdict[klass]: - if proxy.kwargs is None or len(proxy.kwargs) == 0 or proxy.klass != klass: - continue - found = True - for k,v in kwargs.items(): - if (k not in proxy.kwargs) or v != proxy.kwargs[k]: - found = False - break - if found: - return proxy - return None - -class TwitterProcessorException(Exception): - pass - -class TwitterProcessor(object): - - def __init__(self, json_dict, json_txt, source_id, session, access_token=None, token_filename=None, user_query_twitter=False): - - if json_dict is None and json_txt is None: - raise TwitterProcessorException("No json") - - if json_dict is None: - self.json_dict = anyjson.deserialize(json_txt) - else: - self.json_dict = json_dict - - if not json_txt: - self.json_txt = anyjson.serialize(json_dict) - else: - self.json_txt = json_txt - - if "id" not in self.json_dict: - raise TwitterProcessorException("No id in json") - - self.source_id = source_id - self.session = session - self.token_filename = token_filename - self.access_token = access_token - self.obj_buffer = ObjectsBuffer() - self.user_query_twitter = user_query_twitter - - - - def __get_user(self, user_dict, do_merge): - get_logger().debug("Get user : " + repr(user_dict)) #@UndefinedVariable - - user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"]) - - user_id = user_dict.get("id",None) - user_name = user_dict.get("screen_name", user_dict.get("name", None)) - - if user_id is None and user_name is None: - return None - - user = None - if user_id: - user = self.obj_buffer.get(User, id=user_id) - else: - user = self.obj_buffer.get(User, screen_name=user_name) - - #to do update user id needed - if user is not None: - user_created_at = None - if user.args is not None: - user_created_at = user.args.get('created_at', None) - if user_created_at is None and user_dict.get('created_at', None) is not None and do_merge: - if user.args is None: - user.args = user_dict - else: - user.args.update(user_dict) - return user - - #todo : add methpds to objectbuffer to get buffer user - user_obj = None - if user_id: - user_obj = self.session.query(User).filter(User.id == user_id).first() - else: - user_obj = self.session.query(User).filter(User.screen_name.ilike(user_name)).first() - - #todo update user if needed - if user_obj is not None: - if user_obj.created_at is not None or user_dict.get('created_at', None) is None or not do_merge : - user = ObjectBufferProxy(User, None, None, False, user_obj) - else: - user = self.obj_buffer.add_object(User, None, user_dict, True, user_obj) - return user - - user_created_at = user_dict.get("created_at", None) - - if user_created_at is None and self.user_query_twitter: - - if self.access_token is not None: - acess_token_key, access_token_secret = self.access_token - else: - acess_token_key, access_token_secret = get_oauth_token(self.token_filename) - t = twitter.Twitter(auth=twitter.OAuth(acess_token_key, access_token_secret, CONSUMER_KEY, CONSUMER_SECRET)) - try: - if user_id: - user_dict = t.users.show(user_id=user_id) - else: - user_dict = t.users.show(screen_name=user_name) - except Exception as e: - get_logger().info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable - get_logger().info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable - return None - - if "id" not in user_dict: - return None - - #TODO filter get, wrap in proxy - user_obj = self.session.query(User).filter(User.id == user_dict["id"]).first() - - if user_obj is not None and not do_merge: - return ObjectBufferProxy(User, None, None, False, user_obj) - else: - return self.obj_buffer.add_object(User, None, user_dict, True) - - def __get_or_create_object(self, klass, filter_by_kwargs, filter, creation_kwargs, must_flush, do_merge): - - obj_proxy = self.obj_buffer.get(klass, **filter_by_kwargs) - if obj_proxy is None: - query = self.session.query(klass) - if filter is not None: - query = query.filter(filter) - else: - query = query.filter_by(**filter_by_kwargs) - obj_instance = query.first() - if obj_instance is not None: - if not do_merge: - obj_proxy = ObjectBufferProxy(klass, None, None, False, obj_instance) - else: - obj_proxy = self.obj_buffer.add_object(klass, None, creation_kwargs, must_flush, obj_instance) - if obj_proxy is None: - obj_proxy = self.obj_buffer.add_object(klass, None, creation_kwargs, must_flush) - return obj_proxy - - - def __process_entity(self, ind, ind_type): - get_logger().debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable - - ind = clean_keys(ind) - - entity_type = self.__get_or_create_object(EntityType, {'label':ind_type}, None, {'label':ind_type}, True, False) - - entity_dict = { - "indice_start" : ind["indices"][0], - "indice_end" : ind["indices"][1], - "tweet_id" : self.tweet.id, - "entity_type_id" : entity_type.id, - "source" : adapt_json(ind) - } - - def process_medias(): - - media_id = ind.get('id', None) - if media_id is None: - return None, None - - type_str = ind.get("type", "photo") - media_type = self.__get_or_create_object(MediaType, {'label': type_str}, None, {'label':type_str}, True, False) - media_ind = adapt_fields(ind, fields_adapter["entities"]["medias"]) - if "type" in media_ind: - del(media_ind["type"]) - media_ind['type_id'] = media_type.id - media = self.__get_or_create_object(Media, {'id':media_id}, None, media_ind, True, False) - - entity_dict['media_id'] = media.id - return EntityMedia, entity_dict - - def process_hashtags(): - text = ind.get("text", ind.get("hashtag", None)) - if text is None: - return None, None - ind['text'] = text - hashtag = self.__get_or_create_object(Hashtag, {'text':text}, Hashtag.text.ilike(text), ind, True, False) - entity_dict['hashtag_id'] = hashtag.id - return EntityHashtag, entity_dict - - def process_user_mentions(): - user_mention = self.__get_user(ind, False) - if user_mention is None: - entity_dict['user_id'] = None - else: - entity_dict['user_id'] = user_mention.id - return EntityUser, entity_dict - - def process_urls(): - url = self.__get_or_create_object(Url, {'url':ind["url"]}, None, ind, True, False) - entity_dict['url_id'] = url.id - return EntityUrl, entity_dict - - #{'': lambda } - entity_klass, entity_dict = { - 'hashtags': process_hashtags, - 'user_mentions' : process_user_mentions, - 'urls' : process_urls, - 'media': process_medias, - }.get(ind_type, lambda: (Entity, entity_dict))() - - get_logger().debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable - if entity_klass: - self.obj_buffer.add_object(entity_klass, None, entity_dict, False) - - - def __process_twitter_stream(self): - - tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count() - if tweet_nb > 0: - return - - ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"]) - - # get or create user - user = self.__get_user(self.json_dict["user"], True) - if user is None: - get_logger().warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable - ts_copy["user_id"] = None - else: - ts_copy["user_id"] = user.id - - del(ts_copy['user']) - ts_copy["tweet_source_id"] = self.source_id - - self.tweet = self.obj_buffer.add_object(Tweet, None, ts_copy, True) - - self.__process_entities() - - - def __process_entities(self): - if "entities" in self.json_dict: - for ind_type, entity_list in self.json_dict["entities"].items(): - for ind in entity_list: - self.__process_entity(ind, ind_type) - else: - - text = self.tweet.text - extractor = twitter_text.Extractor(text) - for ind in extractor.extract_hashtags_with_indices(): - self.__process_entity(ind, "hashtags") - - for ind in extractor.extract_urls_with_indices(): - self.__process_entity(ind, "urls") - - for ind in extractor.extract_mentioned_screen_names_with_indices(): - self.__process_entity(ind, "user_mentions") - - def __process_twitter_rest(self): - tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count() - if tweet_nb > 0: - return - - - tweet_fields = { - 'created_at': self.json_dict["created_at"], - 'favorited': False, - 'id': self.json_dict["id"], - 'id_str': self.json_dict["id_str"], - #'in_reply_to_screen_name': ts["to_user"], - 'in_reply_to_user_id': self.json_dict["to_user_id"], - 'in_reply_to_user_id_str': self.json_dict["to_user_id_str"], - #'place': ts["place"], - 'source': self.json_dict["source"], - 'text': self.json_dict["text"], - 'truncated': False, - 'tweet_source_id' : self.source_id, - } - - #user - - user_fields = { - 'lang' : self.json_dict.get('iso_language_code',None), - 'profile_image_url' : self.json_dict["profile_image_url"], - 'screen_name' : self.json_dict["from_user"], - 'id' : self.json_dict["from_user_id"], - 'id_str' : self.json_dict["from_user_id_str"], - 'name' : self.json_dict['from_user_name'], - } - - user = self.__get_user(user_fields, do_merge=False) - if user is None: - get_logger().warning("USER not found " + repr(user_fields)) #@UndefinedVariable - tweet_fields["user_id"] = None - else: - tweet_fields["user_id"] = user.id - - tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"]) - self.tweet = self.obj_buffer.add_object(Tweet, None, tweet_fields, True) - - self.__process_entities() - - - - def process(self): - - if self.source_id is None: - tweet_source = self.obj_buffer.add_object(TweetSource, None, {'original_json':self.json_txt}, True) - self.source_id = tweet_source.id - - if "metadata" in self.json_dict: - self.__process_twitter_rest() - else: - self.__process_twitter_stream() - - self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['OK']}, True) - - self.obj_buffer.persists(self.session) - - -def set_logging(options, plogger=None, queue=None): - - logging_config = { - "format" : '%(asctime)s %(levelname)s:%(name)s:%(message)s', - "level" : max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)), #@UndefinedVariable - } - - if options.logfile == "stdout": - logging_config["stream"] = sys.stdout - elif options.logfile == "stderr": - logging_config["stream"] = sys.stderr - else: - logging_config["filename"] = options.logfile - - logger = plogger - if logger is None: - logger = get_logger() #@UndefinedVariable - - if len(logger.handlers) == 0: - filename = logging_config.get("filename") - if queue is not None: - hdlr = QueueHandler(queue, True) - elif filename: - mode = logging_config.get("filemode", 'a') - hdlr = logging.FileHandler(filename, mode) #@UndefinedVariable - else: - stream = logging_config.get("stream") - hdlr = logging.StreamHandler(stream) #@UndefinedVariable - - fs = logging_config.get("format", logging.BASIC_FORMAT) #@UndefinedVariable - dfs = logging_config.get("datefmt", None) - fmt = logging.Formatter(fs, dfs) #@UndefinedVariable - hdlr.setFormatter(fmt) - logger.addHandler(hdlr) - level = logging_config.get("level") - if level is not None: - logger.setLevel(level) - - options.debug = (options.verbose-options.quiet > 0) - return logger - -def set_logging_options(parser): - parser.add_option("-l", "--log", dest="logfile", - help="log to file", metavar="LOG", default="stderr") - parser.add_option("-v", dest="verbose", action="count", - help="verbose", metavar="VERBOSE", default=0) - parser.add_option("-q", dest="quiet", action="count", - help="quiet", metavar="QUIET", default=0) - -def get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist): - - query = query.join(EntityHashtag).join(Hashtag) - - if tweet_exclude_table is not None: - query = query.filter(~Tweet.id.in_(select([tweet_exclude_table.c.id]))) #@UndefinedVariable - - if start_date: - query = query.filter(Tweet.created_at >= start_date) - if end_date: - query = query.filter(Tweet.created_at <= end_date) - - if user_whitelist: - query = query.join(User).filter(User.screen_name.in_(user_whitelist)) - - - if hashtags : - def merge_hash(l,h): - l.extend(h.split(",")) - return l - htags = reduce(merge_hash, hashtags, []) - - query = query.filter(or_(*map(lambda h: Hashtag.text.contains(h), htags))) #@UndefinedVariable - - return query - - - -def get_filter_query(session, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist): - - query = session.query(Tweet) - query = get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist) - return query.order_by(Tweet.created_at) - - -def get_user_query(session, start_date, end_date, hashtags, tweet_exclude_table): - - query = session.query(User).join(Tweet) - - query = get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, None) - - return query.distinct() - -logger_name = "iri.tweet" - -def get_logger(): - global logger_name - return logging.getLogger(logger_name) #@UndefinedVariable - - -# Next two import lines for this demo only - -class QueueHandler(logging.Handler): #@UndefinedVariable - """ - This is a logging handler which sends events to a multiprocessing queue. - """ - - def __init__(self, queue, ignore_full): - """ - Initialise an instance, using the passed queue. - """ - logging.Handler.__init__(self) #@UndefinedVariable - self.queue = queue - self.ignore_full = True - - def emit(self, record): - """ - Emit a record. - - Writes the LogRecord to the queue. - """ - try: - ei = record.exc_info - if ei: - dummy = self.format(record) # just to get traceback text into record.exc_text - record.exc_info = None # not needed any more - if not self.ignore_full or not self.queue.full(): - self.queue.put_nowait(record) - except Queue.Full: - if self.ignore_full: - pass - else: - raise - except (KeyboardInterrupt, SystemExit): - raise - except: - self.handleError(record) - -def show_progress(current_line, total_line, label, width): - - percent = (float(current_line) / float(total_line)) * 100.0 - - marks = math.floor(width * (percent / 100.0)) - spaces = math.floor(width - marks) - - loader = u'[' + (u'=' * int(marks)) + (u' ' * int(spaces)) + u']' - - sys.stdout.write(u"%s %d%% %d/%d - %r\r" % (loader, percent, current_line - 1, total_line - 1, label[:50].rjust(50))) #takes the header into account - if percent >= 100: - sys.stdout.write("\n") - sys.stdout.flush()