--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/tweetcast/twisted/server/tweetcast.py Wed Oct 12 18:11:19 2011 +0200
@@ -0,0 +1,122 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import anyjson
+from twisted.enterprise import adbapi
+from twisted.internet import reactor, task
+from twisted.internet.protocol import Protocol, Factory
+from autobahn.websocket import WebSocketServerFactory, WebSocketServerProtocol
+
+connectstring = "dbname='tweet_live' user='postgres' host='localhost' password='doiteshimashite'"
+columns = [ 'id', 'created_at', 'text', 'user_id', 'screen_name', 'profile_image_url' ]
+selectcommon = "SELECT tweet_tweet.id, tweet_tweet.created_at, text, user_id, screen_name, profile_image_url FROM tweet_tweet JOIN tweet_user ON tweet_tweet.user_id = tweet_user.id"
+annotations = {
+ "positive" : '++',
+ "negative" : '--',
+ "reference" : '==',
+ "question" : '??'
+ }
+
+dbpool = adbapi.ConnectionPool("psycopg2",connectstring)
+
+class Tweet:
+
+ def __init__(self, ligne):
+ self.data = dict((columns[i], str(ligne[i])) for i in range(len(columns)))
+ self.data['annotations'] = []
+ for a in annotations:
+ n = self.data['text'].count(annotations[a])
+ if n:
+ self.data['annotations'].append({
+ "name" : a,
+ "text" : annotations[a],
+ "count" : n
+ })
+
+ def __repr__(self):
+ return anyjson.serialize(self.data)
+
+class TweetCast:
+
+ lastid = 0L
+ tweets = []
+
+ def __init__(self):
+ self.lastid = 0L
+ self.serverfactory = TweetcastServerFactory(tweetcast=self)
+ dbpool.runQuery("%s ORDER BY tweet_tweet.id DESC LIMIT 20"%selectcommon).addCallback(self.callbackInit)
+
+ def callbackInit(self, result):
+ if result:
+ self.lastid = result[0][0]
+ for ligne in result:
+ self.tweets.insert(0, Tweet(ligne))
+ print "Requesting older tweets"
+ task.LoopingCall(self.scheduleTweets).start(1)
+
+ def scheduleTweets(self):
+ dbpool.runQuery("%s WHERE tweet_tweet.id > %d ORDER BY tweet_tweet.id ASC"%(selectcommon, self.lastid)).addCallback(self.callbackTweets)
+
+ def callbackTweets(self, result):
+ if result:
+ self.lastid = result[len(result)-1][0]
+ newtweets = [ Tweet(ligne) for ligne in result ]
+ data = { "tweets" : [ tweet.data for tweet in newtweets ] }
+ self.serverfactory.broadcast(anyjson.serialize(data))
+ print "%d new tweets"%len(result)
+ for tweet in newtweets:
+ self.tweets.append(tweet)
+
+class TweetcastServerProtocol(WebSocketServerProtocol):
+
+ def onOpen(self):
+ self.factory.register(self)
+ if len(self.factory.tweetcast.tweets):
+ data = { "tweets" : [ tweet.data for tweet in self.factory.tweetcast.tweets ] }
+ self.sendMessage(anyjson.serialize(data))
+ print "sending %d old tweets to new client"%len(self.factory.tweetcast.tweets)
+
+ def connectionLost(self, reason):
+ WebSocketServerProtocol.connectionLost(self, reason)
+ self.factory.unregister(self)
+
+ def onMessage(self, msg, binary):
+ print "Got message: " + msg
+
+class TweetcastServerFactory(WebSocketServerFactory):
+
+ protocol = TweetcastServerProtocol
+
+ def __init__(self, tweetcast=None):
+ WebSocketServerFactory.__init__(self)
+ self.clients = []
+ self.tweetcast = tweetcast
+
+ def register(self, client):
+ if not client in self.clients:
+ print "registered client " + client.peerstr
+ self.clients.append(client)
+
+ def unregister(self, client):
+ if client in self.clients:
+ print "unregistered client " + client.peerstr
+ self.clients.remove(client)
+
+ def broadcast(self, msg):
+ print "broadcasting ids up to %d" % self.tweetcast.lastid
+ for c in self.clients:
+ print "send to " + c.peerstr
+ c.sendMessage(msg)
+
+class FlashPolicySocketProtocol(Protocol):
+ def dataReceived(self, data):
+ self.transport.write("<?xml version=\"1.0\"?><!DOCTYPE cross-domain-policy SYSTEM \"/xml/dtds/cross-domain-policy.dtd\"><cross-domain-policy><site-control permitted-cross-domain-policies=\"master-only\"/><allow-access-from domain=\"*\" to-ports=\"*\" /></cross-domain-policy>\0")
+
+class FlashPolicyFactory(Factory):
+ def __init__(self):
+ self.protocol = FlashPolicySocketProtocol;
+
+tc = TweetCast()
+reactor.listenTCP(843, FlashPolicyFactory())
+reactor.listenTCP(9000, tc.serverfactory)
+reactor.run()
\ No newline at end of file