import argparse
import datetime
import functools
import json
import logging
import math
import re
import time
import urllib
from enum import Enum
import requests
import twitter
from blessings import Terminal
from iri_tweet import models, utils
from iri_tweet.processor import TwitterProcessorStatus
logger = logging.getLogger(__name__)
APPLICATION_NAME = "Tweet seach json"
class SearchType(Enum):
standard = 'standard'
_30day = '30day'
full = 'full'
def __str__(self):
return self.value
def pass_kwargs_as_json(f):
def kwargs_json_wrapper(*args, **kwargs):
normal_kwargs = { k:v for k,v in kwargs.items() if k[0] != "_" }
special_kwargs = { k:v for k,v in kwargs.items() if k[0] == "_" }
new_kwargs = { **special_kwargs, '_json': normal_kwargs }
return f(*args, **new_kwargs)
return kwargs_json_wrapper
# TODO: implement some more parameters
# script to "scrap twitter results"
# Shamelessly taken from https://github.com/Jefferson-Henrique/GetOldTweets-python
# pyquery cssselect
class TweetManager:
def __init__(self, twitter_con, query, search_type, api_env):
self.query = query
self.search_type = search_type
self.next = ""
self.t = twitter_con
self.api_env = api_env
self.twitter_api = self.get_twitter_api()
self.rate_limit_remaining = 0
self.rate_limit_limit = 0
self.rate_limit_reset = 0
self.i = 0
def get_twitter_api(self):
return {
SearchType.standard: lambda t: t.search.tweets,
SearchType._30day: lambda t: pass_kwargs_as_json(functools.partial(getattr(getattr(t.tweets.search,'30day'),self.api_env), _method="POST")),
SearchType.full: lambda t: pass_kwargs_as_json(functools.partial(getattr(t.tweets.search.fullarchive, self.api_env), _method="POST")),
}[self.search_type](self.t)
def __iter__(self):
while True:
if self.next is None:
break
self.i = self.i+1
# with open("json_dump_%s.json" % self.i, 'r') as fp:
# jsondata = json.load(fp)
jsondata = self.get_json_response()
self.rate_limit_remaining = jsondata.rate_limit_remaining
self.rate_limit_limit = jsondata.rate_limit_limit
self.rate_limit_reset = jsondata.rate_limit_reset
with open("json_dump_%s.json" % self.i, 'w') as fp:
json.dump(jsondata, fp)
if self.search_type == SearchType.standard:
next_results = jsondata['search_metadata'].get('next_results', "?")[1:]
self.next = urllib.parse.parse_qs(next_results).get('max_id', [None])[0]
tweet_list = jsondata['statuses']
else:
self.next = jsondata.get('next')
tweet_list = jsondata['results']
if len(tweet_list) == 0:
break
for tweet in tweet_list:
yield tweet
def get_json_response(self):
if self.search_type == SearchType.standard:
return self.twitter_api(q=self.query, include_entities=True, max_id=int(self.next) if self.next else 0)
else:
kwargs = { "query": self.query, "maxResults": 100 }
if self.next:
kwargs["next"] = self.next
return self.twitter_api(**kwargs)
def get_options():
usage = "usage: %(prog)s [options] <connection_str_or_filepath>"
parser = argparse.ArgumentParser(usage=usage)
parser.add_argument(dest="conn_str",
help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR")
parser.add_argument("-Q", dest="query",
help="query", metavar="QUERY")
parser.add_argument("-k", "--key", dest="consumer_key",
help="Twitter consumer key", metavar="CONSUMER_KEY")
parser.add_argument("-s", "--secret", dest="consumer_secret",
help="Twitter consumer secret", metavar="CONSUMER_SECRET")
parser.add_argument("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
help="Token file name")
parser.add_argument("-a", dest="search_type", metavar="SEARCH_TYPE", default=SearchType.standard, choices=list(SearchType), type=SearchType,
help="Twitter search type ('standard', '30day', 'full')")
parser.add_argument("-e", dest="api_env", metavar="API_ENV", default="dev",
help="Twitter api dev environment")
utils.set_logging_options(parser)
return parser.parse_args()
if __name__ == "__main__":
options = get_options()
print("the search type is : %s" % options.search_type)
utils.set_logging(options)
bearer_token = utils.get_oauth2_token(consumer_key=options.consumer_key, consumer_secret=options.consumer_secret, token_file_path=options.token_filename, application_name=APPLICATION_NAME)
twitter_auth = twitter.OAuth2(options.consumer_key, options.consumer_secret, bearer_token)
t = twitter.Twitter(domain="api.twitter.com", auth=twitter_auth, secure=True)
t.secure = True
conn_str = options.conn_str.strip()
if not re.match(r"^\w+://.+", conn_str):
conn_str = 'sqlite:///' + conn_str
engine, metadata, Session = models.setup_database(conn_str, echo=((options.verbose-options.quiet)>0), create_all=True)
session = None
term = Terminal()
try:
session = Session()
results = None
print(options.query)
tm = TweetManager(t, options.query, options.search_type, options.api_env)
move_up = 0
for i,tweet in enumerate(tm):
# get id
tweet_id = tweet.get("id")
if not tweet_id:
continue
if move_up > 0:
print((move_up+1)*term.move_up())
move_up = 0
print ("%d: %s - %r" % (i+1, tweet_id, tweet.get("text", "") ) + term.clear_eol())
move_up += 1
count_tweet = session.query(models.Tweet).filter_by(id_str=tweet_id).count()
if count_tweet:
continue
processor = TwitterProcessorStatus(tweet, None, None, session, twitter_auth=twitter_auth, logger=logger)
processor.process()
session.flush()
session.commit()
except twitter.api.TwitterHTTPError as e:
fmt = ("." + e.format) if e.format else ""
print("Twitter sent status %s for URL: %s%s using parameters: (%s)\ndetails: %s" % (repr(e.e.code), repr(e.uri), repr(fmt), repr(e.uriparts), repr(e.response_data)))
finally:
if session:
session.close()