from models import (Tweet, User, Hashtag, EntityHashtag, EntityUser, Url,
EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY,
ACCESS_TOKEN_SECRET, adapt_date, adapt_json, TweetSource, TweetLog)
from sqlalchemy.sql import select, or_ #@UnresolvedImport
import anyjson #@UnresolvedImport
import datetime
import email.utils
import logging #@UnresolvedImport
import os.path
import sys
import twitter.oauth #@UnresolvedImport
import twitter.oauth_dance #@UnresolvedImport
import twitter_text #@UnresolvedImport
CACHE_ACCESS_TOKEN = {}
def get_oauth_token(token_file_path=None, application_name=APPLICATION_NAME, consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET):
global CACHE_ACCESS_TOKEN
if CACHE_ACCESS_TOKEN is not None and application_name in CACHE_ACCESS_TOKEN:
return CACHE_ACCESS_TOKEN[application_name]
if token_file_path and os.path.exists(token_file_path):
logging.debug("reading token from file %s" % token_file_path) #@UndefinedVariable
CACHE_ACCESS_TOKEN[application_name] = twitter.oauth.read_token_file(token_file_path)
return CACHE_ACCESS_TOKEN[application_name]
#read access token info from path
if 'ACCESS_TOKEN_KEY' in dict() and 'ACCESS_TOKEN_SECRET' in dict() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET:
return ACCESS_TOKEN_KEY,ACCESS_TOKEN_SECRET
CACHE_ACCESS_TOKEN[application_name] = twitter.oauth_dance.oauth_dance(application_name, consumer_key, consumer_secret, token_file_path)
return CACHE_ACCESS_TOKEN[application_name]
def parse_date(date_str):
ts = email.utils.parsedate_tz(date_str) #@UndefinedVariable
return datetime.datetime(*ts[0:7])
def clean_keys(dict_val):
return dict([(str(key),value) for key,value in dict_val.items()])
fields_adapter = {
'stream': {
"tweet": {
"created_at" : adapt_date,
"coordinates" : adapt_json,
"place" : adapt_json,
"geo" : adapt_json,
# "original_json" : adapt_json,
},
"user": {
"created_at" : adapt_date,
},
},
'rest': {
"tweet" : {
"place" : adapt_json,
"geo" : adapt_json,
"created_at" : adapt_date,
# "original_json" : adapt_json,
},
},
}
#
# adapt fields, return a copy of the field_dict with adapted fields
#
def adapt_fields(fields_dict, adapter_mapping):
def adapt_one_field(field, value):
if field in adapter_mapping and adapter_mapping[field] is not None:
return adapter_mapping[field](value)
else:
return value
return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()])
class ObjectBufferProxy(object):
def __init__(self, klass, args, kwargs, must_flush, instance=None):
self.klass= klass
self.args = args
self.kwargs = kwargs
self.must_flush = must_flush
self.instance = instance
def persists(self, session):
new_args = [arg() if callable(arg) else arg for arg in self.args] if self.args is not None else []
new_kwargs = dict([(k,v()) if callable(v) else (k,v) for k,v in self.kwargs.items()]) if self.kwargs is not None else {}
self.instance = self.klass(*new_args, **new_kwargs)
session.add(self.instance)
if self.must_flush:
session.flush()
def __getattr__(self, name):
return lambda : getattr(self.instance, name) if self.instance else None
class ObjectsBuffer(object):
def __init__(self):
self.__bufferlist = []
def persists(self, session):
for object_proxy in self.__bufferlist:
object_proxy.persists(session)
def add_object(self, klass, args, kwargs, must_flush):
new_proxy = ObjectBufferProxy(klass, args, kwargs, must_flush)
self.__bufferlist.append(new_proxy)
return new_proxy
def get(self, klass, **kwargs):
for proxy in self.__bufferlist:
if proxy.kwargs is None or len(proxy.kwargs) == 0 or proxy.klass != klass:
continue
found = True
for k,v in kwargs.items():
if (k not in proxy.kwargs) or v != proxy.kwargs[k]:
found = False
break
if found:
return proxy
return None
class TwitterProcessorException(Exception):
pass
class TwitterProcessor(object):
def __init__(self, json_dict, json_txt, source_id, session, token_filename=None):
if json_dict is None and json_txt is None:
raise TwitterProcessorException("No json")
if json_dict is None:
self.json_dict = anyjson.deserialize(json_txt)
else:
self.json_dict = json_dict
if not json_txt:
self.json_txt = anyjson.serialize(json_dict)
else:
self.json_txt = json_txt
if "id" not in self.json_dict:
raise TwitterProcessorException("No id in json")
self.source_id = source_id
self.session = session
self.token_filename = token_filename
self.obj_buffer = ObjectsBuffer()
def __get_user(self, user_dict):
logger.debug("Get user : " + repr(user_dict)) #@UndefinedVariable
user_id = user_dict.get("id",None)
user_name = user_dict.get("screen_name", user_dict.get("name", None))
if user_id is None and user_name is None:
return None
user = None
if user_id:
user = self.obj_buffer.get(User, id=user_id)
else:
user = self.obj_buffer.get(User, screen_name=user_name)
if user is not None:
return user
#todo : add methpds to objectbuffer to get buffer user
user_obj = None
if user_id:
user_obj = self.session.query(User).filter(User.id == user_id).first()
else:
user_obj = self.session.query(User).filter(User.screen_name == user_name).first()
if user_obj is not None:
user = ObjectBufferProxy(User, None, None, False, user_obj)
return user
user_created_at = user_dict.get("created_at", None)
if user_created_at is None:
acess_token_key, access_token_secret = get_oauth_token(self.token_filename)
t = twitter.Twitter(auth=twitter.OAuth(acess_token_key, access_token_secret, CONSUMER_KEY, CONSUMER_SECRET))
try:
if user_id:
user_dict = t.users.show(user_id=user_id)
else:
user_dict = t.users.show(screen_name=user_name)
except Exception as e:
logger.info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable
logger.info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable
user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"])
if "id" not in user_dict:
return None
user = self.obj_buffer.add_object(User, None, user_dict, True)
return user
def __process_entity(self, ind, ind_type):
logger.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable
ind = clean_keys(ind)
entity_dict = {
"indice_start": ind["indices"][0],
"indice_end" : ind["indices"][1],
"tweet_id" : self.tweet.id,
}
def process_hashtags():
text = ind.get("text", ind.get("hashtag", None))
if text is None:
return None
hashtag = self.obj_buffer.get(Hashtag, text=text)
if hashtag is None:
hashtag_obj = self.session.query(Hashtag).filter(Hashtag.text == text).first()
if hashtag_obj is not None:
hashtag = ObjectBufferProxy(Hashtag, None, None, False, hashtag_obj)
if not hashtag:
ind["text"] = text
hashtag = self.obj_buffer.add_object(Hashtag, None, ind, True)
entity_dict['hashtag_id'] = hashtag.id
return EntityHashtag, entity_dict
def process_user_mentions():
user_mention = self.__get_user(ind)
if user_mention is None:
entity_dict['user_id'] = None
else:
entity_dict['user_id'] = user_mention.id
return EntityUser, entity_dict
def process_urls():
url = self.obj_buffer.get(Url, url=ind["url"])
if url is None:
url_obj = self.session.query(Url).filter(Url.url == ind["url"]).first()
if url_obj is not None:
url = ObjectBufferProxy(Url, None, None, False, url_obj)
if url is None:
url = self.obj_buffer.add_object(Url, None, ind, True)
entity_dict['url_id'] = url.id
return EntityUrl, entity_dict
#{'': lambda }
entity_klass, entity_dict = {
'hashtags': process_hashtags,
'user_mentions' : process_user_mentions,
'urls' : process_urls
}[ind_type]()
logger.debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable
if entity_klass:
self.obj_buffer.add_object(entity_klass, None, entity_dict, False)
def __process_twitter_stream(self):
tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
if tweet_nb > 0:
return
ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"])
# get or create user
user = self.__get_user(self.json_dict["user"])
if user is None:
logger.warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable
ts_copy["user_id"] = None
else:
ts_copy["user_id"] = user.id
del(ts_copy['user'])
ts_copy["tweet_source_id"] = self.source_id
self.tweet = self.obj_buffer.add_object(Tweet, None, ts_copy, True)
# get entities
for ind_type, entity_list in self.json_dict["entities"].items():
for ind in entity_list:
self.__process_entity(ind, ind_type)
def __process_twitter_rest(self):
tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
if tweet_nb > 0:
return
tweet_fields = {
'created_at': self.json_dict["created_at"],
'favorited': False,
'id': self.json_dict["id"],
'id_str': self.json_dict["id_str"],
#'in_reply_to_screen_name': ts["to_user"],
'in_reply_to_user_id': self.json_dict["to_user_id"],
'in_reply_to_user_id_str': self.json_dict["to_user_id_str"],
#'place': ts["place"],
'source': self.json_dict["source"],
'text': self.json_dict["text"],
'truncated': False,
'tweet_source_id' : self.source_id,
}
#user
user_fields = {
'lang' : self.json_dict.get('iso_language_code',None),
'profile_image_url' : self.json_dict["profile_image_url"],
'screen_name' : self.json_dict["from_user"],
}
user = self.__get_user(user_fields)
if user is None:
logger.warning("USER not found " + repr(user_fields)) #@UndefinedVariable
tweet_fields["user"] = None
tweet_fields["user_id"] = None
else:
tweet_fields["user_id"] = user.id
tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"])
self.tweet = self.obj_buffer.add_object(Tweet, None, tweet_fields, True)
text = self.tweet.text
extractor = twitter_text.Extractor(text)
for ind in extractor.extract_hashtags_with_indices():
self.__process_entity(ind, "hashtags")
for ind in extractor.extract_urls_with_indices():
self.__process_entity(ind, "urls")
for ind in extractor.extract_mentioned_screen_names_with_indices():
self.__process_entity(ind, "user_mentions")
def process(self):
if self.source_id is None:
tweet_source = self.obj_buffer.add_object(TweetSource, None, {'original_json':self.json_txt}, True)
self.source_id = tweet_source.id
if "metadata" in self.json_dict:
self.__process_twitter_rest()
else:
self.__process_twitter_stream()
self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['OK']}, False)
self.obj_buffer.persists(self.session)
def set_logging(options, plogger=None):
logging_config = {
"format" : '%(asctime)s %(levelname)s:%(name)s:%(message)s',
"level" : max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)), #@UndefinedVariable
}
if options.logfile == "stdout":
logging_config["stream"] = sys.stdout
elif options.logfile == "stderr":
logging_config["stream"] = sys.stderr
else:
logging_config["filename"] = options.logfile
logger = plogger
if logger is None:
logger = logging.getLogger() #@UndefinedVariable
if len(logger.handlers) == 0:
filename = logging_config.get("filename")
if filename:
mode = logging_config.get("filemode", 'a')
hdlr = logging.FileHandler(filename, mode) #@UndefinedVariable
else:
stream = logging_config.get("stream")
hdlr = logging.StreamHandler(stream) #@UndefinedVariable
fs = logging_config.get("format", logging.BASIC_FORMAT) #@UndefinedVariable
dfs = logging_config.get("datefmt", None)
fmt = logging.Formatter(fs, dfs) #@UndefinedVariable
hdlr.setFormatter(fmt)
logger.addHandler(hdlr)
level = logging_config.get("level")
if level is not None:
logger.setLevel(level)
options.debug = (options.verbose-options.quiet > 0)
def set_logging_options(parser):
parser.add_option("-l", "--log", dest="logfile",
help="log to file", metavar="LOG", default="stderr")
parser.add_option("-v", dest="verbose", action="count",
help="verbose", metavar="VERBOSE", default=0)
parser.add_option("-q", dest="quiet", action="count",
help="quiet", metavar="QUIET", default=0)
def get_filter_query(session, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist):
query = session.query(Tweet).join(EntityHashtag).join(Hashtag)
if tweet_exclude_table is not None:
query = query.filter(~Tweet.id.in_(select([tweet_exclude_table.c.id]))) #@UndefinedVariable
query = query.filter(Tweet.created_at >= start_date).filter(Tweet.created_at <= end_date)
if user_whitelist:
query = query.join(User).filter(User.screen_name.in_(user_whitelist))
if hashtags :
def merge_hash(l,h):
l.extend(h.split(","))
return l
htags = reduce(merge_hash, hashtags, [])
query = query.filter(or_(*map(lambda h: Hashtag.text.contains(h), htags))) #@UndefinedVariable
return query
def get_user_query(session, start_date, end_date, hashtags, tweet_exclude_table):
query = session.query(User).join(Tweet).join(EntityHashtag).join(Hashtag)
if tweet_exclude_table is not None:
query = query.filter(~Tweet.id.in_(select([tweet_exclude_table.c.id]))) #@UndefinedVariable
query = query.filter(Tweet.created_at >= start_date).filter(Tweet.created_at <= end_date)
if hashtags :
def merge_hash(l,h):
l.extend(h.split(","))
return l
htags = reduce(merge_hash, hashtags, [])
query = query.filter(or_(*map(lambda h: Hashtag.text.contains(h), htags))) #@UndefinedVariable
return query.distinct()
logger = logging.getLogger() #@UndefinedVariable