|
1 /* CONFIGURATION */ |
|
2 |
|
3 RECORD_NEW_TWEETS = true; |
|
4 DEFAULT_SIO_PORT = 8000; |
|
5 /* Overriden par the "-p" parameter, e.g. node tweetcast.js -p 8080 */ |
|
6 SQLITE_FILE_DIR = __dirname + '/'; |
|
7 SQLITE_FILE_START = 'tweets-'; |
|
8 SQLITE_FILE_EXT = '.sqlite'; |
|
9 DEFAULT_TRACKING_KEYWORD = 'Bieber'; |
|
10 /* Overriden par the "-T" parameter, e.g. node tweetcast.js -T "Bieber" */ |
|
11 TWITTER_USER = 'materiauxnum'; |
|
12 TWITTER_PASS = 'm473r14ux7w337'; |
|
13 |
|
14 /* FUNCTIONS */ |
|
15 |
|
16 function createTables() { |
|
17 |
|
18 var requete = "CREATE TABLE IF NOT EXISTS tweets ( pos INTEGER PRIMARY KEY, tweet_id TEXT UNIQUE, created_at INTEGER, json TEXT" + annotations.map(function(a) { return ', a_' + a + ' INTEGER' }).join("") + " )"; |
|
19 db.execute(requete, function(err) { |
|
20 if (err) throw err; |
|
21 db.execute("CREATE INDEX IF NOT EXISTS idx_created_at ON tweets ( created_at )", function(err) { if (err) throw err; }); |
|
22 getSendLastPos(); |
|
23 }); |
|
24 |
|
25 db.execute("CREATE TABLE IF NOT EXISTS tweet_refs ( id INTEGER PRIMARY KEY, from_id TEXT, to_id TEXT, ref_type TEXT )", function(err) { if (err) throw err; }); |
|
26 } |
|
27 |
|
28 function commitReference(from_id, to_id, ref_type) { |
|
29 db.execute( |
|
30 "INSERT INTO tweet_refs ( from_id, to_id, ref_type ) VALUES ( ?, ?, ? )", |
|
31 [ from_id, to_id, ref_type ], |
|
32 function(err) { if (err) throw err; } |
|
33 ); |
|
34 } |
|
35 |
|
36 function commitTweet(data) { |
|
37 |
|
38 var tweet = JSON.parse(data), |
|
39 ann = []; |
|
40 |
|
41 textids(tweet); |
|
42 for (var j in keys_to_delete) { |
|
43 delete tweet[keys_to_delete[j]]; |
|
44 } |
|
45 textids(tweet.user); |
|
46 for (var j in user_keys_to_delete) { |
|
47 delete tweet.user[user_keys_to_delete[j]]; |
|
48 } |
|
49 if (tweet.retweeted_status) { |
|
50 textids(tweet.retweeted_status); |
|
51 for (var j in keys_to_delete) { |
|
52 delete tweet.retweeted_status[keys_to_delete[j]]; |
|
53 } |
|
54 } |
|
55 for (var i in annotations_keywords) { |
|
56 for (var j in annotations_keywords[i]) { |
|
57 if (tweet.text.indexOf(annotations_keywords[i][j]) != -1) { |
|
58 ann.push(annotations[i]); |
|
59 break; |
|
60 } |
|
61 } |
|
62 } |
|
63 tweet.annotations = ann; |
|
64 tweet.created_at = new Date(tweet.created_at); |
|
65 |
|
66 if (tweet.in_reply_to_status_id) { |
|
67 commitReference( tweet.id, tweet.in_reply_to_status_id, "reply" ); |
|
68 } |
|
69 if (tweet.retweeted_status) { |
|
70 commitReference( tweet.id, tweet.retweeted_status.id, "retweet" ); |
|
71 } |
|
72 db.execute( |
|
73 "INSERT INTO tweets ( tweet_id, created_at, json " |
|
74 + annotations.map(function(a) { return ', a_' + a }).join("") |
|
75 + " ) VALUES ( ?, ?, ? " |
|
76 + annotations.map(function(a) { return ', ?' }).join("") |
|
77 + " )", |
|
78 [ tweet.id, tweet.created_at.valueOf(), JSON.stringify(tweet) ].concat(annotations.map(function(a) { return ann.indexOf(a) == -1 ? 0 : 1 })), |
|
79 function(err) { |
|
80 if (err) throw err; |
|
81 getSendLastPos(); |
|
82 } |
|
83 ); |
|
84 } |
|
85 |
|
86 function callBackNewTweets(chunk) { |
|
87 var newdata = chunk.split('\r\n'); |
|
88 for (var i in newdata) { |
|
89 if (newdata[i].length > 0) { |
|
90 commitTweet(newdata[i]); |
|
91 } |
|
92 } |
|
93 console.log("New tweets received"); |
|
94 } |
|
95 |
|
96 function getSendLastPos() { |
|
97 db.execute("SELECT MAX(pos) lastpos FROM tweets", function (err, results) { |
|
98 if (err) throw err; |
|
99 lastpos = results[0].lastpos ? results[0].lastpos : 0; |
|
100 console.log("Broadcasting last pos = ",lastpos); |
|
101 io.sockets.emit('tweetSummary', { |
|
102 tweetcount : lastpos |
|
103 }); |
|
104 }); |
|
105 } |
|
106 |
|
107 |
|
108 function getSendTweets(posList, socket) { |
|
109 console.log("request for tweets ("+posList.join(',')+") from "+socket.id); |
|
110 db.execute("SELECT * FROM tweets WHERE pos IN ( " + posList.join(',') + " )", function (err, results) { |
|
111 if (err) throw err; |
|
112 socket.emit('tweets', |
|
113 results.map(function(line) { |
|
114 var tw = JSON.parse(line.json); |
|
115 tw.pos = line.pos; |
|
116 return tw; |
|
117 }) |
|
118 ); |
|
119 }); |
|
120 } |
|
121 |
|
122 function getSendTimeline(level, socket) { |
|
123 console.log("request for timeline ("+level+") from "+socket.id); |
|
124 var lvl = date_levels[level], |
|
125 requete = "SELECT COUNT(*) AS nb, " |
|
126 + lvl |
|
127 + "*ROUND(created_at/" |
|
128 + lvl |
|
129 + ") AS tranche" |
|
130 + annotations.map(function (a) { return " , SUM(a_" + a + ") AS s_" + a }).join("") |
|
131 + " FROM tweets GROUP BY tranche ORDER BY tranche DESC LIMIT 0,50"; |
|
132 db.execute(requete, function (err, results) { |
|
133 if (err) throw err; |
|
134 var tbl = [], |
|
135 lastend = parseInt(results[results.length - 1].tranche); |
|
136 for (var i = results.length - 1; i >= 0; i--) { |
|
137 var start = parseInt(results[i].tranche); |
|
138 while (start > lastend) { |
|
139 var struct = { "start": lastend, "tweets" : 0, "annotations" : {} }; |
|
140 lastend += lvl; |
|
141 struct.end = lastend; |
|
142 tbl.push(struct); |
|
143 } |
|
144 lastend += lvl; |
|
145 var struct = { |
|
146 "start" : start, |
|
147 "end" : lastend, |
|
148 "tweets" : results[i].nb, |
|
149 "annotations" : {} |
|
150 } |
|
151 for (var j in annotations) { |
|
152 struct.annotations[annotations[j]] = results[i]['s_' + annotations[j]]; |
|
153 } |
|
154 tbl.push(struct); |
|
155 } |
|
156 socket.emit('timeline', tbl); |
|
157 }); |
|
158 } |
|
159 |
|
160 function textids(object) { |
|
161 for (var key in object) { |
|
162 if (key.substr(-2) == 'id') { |
|
163 object[key] = object[key + '_str']; |
|
164 delete object[key + '_str']; |
|
165 } |
|
166 } |
|
167 } |
|
168 |
|
169 function httpHandler(req, res) { |
|
170 var url = req.url + ( req.url[req.url.length - 1] == "/" ? "index.html" : "" ); |
|
171 fs.readFile(__dirname + "/client" + url, function(err, data) { |
|
172 if (err) { |
|
173 res.writeHead(404); |
|
174 return res.end('File not found'); |
|
175 } |
|
176 res.writeHead(200); |
|
177 res.end(data); |
|
178 }); |
|
179 } |
|
180 |
|
181 /* Initialization */ |
|
182 |
|
183 var fs = require('fs'), |
|
184 http = require('http'), |
|
185 https = require('https'), |
|
186 sqlite = require('sqlite'), |
|
187 socketio = require('socket.io'), |
|
188 tweets = [], |
|
189 lastpos = 0, |
|
190 arcs = [], |
|
191 tweet_ids = [], |
|
192 date_struct = [], |
|
193 date_levels = [ |
|
194 3600 * 1000, |
|
195 15 * 60 * 1000, |
|
196 5 * 60 * 1000, |
|
197 60 * 1000, |
|
198 15 * 1000 |
|
199 ], |
|
200 annotations = [ 'positive', 'negative', 'reference', 'question' ], |
|
201 annotations_keywords = [ [ '++' ], [ '--' ], [ '==' ], [ '??' ] ], |
|
202 annkw = { |
|
203 'positive' : '++', |
|
204 'negative' : '--', |
|
205 'reference' : '==', |
|
206 'question' : '??' |
|
207 }, |
|
208 keys_to_delete = [ |
|
209 'in_reply_to_screen_name', |
|
210 'in_reply_to_user_id', |
|
211 'retweeted', |
|
212 'place', |
|
213 'geo', |
|
214 'source', |
|
215 'contributors', |
|
216 'coordinates', |
|
217 'retweet_count', |
|
218 'favorited', |
|
219 'truncated', |
|
220 'possibly_sensitive' |
|
221 ], |
|
222 user_keys_to_delete = [ |
|
223 'default_profile_image', |
|
224 'show_all_inline_media', |
|
225 'contributors_enabled', |
|
226 'profile_sidebar_fill_color', |
|
227 'created_at', |
|
228 'lang', |
|
229 'time_zone', |
|
230 'profile_sidebar_border_color', |
|
231 'follow_request_sent', |
|
232 'profile_background_image_url', |
|
233 'profile_background_image_url_https', |
|
234 'followers_count', |
|
235 'description', |
|
236 'url', |
|
237 'geo_enabled', |
|
238 'profile_use_background_image', |
|
239 'default_profile', |
|
240 'following', |
|
241 'profile_text_color', |
|
242 'is_translator', |
|
243 'favourites_count', |
|
244 'listed_count', |
|
245 'friends_count', |
|
246 'profile_link_color', |
|
247 'protected', |
|
248 'location', |
|
249 'notifications', |
|
250 'profile_image_url_https', |
|
251 'statuses_count', |
|
252 'verified', |
|
253 'profile_background_color', |
|
254 'profile_background_tile', |
|
255 'utc_offset' |
|
256 ], |
|
257 app = http.createServer(httpHandler), |
|
258 port_flag = process.argv.indexOf("-p"), |
|
259 sio_port = ( port_flag != -1 && port_flag < process.argv.length - 1 && parseInt(process.argv[port_flag + 1]) ? parseInt(process.argv[port_flag + 1]) : DEFAULT_SIO_PORT ) |
|
260 io = socketio.listen(app), |
|
261 track_flag = process.argv.indexOf("-T"), |
|
262 tracking_keyword = ( track_flag != -1 && track_flag < process.argv.length - 1 ? process.argv[track_flag + 1] : DEFAULT_TRACKING_KEYWORD ), |
|
263 sqlfile = SQLITE_FILE_DIR + SQLITE_FILE_START + encodeURIComponent(tracking_keyword) + SQLITE_FILE_EXT, |
|
264 db = new sqlite.Database(); |
|
265 |
|
266 /* MAIN CODE */ |
|
267 |
|
268 app.listen(sio_port); |
|
269 |
|
270 console.log("Listening on port: "+sio_port); |
|
271 console.log("Opening SQLITE file: "+sqlfile); |
|
272 db.open(sqlfile , function(err) { |
|
273 if (err) throw err; |
|
274 createTables(); |
|
275 }); |
|
276 |
|
277 if (RECORD_NEW_TWEETS) { |
|
278 console.log("Requesting Twitter to track keyword(s): "+tracking_keyword); |
|
279 var req = https.request({ |
|
280 host: "stream.twitter.com", |
|
281 path: "/1/statuses/filter.json", |
|
282 method: "POST", |
|
283 headers: { |
|
284 'Authorization': 'Basic ' + new Buffer( TWITTER_USER + ":" + TWITTER_PASS ).toString('base64'), |
|
285 'Content-Type': 'application/x-www-form-urlencoded' |
|
286 } |
|
287 }, function(res) { |
|
288 console.log('Reply from stream.twitter.com: ' + res.statusCode); |
|
289 console.log('Headers: ' + JSON.stringify(res.headers)); |
|
290 res.setEncoding('utf8'); |
|
291 res.on('data', callBackNewTweets); |
|
292 }); |
|
293 |
|
294 req.write('track=' + encodeURIComponent(tracking_keyword)); |
|
295 req.end(); |
|
296 } |
|
297 |
|
298 io.set('log level', 0); |
|
299 io.sockets.on('connection', function(socket) { |
|
300 console.log("New connection from" + socket.handshake.address.address + " with id=" + socket.id); |
|
301 socket.emit('tweetSummary', { tweetcount : tweets.length }); |
|
302 socket.on('updateTweets', function(data) { |
|
303 if (data.tweets.length) { |
|
304 getSendTweets(data.tweets, socket); |
|
305 } |
|
306 }); |
|
307 socket.on('updateTimeline', function(data) { |
|
308 getSendTimeline(data.level, socket); |
|
309 }); |
|
310 }); |