script/utils/search_twitter_api.py
author ymh <ymh.work@gmail.com>
Wed, 18 Dec 2019 12:01:20 +0100
changeset 1525 3c96e9ef3d64
parent 1523 53f1b28188f0
permissions -rw-r--r--
update code settings + some dependencies version

import argparse
import datetime
import functools
import json
import logging
import math
import re
import time
import urllib
from enum import Enum

import requests
import twitter
from blessings import Terminal

from iri_tweet import models, utils
from iri_tweet.processor import TwitterProcessorStatus

logger = logging.getLogger(__name__)

APPLICATION_NAME = "Tweet seach json"


class SearchType(Enum):
    standard = 'standard'
    _30day = '30day'
    full = 'full'

    def __str__(self):
        return self.value

def pass_kwargs_as_json(f):
    def kwargs_json_wrapper(*args, **kwargs):
        normal_kwargs = { k:v for k,v in kwargs.items() if k[0] != "_" }
        special_kwargs = { k:v for k,v in kwargs.items() if k[0] == "_" }
        new_kwargs = { **special_kwargs, '_json': normal_kwargs }
        return f(*args, **new_kwargs)
    return kwargs_json_wrapper

# TODO: implement some more parameters
# script to "scrap twitter results"
# Shamelessly taken from https://github.com/Jefferson-Henrique/GetOldTweets-python
# pyquery cssselect
class TweetManager:

    def __init__(self, twitter_con, query, search_type, api_env):
        self.query = query
        self.search_type = search_type
        self.next = ""
        self.t = twitter_con
        self.api_env = api_env
        self.twitter_api = self.get_twitter_api()
        self.rate_limit_remaining = 0
        self.rate_limit_limit = 0
        self.rate_limit_reset = 0
        self.i = 0

    def get_twitter_api(self):
        return {
            SearchType.standard: lambda t: t.search.tweets,
            SearchType._30day:   lambda t: pass_kwargs_as_json(functools.partial(getattr(getattr(t.tweets.search,'30day'),self.api_env), _method="POST")),
            SearchType.full:     lambda t: pass_kwargs_as_json(functools.partial(getattr(t.tweets.search.fullarchive, self.api_env), _method="POST")),
        }[self.search_type](self.t)

    def __iter__(self):
        while True:
            if self.next is None:
                break
            self.i = self.i+1

            # with open("json_dump_%s.json" % self.i, 'r') as fp:
            #     jsondata = json.load(fp)
            jsondata = self.get_json_response()

            self.rate_limit_remaining = jsondata.rate_limit_remaining
            self.rate_limit_limit = jsondata.rate_limit_limit
            self.rate_limit_reset = jsondata.rate_limit_reset

            with open("json_dump_%s.json" % self.i, 'w') as fp:
                json.dump(jsondata, fp)

            if self.search_type == SearchType.standard:
                next_results = jsondata['search_metadata'].get('next_results', "?")[1:]
                self.next = urllib.parse.parse_qs(next_results).get('max_id', [None])[0]
                tweet_list = jsondata['statuses']
            else:
                self.next = jsondata.get('next')
                tweet_list = jsondata['results']

            if len(tweet_list) == 0:
                break

            for tweet in tweet_list:
                yield tweet

    def get_json_response(self):
        if self.search_type == SearchType.standard:
            return self.twitter_api(q=self.query, include_entities=True, max_id=int(self.next) if self.next else 0)
        else:
            kwargs = { "query": self.query, "maxResults": 100 }
            if self.next:
                kwargs["next"] = self.next
            return self.twitter_api(**kwargs)

def get_options():

    usage = "usage: %(prog)s [options] <connection_str_or_filepath>"

    parser = argparse.ArgumentParser(usage=usage)

    parser.add_argument(dest="conn_str",
                        help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR")
    parser.add_argument("-Q", dest="query",
                        help="query", metavar="QUERY")
    parser.add_argument("-k", "--key", dest="consumer_key",
                        help="Twitter consumer key", metavar="CONSUMER_KEY")
    parser.add_argument("-s", "--secret", dest="consumer_secret",
                        help="Twitter consumer secret", metavar="CONSUMER_SECRET")
    parser.add_argument("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
                        help="Token file name")
    parser.add_argument("-a", dest="search_type", metavar="SEARCH_TYPE", default=SearchType.standard, choices=list(SearchType), type=SearchType,
                        help="Twitter search type ('standard', '30day', 'full')")
    parser.add_argument("-e", dest="api_env", metavar="API_ENV", default="dev",
                        help="Twitter api dev environment")


    utils.set_logging_options(parser)

    return parser.parse_args()


if __name__ == "__main__":

    options = get_options()

    print("the search type is : %s" % options.search_type)

    utils.set_logging(options)

    bearer_token = utils.get_oauth2_token(consumer_key=options.consumer_key, consumer_secret=options.consumer_secret, token_file_path=options.token_filename, application_name=APPLICATION_NAME)
    twitter_auth = twitter.OAuth2(options.consumer_key, options.consumer_secret, bearer_token)

    t = twitter.Twitter(domain="api.twitter.com", auth=twitter_auth, secure=True)
    t.secure = True

    conn_str = options.conn_str.strip()
    if not re.match(r"^\w+://.+", conn_str):
        conn_str = 'sqlite:///' + conn_str

    engine, metadata, Session = models.setup_database(conn_str, echo=((options.verbose-options.quiet)>0), create_all=True)
    session = None


    term = Terminal()

    try:
        session = Session()

        results = None
        print(options.query)

        tm = TweetManager(t, options.query, options.search_type, options.api_env)

        move_up = 0

        for i,tweet in enumerate(tm):
            # get id
            tweet_id = tweet.get("id")

            if not tweet_id:
                continue

            if move_up > 0:
                print((move_up+1)*term.move_up())
                move_up = 0

            print ("%d: %s - %r" % (i+1, tweet_id, tweet.get("text", "") ) + term.clear_eol())
            move_up += 1

            count_tweet = session.query(models.Tweet).filter_by(id_str=tweet_id).count()

            if count_tweet:
                continue

            processor = TwitterProcessorStatus(tweet, None, None, session, twitter_auth=twitter_auth, logger=logger)
            processor.process()
            session.flush()
            session.commit()

    except twitter.api.TwitterHTTPError as e:
        fmt = ("." + e.format) if e.format else ""
        print("Twitter sent status %s for URL: %s%s using parameters: (%s)\ndetails: %s" % (repr(e.e.code), repr(e.uri), repr(fmt), repr(e.uriparts), repr(e.response_data)))

    finally:
        if session:
            session.close()