script/utils/search_twitter_api.py
author ymh <ymh.work@gmail.com>
Wed, 18 Dec 2024 15:24:41 +0100
changeset 1584 257c14dae52a
parent 1523 53f1b28188f0
permissions -rw-r--r--
Added tag V09.006 for changeset 459a88818bec

import argparse
import datetime
import functools
import json
import logging
import math
import re
import time
import urllib
from enum import Enum

import requests
import twitter
from blessings import Terminal

from iri_tweet import models, utils
from iri_tweet.processor import TwitterProcessorStatus

logger = logging.getLogger(__name__)

APPLICATION_NAME = "Tweet seach json"


class SearchType(Enum):
    standard = 'standard'
    _30day = '30day'
    full = 'full'

    def __str__(self):
        return self.value

def pass_kwargs_as_json(f):
    def kwargs_json_wrapper(*args, **kwargs):
        normal_kwargs = { k:v for k,v in kwargs.items() if k[0] != "_" }
        special_kwargs = { k:v for k,v in kwargs.items() if k[0] == "_" }
        new_kwargs = { **special_kwargs, '_json': normal_kwargs }
        return f(*args, **new_kwargs)
    return kwargs_json_wrapper

# TODO: implement some more parameters
# script to "scrap twitter results"
# Shamelessly taken from https://github.com/Jefferson-Henrique/GetOldTweets-python
# pyquery cssselect
class TweetManager:

    def __init__(self, twitter_con, query, search_type, api_env):
        self.query = query
        self.search_type = search_type
        self.next = ""
        self.t = twitter_con
        self.api_env = api_env
        self.twitter_api = self.get_twitter_api()
        self.rate_limit_remaining = 0
        self.rate_limit_limit = 0
        self.rate_limit_reset = 0
        self.i = 0

    def get_twitter_api(self):
        return {
            SearchType.standard: lambda t: t.search.tweets,
            SearchType._30day:   lambda t: pass_kwargs_as_json(functools.partial(getattr(getattr(t.tweets.search,'30day'),self.api_env), _method="POST")),
            SearchType.full:     lambda t: pass_kwargs_as_json(functools.partial(getattr(t.tweets.search.fullarchive, self.api_env), _method="POST")),
        }[self.search_type](self.t)

    def __iter__(self):
        while True:
            if self.next is None:
                break
            self.i = self.i+1

            # with open("json_dump_%s.json" % self.i, 'r') as fp:
            #     jsondata = json.load(fp)
            jsondata = self.get_json_response()

            self.rate_limit_remaining = jsondata.rate_limit_remaining
            self.rate_limit_limit = jsondata.rate_limit_limit
            self.rate_limit_reset = jsondata.rate_limit_reset

            with open("json_dump_%s.json" % self.i, 'w') as fp:
                json.dump(jsondata, fp)

            if self.search_type == SearchType.standard:
                next_results = jsondata['search_metadata'].get('next_results', "?")[1:]
                self.next = urllib.parse.parse_qs(next_results).get('max_id', [None])[0]
                tweet_list = jsondata['statuses']
            else:
                self.next = jsondata.get('next')
                tweet_list = jsondata['results']

            if len(tweet_list) == 0:
                break

            for tweet in tweet_list:
                yield tweet

    def get_json_response(self):
        if self.search_type == SearchType.standard:
            return self.twitter_api(q=self.query, include_entities=True, max_id=int(self.next) if self.next else 0)
        else:
            kwargs = { "query": self.query, "maxResults": 100 }
            if self.next:
                kwargs["next"] = self.next
            return self.twitter_api(**kwargs)

def get_options():

    usage = "usage: %(prog)s [options] <connection_str_or_filepath>"

    parser = argparse.ArgumentParser(usage=usage)

    parser.add_argument(dest="conn_str",
                        help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR")
    parser.add_argument("-Q", dest="query",
                        help="query", metavar="QUERY")
    parser.add_argument("-k", "--key", dest="consumer_key",
                        help="Twitter consumer key", metavar="CONSUMER_KEY")
    parser.add_argument("-s", "--secret", dest="consumer_secret",
                        help="Twitter consumer secret", metavar="CONSUMER_SECRET")
    parser.add_argument("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
                        help="Token file name")
    parser.add_argument("-a", dest="search_type", metavar="SEARCH_TYPE", default=SearchType.standard, choices=list(SearchType), type=SearchType,
                        help="Twitter search type ('standard', '30day', 'full')")
    parser.add_argument("-e", dest="api_env", metavar="API_ENV", default="dev",
                        help="Twitter api dev environment")


    utils.set_logging_options(parser)

    return parser.parse_args()


if __name__ == "__main__":

    options = get_options()

    print("the search type is : %s" % options.search_type)

    utils.set_logging(options)

    bearer_token = utils.get_oauth2_token(consumer_key=options.consumer_key, consumer_secret=options.consumer_secret, token_file_path=options.token_filename, application_name=APPLICATION_NAME)
    twitter_auth = twitter.OAuth2(options.consumer_key, options.consumer_secret, bearer_token)

    t = twitter.Twitter(domain="api.twitter.com", auth=twitter_auth, secure=True)
    t.secure = True

    conn_str = options.conn_str.strip()
    if not re.match(r"^\w+://.+", conn_str):
        conn_str = 'sqlite:///' + conn_str

    engine, metadata, Session = models.setup_database(conn_str, echo=((options.verbose-options.quiet)>0), create_all=True)
    session = None


    term = Terminal()

    try:
        session = Session()

        results = None
        print(options.query)

        tm = TweetManager(t, options.query, options.search_type, options.api_env)

        move_up = 0

        for i,tweet in enumerate(tm):
            # get id
            tweet_id = tweet.get("id")

            if not tweet_id:
                continue

            if move_up > 0:
                print((move_up+1)*term.move_up())
                move_up = 0

            print ("%d: %s - %r" % (i+1, tweet_id, tweet.get("text", "") ) + term.clear_eol())
            move_up += 1

            count_tweet = session.query(models.Tweet).filter_by(id_str=tweet_id).count()

            if count_tweet:
                continue

            processor = TwitterProcessorStatus(tweet, None, None, session, twitter_auth=twitter_auth, logger=logger)
            processor.process()
            session.flush()
            session.commit()

    except twitter.api.TwitterHTTPError as e:
        fmt = ("." + e.format) if e.format else ""
        print("Twitter sent status %s for URL: %s%s using parameters: (%s)\ndetails: %s" % (repr(e.e.code), repr(e.uri), repr(fmt), repr(e.uriparts), repr(e.response_data)))

    finally:
        if session:
            session.close()