tweetcast/server-gevent/tweetcast.py
author Raphael Velt <raph.velt@gmail.com>
Tue, 14 Feb 2012 14:17:43 +0100
changeset 506 f7fd2458fc5c
parent 462 663f82cc659b
permissions -rwxr-xr-x
Added museo 14.02.12 and several corrections

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from gevent import monkey; monkey.patch_all()
# Importer d'abord, sinon exception
import anyjson, gevent, psycopg2
from sqlalchemy import (Boolean, Column, BigInteger, Integer, String, 
    ForeignKey, DateTime, create_engine, asc, func)
from sqlalchemy.orm import backref, relationship, sessionmaker, joinedload
from sqlalchemy.ext.declarative import declarative_base
from gevent.pywsgi import WSGIServer
from urlparse import parse_qs
import datetime
from server_setup import SQL_CONNECT, LISTENER
import os
import _socket
import pwd
from gevent.server import StreamServer 

Base = declarative_base()
engine = create_engine(SQL_CONNECT)
Session = sessionmaker(bind=engine)

class TweetSource(Base):
    __tablename__ = 'tweet_tweet_source'
    id = Column(Integer, primary_key=True, autoincrement=True)
    original_json = Column(String)
    received_at = Column(DateTime, default=datetime.datetime.utcnow, index=True)

class Tweet(Base):
    __tablename__ = 'tweet_tweet'
    id = Column(BigInteger, primary_key=True, autoincrement=False)
    tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id'))
    tweet_source = relationship("TweetSource", backref="tweet")
    def jsondict(self):
        tweetdict = anyjson.deserialize(self.tweet_source.original_json)
        keys_to_delete = [
            'in_reply_to_screen_name',
            'in_reply_to_user_id',
            'retweeted',
            'place',
            'geo',
            'source',
            'contributors',
            'coordinates',
            'retweet_count',
            'favorited',
            'truncated',
            'possibly_sensitive'
        ]
        user_keys_to_delete = [
            'default_profile_image',
            'show_all_inline_media',
            'contributors_enabled',
            'profile_sidebar_fill_color',
            'created_at',
            'lang',
            'time_zone',
            'profile_sidebar_border_color',
            'follow_request_sent',
            'profile_background_image_url',
            'profile_background_image_url_https',
            'followers_count',
            'description',
            'url',
            'geo_enabled',
            'profile_use_background_image',
            'default_profile',
            'following',
            'profile_text_color',
            'is_translator',
            'favourites_count',
            'listed_count',
            'friends_count',
            'profile_link_color',
            'protected',
            'location',
            'notifications',
            'profile_image_url_https',
            'statuses_count',
            'verified',
            'profile_background_color',
            'profile_background_tile',
            'utc_offset'
        ]
        
        def textids(dictionary):
            idfields = [key for key in dictionary if key[-2:] == 'id']
            for key in idfields:
                keystr = key + '_str'
                if keystr in dictionary:
                    dictionary[key] = dictionary[keystr]
                    del dictionary[keystr]
                        
        for key in keys_to_delete:
            if key in tweetdict:
                del tweetdict[key]
        for key in user_keys_to_delete:
            if key in tweetdict['user']:
                del tweetdict['user'][key]
        textids(tweetdict)
        textids(tweetdict['user'])
        if 'retweeted_status' in tweetdict:
            for key in keys_to_delete:
                if key in tweetdict['retweeted_status']:
                    del tweetdict['retweeted_status'][key]
            for key in user_keys_to_delete:
                if key in tweetdict['retweeted_status']['user']:
                    del tweetdict['retweeted_status']['user'][key]
            textids(tweetdict['retweeted_status'])
        return tweetdict


def unlink(path):
    from errno import ENOENT
    try:
        os.unlink(path)
    except OSError, ex:
        if ex.errno != ENOENT:
            raise

def link(src, dest):
    from errno import ENOENT
    try:
        os.link(src, dest)
    except OSError, ex:
        if ex.errno != ENOENT:
            raise 

def bind_unix_listener(path, backlog=50, user=None):
    pid = os.getpid()
    tempname = '%s.%s.tmp' % (path, pid)
    backname = '%s.%s.bak' % (path, pid)
    unlink(tempname)
    unlink(backname)
    link(path, backname)
    try:
        sock = _socket.socket(_socket.AF_UNIX, _socket.SOCK_STREAM)
        sock.setblocking(0)
        sock.bind(tempname)
        try:
            if user is not None:
                user = pwd.getpwnam(user)
                os.chown(tempname, user.pw_uid, user.pw_gid)
                os.chmod(tempname, 0600)
            sock.listen(backlog)
            try:
                os.rename(tempname, path)
            except:
                os.rename(backname, path)
                backname = None
                raise
            tempname = None
            return sock
        finally:
            if tempname is not None:
                unlink(tempname)
    finally:
        if backname is not None:
            unlink(backname) 

class Tweetcast(object):
    
    def __init__(self):
        print "Starting server"
        self.data = []
        self.lastid = 0L
        self.session = Session()
        self.refresh()        

    def refresh(self):
        print "refreshing"
        query = self.session.query(Tweet).order_by(asc(Tweet.id)).options(joinedload(Tweet.tweet_source)).filter(Tweet.id > self.lastid)
        for tweet in query:
            self.lastid = tweet.id
            self.data.append(anyjson.serialize(tweet.jsondict()))
        print len(self.data), "tweets in memory"
        self.lastrefresh = datetime.datetime.now()

    def webserver(self, env, start_response):
        if (datetime.datetime.now() - self.lastrefresh).seconds > 10:
            self.refresh()
        if env['PATH_INFO'] == '/':
            httpquery = parse_qs(env['QUERY_STRING'])
            print "serving tweets to", env['REMOTE_ADDR'], httpquery
            frompos = 0
            if "from" in httpquery:
                frompos = int(httpquery["from"][0])
            result = '%s{"tweets" : [ %s ] }%s'%(
                "%s("%httpquery["callback"][0] if "callback" in httpquery else "",
                ",".join(self.data[frompos:]),
                ")" if "callback" in httpquery else ""
            )
            print "Sending response"
            start_response('200 OK', [('Content-Type', 'application/javascript' if "callback" in httpquery else 'application/json' )])
            return [result]
        else:
            start_response('404 Not Found', [('Content-Type', 'text/html')])
            return ['<h1>Not Found</h1>']
        
    def startserver(self, listener):
        
        if isinstance(listener, str):
            listener = bind_unix_listener(listener)
            
        WSGIServer(listener, self.webserver).serve_forever()        

if __name__ == "__main__":
    tc = Tweetcast()

    tc.startserver(LISTENER)