tweetcast/twisted/server/tweetcast.py
changeset 404 89968844eb7d
parent 403 dd1686ae5506
child 405 6626b728b142
equal deleted inserted replaced
403:dd1686ae5506 404:89968844eb7d
     1 #!/usr/bin/env python
       
     2 # -*- coding: utf-8 -*-
       
     3 
       
     4 import anyjson
       
     5 from twisted.enterprise import adbapi
       
     6 from twisted.internet import reactor, task
       
     7 from twisted.internet.protocol import Protocol, Factory
       
     8 from autobahn.websocket import WebSocketServerFactory, WebSocketServerProtocol
       
     9 
       
    10 connectstring = "dbname='tweet_live' user='postgres' host='localhost' password='doiteshimashite'"
       
    11 columns = [ 'id', 'created_at', 'text', 'user_id', 'screen_name', 'profile_image_url' ]
       
    12 selectcommon = "SELECT tweet_tweet.id, tweet_tweet.created_at, text, user_id, screen_name, profile_image_url FROM tweet_tweet JOIN tweet_user ON tweet_tweet.user_id = tweet_user.id"
       
    13 annotations = {
       
    14 	"positive" : '++',
       
    15 	"negative" : '--',
       
    16 	"reference" : '==',
       
    17 	"question" : '??'
       
    18 	}
       
    19 
       
    20 dbpool = adbapi.ConnectionPool("psycopg2",connectstring)
       
    21 
       
    22 class Tweet:
       
    23 
       
    24 	def __init__(self, ligne):
       
    25 		self.data = dict((columns[i], str(ligne[i])) for i in range(len(columns)))
       
    26 		self.data['annotations'] = []
       
    27 		for a in annotations:
       
    28 			n = self.data['text'].count(annotations[a])
       
    29 			if n:
       
    30 				self.data['annotations'].append({
       
    31 					"name" : a,
       
    32 					"text" : annotations[a],
       
    33 					"count" : n
       
    34 				})
       
    35 
       
    36 	def __repr__(self):
       
    37 		return anyjson.serialize(self.data)
       
    38 
       
    39 class TweetCast:
       
    40 
       
    41 	lastid = 0L
       
    42 	tweets = []
       
    43 	
       
    44 	def __init__(self):
       
    45 		self.lastid = 0L
       
    46 		self.serverfactory = TweetcastServerFactory(tweetcast=self)
       
    47 		dbpool.runQuery("%s ORDER BY tweet_tweet.id DESC LIMIT 20"%selectcommon).addCallback(self.callbackInit)
       
    48 	
       
    49 	def callbackInit(self, result):
       
    50 		if result:
       
    51 			self.lastid = result[0][0]
       
    52 			for ligne in result:
       
    53 				self.tweets.insert(0, Tweet(ligne))
       
    54 		print "Requesting older tweets"
       
    55 		task.LoopingCall(self.scheduleTweets).start(1)
       
    56 
       
    57 	def scheduleTweets(self):
       
    58 		dbpool.runQuery("%s WHERE tweet_tweet.id > %d ORDER BY tweet_tweet.id ASC"%(selectcommon, self.lastid)).addCallback(self.callbackTweets)
       
    59 	
       
    60 	def callbackTweets(self, result):
       
    61 		if result:
       
    62 			self.lastid = result[len(result)-1][0]
       
    63 			newtweets = [ Tweet(ligne) for ligne in result ]
       
    64 			data = { "tweets" : [ tweet.data for tweet in newtweets ] }
       
    65 			self.serverfactory.broadcast(anyjson.serialize(data))
       
    66 			print "%d new tweets"%len(result)
       
    67 			for tweet in newtweets:
       
    68 				self.tweets.append(tweet)
       
    69 
       
    70 class TweetcastServerProtocol(WebSocketServerProtocol):
       
    71 
       
    72 	def onOpen(self):
       
    73 		self.factory.register(self)
       
    74 		if len(self.factory.tweetcast.tweets):
       
    75 			data = { "tweets" : [ tweet.data for tweet in self.factory.tweetcast.tweets ] }
       
    76 			self.sendMessage(anyjson.serialize(data))
       
    77 			print "sending %d old tweets to new client"%len(self.factory.tweetcast.tweets)
       
    78 
       
    79 	def connectionLost(self, reason):
       
    80 		WebSocketServerProtocol.connectionLost(self, reason)
       
    81 		self.factory.unregister(self)
       
    82 
       
    83 	def onMessage(self, msg, binary):
       
    84 		print "Got message: " + msg
       
    85 
       
    86 class TweetcastServerFactory(WebSocketServerFactory):
       
    87  
       
    88    protocol = TweetcastServerProtocol
       
    89  
       
    90    def __init__(self, tweetcast=None):
       
    91       WebSocketServerFactory.__init__(self)
       
    92       self.clients = []
       
    93       self.tweetcast = tweetcast
       
    94  
       
    95    def register(self, client):
       
    96       if not client in self.clients:
       
    97          print "registered client " + client.peerstr
       
    98          self.clients.append(client)
       
    99  
       
   100    def unregister(self, client):
       
   101       if client in self.clients:
       
   102          print "unregistered client " + client.peerstr
       
   103          self.clients.remove(client)
       
   104  
       
   105    def broadcast(self, msg):
       
   106       print "broadcasting ids up to %d" % self.tweetcast.lastid
       
   107       for c in self.clients:
       
   108          print "send to " + c.peerstr
       
   109          c.sendMessage(msg)
       
   110 
       
   111 class FlashPolicySocketProtocol(Protocol):
       
   112     def dataReceived(self, data):
       
   113         self.transport.write("<?xml version=\"1.0\"?><!DOCTYPE cross-domain-policy SYSTEM \"/xml/dtds/cross-domain-policy.dtd\"><cross-domain-policy><site-control permitted-cross-domain-policies=\"master-only\"/><allow-access-from domain=\"*\" to-ports=\"*\" /></cross-domain-policy>\0") 
       
   114  
       
   115 class FlashPolicyFactory(Factory):
       
   116     def __init__(self):
       
   117         self.protocol = FlashPolicySocketProtocol;
       
   118 
       
   119 tc = TweetCast()
       
   120 reactor.listenTCP(843, FlashPolicyFactory())
       
   121 reactor.listenTCP(9000, tc.serverfactory)
       
   122 reactor.run()