script/iri_tweet/utils.py
changeset 11 54d7f1486ac4
parent 9 bb44692e09ee
--- a/script/iri_tweet/utils.py	Wed Jan 12 13:25:01 2011 +0100
+++ b/script/iri_tweet/utils.py	Tue Jan 18 10:08:03 2011 +0100
@@ -1,11 +1,27 @@
-import email.utils
-import logging
 from models import *
 import datetime
+import email.utils
+import json
+import logging
+import sys
 import twitter
 import twitter_text
+import os.path
+import twitter.oauth
 
 
+def get_oauth_token(token_file_path=None):
+    
+    if token_file_path and os.path.file_exists(token_file_path):
+        logging.debug("reading token from file %s" % token_file_path)
+        return twitter.oauth.read_token_file(token_file_path)
+        #read access token info from path
+    
+    if 'ACCESS_TOKEN_KEY' in dict() and 'ACCESS_TOKEN_SECRET' in dict() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET:
+        return ACCESS_TOKEN_KEY,ACCESS_TOKEN_SECRET
+    
+    return twitter.oauth_dance.oauth_dance(APPLICATION_NAME, CONSUMER_KEY, CONSUMER_SECRET, token_filename)
+
 def parse_date(date_str):
     ts = email.utils.parsedate_tz(date_str)
     return datetime.datetime(*ts[0:7])
@@ -45,196 +61,250 @@
             return value
     return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()])    
 
-def get_user(user_dict, session):
-    
-    logging.debug("Get user : " + repr(user_dict))
-    
-    user_id = user_dict.get("id",None)    
-    user_name = user_dict.get("screen_name", user_dict.get("name", None))
-    
-    if user_id is None and user_name is None:
-        return None
-
-    if user_id:
-        user = session.query(User).filter(User.id == user_id).first()
-    else:
-        user = session.query(User).filter(User.screen_name == user_name).first()
-
-    if user is not None:
-        return user
-
-    user_created_at = user_dict.get("created_at", None)
-    
-    if user_created_at is None:
-        t = twitter.Twitter(auth=twitter.OAuth(ACCESS_TOKEN_KEY, ACCESS_TOKEN_SECRET, CONSUMER_KEY, CONSUMER_SECRET))
-        try:
-            if user_id:
-                user_dict = t.users.show(user_id=user_id)
-            else:
-                user_dict = t.users.show(screen_name=user_name)            
-        except Exception as e:
-            logging.info("get_user : TWITTER ERROR : " + repr(e))
-            logging.info("get_user : TWITTER ERROR : " + str(e))
-
-    user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"])
-    if "id" not in user_dict:
-        return None
-    
-    user = User(**user_dict)
-    
-    session.add(user)
-    session.flush()
-    
-    return user 
-    # if not, if needed get info from twitter
-    # create user
-    # return it
-
-def process_entity(ind, ind_type, tweet, session):
-
-    logging.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type))
-
-    entity_dict = {
-       "indice_start": ind["indices"][0],
-       "indice_end"  : ind["indices"][1],
-       "tweet_id"    : tweet.id,
-       "tweet"       : tweet
-    }
-
-    def process_hashtags():
-        text = ind.get("text", ind.get("hashtag", None))
-        if text is None:
-            return None 
-        hashtag = session.query(Hashtag).filter(Hashtag.text == text).first()
-        if not hashtag:
-            ind["text"] = text
-            hashtag = Hashtag(**ind)
-            session.add(hashtag)
-            session.flush()
-        entity_dict['hashtag'] = hashtag
-        entity_dict['hashtag_id'] = hashtag.id
-        entity = EntityHashtag(**entity_dict)
-        return entity
-    
-    def process_user_mentions():
-        user_mention = get_user(ind, session)
-        if user_mention is None:
-            entity_dict['user'] = None
-            entity_dict['user_id'] = None
-        else:
-            entity_dict['user'] = user_mention
-            entity_dict['user_id'] = user_mention.id
-        entity = EntityUser(**entity_dict)
-        return entity
-    
-    def process_urls():
-        url = session.query(Url).filter(Url.url == ind["url"]).first()
-        if url is None:
-            url = Url(**ind)
-            session.add(url)
-            session.flush()
-        entity_dict['url'] = url
-        entity_dict['url_id'] = url.id
-        entity = EntityUrl(**entity_dict)
-        return entity
-    
-    #{'': lambda }
-    entity =  { 
-        'hashtags': process_hashtags,
-        'user_mentions' : process_user_mentions,
-        'urls' : process_urls
-        }[ind_type]()
-        
-    logging.debug("Process_entity entity_dict: " + repr(entity_dict))
-    if entity:
-        session.add(entity)
 
 
+class TwitterProcessorException(Exception):
+    pass
 
-def from_twitter_rest(ts, jsontxt, session):
+class TwitterProcessor(object):
     
-    tweet_nb = session.query(Tweet).filter(Tweet.id == ts["id"]).count()
-    if tweet_nb > 0:
-        return
+    def __init__(self, json_dict, json_txt, session):
+
+        if json_dict is None and json_txt is None:
+            raise TwitterProcessorException("No json")
+        
+        if json_dict is None:
+            self.json_dict = json.loads(json_txt)
+        else:
+            self.json_dict = json_dict
+        
+        if not json_txt:
+            self.json_txt = json.dumps(json_dict)
+        else:
+            self.json_txt = json_txt
+        
+        if "id" not in self.json_dict:
+            raise TwitterProcessorException("No id in json")
+        
+        self.session = session
+
+    def __get_user(self, user_dict):
+        logging.debug("Get user : " + repr(user_dict))
+    
+        user_id = user_dict.get("id",None)    
+        user_name = user_dict.get("screen_name", user_dict.get("name", None))
         
-    tweet_fields = {
-        'created_at': ts["created_at"], 
-        'favorited': False,
-        'id': ts["id"],
-        'id_str': ts["id_str"],
-        #'in_reply_to_screen_name': ts["to_user"], 
-        'in_reply_to_user_id': ts["to_user_id"],
-        'in_reply_to_user_id_str': ts["to_user_id_str"],
-        #'place': ts["place"],
-        'source': ts["source"],
-        'text': ts["text"],
-        'truncated': False,
-        'original_json' : jsontxt,
-    }
+        if user_id is None and user_name is None:
+            return None
+    
+        if user_id:
+            user = self.session.query(User).filter(User.id == user_id).first()
+        else:
+            user = self.session.query(User).filter(User.screen_name == user_name).first()
+    
+        if user is not None:
+            return user
     
-    #user
+        user_created_at = user_dict.get("created_at", None)
+        
+        if user_created_at is None:
+            acess_token_key, access_token_secret = get_oauth_token()
+            t = twitter.Twitter(auth=twitter.OAuth(token_key, token_secret, CONSUMER_KEY, CONSUMER_SECRET))
+            try:
+                if user_id:
+                    user_dict = t.users.show(user_id=user_id)
+                else:
+                    user_dict = t.users.show(screen_name=user_name)            
+            except Exception as e:
+                logging.info("get_user : TWITTER ERROR : " + repr(e))
+                logging.info("get_user : TWITTER ERROR : " + str(e))
+    
+        user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"])
+        if "id" not in user_dict:
+            return None
+        
+        user = User(**user_dict)
+        
+        self.session.add(user)
+        self.session.flush()
+        
+        return user 
 
-    user_fields = {
-        'id' : ts['from_user_id'],
-        'id_str' : ts['from_user_id_str'],
-        'lang' : ts['iso_language_code'],
-        'profile_image_url' : ts["profile_image_url"],
-        'screen_name' : ts["from_user"],                   
-    }
+    def __process_entity(self, ind, ind_type):
+        logging.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type))
     
-    user = get_user(user_fields, session)
-    if user is None:
-        log.warning("USER not found " + repr(user_fields))
-        tweet_fields["user"] = None
-        tweet_fields["user_id"] = None
-    else:
-        tweet_fields["user"] = user
-        tweet_fields["user_id"] = user.id
+        entity_dict = {
+           "indice_start": ind["indices"][0],
+           "indice_end"  : ind["indices"][1],
+           "tweet_id"    : self.tweet.id,
+           "tweet"       : self.tweet
+        }
     
-    tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"])
-    tweet = Tweet(**tweet_fields)
-    session.add(tweet)
-    
-    text = tweet.text
-    
-    extractor = twitter_text.Extractor(text)
-    
-    for ind in extractor.extract_hashtags_with_indices():
-        process_entity(ind, "hashtags", tweet, session)
+        def process_hashtags():
+            text = ind.get("text", ind.get("hashtag", None))
+            if text is None:
+                return None 
+            hashtag = self.session.query(Hashtag).filter(Hashtag.text == text).first()
+            if not hashtag:
+                ind["text"] = text
+                hashtag = Hashtag(**ind)
+                self.session.add(hashtag)
+                self.session.flush()
+            entity_dict['hashtag'] = hashtag
+            entity_dict['hashtag_id'] = hashtag.id
+            entity = EntityHashtag(**entity_dict)
+            return entity
+        
+        def process_user_mentions():
+            user_mention = self.__get_user(ind)
+            if user_mention is None:
+                entity_dict['user'] = None
+                entity_dict['user_id'] = None
+            else:
+                entity_dict['user'] = user_mention
+                entity_dict['user_id'] = user_mention.id
+            entity = EntityUser(**entity_dict)
+            return entity
+        
+        def process_urls():
+            url = self.session.query(Url).filter(Url.url == ind["url"]).first()
+            if url is None:
+                url = Url(**ind)
+                self.session.add(url)
+                self.session.flush()
+            entity_dict['url'] = url
+            entity_dict['url_id'] = url.id
+            entity = EntityUrl(**entity_dict)
+            return entity
         
-    for ind in extractor.extract_mentioned_screen_names_with_indices():
-        process_entity(ind, "user_mentions", tweet, session)
-    
-    for ind in extractor.extract_urls_with_indices():
-        process_entity(ind, "urls", tweet, session)
-    
-    
-    
+        #{'': lambda }
+        entity =  { 
+            'hashtags': process_hashtags,
+            'user_mentions' : process_user_mentions,
+            'urls' : process_urls
+            }[ind_type]()
+            
+        logging.debug("Process_entity entity_dict: " + repr(entity_dict))
+        if entity:
+            self.session.add(entity)
+            self.session.flush()
+
 
-def from_twitter_stream(ts, jsontxt, session):
+    def __process_twitter_stream(self):
+        
+        tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
+        if tweet_nb > 0:
+            return
+        
+        ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"])
+        
+        # get or create user
+        user = self.__get_user(self.json_dict["user"])
+        if user is None:
+            log.warning("USER not found " + repr(ts["user"]))
+            ts_copy["user"] = None
+            ts_copy["user_id"] = None
+        else:
+            ts_copy["user"] = user
+            ts_copy["user_id"] = ts_copy["user"].id
+        ts_copy["original_json"] = self.json_txt
+        
+        self.tweet = Tweet(**ts_copy)
+        self.session.add(self.tweet)
+        self.session.flush()
+            
+        # get entities
+        for ind_type, entity_list in self.json_dict["entities"].items():
+            for ind in entity_list:
+                self.__process_entity(ind, ind_type)
+
+
+    def __process_twitter_rest(self):
+        tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
+        if tweet_nb > 0:
+            return
+            
+        tweet_fields = {
+            'created_at': self.json_dict["created_at"], 
+            'favorited': False,
+            'id': self.json_dict["id"],
+            'id_str': self.json_dict["id_str"],
+            #'in_reply_to_screen_name': ts["to_user"], 
+            'in_reply_to_user_id': self.json_dict["to_user_id"],
+            'in_reply_to_user_id_str': self.json_dict["to_user_id_str"],
+            #'place': ts["place"],
+            'source': self.json_dict["source"],
+            'text': self.json_dict["text"],
+            'truncated': False,
+            'original_json' : self.json_txt,
+        }
+        
+        #user
     
-    tweet_nb = session.query(Tweet).filter(Tweet.id == ts["id"]).count()
-    if tweet_nb > 0:
-        return
-    
-    ts_copy = adapt_fields(ts, fields_adapter["stream"]["tweet"])
+        user_fields = {
+            'id' : self.json_dict['from_user_id'],
+            'id_str' : self.json_dict['from_user_id_str'],
+            'lang' : self.json_dict['iso_language_code'],
+            'profile_image_url' : self.json_dict["profile_image_url"],
+            'screen_name' : self.json_dict["from_user"],                   
+        }
+        
+        user = self.__get_user(user_fields)
+        if user is None:
+            log.warning("USER not found " + repr(user_fields))
+            tweet_fields["user"] = None
+            tweet_fields["user_id"] = None
+        else:
+            tweet_fields["user"] = user
+            tweet_fields["user_id"] = user.id
+        
+        tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"])
+        self.tweet = Tweet(**tweet_fields)
+        session.add(self.tweet)
+        
+        text = self.tweet.text
+        
+        extractor = twitter_text.Extractor(text)
+        
+        for ind in extractor.extract_hashtags_with_indices():
+            self.__process_entity(ind, "hashtags")
+            
+        for ind in extractor.extract_mentioned_screen_names_with_indices():
+            self.__process_entity(ind, "user_mentions")
+        
+        for ind in extractor.extract_urls_with_indices():
+            self.__process_entity(ind, "urls")
+        
+        self.session.flush()
+
+
+    def process(self):
+        if "metadata" in self.json_dict:
+            self.__process_twitter_rest()
+        else:
+            self.__process_twitter_stream()
+        
+
+def set_logging(options):
     
-    # get or create user
-    user = get_user(ts["user"], session)
-    if user is None:
-        log.warning("USER not found " + repr(ts["user"]))
-        ts_copy["user"] = None
-        ts_copy["user_id"] = None
+    logging_config = {}
+    
+    if options.logfile == "stdout":
+        logging_config["stream"] = sys.stdout
+    elif options.logfile == "stderr":
+        logging_config["stream"] = sys.stderr
     else:
-        ts_copy["user"] = user
-        ts_copy["user_id"] = ts_copy["user"].id
-    ts_copy["original_json"] = jsontxt
+        logging_config["filename"] = options.logfile
+        
+    logging_config["level"] = max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet))
+    logging.basicConfig(**logging_config)
     
-    tweet = Tweet(**ts_copy)
-    session.add(tweet)
-    session.flush()
-        
-    # get entities
-    for ind_type, entity_list in ts["entities"].items():
-        for ind in entity_list:
-            process_entity(ind, ind_type, tweet, session)
+    options.debug = (options.verbose-options.quiet > 0)
+
+def set_logging_options(parser):
+    parser.add_option("-l", "--log", dest="logfile",
+                      help="log to file", metavar="LOG", default="stderr")
+    parser.add_option("-v", dest="verbose", action="count",
+                      help="verbose", metavar="VERBOSE", default=0)
+    parser.add_option("-q", dest="quiet", action="count",
+                      help="quiet", metavar="QUIET", default=0)