--- a/tweetcast/server-gevent/tweetcast.py Mon Mar 05 18:41:11 2012 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,211 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-from gevent import monkey; monkey.patch_all()
-# Importer d'abord, sinon exception
-import anyjson, gevent, psycopg2
-from sqlalchemy import (Boolean, Column, BigInteger, Integer, String,
- ForeignKey, DateTime, create_engine, asc, func)
-from sqlalchemy.orm import backref, relationship, sessionmaker, joinedload
-from sqlalchemy.ext.declarative import declarative_base
-from gevent.pywsgi import WSGIServer
-from urlparse import parse_qs
-import datetime
-from server_setup import SQL_CONNECT, LISTENER
-import os
-import _socket
-import pwd
-from gevent.server import StreamServer
-
-Base = declarative_base()
-engine = create_engine(SQL_CONNECT)
-Session = sessionmaker(bind=engine)
-
-class TweetSource(Base):
- __tablename__ = 'tweet_tweet_source'
- id = Column(Integer, primary_key=True, autoincrement=True)
- original_json = Column(String)
- received_at = Column(DateTime, default=datetime.datetime.utcnow, index=True)
-
-class Tweet(Base):
- __tablename__ = 'tweet_tweet'
- id = Column(BigInteger, primary_key=True, autoincrement=False)
- tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id'))
- tweet_source = relationship("TweetSource", backref="tweet")
- def jsondict(self):
- tweetdict = anyjson.deserialize(self.tweet_source.original_json)
- keys_to_delete = [
- 'in_reply_to_screen_name',
- 'in_reply_to_user_id',
- 'retweeted',
- 'place',
- 'geo',
- 'source',
- 'contributors',
- 'coordinates',
- 'retweet_count',
- 'favorited',
- 'truncated',
- 'possibly_sensitive'
- ]
- user_keys_to_delete = [
- 'default_profile_image',
- 'show_all_inline_media',
- 'contributors_enabled',
- 'profile_sidebar_fill_color',
- 'created_at',
- 'lang',
- 'time_zone',
- 'profile_sidebar_border_color',
- 'follow_request_sent',
- 'profile_background_image_url',
- 'profile_background_image_url_https',
- 'followers_count',
- 'description',
- 'url',
- 'geo_enabled',
- 'profile_use_background_image',
- 'default_profile',
- 'following',
- 'profile_text_color',
- 'is_translator',
- 'favourites_count',
- 'listed_count',
- 'friends_count',
- 'profile_link_color',
- 'protected',
- 'location',
- 'notifications',
- 'profile_image_url_https',
- 'statuses_count',
- 'verified',
- 'profile_background_color',
- 'profile_background_tile',
- 'utc_offset'
- ]
-
- def textids(dictionary):
- idfields = [key for key in dictionary if key[-2:] == 'id']
- for key in idfields:
- keystr = key + '_str'
- if keystr in dictionary:
- dictionary[key] = dictionary[keystr]
- del dictionary[keystr]
-
- for key in keys_to_delete:
- if key in tweetdict:
- del tweetdict[key]
- for key in user_keys_to_delete:
- if key in tweetdict['user']:
- del tweetdict['user'][key]
- textids(tweetdict)
- textids(tweetdict['user'])
- if 'retweeted_status' in tweetdict:
- for key in keys_to_delete:
- if key in tweetdict['retweeted_status']:
- del tweetdict['retweeted_status'][key]
- for key in user_keys_to_delete:
- if key in tweetdict['retweeted_status']['user']:
- del tweetdict['retweeted_status']['user'][key]
- textids(tweetdict['retweeted_status'])
- return tweetdict
-
-
-def unlink(path):
- from errno import ENOENT
- try:
- os.unlink(path)
- except OSError, ex:
- if ex.errno != ENOENT:
- raise
-
-def link(src, dest):
- from errno import ENOENT
- try:
- os.link(src, dest)
- except OSError, ex:
- if ex.errno != ENOENT:
- raise
-
-def bind_unix_listener(path, backlog=50, user=None):
- pid = os.getpid()
- tempname = '%s.%s.tmp' % (path, pid)
- backname = '%s.%s.bak' % (path, pid)
- unlink(tempname)
- unlink(backname)
- link(path, backname)
- try:
- sock = _socket.socket(_socket.AF_UNIX, _socket.SOCK_STREAM)
- sock.setblocking(0)
- sock.bind(tempname)
- try:
- if user is not None:
- user = pwd.getpwnam(user)
- os.chown(tempname, user.pw_uid, user.pw_gid)
- os.chmod(tempname, 0600)
- sock.listen(backlog)
- try:
- os.rename(tempname, path)
- except:
- os.rename(backname, path)
- backname = None
- raise
- tempname = None
- return sock
- finally:
- if tempname is not None:
- unlink(tempname)
- finally:
- if backname is not None:
- unlink(backname)
-
-class Tweetcast(object):
-
- def __init__(self):
- print "Starting server"
- self.data = []
- self.lastid = 0L
- self.session = Session()
- self.refresh()
-
- def refresh(self):
- print "refreshing"
- query = self.session.query(Tweet).order_by(asc(Tweet.id)).options(joinedload(Tweet.tweet_source)).filter(Tweet.id > self.lastid)
- for tweet in query:
- self.lastid = tweet.id
- self.data.append(anyjson.serialize(tweet.jsondict()))
- print len(self.data), "tweets in memory"
- self.lastrefresh = datetime.datetime.now()
-
- def webserver(self, env, start_response):
- if (datetime.datetime.now() - self.lastrefresh).seconds > 10:
- self.refresh()
- if env['PATH_INFO'] == '/':
- httpquery = parse_qs(env['QUERY_STRING'])
- print "serving tweets to", env['REMOTE_ADDR'], httpquery
- frompos = 0
- if "from" in httpquery:
- frompos = int(httpquery["from"][0])
- result = '%s{"tweets" : [ %s ] }%s'%(
- "%s("%httpquery["callback"][0] if "callback" in httpquery else "",
- ",".join(self.data[frompos:]),
- ")" if "callback" in httpquery else ""
- )
- print "Sending response"
- start_response('200 OK', [('Content-Type', 'application/javascript' if "callback" in httpquery else 'application/json' )])
- return [result]
- else:
- start_response('404 Not Found', [('Content-Type', 'text/html')])
- return ['<h1>Not Found</h1>']
-
- def startserver(self, listener):
-
- if isinstance(listener, str):
- listener = bind_unix_listener(listener)
-
- WSGIServer(listener, self.webserver).serve_forever()
-
-if __name__ == "__main__":
- tc = Tweetcast()
-
- tc.startserver(LISTENER)