diff -r 0a2505c3b547 -r 8276f3ff7a3f tweetcast/server-gevent/tweetcast.py --- a/tweetcast/server-gevent/tweetcast.py Mon Mar 05 18:41:11 2012 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,211 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -from gevent import monkey; monkey.patch_all() -# Importer d'abord, sinon exception -import anyjson, gevent, psycopg2 -from sqlalchemy import (Boolean, Column, BigInteger, Integer, String, - ForeignKey, DateTime, create_engine, asc, func) -from sqlalchemy.orm import backref, relationship, sessionmaker, joinedload -from sqlalchemy.ext.declarative import declarative_base -from gevent.pywsgi import WSGIServer -from urlparse import parse_qs -import datetime -from server_setup import SQL_CONNECT, LISTENER -import os -import _socket -import pwd -from gevent.server import StreamServer - -Base = declarative_base() -engine = create_engine(SQL_CONNECT) -Session = sessionmaker(bind=engine) - -class TweetSource(Base): - __tablename__ = 'tweet_tweet_source' - id = Column(Integer, primary_key=True, autoincrement=True) - original_json = Column(String) - received_at = Column(DateTime, default=datetime.datetime.utcnow, index=True) - -class Tweet(Base): - __tablename__ = 'tweet_tweet' - id = Column(BigInteger, primary_key=True, autoincrement=False) - tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id')) - tweet_source = relationship("TweetSource", backref="tweet") - def jsondict(self): - tweetdict = anyjson.deserialize(self.tweet_source.original_json) - keys_to_delete = [ - 'in_reply_to_screen_name', - 'in_reply_to_user_id', - 'retweeted', - 'place', - 'geo', - 'source', - 'contributors', - 'coordinates', - 'retweet_count', - 'favorited', - 'truncated', - 'possibly_sensitive' - ] - user_keys_to_delete = [ - 'default_profile_image', - 'show_all_inline_media', - 'contributors_enabled', - 'profile_sidebar_fill_color', - 'created_at', - 'lang', - 'time_zone', - 'profile_sidebar_border_color', - 'follow_request_sent', - 'profile_background_image_url', - 'profile_background_image_url_https', - 'followers_count', - 'description', - 'url', - 'geo_enabled', - 'profile_use_background_image', - 'default_profile', - 'following', - 'profile_text_color', - 'is_translator', - 'favourites_count', - 'listed_count', - 'friends_count', - 'profile_link_color', - 'protected', - 'location', - 'notifications', - 'profile_image_url_https', - 'statuses_count', - 'verified', - 'profile_background_color', - 'profile_background_tile', - 'utc_offset' - ] - - def textids(dictionary): - idfields = [key for key in dictionary if key[-2:] == 'id'] - for key in idfields: - keystr = key + '_str' - if keystr in dictionary: - dictionary[key] = dictionary[keystr] - del dictionary[keystr] - - for key in keys_to_delete: - if key in tweetdict: - del tweetdict[key] - for key in user_keys_to_delete: - if key in tweetdict['user']: - del tweetdict['user'][key] - textids(tweetdict) - textids(tweetdict['user']) - if 'retweeted_status' in tweetdict: - for key in keys_to_delete: - if key in tweetdict['retweeted_status']: - del tweetdict['retweeted_status'][key] - for key in user_keys_to_delete: - if key in tweetdict['retweeted_status']['user']: - del tweetdict['retweeted_status']['user'][key] - textids(tweetdict['retweeted_status']) - return tweetdict - - -def unlink(path): - from errno import ENOENT - try: - os.unlink(path) - except OSError, ex: - if ex.errno != ENOENT: - raise - -def link(src, dest): - from errno import ENOENT - try: - os.link(src, dest) - except OSError, ex: - if ex.errno != ENOENT: - raise - -def bind_unix_listener(path, backlog=50, user=None): - pid = os.getpid() - tempname = '%s.%s.tmp' % (path, pid) - backname = '%s.%s.bak' % (path, pid) - unlink(tempname) - unlink(backname) - link(path, backname) - try: - sock = _socket.socket(_socket.AF_UNIX, _socket.SOCK_STREAM) - sock.setblocking(0) - sock.bind(tempname) - try: - if user is not None: - user = pwd.getpwnam(user) - os.chown(tempname, user.pw_uid, user.pw_gid) - os.chmod(tempname, 0600) - sock.listen(backlog) - try: - os.rename(tempname, path) - except: - os.rename(backname, path) - backname = None - raise - tempname = None - return sock - finally: - if tempname is not None: - unlink(tempname) - finally: - if backname is not None: - unlink(backname) - -class Tweetcast(object): - - def __init__(self): - print "Starting server" - self.data = [] - self.lastid = 0L - self.session = Session() - self.refresh() - - def refresh(self): - print "refreshing" - query = self.session.query(Tweet).order_by(asc(Tweet.id)).options(joinedload(Tweet.tweet_source)).filter(Tweet.id > self.lastid) - for tweet in query: - self.lastid = tweet.id - self.data.append(anyjson.serialize(tweet.jsondict())) - print len(self.data), "tweets in memory" - self.lastrefresh = datetime.datetime.now() - - def webserver(self, env, start_response): - if (datetime.datetime.now() - self.lastrefresh).seconds > 10: - self.refresh() - if env['PATH_INFO'] == '/': - httpquery = parse_qs(env['QUERY_STRING']) - print "serving tweets to", env['REMOTE_ADDR'], httpquery - frompos = 0 - if "from" in httpquery: - frompos = int(httpquery["from"][0]) - result = '%s{"tweets" : [ %s ] }%s'%( - "%s("%httpquery["callback"][0] if "callback" in httpquery else "", - ",".join(self.data[frompos:]), - ")" if "callback" in httpquery else "" - ) - print "Sending response" - start_response('200 OK', [('Content-Type', 'application/javascript' if "callback" in httpquery else 'application/json' )]) - return [result] - else: - start_response('404 Not Found', [('Content-Type', 'text/html')]) - return ['

Not Found

'] - - def startserver(self, listener): - - if isinstance(listener, str): - listener = bind_unix_listener(listener) - - WSGIServer(listener, self.webserver).serve_forever() - -if __name__ == "__main__": - tc = Tweetcast() - - tc.startserver(LISTENER)