# HG changeset patch # User Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com> # Date 1295341683 -3600 # Node ID 54d7f1486ac4e6481082e0607c8d17a58e7f205c # Parent eb885a117aa06b47ad72f7323f3ea4cc116bf269 implement get_oauth_token diff -r eb885a117aa0 -r 54d7f1486ac4 .hgignore --- a/.hgignore Wed Jan 12 13:25:01 2011 +0100 +++ b/.hgignore Tue Jan 18 10:08:03 2011 +0100 @@ -4,4 +4,6 @@ syntax: regexp ^script/stream/virtualenv$ syntax: regexp -^script/rest/virtualenv$ \ No newline at end of file +^script/rest/virtualenv$ +syntax: regexp +^script/stream/streamwatcher\.py$ \ No newline at end of file diff -r eb885a117aa0 -r 54d7f1486ac4 script/iri_tweet/export_tweet_db.py --- a/script/iri_tweet/export_tweet_db.py Wed Jan 12 13:25:01 2011 +0100 +++ b/script/iri_tweet/export_tweet_db.py Tue Jan 18 10:08:03 2011 +0100 @@ -1,7 +1,6 @@ from models import * from utils import * from optparse import OptionParser -from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker import logging import sqlite3 @@ -15,36 +14,18 @@ parser = OptionParser() - parser.add_option("-l", "--log", dest="logfile", - help="log to file", metavar="LOG", default="stderr") - parser.add_option("-v", dest="verbose", action="count", - help="verbose", metavar="VERBOSE", default=0) - parser.add_option("-q", dest="quiet", action="count", - help="quiet", metavar="QUIET", default=0) + set_logging_options(parser) return parser.parse_args() if __name__ == "__main__": (options, args) = get_option() - - logging_config = {} - - if options.logfile == "stdout": - logging_config["stream"] = sys.stdout - elif options.logfile == "stderr": - logging_config["stream"] = sys.stderr - else: - logging_config["filename"] = options.logfile - - logging_config["level"] = max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)) - - logging.basicConfig(**logging_config) + + set_logging(options) with sqlite3.connect(args[0]) as conn_in: - engine = create_engine('sqlite:///'+args[1], echo=((options.verbose-options.quiet)>0)) - metadata = Base.metadata - metadata.create_all(engine) + engine, metadata = setup_database('sqlite:///'+args[1], echo=((options.verbose-options.quiet)>0)) Session = sessionmaker(bind=engine) session = Session() try: @@ -52,20 +33,8 @@ fields_mapping = {} for i,res in enumerate(curs_in.execute("select json from tweet_tweet;")): logging.debug("main loop %d : %s" % (i, res[0])) - json = eval(res[0]) - if "metadata" in json: - from_twitter_rest(json, res[0], session) - else: - from_twitter_stream(json, res[0], session) - #if "user_mentions" in json["entities"]: - # for hash in json["entities"]["user_mentions"]: - ## for key,value in hash.items(): - # if key not in fields_mapping or fields_mapping[key] is type(None): - # fields_mapping[key] = type(value) - - - #for key,value in fields_mapping.items(): - # print key,value.__name__ + processor = TwitterProcessor(eval(res[0]), res[0], session) + processor.process() session.commit() logging.debug("main : %d tweet processed" % (i+1)) except Exception, e: diff -r eb885a117aa0 -r 54d7f1486ac4 script/iri_tweet/export_twitter_alchemy.py --- a/script/iri_tweet/export_twitter_alchemy.py Wed Jan 12 13:25:01 2011 +0100 +++ b/script/iri_tweet/export_twitter_alchemy.py Tue Jan 18 10:08:03 2011 +0100 @@ -5,17 +5,17 @@ from models import * from optparse import OptionParser from sqlalchemy import Table, Column, Integer, BigInteger, String, MetaData, \ - ForeignKey, create_engine + ForeignKey from sqlalchemy.orm import sessionmaker, mapper from sqlalchemy.sql import select import datetime -import time import email.utils import logging import os import os.path import re import sys +import time import uuid #class TweetExclude(object): @@ -29,9 +29,7 @@ ts = email.utils.parsedate_tz(date_str) return datetime.datetime(*ts[0:7]) - -if __name__ == "__main__" : - +def get_options(): parser = OptionParser() parser.add_option("-f", "--file", dest="filename", help="write export to file", metavar="FILE", default="project_enmi.ldt") @@ -63,36 +61,24 @@ help="Replace tweet ensemble", metavar="REPLACE", default=False) parser.add_option("-l", "--log", dest="logfile", help="log to file", metavar="LOG", default="stderr") - parser.add_option("-v", dest="verbose", action="count", - help="verbose", metavar="VERBOSE", default=0) - parser.add_option("-q", dest="quiet", action="count", - help="quiet", metavar="QUIET", default=0) - parser.add_option("-L", dest="listconf", - help="file containing the list of file to process", metavar="LIST", default=0) - + + set_logging_options(parser) - (options, args) = parser.parse_args() - - logging_config = {} - - if options.logfile == "stdout": - logging_config["stream"] = sys.stdout - elif options.logfile == "stderr": - logging_config["stream"] = sys.stderr - else: - logging_config["filename"] = options.logfile + return parser.parse_args() + + +if __name__ == "__main__" : - logging_config["level"] = max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)) - - logging.basicConfig(**logging_config) - + (options, args) = get_options() + + set_logging(options) + logging.debug("OPTIONS : " + repr(options)) - - engine = create_engine('sqlite:///'+options.database, echo=((options.verbose-options.quiet)>0)) + engine, metadata = setup_database('sqlite:///'+options.database, echo=((options.verbose-options.quiet)>0), create_all = False) + Session = sessionmaker() - conn = engine.connect() try : session = Session(bind=conn) diff -r eb885a117aa0 -r 54d7f1486ac4 script/iri_tweet/models.py --- a/script/iri_tweet/models.py Wed Jan 12 13:25:01 2011 +0100 +++ b/script/iri_tweet/models.py Tue Jan 18 10:08:03 2011 +0100 @@ -1,7 +1,7 @@ -from sqlalchemy import Boolean, Table, Column, BigInteger, \ - Integer, String, MetaData, ForeignKey, DateTime +from sqlalchemy import Boolean, Table, Column, BigInteger, Integer, String, \ + MetaData, ForeignKey, DateTime, create_engine from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.orm import relationship, backref, sessionmaker +from sqlalchemy.orm import relationship, backref import datetime import email.utils import simplejson @@ -9,10 +9,11 @@ Base = declarative_base() +APPLICATION_NAME = "IRI_TWITTER" CONSUMER_KEY = "54ThDZhpEjokcMgHJOMnQA" CONSUMER_SECRET = "wUoL9UL2T87tfc97R0Dff2EaqRzpJ5XGdmaN2XK3udA" -ACCESS_TOKEN_KEY= "47312923-LiNTtz0I18YXMVIrFeTuhmH7bOvYsK6p3Ln2Dc" -ACCESS_TOKEN_SECRET = "r3LoXVcjImNAElUpWqTu2SG2xCdWFHkva7xeQoncA" +#ACCESS_TOKEN_KEY= "47312923-LiNTtz0I18YXMVIrFeTuhmH7bOvYsK6p3Ln2Dc" +#ACCESS_TOKEN_SECRET = "r3LoXVcjImNAElUpWqTu2SG2xCdWFHkva7xeQoncA" def adapt_date(date_str): ts = email.utils.parsedate_tz(date_str) @@ -179,6 +180,22 @@ for key, value in kwargs.items(): if hasattr(self,key): setattr(self,key,value) + + +def setup_database(*args, **kwargs): + + create_all = True + if "create_all" in kwargs: + create_all = kwargs["create_all"] + del(kwargs["create_all"]) + + engine = create_engine(*args, **kwargs) + metadata = Base.metadata + + if create_all: + metadata.create_all(engine) + + return (engine, metadata) rest_tweet_tweet = { u'iso_language_code': 'unicode', diff -r eb885a117aa0 -r 54d7f1486ac4 script/iri_tweet/utils.py --- a/script/iri_tweet/utils.py Wed Jan 12 13:25:01 2011 +0100 +++ b/script/iri_tweet/utils.py Tue Jan 18 10:08:03 2011 +0100 @@ -1,11 +1,27 @@ -import email.utils -import logging from models import * import datetime +import email.utils +import json +import logging +import sys import twitter import twitter_text +import os.path +import twitter.oauth +def get_oauth_token(token_file_path=None): + + if token_file_path and os.path.file_exists(token_file_path): + logging.debug("reading token from file %s" % token_file_path) + return twitter.oauth.read_token_file(token_file_path) + #read access token info from path + + if 'ACCESS_TOKEN_KEY' in dict() and 'ACCESS_TOKEN_SECRET' in dict() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET: + return ACCESS_TOKEN_KEY,ACCESS_TOKEN_SECRET + + return twitter.oauth_dance.oauth_dance(APPLICATION_NAME, CONSUMER_KEY, CONSUMER_SECRET, token_filename) + def parse_date(date_str): ts = email.utils.parsedate_tz(date_str) return datetime.datetime(*ts[0:7]) @@ -45,196 +61,250 @@ return value return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()]) -def get_user(user_dict, session): - - logging.debug("Get user : " + repr(user_dict)) - - user_id = user_dict.get("id",None) - user_name = user_dict.get("screen_name", user_dict.get("name", None)) - - if user_id is None and user_name is None: - return None - - if user_id: - user = session.query(User).filter(User.id == user_id).first() - else: - user = session.query(User).filter(User.screen_name == user_name).first() - - if user is not None: - return user - - user_created_at = user_dict.get("created_at", None) - - if user_created_at is None: - t = twitter.Twitter(auth=twitter.OAuth(ACCESS_TOKEN_KEY, ACCESS_TOKEN_SECRET, CONSUMER_KEY, CONSUMER_SECRET)) - try: - if user_id: - user_dict = t.users.show(user_id=user_id) - else: - user_dict = t.users.show(screen_name=user_name) - except Exception as e: - logging.info("get_user : TWITTER ERROR : " + repr(e)) - logging.info("get_user : TWITTER ERROR : " + str(e)) - - user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"]) - if "id" not in user_dict: - return None - - user = User(**user_dict) - - session.add(user) - session.flush() - - return user - # if not, if needed get info from twitter - # create user - # return it - -def process_entity(ind, ind_type, tweet, session): - - logging.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) - - entity_dict = { - "indice_start": ind["indices"][0], - "indice_end" : ind["indices"][1], - "tweet_id" : tweet.id, - "tweet" : tweet - } - - def process_hashtags(): - text = ind.get("text", ind.get("hashtag", None)) - if text is None: - return None - hashtag = session.query(Hashtag).filter(Hashtag.text == text).first() - if not hashtag: - ind["text"] = text - hashtag = Hashtag(**ind) - session.add(hashtag) - session.flush() - entity_dict['hashtag'] = hashtag - entity_dict['hashtag_id'] = hashtag.id - entity = EntityHashtag(**entity_dict) - return entity - - def process_user_mentions(): - user_mention = get_user(ind, session) - if user_mention is None: - entity_dict['user'] = None - entity_dict['user_id'] = None - else: - entity_dict['user'] = user_mention - entity_dict['user_id'] = user_mention.id - entity = EntityUser(**entity_dict) - return entity - - def process_urls(): - url = session.query(Url).filter(Url.url == ind["url"]).first() - if url is None: - url = Url(**ind) - session.add(url) - session.flush() - entity_dict['url'] = url - entity_dict['url_id'] = url.id - entity = EntityUrl(**entity_dict) - return entity - - #{'': lambda } - entity = { - 'hashtags': process_hashtags, - 'user_mentions' : process_user_mentions, - 'urls' : process_urls - }[ind_type]() - - logging.debug("Process_entity entity_dict: " + repr(entity_dict)) - if entity: - session.add(entity) +class TwitterProcessorException(Exception): + pass -def from_twitter_rest(ts, jsontxt, session): +class TwitterProcessor(object): - tweet_nb = session.query(Tweet).filter(Tweet.id == ts["id"]).count() - if tweet_nb > 0: - return + def __init__(self, json_dict, json_txt, session): + + if json_dict is None and json_txt is None: + raise TwitterProcessorException("No json") + + if json_dict is None: + self.json_dict = json.loads(json_txt) + else: + self.json_dict = json_dict + + if not json_txt: + self.json_txt = json.dumps(json_dict) + else: + self.json_txt = json_txt + + if "id" not in self.json_dict: + raise TwitterProcessorException("No id in json") + + self.session = session + + def __get_user(self, user_dict): + logging.debug("Get user : " + repr(user_dict)) + + user_id = user_dict.get("id",None) + user_name = user_dict.get("screen_name", user_dict.get("name", None)) - tweet_fields = { - 'created_at': ts["created_at"], - 'favorited': False, - 'id': ts["id"], - 'id_str': ts["id_str"], - #'in_reply_to_screen_name': ts["to_user"], - 'in_reply_to_user_id': ts["to_user_id"], - 'in_reply_to_user_id_str': ts["to_user_id_str"], - #'place': ts["place"], - 'source': ts["source"], - 'text': ts["text"], - 'truncated': False, - 'original_json' : jsontxt, - } + if user_id is None and user_name is None: + return None + + if user_id: + user = self.session.query(User).filter(User.id == user_id).first() + else: + user = self.session.query(User).filter(User.screen_name == user_name).first() + + if user is not None: + return user - #user + user_created_at = user_dict.get("created_at", None) + + if user_created_at is None: + acess_token_key, access_token_secret = get_oauth_token() + t = twitter.Twitter(auth=twitter.OAuth(token_key, token_secret, CONSUMER_KEY, CONSUMER_SECRET)) + try: + if user_id: + user_dict = t.users.show(user_id=user_id) + else: + user_dict = t.users.show(screen_name=user_name) + except Exception as e: + logging.info("get_user : TWITTER ERROR : " + repr(e)) + logging.info("get_user : TWITTER ERROR : " + str(e)) + + user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"]) + if "id" not in user_dict: + return None + + user = User(**user_dict) + + self.session.add(user) + self.session.flush() + + return user - user_fields = { - 'id' : ts['from_user_id'], - 'id_str' : ts['from_user_id_str'], - 'lang' : ts['iso_language_code'], - 'profile_image_url' : ts["profile_image_url"], - 'screen_name' : ts["from_user"], - } + def __process_entity(self, ind, ind_type): + logging.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) - user = get_user(user_fields, session) - if user is None: - log.warning("USER not found " + repr(user_fields)) - tweet_fields["user"] = None - tweet_fields["user_id"] = None - else: - tweet_fields["user"] = user - tweet_fields["user_id"] = user.id + entity_dict = { + "indice_start": ind["indices"][0], + "indice_end" : ind["indices"][1], + "tweet_id" : self.tweet.id, + "tweet" : self.tweet + } - tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"]) - tweet = Tweet(**tweet_fields) - session.add(tweet) - - text = tweet.text - - extractor = twitter_text.Extractor(text) - - for ind in extractor.extract_hashtags_with_indices(): - process_entity(ind, "hashtags", tweet, session) + def process_hashtags(): + text = ind.get("text", ind.get("hashtag", None)) + if text is None: + return None + hashtag = self.session.query(Hashtag).filter(Hashtag.text == text).first() + if not hashtag: + ind["text"] = text + hashtag = Hashtag(**ind) + self.session.add(hashtag) + self.session.flush() + entity_dict['hashtag'] = hashtag + entity_dict['hashtag_id'] = hashtag.id + entity = EntityHashtag(**entity_dict) + return entity + + def process_user_mentions(): + user_mention = self.__get_user(ind) + if user_mention is None: + entity_dict['user'] = None + entity_dict['user_id'] = None + else: + entity_dict['user'] = user_mention + entity_dict['user_id'] = user_mention.id + entity = EntityUser(**entity_dict) + return entity + + def process_urls(): + url = self.session.query(Url).filter(Url.url == ind["url"]).first() + if url is None: + url = Url(**ind) + self.session.add(url) + self.session.flush() + entity_dict['url'] = url + entity_dict['url_id'] = url.id + entity = EntityUrl(**entity_dict) + return entity - for ind in extractor.extract_mentioned_screen_names_with_indices(): - process_entity(ind, "user_mentions", tweet, session) - - for ind in extractor.extract_urls_with_indices(): - process_entity(ind, "urls", tweet, session) - - - + #{'': lambda } + entity = { + 'hashtags': process_hashtags, + 'user_mentions' : process_user_mentions, + 'urls' : process_urls + }[ind_type]() + + logging.debug("Process_entity entity_dict: " + repr(entity_dict)) + if entity: + self.session.add(entity) + self.session.flush() + -def from_twitter_stream(ts, jsontxt, session): + def __process_twitter_stream(self): + + tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count() + if tweet_nb > 0: + return + + ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"]) + + # get or create user + user = self.__get_user(self.json_dict["user"]) + if user is None: + log.warning("USER not found " + repr(ts["user"])) + ts_copy["user"] = None + ts_copy["user_id"] = None + else: + ts_copy["user"] = user + ts_copy["user_id"] = ts_copy["user"].id + ts_copy["original_json"] = self.json_txt + + self.tweet = Tweet(**ts_copy) + self.session.add(self.tweet) + self.session.flush() + + # get entities + for ind_type, entity_list in self.json_dict["entities"].items(): + for ind in entity_list: + self.__process_entity(ind, ind_type) + + + def __process_twitter_rest(self): + tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count() + if tweet_nb > 0: + return + + tweet_fields = { + 'created_at': self.json_dict["created_at"], + 'favorited': False, + 'id': self.json_dict["id"], + 'id_str': self.json_dict["id_str"], + #'in_reply_to_screen_name': ts["to_user"], + 'in_reply_to_user_id': self.json_dict["to_user_id"], + 'in_reply_to_user_id_str': self.json_dict["to_user_id_str"], + #'place': ts["place"], + 'source': self.json_dict["source"], + 'text': self.json_dict["text"], + 'truncated': False, + 'original_json' : self.json_txt, + } + + #user - tweet_nb = session.query(Tweet).filter(Tweet.id == ts["id"]).count() - if tweet_nb > 0: - return - - ts_copy = adapt_fields(ts, fields_adapter["stream"]["tweet"]) + user_fields = { + 'id' : self.json_dict['from_user_id'], + 'id_str' : self.json_dict['from_user_id_str'], + 'lang' : self.json_dict['iso_language_code'], + 'profile_image_url' : self.json_dict["profile_image_url"], + 'screen_name' : self.json_dict["from_user"], + } + + user = self.__get_user(user_fields) + if user is None: + log.warning("USER not found " + repr(user_fields)) + tweet_fields["user"] = None + tweet_fields["user_id"] = None + else: + tweet_fields["user"] = user + tweet_fields["user_id"] = user.id + + tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"]) + self.tweet = Tweet(**tweet_fields) + session.add(self.tweet) + + text = self.tweet.text + + extractor = twitter_text.Extractor(text) + + for ind in extractor.extract_hashtags_with_indices(): + self.__process_entity(ind, "hashtags") + + for ind in extractor.extract_mentioned_screen_names_with_indices(): + self.__process_entity(ind, "user_mentions") + + for ind in extractor.extract_urls_with_indices(): + self.__process_entity(ind, "urls") + + self.session.flush() + + + def process(self): + if "metadata" in self.json_dict: + self.__process_twitter_rest() + else: + self.__process_twitter_stream() + + +def set_logging(options): - # get or create user - user = get_user(ts["user"], session) - if user is None: - log.warning("USER not found " + repr(ts["user"])) - ts_copy["user"] = None - ts_copy["user_id"] = None + logging_config = {} + + if options.logfile == "stdout": + logging_config["stream"] = sys.stdout + elif options.logfile == "stderr": + logging_config["stream"] = sys.stderr else: - ts_copy["user"] = user - ts_copy["user_id"] = ts_copy["user"].id - ts_copy["original_json"] = jsontxt + logging_config["filename"] = options.logfile + + logging_config["level"] = max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)) + logging.basicConfig(**logging_config) - tweet = Tweet(**ts_copy) - session.add(tweet) - session.flush() - - # get entities - for ind_type, entity_list in ts["entities"].items(): - for ind in entity_list: - process_entity(ind, ind_type, tweet, session) + options.debug = (options.verbose-options.quiet > 0) + +def set_logging_options(parser): + parser.add_option("-l", "--log", dest="logfile", + help="log to file", metavar="LOG", default="stderr") + parser.add_option("-v", dest="verbose", action="count", + help="verbose", metavar="VERBOSE", default=0) + parser.add_option("-q", dest="quiet", action="count", + help="quiet", metavar="QUIET", default=0) diff -r eb885a117aa0 -r 54d7f1486ac4 script/rest/search_enmi.py --- a/script/rest/search_enmi.py Wed Jan 12 13:25:01 2011 +0100 +++ b/script/rest/search_enmi.py Tue Jan 18 10:08:03 2011 +0100 @@ -1,5 +1,7 @@ +from iri_tweet import models, utils +from sqlalchemy.orm import sessionmaker import sqlite3 -import twython +import twitter def get_option(): @@ -13,6 +15,11 @@ help="quiet", metavar="QUIET", default=0) parser.add_option("-r", "--request", dest="request", help="twitter request", metavar="REQUEST", default=0) + parser.add_option("-Q", dest="query", + help="query", metavar="QUERY") + parser.add_option("-P", dest="rpp", metavar="RPP", default="50", + help="Result per page", metavar="RPP") + #add request token #add @@ -20,20 +27,31 @@ if __name__ == "__main__": - twitter = twython.Twython() - conn = sqlite3.connect('enmi2010_twitter_rest.db') + (options, args) = get_option() + + twitter = twitter.Twitter(domain="search.twitter.com") + engine, metadata = models.setup_database('sqlite:///'+args[0], echo=((options.verbose-options.quiet)>0)) + Session = sessionmaker(bind=engine) + session = Session() try: - conn.row_factory = sqlite3.Row - curs = conn.cursor() - curs.execute("create table if not exists tweet_tweet (json);") - conn.commit() + #conn.row_factory = sqlite3.Row + #curs = conn.cursor() + #curs.execute("create table if not exists tweet_tweet (json);") + #conn.commit() - results = twitter.searchTwitter(q="#enmi", rpp="50") - for tweet in results["results"]: - print tweet - curs.execute("insert into tweet_tweet (json) values (:json);", {"json":unicode(tweet)}) - conn.commit() + results = None + page = 1 + + while page <= int(1500/int(options.rpp)) and ( results is None or len(results) > 0): + results = twitter. search(q=options.query, rpp=options.rpp, page=page) + for tweet in results["results"]: + print tweet + processor = utils.TwitterProcessor(tweet, None, session) + processor.process() + session.flush() + page += 1 + session.commit() finally: - conn.close() + session.close() diff -r eb885a117aa0 -r 54d7f1486ac4 script/stream/recorder.py --- a/script/stream/recorder.py Wed Jan 12 13:25:01 2011 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,76 +0,0 @@ -import time - -from getpass import getpass -from textwrap import TextWrapper - -import tweepy -import webbrowser - -CONSUMER_KEY = "54ThDZhpEjokcMgHJOMnQA" -CONSUMER_SECRET = "wUoL9UL2T87tfc97R0Dff2EaqRzpJ5XGdmaN2XK3udA" - -class StreamWatcherListener(tweepy.StreamListener): - - status_wrapper = TextWrapper(width=60, initial_indent=' ', subsequent_indent=' ') - - def on_status(self, status): - try: - print self.status_wrapper.fill(status.text) - print '\n %s %s via %s\n' % (status.author.screen_name, status.created_at, status.source) - except: - # Catch any unicode errors while printing to console - # and just ignore them to avoid breaking application. - pass - - def on_error(self, status_code): - print 'An error has occured! Status code = %s' % status_code - return True # keep stream alive - - def on_timeout(self): - print 'Snoozing Zzzzzz' - - - -def main(): - - auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET) - auth_url = auth.get_authorization_url() - print 'Please authorize: ' + auth_url - webbrowser.open(auth_url) - - # Prompt for login credentials and setup stream object - verifier = raw_input('PIN: ').strip() - auth.get_access_token(verifier) - stream = tweepy.Stream(auth, StreamWatcherListener(), timeout=None) - - # Prompt for mode of streaming - valid_modes = ['sample', 'filter'] - while True: - mode = raw_input('Mode? [sample/filter] ') - if mode in valid_modes: - break - print 'Invalid mode! Try again.' - - if mode == 'sample': - stream.sample() - - elif mode == 'filter': - follow_list = raw_input('Users to follow (comma separated): ').strip() - track_list = raw_input('Keywords to track (comma seperated): ').strip() - if follow_list: - follow_list = [u for u in follow_list.split(',')] - else: - follow_list = None - if track_list: - track_list = [k for k in track_list.split(',')] - else: - track_list = None - - stream.filter(follow_list, track_list) - - -if __name__ == '__main__': - try: - main() - except KeyboardInterrupt: - print '\nGoodbye!' diff -r eb885a117aa0 -r 54d7f1486ac4 script/stream/recorder_tweetstream.py --- a/script/stream/recorder_tweetstream.py Wed Jan 12 13:25:01 2011 +0100 +++ b/script/stream/recorder_tweetstream.py Tue Jan 18 10:08:03 2011 +0100 @@ -1,10 +1,15 @@ +from getpass import getpass +from iri_tweet import models, utils +from optparse import OptionParser +from sqlalchemy.orm import sessionmaker +from sqlite3 import * +import logging +import os +import socket +import sys import tweetstream -from getpass import getpass -import socket socket._fileobject.default_bufsize = 0 -from sqlite3 import * -from optparse import OptionParser -import os + #columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user'] @@ -63,11 +68,13 @@ -def process_tweet(tweet, cursor, debug): - print tweet - cursor.execute("insert into tweet_tweet (json) values (:json);", {"json":unicode(tweet)}); +def process_tweet(tweet, session, debug): + + logging.debug("Process_tweet :" + repr(tweet)) + processor = utils.TwitterProcessor(tweet, None, session) + processor.process() -def main(username, password, track, curs, debug, reconnects): +def main(username, password, track, session, debug, reconnects): username = username or raw_input('Twitter username: ') password = password or getpass('Twitter password: ') @@ -78,12 +85,12 @@ stream = ReconnectingTweetStream(username, password, track_list, reconnects=reconnects) try: for tweet in stream: - process_tweet(tweet, curs, debug) + process_tweet(tweet, session, debug) + session.commit() finally: stream.close() - -if __name__ == '__main__': - + +def get_options(): parser = OptionParser() parser.add_option("-f", "--file", dest="filename", help="write tweet to FILE", metavar="FILE", default="enmi2010_twitter.db") @@ -95,14 +102,21 @@ help="Twitter track", metavar="TRACK") parser.add_option("-n", "--new", dest="new", action="store_true", help="new database", default=False) - parser.add_option("-d", "--debug", dest="debug", action="store_true", - help="debug", default=False) parser.add_option("-r", "--reconnects", dest="reconnects", help="Reconnects", metavar="RECONNECTS", default=10, type='int') + + utils.set_logging_options(parser) + return parser.parse_args() + - (options, args) = parser.parse_args() +if __name__ == '__main__': + + (options, args) = get_options() + + utils.set_logging(options) + if options.debug: print "OPTIONS : " print repr(options) @@ -110,16 +124,15 @@ if options.new and os.path.exists(options.filename): os.remove(options.filename) - conn = connect(options.filename) + engine, metadata = models.setup_database('sqlite:///'+options.filename, echo=(options.debug)) + Session = sessionmaker(bind=engine) + session = Session() + try: - conn.row_factory = Row - curs = conn.cursor() - - curs.execute("create table if not exists tweet_tweet (json);") - try: - main(options.username, options.password, options.track, curs, options.debug, options.reconnects) + main(options.username, options.password, options.track, session, options.debug, options.reconnects) except KeyboardInterrupt: print '\nGoodbye!' + session.commit() finally: - conn.close() + session.close() diff -r eb885a117aa0 -r 54d7f1486ac4 script/virtualenv/res/distribute-0.6.14.tar.gz Binary file script/virtualenv/res/distribute-0.6.14.tar.gz has changed diff -r eb885a117aa0 -r 54d7f1486ac4 web/index.php --- a/web/index.php Wed Jan 12 13:25:01 2011 +0100 +++ b/web/index.php Tue Jan 18 10:08:03 2011 +0100 @@ -480,7 +480,7 @@