diff -r 526d3e411736 -r 13702105c5ee tweetcast/server/tweetcast.py --- a/tweetcast/server/tweetcast.py Mon Oct 10 15:24:28 2011 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,96 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import anyjson -from twisted.enterprise import adbapi -from twisted.internet import reactor, task -from twisted.internet.protocol import Protocol, Factory -from autobahn.websocket import WebSocketServerFactory, WebSocketServerProtocol - -connectstring = "dbname='tweet_live' user='postgres' host='localhost' password='doiteshimashite'" -columns = [ 'id', 'created_at', 'text', 'user_id', 'screen_name', 'profile_image_url' ] -selectcommon = "SELECT tweet_tweet.id, tweet_tweet.created_at, text, user_id, screen_name, profile_image_url FROM tweet_tweet JOIN tweet_user ON tweet_tweet.user_id = tweet_user.id" - -dbpool = adbapi.ConnectionPool("psycopg2",connectstring) - -class TweetCast: - - def __init__(self): - self.lastid = 0L - self.serverfactory = TweetcastServerFactory(tweetcast=self) - dbpool.runQuery("SELECT MAX(tweet_tweet.id) FROM tweet_tweet").addCallback(self.callbackInit) - - def callbackInit(self, result): - self.lastid = result[0][0] - task.LoopingCall(self.scheduleTweets).start(1) - - def scheduleTweets(self): - dbpool.runQuery("%s WHERE tweet_tweet.id > %d ORDER BY tweet_tweet.id ASC"%(selectcommon, self.lastid)).addCallback(self.callbackTweets) - - def callbackTweets(self, result): - if result: - self.lastid = result[len(result)-1][0] - data = [dict((columns[i], str(ligne[i])) for i in range(len(columns))) for ligne in result] - else: - data = None - self.serverfactory.broadcast(anyjson.serialize(data)) - -class TweetcastServerProtocol(WebSocketServerProtocol): - - def onOpen(self): - self.factory.register(self) - dbpool.runQuery("%s WHERE tweet_tweet.id <= %d ORDER BY tweet_tweet.id DESC LIMIT 100"%(selectcommon, self.factory.tweetcast.lastid)).addCallback(self.callbackOldTweets) - - def callbackOldTweets(self, result): - if result: - data = [dict((columns[i], str(ligne[i])) for i in range(len(columns))) for ligne in result] - data.reverse() - else: - data = None - self.sendMessage(anyjson.serialize(data)) - print "sending old tweets to new client" - - def connectionLost(self, reason): - WebSocketServerProtocol.connectionLost(self, reason) - self.factory.unregister(self) - - def onMessage(self, msg, binary): - print "Got message: " + msg - -class TweetcastServerFactory(WebSocketServerFactory): - - protocol = TweetcastServerProtocol - - def __init__(self, tweetcast=None): - WebSocketServerFactory.__init__(self) - self.clients = [] - self.tweetcast = tweetcast - - def register(self, client): - if not client in self.clients: - print "registered client " + client.peerstr - self.clients.append(client) - - def unregister(self, client): - if client in self.clients: - print "unregistered client " + client.peerstr - self.clients.remove(client) - - def broadcast(self, msg): - print "broadcasting ids up to %d" % self.tweetcast.lastid - for c in self.clients: - print "send to " + c.peerstr - c.sendMessage(msg) - -class FlashPolicySocketProtocol(Protocol): - def dataReceived(self, data): - self.transport.write("\0") - -class FlashPolicyFactory(Factory): - def __init__(self): - self.protocol = FlashPolicySocketProtocol; - -tc = TweetCast() -reactor.listenTCP(843, FlashPolicyFactory()) -reactor.listenTCP(9000, tc.serverfactory) -reactor.run() \ No newline at end of file