1 /* DEFAULT CONFIGURATION */ |
|
2 |
|
3 //var TWITTER_USER = 'materiauxnum', |
|
4 // TWITTER_PASS = 'm473r14ux7w337', |
|
5 var TWITTER_USER = 'raphv', |
|
6 TWITTER_PASS = '7w337LfC23gE', |
|
7 RECORD_NEW_TWEETS = true, |
|
8 commit_script = ''; |
|
9 |
|
10 /* CALLING COMMON CONFIGURATION FILE */ |
|
11 |
|
12 var conf_file = flagOption('-c', 'conf.js'); |
|
13 |
|
14 myLog('Reading Configuration from ' + conf_file); |
|
15 |
|
16 var fs = require('fs'); |
|
17 eval(fs.readFileSync(conf_file,'utf8')); |
|
18 |
|
19 if (typeof sqlfile == "undefined") { |
|
20 sqlfile = __dirname + '/tweets-' + encodeURIComponent(tracking_keyword) + '.sqlite' |
|
21 } |
|
22 |
|
23 /* FUNCTIONS */ |
|
24 |
|
25 function flagOption(flag, defaultValue) { |
|
26 var flagPos = process.argv.indexOf(flag); |
|
27 return ( flagPos != -1 && flagPos < process.argv.length - 1) ? process.argv[flagPos + 1] : defaultValue; |
|
28 } |
|
29 |
|
30 function myLog() { |
|
31 var args = ["\033[1;34m["+new Date().toLocaleTimeString()+"]\033[0m"]; |
|
32 for (var i in arguments) { |
|
33 args.push(arguments[i]); |
|
34 } |
|
35 console.log.apply(console, args); |
|
36 } |
|
37 |
|
38 function annotationMap(callback, options) { |
|
39 var includeDefault = ( options && options.includeDefault ? options.includeDefault : false ); |
|
40 var returnObject = ( options && options.returnObject ? options.returnObject : false ); |
|
41 res = (returnObject ? {} : []); |
|
42 for (var i in annotations) { |
|
43 if (i != "default" || includeDefault) { |
|
44 var el = callback(i, annotations[i]) |
|
45 if (returnObject) { |
|
46 res[i] = el; |
|
47 } else { |
|
48 res.push(el); |
|
49 } |
|
50 } |
|
51 } |
|
52 return res; |
|
53 } |
|
54 |
|
55 function createTables() { |
|
56 |
|
57 var requete = "CREATE TABLE IF NOT EXISTS tweets ( pos INTEGER PRIMARY KEY, tweet_id TEXT UNIQUE, created_at INTEGER, json TEXT" |
|
58 + annotationMap(function(a) { return ', a_' + a + ' INTEGER' }).join("") |
|
59 + " );\n" |
|
60 + "CREATE TABLE IF NOT EXISTS tweet_refs ( id INTEGER PRIMARY KEY, from_id TEXT, to_id TEXT, ref_type TEXT );\n" |
|
61 + "CREATE INDEX IF NOT EXISTS idx_created_at ON tweets ( created_at );"; |
|
62 db.executeScript(requete, function(err) { |
|
63 if (err) { myLog("SQLITE error",err.stack); } |
|
64 getSendLastPos(); |
|
65 }); |
|
66 } |
|
67 |
|
68 function commitReference(from_id, to_id, ref_type) { |
|
69 commit_script += 'INSERT OR IGNORE INTO tweet_refs ( from_id, to_id, ref_type ) VALUES ( "' + from_id + '", "' + to_id + '", "' + ref_type + '" );\n'; |
|
70 } |
|
71 |
|
72 function commitTweet(data) { |
|
73 |
|
74 var tweet = JSON.parse(data), |
|
75 ann = []; |
|
76 |
|
77 if (!tweet.id) { |
|
78 myLog("Error: Could not parse data",data); |
|
79 return; |
|
80 } |
|
81 |
|
82 try { |
|
83 textids(tweet); |
|
84 for (var j in keys_to_delete) { |
|
85 delete tweet[keys_to_delete[j]]; |
|
86 } |
|
87 textids(tweet.user); |
|
88 for (var j in user_keys_to_delete) { |
|
89 delete tweet.user[user_keys_to_delete[j]]; |
|
90 } |
|
91 if (tweet.retweeted_status) { |
|
92 textids(tweet.retweeted_status); |
|
93 for (var j in keys_to_delete) { |
|
94 delete tweet.retweeted_status[keys_to_delete[j]]; |
|
95 } |
|
96 } |
|
97 annotationMap(function(i, annotation) { |
|
98 for (var j in annotation.keywords) { |
|
99 if (tweet.text.search(annotation.keywords[j]) != -1) { |
|
100 ann.push(i); |
|
101 break; |
|
102 } |
|
103 } |
|
104 }); |
|
105 tweet.annotations = ann; |
|
106 tweet.created_at = new Date(tweet.created_at); |
|
107 // myLog("Time delta :",(new Date() - tweet.created_at) / 1000); |
|
108 } catch (err) { |
|
109 myLog("Error while processing tweet",err.stack); |
|
110 } |
|
111 |
|
112 if (tweet.in_reply_to_status_id) { |
|
113 commitReference( tweet.id, tweet.in_reply_to_status_id, "reply" ); |
|
114 } |
|
115 if (tweet.retweeted_status) { |
|
116 commitReference( tweet.id, tweet.retweeted_status.id, "retweet" ); |
|
117 } |
|
118 commit_script += 'INSERT INTO tweets ( tweet_id, created_at, json ' |
|
119 + annotationMap(function(a) { return ', a_' + a }).join("") |
|
120 + ' ) VALUES ( "' |
|
121 + tweet.id |
|
122 + '", ' |
|
123 + tweet.created_at.valueOf() |
|
124 + ', "' |
|
125 + JSON.stringify(tweet).replace(/\"/g, '""') |
|
126 + '"' |
|
127 + annotationMap(function(a) { |
|
128 return ann.indexOf(a) == -1 ? ', 0' : ', 1' |
|
129 }).join("") |
|
130 + ' );\n'; |
|
131 } |
|
132 |
|
133 function callBackNewTweets(chunk) { |
|
134 var newdata = chunk.split('\r\n'); |
|
135 for (var i in newdata) { |
|
136 if (newdata[i].length > 0) { |
|
137 commitTweet(newdata[i]); |
|
138 } |
|
139 } |
|
140 // myLog("Data received - length :",chunk.length); |
|
141 } |
|
142 |
|
143 function requestTwitter() { |
|
144 myLog("Requesting Twitter to track keyword(s): "+tracking_keyword); |
|
145 var req = https.request({ |
|
146 host: "stream.twitter.com", |
|
147 path: "/1/statuses/filter.json", |
|
148 method: "POST", |
|
149 headers: { |
|
150 'Authorization': 'Basic ' + new Buffer( TWITTER_USER + ":" + TWITTER_PASS ).toString('base64'), |
|
151 'Content-Type': 'application/x-www-form-urlencoded' |
|
152 } |
|
153 }, function(res) { |
|
154 myLog('Reply from stream.twitter.com: ' + res.statusCode); |
|
155 myLog('Headers: ' + JSON.stringify(res.headers)); |
|
156 res.setEncoding('utf8'); |
|
157 res.on('data', callBackNewTweets); |
|
158 res.on('end', function() { |
|
159 myLog('End Twitter Connection — Trying to reconnect'); |
|
160 requestTwitter(); |
|
161 }); |
|
162 }); |
|
163 |
|
164 req.write('track=' + encodeURIComponent(tracking_keyword)); |
|
165 req.socket.setTimeout(60000); |
|
166 req.socket.on('timeout', function() { |
|
167 myLog('TimeOut - Trying to reconnect'); |
|
168 requestTwitter(); |
|
169 }); |
|
170 req.end(); |
|
171 } |
|
172 |
|
173 function getSendLastPos() { |
|
174 db.execute("SELECT MAX(pos) lastpos FROM tweets", function (err, results) { |
|
175 if (err) { myLog("SQLITE error",err.stack); } |
|
176 if (results[0].lastpos != lastpos) { |
|
177 lastpos = results[0].lastpos ? results[0].lastpos : 0; |
|
178 try { |
|
179 io.sockets.emit('tweetSummary', { |
|
180 tweetcount : lastpos |
|
181 }); |
|
182 } catch(err) { |
|
183 myLog("SOCKET.IO error while Broadcasting tweetSummary",err.stack); |
|
184 } |
|
185 } |
|
186 }); |
|
187 } |
|
188 |
|
189 function commitTweets() { |
|
190 if (commit_script != '') { |
|
191 var requete = commit_script; |
|
192 commit_script = ''; |
|
193 // console.log(requete); |
|
194 // var reqd = new Date(); |
|
195 db.executeScript(requete, function (err) { |
|
196 if (err) { myLog("SQLITE error",err.stack); } |
|
197 // myLog("Commit took",(new Date() - reqd),"ms"); |
|
198 getSendLastPos(); |
|
199 }); |
|
200 } |
|
201 } |
|
202 |
|
203 function getSendTweetPosByDate(date, socket) { |
|
204 db.execute("SELECT pos, created_at, ABS(created_at-" + date + ") AS dist FROM tweets ORDER BY dist ASC LIMIT 0,9", function (err, results) { |
|
205 if (err) { myLog("SQLITE error",err.stack); } |
|
206 if (results.length) { |
|
207 try { |
|
208 socket.emit('tweetPosByDate', { |
|
209 tweetpos : results[0].pos, |
|
210 date : results[0].created_at |
|
211 }); |
|
212 } catch(err) { |
|
213 myLog("SOCKET.IO error while sending tweetPosByDate",err.stack); |
|
214 } |
|
215 } |
|
216 }); |
|
217 } |
|
218 |
|
219 function getSendLinkedTweets(pos, socket) { |
|
220 // myLog("request for tweets linked to",pos); |
|
221 db.execute("SELECT A.pos pos_a, A.tweet_id id_a, B.pos pos_b, B.tweet_id id_b, ref_type, ABS(B.created_at - A.created_at) delta FROM tweets A, tweets B, tweet_refs WHERE id_a = from_id AND id_b = to_id AND (pos_a = ? OR pos_b = ?) ORDER BY delta ASC LIMIT 0, 10", [ pos, pos ], function(err, results) { |
|
222 if (err) { myLog("SQLITE error: ",err.stack); } |
|
223 var struct = { |
|
224 "tweetpos" : pos, |
|
225 "referencing" : [], |
|
226 "referenced_by" : [] |
|
227 }; |
|
228 for (var i in results) { |
|
229 if (results[i].pos_a == pos) { |
|
230 struct.referencing.push({ |
|
231 "pos" : results[i].pos_b, |
|
232 "ref_type" : results[i].ref_type |
|
233 }); |
|
234 } else { |
|
235 struct.referenced_by.push({ |
|
236 "pos" : results[i].pos_a, |
|
237 "ref_type" : results[i].ref_type |
|
238 }); |
|
239 } |
|
240 } |
|
241 try { |
|
242 socket.emit('linkedTweets', struct); |
|
243 } catch(err) { |
|
244 myLog("SOCKET.IO error while sending linkedTweets: ",err.stack); |
|
245 } |
|
246 }); |
|
247 } |
|
248 |
|
249 function getSendTweets(posList, socket) { |
|
250 // myLog("request for tweets ("+posList.join(',')+") from "+socket.id); |
|
251 db.execute("SELECT * FROM tweets WHERE pos IN ( " + posList.join(',') + " )", function (err, results) { |
|
252 if (err) { myLog("SQLITE error",err.stack); } |
|
253 try { |
|
254 socket.emit('tweets', |
|
255 results.map(function(line) { |
|
256 var tw = JSON.parse(line.json); |
|
257 tw.pos = line.pos; |
|
258 return tw; |
|
259 }) |
|
260 ); |
|
261 } catch (err) { |
|
262 myLog("SOCKET.IO error while sending tweets",err.stack); |
|
263 } |
|
264 }); |
|
265 } |
|
266 |
|
267 function getSendTimeline(data, socket) { |
|
268 // myLog("request for timeline (",data.level, data.full,") from "+socket.id); |
|
269 var lvl = date_levels[data.level], |
|
270 requete = "SELECT COUNT(*) AS nb, " |
|
271 + lvl |
|
272 + "*ROUND(created_at/" |
|
273 + lvl |
|
274 + ") AS tranche" |
|
275 + annotationMap(function (a) { return " , SUM(a_" + a + ") AS s_" + a }).join("") |
|
276 + " FROM tweets GROUP BY tranche ORDER BY tranche DESC LIMIT 0," + ( data.full ? "50" : "1" ); |
|
277 db.execute(requete, function (err, results) { |
|
278 if (err) { myLog("SQLITE error",err.stack); } |
|
279 if (!results.length) { |
|
280 return; |
|
281 } |
|
282 var tbl = [], |
|
283 lastend = parseInt(results[results.length - 1].tranche); |
|
284 for (var i = results.length - 1; i >= 0; i--) { |
|
285 var start = parseInt(results[i].tranche); |
|
286 while (start > lastend) { |
|
287 var struct = { "start": lastend, "tweets" : 0, "annotations" : {} }; |
|
288 lastend += lvl; |
|
289 struct.end = lastend; |
|
290 tbl.push(struct); |
|
291 } |
|
292 lastend += lvl; |
|
293 var struct = { |
|
294 "start" : start, |
|
295 "end" : lastend, |
|
296 "tweets" : results[i].nb, |
|
297 "annotations" : annotationMap(function (a) { |
|
298 return results[i]['s_'+a]; |
|
299 },{returnObject: true}) |
|
300 } |
|
301 tbl.push(struct); |
|
302 } |
|
303 try { |
|
304 socket.emit('timeline', { |
|
305 "full" : data.full, |
|
306 "level" : data.level, |
|
307 "data" : tbl |
|
308 }); |
|
309 } catch (err) { |
|
310 myLog("SOCKET.IO error while sending timeline",err.stack); |
|
311 } |
|
312 }); |
|
313 } |
|
314 |
|
315 function textids(object) { |
|
316 for (var key in object) { |
|
317 // Workaround for Unicode bug in socket.io. |
|
318 |
|
319 if (typeof object[key] == "string") { |
|
320 var tmp = ''; |
|
321 for (var i = 0; i < object[key].length; i++) { |
|
322 tmp += ( object[key].charCodeAt(i) < 128 ? object[key].charAt(i) : "&#" + object[key].charCodeAt(i) + ";" ); |
|
323 } |
|
324 object[key] = tmp; |
|
325 } |
|
326 |
|
327 if (key.substr(-2) == 'id') { |
|
328 object[key] = object[key + '_str']; |
|
329 delete object[key + '_str']; |
|
330 } |
|
331 |
|
332 } |
|
333 } |
|
334 |
|
335 function httpHandler(req, res) { |
|
336 myLog("HTTP Request for URL "+req.url); |
|
337 var url = ( req.url == "/config" ? conf_file : __dirname + "/client" + req.url + ( req.url[req.url.length - 1] == "/" ? "index.html" : "" ) ); |
|
338 fs.readFile( url, function(err, data) { |
|
339 if (err) { |
|
340 myLog("Error 404"); |
|
341 res.writeHead(404); |
|
342 return res.end('File not found'); |
|
343 } |
|
344 res.writeHead(200); |
|
345 res.end(data); |
|
346 }); |
|
347 } |
|
348 |
|
349 /* Initialization */ |
|
350 |
|
351 var http = require('http'), |
|
352 https = require('https'), |
|
353 sqlite = require('sqlite'), |
|
354 socketio = require('socket.io'), |
|
355 tweets = [], |
|
356 lastpos = 0, |
|
357 arcs = [], |
|
358 tweet_ids = [], |
|
359 date_struct = [], |
|
360 date_levels = [ |
|
361 3600 * 1000, |
|
362 15 * 60 * 1000, |
|
363 5 * 60 * 1000, |
|
364 60 * 1000, |
|
365 15 * 1000 |
|
366 ], |
|
367 keys_to_delete = [ |
|
368 'in_reply_to_screen_name', |
|
369 'in_reply_to_user_id', |
|
370 'retweeted', |
|
371 'place', |
|
372 'geo', |
|
373 'source', |
|
374 'contributors', |
|
375 'coordinates', |
|
376 'retweet_count', |
|
377 'favorited', |
|
378 'truncated', |
|
379 'possibly_sensitive' |
|
380 ], |
|
381 user_keys_to_delete = [ |
|
382 'default_profile_image', |
|
383 'show_all_inline_media', |
|
384 'contributors_enabled', |
|
385 'profile_sidebar_fill_color', |
|
386 'created_at', |
|
387 'lang', |
|
388 'time_zone', |
|
389 'profile_sidebar_border_color', |
|
390 'follow_request_sent', |
|
391 'profile_background_image_url', |
|
392 'profile_background_image_url_https', |
|
393 'followers_count', |
|
394 'description', |
|
395 'url', |
|
396 'geo_enabled', |
|
397 'profile_use_background_image', |
|
398 'default_profile', |
|
399 'following', |
|
400 'profile_text_color', |
|
401 'is_translator', |
|
402 'favourites_count', |
|
403 'listed_count', |
|
404 'friends_count', |
|
405 'profile_link_color', |
|
406 'protected', |
|
407 'location', |
|
408 'notifications', |
|
409 'profile_image_url_https', |
|
410 'statuses_count', |
|
411 'verified', |
|
412 'profile_background_color', |
|
413 'profile_background_tile', |
|
414 'utc_offset' |
|
415 ], |
|
416 app = http.createServer(httpHandler), |
|
417 io = socketio.listen(app), |
|
418 db = new sqlite.Database(); |
|
419 |
|
420 /* MAIN CODE */ |
|
421 |
|
422 app.listen(app_port); |
|
423 myLog("Listening on port: "+app_port); |
|
424 myLog("Opening SQLITE file: "+sqlfile); |
|
425 db.open(sqlfile , function(err) { |
|
426 if (err) { myLog("SQLITE error",err.stack); } |
|
427 createTables(); |
|
428 }); |
|
429 |
|
430 setInterval(commitTweets,500); |
|
431 setInterval(function(){myLog("Still alive, tweet count",lastpos)}, 60000); |
|
432 |
|
433 if (RECORD_NEW_TWEETS) { |
|
434 requestTwitter(); |
|
435 } |
|
436 |
|
437 io.set('log level', 0); |
|
438 io.sockets.on('connection', function(socket) { |
|
439 myLog("New connection from", socket.handshake.address.address, "with id=", socket.id); |
|
440 try { |
|
441 socket.emit('tweetSummary', { tweetcount : lastpos }); |
|
442 } catch (err) { |
|
443 myLog("SOCKET.IO error while sending tweetSummary",err.stack); |
|
444 } |
|
445 socket.on('updateTweets', function(data) { |
|
446 if (data.tweets.length) { |
|
447 getSendTweets(data.tweets, socket); |
|
448 } |
|
449 }); |
|
450 socket.on('updateTimeline', function(data) { |
|
451 getSendTimeline(data, socket); |
|
452 }); |
|
453 socket.on('tweetPosByDate', function(data) { |
|
454 getSendTweetPosByDate(data.date, socket); |
|
455 }); |
|
456 socket.on('linkedTweets', function(data) { |
|
457 getSendLinkedTweets(data.tweetpos, socket); |
|
458 }); |
|
459 }); |
|