#!/usr/bin/env python
# -*- coding: utf-8 -*-
from gevent import monkey; monkey.patch_all()
# Importer d'abord, sinon exception
import anyjson, gevent, psycopg2
from sqlalchemy import (Boolean, Column, BigInteger, Integer, String,
ForeignKey, DateTime, create_engine, asc, func)
from sqlalchemy.orm import backref, relationship, sessionmaker, joinedload
from sqlalchemy.ext.declarative import declarative_base
from gevent.pywsgi import WSGIServer
from urlparse import parse_qs
import datetime
from server_setup import SQL_CONNECT, LISTENER
import os
import _socket
import pwd
from gevent.server import StreamServer
Base = declarative_base()
engine = create_engine(SQL_CONNECT)
Session = sessionmaker(bind=engine)
class TweetSource(Base):
__tablename__ = 'tweet_tweet_source'
id = Column(Integer, primary_key=True, autoincrement=True)
original_json = Column(String)
received_at = Column(DateTime, default=datetime.datetime.utcnow, index=True)
class Tweet(Base):
__tablename__ = 'tweet_tweet'
id = Column(BigInteger, primary_key=True, autoincrement=False)
tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id'))
tweet_source = relationship("TweetSource", backref="tweet")
def jsondict(self):
tweetdict = anyjson.deserialize(self.tweet_source.original_json)
keys_to_delete = [
'in_reply_to_screen_name',
'in_reply_to_user_id',
'retweeted',
'place',
'geo',
'source',
'contributors',
'coordinates',
'retweet_count',
'favorited',
'truncated',
'possibly_sensitive'
]
user_keys_to_delete = [
'default_profile_image',
'show_all_inline_media',
'contributors_enabled',
'profile_sidebar_fill_color',
'created_at',
'lang',
'time_zone',
'profile_sidebar_border_color',
'follow_request_sent',
'profile_background_image_url',
'profile_background_image_url_https',
'followers_count',
'description',
'url',
'geo_enabled',
'profile_use_background_image',
'default_profile',
'following',
'profile_text_color',
'is_translator',
'favourites_count',
'listed_count',
'friends_count',
'profile_link_color',
'protected',
'location',
'notifications',
'profile_image_url_https',
'statuses_count',
'verified',
'profile_background_color',
'profile_background_tile',
'utc_offset'
]
def textids(dictionary):
idfields = [key for key in dictionary if key[-2:] == 'id']
for key in idfields:
keystr = key + '_str'
if keystr in dictionary:
dictionary[key] = dictionary[keystr]
del dictionary[keystr]
for key in keys_to_delete:
if key in tweetdict:
del tweetdict[key]
for key in user_keys_to_delete:
if key in tweetdict['user']:
del tweetdict['user'][key]
textids(tweetdict)
textids(tweetdict['user'])
if 'retweeted_status' in tweetdict:
for key in keys_to_delete:
if key in tweetdict['retweeted_status']:
del tweetdict['retweeted_status'][key]
for key in user_keys_to_delete:
if key in tweetdict['retweeted_status']['user']:
del tweetdict['retweeted_status']['user'][key]
textids(tweetdict['retweeted_status'])
return tweetdict
def unlink(path):
from errno import ENOENT
try:
os.unlink(path)
except OSError, ex:
if ex.errno != ENOENT:
raise
def link(src, dest):
from errno import ENOENT
try:
os.link(src, dest)
except OSError, ex:
if ex.errno != ENOENT:
raise
def bind_unix_listener(path, backlog=50, user=None):
pid = os.getpid()
tempname = '%s.%s.tmp' % (path, pid)
backname = '%s.%s.bak' % (path, pid)
unlink(tempname)
unlink(backname)
link(path, backname)
try:
sock = _socket.socket(_socket.AF_UNIX, _socket.SOCK_STREAM)
sock.setblocking(0)
sock.bind(tempname)
try:
if user is not None:
user = pwd.getpwnam(user)
os.chown(tempname, user.pw_uid, user.pw_gid)
os.chmod(tempname, 0600)
sock.listen(backlog)
try:
os.rename(tempname, path)
except:
os.rename(backname, path)
backname = None
raise
tempname = None
return sock
finally:
if tempname is not None:
unlink(tempname)
finally:
if backname is not None:
unlink(backname)
class Tweetcast(object):
def __init__(self):
print "Starting server"
self.data = []
self.lastid = 0L
self.session = Session()
self.refresh()
def refresh(self):
print "refreshing"
query = self.session.query(Tweet).order_by(asc(Tweet.id)).options(joinedload(Tweet.tweet_source)).filter(Tweet.id > self.lastid)
for tweet in query:
self.lastid = tweet.id
self.data.append(anyjson.serialize(tweet.jsondict()))
print len(self.data), "tweets in memory"
self.lastrefresh = datetime.datetime.now()
def webserver(self, env, start_response):
if (datetime.datetime.now() - self.lastrefresh).seconds > 10:
self.refresh()
if env['PATH_INFO'] == '/':
httpquery = parse_qs(env['QUERY_STRING'])
print "serving tweets to", env['REMOTE_ADDR'], httpquery
frompos = 0
if "from" in httpquery:
frompos = int(httpquery["from"][0])
result = '%s{"tweets" : [ %s ] }%s'%(
"%s("%httpquery["callback"][0] if "callback" in httpquery else "",
",".join(self.data[frompos:]),
")" if "callback" in httpquery else ""
)
print "Sending response"
start_response('200 OK', [('Content-Type', 'application/javascript' if "callback" in httpquery else 'application/json' )])
return [result]
else:
start_response('404 Not Found', [('Content-Type', 'text/html')])
return ['<h1>Not Found</h1>']
def startserver(self, listener):
if isinstance(listener, str):
listener = bind_unix_listener(listener)
WSGIServer(listener, self.webserver).serve_forever()
if __name__ == "__main__":
tc = Tweetcast()
tc.startserver(LISTENER)