--- a/script/iri_tweet/utils.py Wed Jan 12 13:25:01 2011 +0100
+++ b/script/iri_tweet/utils.py Tue Jan 18 10:08:03 2011 +0100
@@ -1,11 +1,27 @@
-import email.utils
-import logging
from models import *
import datetime
+import email.utils
+import json
+import logging
+import sys
import twitter
import twitter_text
+import os.path
+import twitter.oauth
+def get_oauth_token(token_file_path=None):
+
+ if token_file_path and os.path.file_exists(token_file_path):
+ logging.debug("reading token from file %s" % token_file_path)
+ return twitter.oauth.read_token_file(token_file_path)
+ #read access token info from path
+
+ if 'ACCESS_TOKEN_KEY' in dict() and 'ACCESS_TOKEN_SECRET' in dict() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET:
+ return ACCESS_TOKEN_KEY,ACCESS_TOKEN_SECRET
+
+ return twitter.oauth_dance.oauth_dance(APPLICATION_NAME, CONSUMER_KEY, CONSUMER_SECRET, token_filename)
+
def parse_date(date_str):
ts = email.utils.parsedate_tz(date_str)
return datetime.datetime(*ts[0:7])
@@ -45,196 +61,250 @@
return value
return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()])
-def get_user(user_dict, session):
-
- logging.debug("Get user : " + repr(user_dict))
-
- user_id = user_dict.get("id",None)
- user_name = user_dict.get("screen_name", user_dict.get("name", None))
-
- if user_id is None and user_name is None:
- return None
-
- if user_id:
- user = session.query(User).filter(User.id == user_id).first()
- else:
- user = session.query(User).filter(User.screen_name == user_name).first()
-
- if user is not None:
- return user
-
- user_created_at = user_dict.get("created_at", None)
-
- if user_created_at is None:
- t = twitter.Twitter(auth=twitter.OAuth(ACCESS_TOKEN_KEY, ACCESS_TOKEN_SECRET, CONSUMER_KEY, CONSUMER_SECRET))
- try:
- if user_id:
- user_dict = t.users.show(user_id=user_id)
- else:
- user_dict = t.users.show(screen_name=user_name)
- except Exception as e:
- logging.info("get_user : TWITTER ERROR : " + repr(e))
- logging.info("get_user : TWITTER ERROR : " + str(e))
-
- user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"])
- if "id" not in user_dict:
- return None
-
- user = User(**user_dict)
-
- session.add(user)
- session.flush()
-
- return user
- # if not, if needed get info from twitter
- # create user
- # return it
-
-def process_entity(ind, ind_type, tweet, session):
-
- logging.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type))
-
- entity_dict = {
- "indice_start": ind["indices"][0],
- "indice_end" : ind["indices"][1],
- "tweet_id" : tweet.id,
- "tweet" : tweet
- }
-
- def process_hashtags():
- text = ind.get("text", ind.get("hashtag", None))
- if text is None:
- return None
- hashtag = session.query(Hashtag).filter(Hashtag.text == text).first()
- if not hashtag:
- ind["text"] = text
- hashtag = Hashtag(**ind)
- session.add(hashtag)
- session.flush()
- entity_dict['hashtag'] = hashtag
- entity_dict['hashtag_id'] = hashtag.id
- entity = EntityHashtag(**entity_dict)
- return entity
-
- def process_user_mentions():
- user_mention = get_user(ind, session)
- if user_mention is None:
- entity_dict['user'] = None
- entity_dict['user_id'] = None
- else:
- entity_dict['user'] = user_mention
- entity_dict['user_id'] = user_mention.id
- entity = EntityUser(**entity_dict)
- return entity
-
- def process_urls():
- url = session.query(Url).filter(Url.url == ind["url"]).first()
- if url is None:
- url = Url(**ind)
- session.add(url)
- session.flush()
- entity_dict['url'] = url
- entity_dict['url_id'] = url.id
- entity = EntityUrl(**entity_dict)
- return entity
-
- #{'': lambda }
- entity = {
- 'hashtags': process_hashtags,
- 'user_mentions' : process_user_mentions,
- 'urls' : process_urls
- }[ind_type]()
-
- logging.debug("Process_entity entity_dict: " + repr(entity_dict))
- if entity:
- session.add(entity)
+class TwitterProcessorException(Exception):
+ pass
-def from_twitter_rest(ts, jsontxt, session):
+class TwitterProcessor(object):
- tweet_nb = session.query(Tweet).filter(Tweet.id == ts["id"]).count()
- if tweet_nb > 0:
- return
+ def __init__(self, json_dict, json_txt, session):
+
+ if json_dict is None and json_txt is None:
+ raise TwitterProcessorException("No json")
+
+ if json_dict is None:
+ self.json_dict = json.loads(json_txt)
+ else:
+ self.json_dict = json_dict
+
+ if not json_txt:
+ self.json_txt = json.dumps(json_dict)
+ else:
+ self.json_txt = json_txt
+
+ if "id" not in self.json_dict:
+ raise TwitterProcessorException("No id in json")
+
+ self.session = session
+
+ def __get_user(self, user_dict):
+ logging.debug("Get user : " + repr(user_dict))
+
+ user_id = user_dict.get("id",None)
+ user_name = user_dict.get("screen_name", user_dict.get("name", None))
- tweet_fields = {
- 'created_at': ts["created_at"],
- 'favorited': False,
- 'id': ts["id"],
- 'id_str': ts["id_str"],
- #'in_reply_to_screen_name': ts["to_user"],
- 'in_reply_to_user_id': ts["to_user_id"],
- 'in_reply_to_user_id_str': ts["to_user_id_str"],
- #'place': ts["place"],
- 'source': ts["source"],
- 'text': ts["text"],
- 'truncated': False,
- 'original_json' : jsontxt,
- }
+ if user_id is None and user_name is None:
+ return None
+
+ if user_id:
+ user = self.session.query(User).filter(User.id == user_id).first()
+ else:
+ user = self.session.query(User).filter(User.screen_name == user_name).first()
+
+ if user is not None:
+ return user
- #user
+ user_created_at = user_dict.get("created_at", None)
+
+ if user_created_at is None:
+ acess_token_key, access_token_secret = get_oauth_token()
+ t = twitter.Twitter(auth=twitter.OAuth(token_key, token_secret, CONSUMER_KEY, CONSUMER_SECRET))
+ try:
+ if user_id:
+ user_dict = t.users.show(user_id=user_id)
+ else:
+ user_dict = t.users.show(screen_name=user_name)
+ except Exception as e:
+ logging.info("get_user : TWITTER ERROR : " + repr(e))
+ logging.info("get_user : TWITTER ERROR : " + str(e))
+
+ user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"])
+ if "id" not in user_dict:
+ return None
+
+ user = User(**user_dict)
+
+ self.session.add(user)
+ self.session.flush()
+
+ return user
- user_fields = {
- 'id' : ts['from_user_id'],
- 'id_str' : ts['from_user_id_str'],
- 'lang' : ts['iso_language_code'],
- 'profile_image_url' : ts["profile_image_url"],
- 'screen_name' : ts["from_user"],
- }
+ def __process_entity(self, ind, ind_type):
+ logging.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type))
- user = get_user(user_fields, session)
- if user is None:
- log.warning("USER not found " + repr(user_fields))
- tweet_fields["user"] = None
- tweet_fields["user_id"] = None
- else:
- tweet_fields["user"] = user
- tweet_fields["user_id"] = user.id
+ entity_dict = {
+ "indice_start": ind["indices"][0],
+ "indice_end" : ind["indices"][1],
+ "tweet_id" : self.tweet.id,
+ "tweet" : self.tweet
+ }
- tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"])
- tweet = Tweet(**tweet_fields)
- session.add(tweet)
-
- text = tweet.text
-
- extractor = twitter_text.Extractor(text)
-
- for ind in extractor.extract_hashtags_with_indices():
- process_entity(ind, "hashtags", tweet, session)
+ def process_hashtags():
+ text = ind.get("text", ind.get("hashtag", None))
+ if text is None:
+ return None
+ hashtag = self.session.query(Hashtag).filter(Hashtag.text == text).first()
+ if not hashtag:
+ ind["text"] = text
+ hashtag = Hashtag(**ind)
+ self.session.add(hashtag)
+ self.session.flush()
+ entity_dict['hashtag'] = hashtag
+ entity_dict['hashtag_id'] = hashtag.id
+ entity = EntityHashtag(**entity_dict)
+ return entity
+
+ def process_user_mentions():
+ user_mention = self.__get_user(ind)
+ if user_mention is None:
+ entity_dict['user'] = None
+ entity_dict['user_id'] = None
+ else:
+ entity_dict['user'] = user_mention
+ entity_dict['user_id'] = user_mention.id
+ entity = EntityUser(**entity_dict)
+ return entity
+
+ def process_urls():
+ url = self.session.query(Url).filter(Url.url == ind["url"]).first()
+ if url is None:
+ url = Url(**ind)
+ self.session.add(url)
+ self.session.flush()
+ entity_dict['url'] = url
+ entity_dict['url_id'] = url.id
+ entity = EntityUrl(**entity_dict)
+ return entity
- for ind in extractor.extract_mentioned_screen_names_with_indices():
- process_entity(ind, "user_mentions", tweet, session)
-
- for ind in extractor.extract_urls_with_indices():
- process_entity(ind, "urls", tweet, session)
-
-
-
+ #{'': lambda }
+ entity = {
+ 'hashtags': process_hashtags,
+ 'user_mentions' : process_user_mentions,
+ 'urls' : process_urls
+ }[ind_type]()
+
+ logging.debug("Process_entity entity_dict: " + repr(entity_dict))
+ if entity:
+ self.session.add(entity)
+ self.session.flush()
+
-def from_twitter_stream(ts, jsontxt, session):
+ def __process_twitter_stream(self):
+
+ tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
+ if tweet_nb > 0:
+ return
+
+ ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"])
+
+ # get or create user
+ user = self.__get_user(self.json_dict["user"])
+ if user is None:
+ log.warning("USER not found " + repr(ts["user"]))
+ ts_copy["user"] = None
+ ts_copy["user_id"] = None
+ else:
+ ts_copy["user"] = user
+ ts_copy["user_id"] = ts_copy["user"].id
+ ts_copy["original_json"] = self.json_txt
+
+ self.tweet = Tweet(**ts_copy)
+ self.session.add(self.tweet)
+ self.session.flush()
+
+ # get entities
+ for ind_type, entity_list in self.json_dict["entities"].items():
+ for ind in entity_list:
+ self.__process_entity(ind, ind_type)
+
+
+ def __process_twitter_rest(self):
+ tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
+ if tweet_nb > 0:
+ return
+
+ tweet_fields = {
+ 'created_at': self.json_dict["created_at"],
+ 'favorited': False,
+ 'id': self.json_dict["id"],
+ 'id_str': self.json_dict["id_str"],
+ #'in_reply_to_screen_name': ts["to_user"],
+ 'in_reply_to_user_id': self.json_dict["to_user_id"],
+ 'in_reply_to_user_id_str': self.json_dict["to_user_id_str"],
+ #'place': ts["place"],
+ 'source': self.json_dict["source"],
+ 'text': self.json_dict["text"],
+ 'truncated': False,
+ 'original_json' : self.json_txt,
+ }
+
+ #user
- tweet_nb = session.query(Tweet).filter(Tweet.id == ts["id"]).count()
- if tweet_nb > 0:
- return
-
- ts_copy = adapt_fields(ts, fields_adapter["stream"]["tweet"])
+ user_fields = {
+ 'id' : self.json_dict['from_user_id'],
+ 'id_str' : self.json_dict['from_user_id_str'],
+ 'lang' : self.json_dict['iso_language_code'],
+ 'profile_image_url' : self.json_dict["profile_image_url"],
+ 'screen_name' : self.json_dict["from_user"],
+ }
+
+ user = self.__get_user(user_fields)
+ if user is None:
+ log.warning("USER not found " + repr(user_fields))
+ tweet_fields["user"] = None
+ tweet_fields["user_id"] = None
+ else:
+ tweet_fields["user"] = user
+ tweet_fields["user_id"] = user.id
+
+ tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"])
+ self.tweet = Tweet(**tweet_fields)
+ session.add(self.tweet)
+
+ text = self.tweet.text
+
+ extractor = twitter_text.Extractor(text)
+
+ for ind in extractor.extract_hashtags_with_indices():
+ self.__process_entity(ind, "hashtags")
+
+ for ind in extractor.extract_mentioned_screen_names_with_indices():
+ self.__process_entity(ind, "user_mentions")
+
+ for ind in extractor.extract_urls_with_indices():
+ self.__process_entity(ind, "urls")
+
+ self.session.flush()
+
+
+ def process(self):
+ if "metadata" in self.json_dict:
+ self.__process_twitter_rest()
+ else:
+ self.__process_twitter_stream()
+
+
+def set_logging(options):
- # get or create user
- user = get_user(ts["user"], session)
- if user is None:
- log.warning("USER not found " + repr(ts["user"]))
- ts_copy["user"] = None
- ts_copy["user_id"] = None
+ logging_config = {}
+
+ if options.logfile == "stdout":
+ logging_config["stream"] = sys.stdout
+ elif options.logfile == "stderr":
+ logging_config["stream"] = sys.stderr
else:
- ts_copy["user"] = user
- ts_copy["user_id"] = ts_copy["user"].id
- ts_copy["original_json"] = jsontxt
+ logging_config["filename"] = options.logfile
+
+ logging_config["level"] = max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet))
+ logging.basicConfig(**logging_config)
- tweet = Tweet(**ts_copy)
- session.add(tweet)
- session.flush()
-
- # get entities
- for ind_type, entity_list in ts["entities"].items():
- for ind in entity_list:
- process_entity(ind, ind_type, tweet, session)
+ options.debug = (options.verbose-options.quiet > 0)
+
+def set_logging_options(parser):
+ parser.add_option("-l", "--log", dest="logfile",
+ help="log to file", metavar="LOG", default="stderr")
+ parser.add_option("-v", dest="verbose", action="count",
+ help="verbose", metavar="VERBOSE", default=0)
+ parser.add_option("-q", dest="quiet", action="count",
+ help="quiet", metavar="QUIET", default=0)