tweetcast/twisted/server/tweetcast.py
author Raphael Velt <raph.velt@gmail.com>
Wed, 30 Nov 2011 12:11:59 +0100
changeset 403 dd1686ae5506
parent 311 13702105c5ee
permissions -rwxr-xr-x
minor changes

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import anyjson
from twisted.enterprise import adbapi
from twisted.internet import reactor, task
from twisted.internet.protocol import Protocol, Factory
from autobahn.websocket import WebSocketServerFactory, WebSocketServerProtocol

connectstring = "dbname='tweet_live' user='postgres' host='localhost' password='doiteshimashite'"
columns = [ 'id', 'created_at', 'text', 'user_id', 'screen_name', 'profile_image_url' ]
selectcommon = "SELECT tweet_tweet.id, tweet_tweet.created_at, text, user_id, screen_name, profile_image_url FROM tweet_tweet JOIN tweet_user ON tweet_tweet.user_id = tweet_user.id"
annotations = {
	"positive" : '++',
	"negative" : '--',
	"reference" : '==',
	"question" : '??'
	}

dbpool = adbapi.ConnectionPool("psycopg2",connectstring)

class Tweet:

	def __init__(self, ligne):
		self.data = dict((columns[i], str(ligne[i])) for i in range(len(columns)))
		self.data['annotations'] = []
		for a in annotations:
			n = self.data['text'].count(annotations[a])
			if n:
				self.data['annotations'].append({
					"name" : a,
					"text" : annotations[a],
					"count" : n
				})

	def __repr__(self):
		return anyjson.serialize(self.data)

class TweetCast:

	lastid = 0L
	tweets = []
	
	def __init__(self):
		self.lastid = 0L
		self.serverfactory = TweetcastServerFactory(tweetcast=self)
		dbpool.runQuery("%s ORDER BY tweet_tweet.id DESC LIMIT 20"%selectcommon).addCallback(self.callbackInit)
	
	def callbackInit(self, result):
		if result:
			self.lastid = result[0][0]
			for ligne in result:
				self.tweets.insert(0, Tweet(ligne))
		print "Requesting older tweets"
		task.LoopingCall(self.scheduleTweets).start(1)

	def scheduleTweets(self):
		dbpool.runQuery("%s WHERE tweet_tweet.id > %d ORDER BY tweet_tweet.id ASC"%(selectcommon, self.lastid)).addCallback(self.callbackTweets)
	
	def callbackTweets(self, result):
		if result:
			self.lastid = result[len(result)-1][0]
			newtweets = [ Tweet(ligne) for ligne in result ]
			data = { "tweets" : [ tweet.data for tweet in newtweets ] }
			self.serverfactory.broadcast(anyjson.serialize(data))
			print "%d new tweets"%len(result)
			for tweet in newtweets:
				self.tweets.append(tweet)

class TweetcastServerProtocol(WebSocketServerProtocol):

	def onOpen(self):
		self.factory.register(self)
		if len(self.factory.tweetcast.tweets):
			data = { "tweets" : [ tweet.data for tweet in self.factory.tweetcast.tweets ] }
			self.sendMessage(anyjson.serialize(data))
			print "sending %d old tweets to new client"%len(self.factory.tweetcast.tweets)

	def connectionLost(self, reason):
		WebSocketServerProtocol.connectionLost(self, reason)
		self.factory.unregister(self)

	def onMessage(self, msg, binary):
		print "Got message: " + msg

class TweetcastServerFactory(WebSocketServerFactory):
 
   protocol = TweetcastServerProtocol
 
   def __init__(self, tweetcast=None):
      WebSocketServerFactory.__init__(self)
      self.clients = []
      self.tweetcast = tweetcast
 
   def register(self, client):
      if not client in self.clients:
         print "registered client " + client.peerstr
         self.clients.append(client)
 
   def unregister(self, client):
      if client in self.clients:
         print "unregistered client " + client.peerstr
         self.clients.remove(client)
 
   def broadcast(self, msg):
      print "broadcasting ids up to %d" % self.tweetcast.lastid
      for c in self.clients:
         print "send to " + c.peerstr
         c.sendMessage(msg)

class FlashPolicySocketProtocol(Protocol):
    def dataReceived(self, data):
        self.transport.write("<?xml version=\"1.0\"?><!DOCTYPE cross-domain-policy SYSTEM \"/xml/dtds/cross-domain-policy.dtd\"><cross-domain-policy><site-control permitted-cross-domain-policies=\"master-only\"/><allow-access-from domain=\"*\" to-ports=\"*\" /></cross-domain-policy>\0") 
 
class FlashPolicyFactory(Factory):
    def __init__(self):
        self.protocol = FlashPolicySocketProtocol;

tc = TweetCast()
reactor.listenTCP(843, FlashPolicyFactory())
reactor.listenTCP(9000, tc.serverfactory)
reactor.run()