1 #!/usr/bin/env python |
|
2 # -*- coding: utf-8 -*- |
|
3 |
|
4 import anyjson |
|
5 from twisted.enterprise import adbapi |
|
6 from twisted.internet import reactor, task |
|
7 from twisted.internet.protocol import Protocol, Factory |
|
8 from autobahn.websocket import WebSocketServerFactory, WebSocketServerProtocol |
|
9 |
|
10 connectstring = "dbname='tweet_live' user='postgres' host='localhost' password='doiteshimashite'" |
|
11 columns = [ 'id', 'created_at', 'text', 'user_id', 'screen_name', 'profile_image_url' ] |
|
12 selectcommon = "SELECT tweet_tweet.id, tweet_tweet.created_at, text, user_id, screen_name, profile_image_url FROM tweet_tweet JOIN tweet_user ON tweet_tweet.user_id = tweet_user.id" |
|
13 annotations = { |
|
14 "positive" : '++', |
|
15 "negative" : '--', |
|
16 "reference" : '==', |
|
17 "question" : '??' |
|
18 } |
|
19 |
|
20 dbpool = adbapi.ConnectionPool("psycopg2",connectstring) |
|
21 |
|
22 class Tweet: |
|
23 |
|
24 def __init__(self, ligne): |
|
25 self.data = dict((columns[i], str(ligne[i])) for i in range(len(columns))) |
|
26 self.data['annotations'] = [] |
|
27 for a in annotations: |
|
28 n = self.data['text'].count(annotations[a]) |
|
29 if n: |
|
30 self.data['annotations'].append({ |
|
31 "name" : a, |
|
32 "text" : annotations[a], |
|
33 "count" : n |
|
34 }) |
|
35 |
|
36 def __repr__(self): |
|
37 return anyjson.serialize(self.data) |
|
38 |
|
39 class TweetCast: |
|
40 |
|
41 lastid = 0L |
|
42 tweets = [] |
|
43 |
|
44 def __init__(self): |
|
45 self.lastid = 0L |
|
46 self.serverfactory = TweetcastServerFactory(tweetcast=self) |
|
47 dbpool.runQuery("%s ORDER BY tweet_tweet.id DESC LIMIT 20"%selectcommon).addCallback(self.callbackInit) |
|
48 |
|
49 def callbackInit(self, result): |
|
50 if result: |
|
51 self.lastid = result[0][0] |
|
52 for ligne in result: |
|
53 self.tweets.insert(0, Tweet(ligne)) |
|
54 print "Requesting older tweets" |
|
55 task.LoopingCall(self.scheduleTweets).start(1) |
|
56 |
|
57 def scheduleTweets(self): |
|
58 dbpool.runQuery("%s WHERE tweet_tweet.id > %d ORDER BY tweet_tweet.id ASC"%(selectcommon, self.lastid)).addCallback(self.callbackTweets) |
|
59 |
|
60 def callbackTweets(self, result): |
|
61 if result: |
|
62 self.lastid = result[len(result)-1][0] |
|
63 newtweets = [ Tweet(ligne) for ligne in result ] |
|
64 data = { "tweets" : [ tweet.data for tweet in newtweets ] } |
|
65 self.serverfactory.broadcast(anyjson.serialize(data)) |
|
66 print "%d new tweets"%len(result) |
|
67 for tweet in newtweets: |
|
68 self.tweets.append(tweet) |
|
69 |
|
70 class TweetcastServerProtocol(WebSocketServerProtocol): |
|
71 |
|
72 def onOpen(self): |
|
73 self.factory.register(self) |
|
74 if len(self.factory.tweetcast.tweets): |
|
75 data = { "tweets" : [ tweet.data for tweet in self.factory.tweetcast.tweets ] } |
|
76 self.sendMessage(anyjson.serialize(data)) |
|
77 print "sending %d old tweets to new client"%len(self.factory.tweetcast.tweets) |
|
78 |
|
79 def connectionLost(self, reason): |
|
80 WebSocketServerProtocol.connectionLost(self, reason) |
|
81 self.factory.unregister(self) |
|
82 |
|
83 def onMessage(self, msg, binary): |
|
84 print "Got message: " + msg |
|
85 |
|
86 class TweetcastServerFactory(WebSocketServerFactory): |
|
87 |
|
88 protocol = TweetcastServerProtocol |
|
89 |
|
90 def __init__(self, tweetcast=None): |
|
91 WebSocketServerFactory.__init__(self) |
|
92 self.clients = [] |
|
93 self.tweetcast = tweetcast |
|
94 |
|
95 def register(self, client): |
|
96 if not client in self.clients: |
|
97 print "registered client " + client.peerstr |
|
98 self.clients.append(client) |
|
99 |
|
100 def unregister(self, client): |
|
101 if client in self.clients: |
|
102 print "unregistered client " + client.peerstr |
|
103 self.clients.remove(client) |
|
104 |
|
105 def broadcast(self, msg): |
|
106 print "broadcasting ids up to %d" % self.tweetcast.lastid |
|
107 for c in self.clients: |
|
108 print "send to " + c.peerstr |
|
109 c.sendMessage(msg) |
|
110 |
|
111 class FlashPolicySocketProtocol(Protocol): |
|
112 def dataReceived(self, data): |
|
113 self.transport.write("<?xml version=\"1.0\"?><!DOCTYPE cross-domain-policy SYSTEM \"/xml/dtds/cross-domain-policy.dtd\"><cross-domain-policy><site-control permitted-cross-domain-policies=\"master-only\"/><allow-access-from domain=\"*\" to-ports=\"*\" /></cross-domain-policy>\0") |
|
114 |
|
115 class FlashPolicyFactory(Factory): |
|
116 def __init__(self): |
|
117 self.protocol = FlashPolicySocketProtocol; |
|
118 |
|
119 tc = TweetCast() |
|
120 reactor.listenTCP(843, FlashPolicyFactory()) |
|
121 reactor.listenTCP(9000, tc.serverfactory) |
|
122 reactor.run() |
|