diff -r 60dff8a71024 -r 6a073c4a8578 tweetcast/nodejs/node-direct.js --- a/tweetcast/nodejs/node-direct.js Wed Oct 26 18:29:46 2011 +0200 +++ b/tweetcast/nodejs/node-direct.js Thu Oct 27 17:57:53 2011 +0200 @@ -53,19 +53,19 @@ var requete = "CREATE TABLE IF NOT EXISTS tweets ( pos INTEGER PRIMARY KEY, tweet_id TEXT UNIQUE, created_at INTEGER, json TEXT" + annotationMap(function(a) { return ', a_' + a + ' INTEGER' }).join("") + " )"; db.execute(requete, function(err) { - if (err) throw err; - db.execute("CREATE INDEX IF NOT EXISTS idx_created_at ON tweets ( created_at )", function(err) { if (err) throw err; }); + if (err) { myLog("SQLITE error",err.stack); } + db.execute("CREATE INDEX IF NOT EXISTS idx_created_at ON tweets ( created_at )", function(err) { if (err) { myLog("SQLITE error",err.stack); } }); getSendLastPos(); }); - db.execute("CREATE TABLE IF NOT EXISTS tweet_refs ( id INTEGER PRIMARY KEY, from_id TEXT, to_id TEXT, ref_type TEXT )", function(err) { if (err) throw err; }); + db.execute("CREATE TABLE IF NOT EXISTS tweet_refs ( id INTEGER PRIMARY KEY, from_id TEXT, to_id TEXT, ref_type TEXT )", function(err) { if (err) { myLog("SQLITE error",err.stack); } }); } function commitReference(from_id, to_id, ref_type) { db.execute( "INSERT INTO tweet_refs ( from_id, to_id, ref_type ) VALUES ( ?, ?, ? )", [ from_id, to_id, ref_type ], - function(err) { if (err) throw err; } + function(err) { if (err) { myLog("SQLITE error",err.stack); } } ); } @@ -113,7 +113,7 @@ + " )", [ tweet.id, tweet.created_at.valueOf(), JSON.stringify(tweet) ].concat(annotationMap(function(a) { return ann.indexOf(a) == -1 ? 0 : 1 })), function(err) { - if (err) throw err; + if (err) { myLog("SQLITE error",err.stack); } getSendLastPos(); } ); @@ -126,47 +126,136 @@ commitTweet(newdata[i]); } } - myLog("New tweets received"); +// myLog("New tweets received"); +} + +function requestTwitter() { + myLog("Requesting Twitter to track keyword(s): "+tracking_keyword); + var req = https.request({ + host: "stream.twitter.com", + path: "/1/statuses/filter.json", + method: "POST", + headers: { + 'Authorization': 'Basic ' + new Buffer( TWITTER_USER + ":" + TWITTER_PASS ).toString('base64'), + 'Content-Type': 'application/x-www-form-urlencoded' + } + }, function(res) { + myLog('Reply from stream.twitter.com: ' + res.statusCode); + myLog('Headers: ' + JSON.stringify(res.headers)); + res.setEncoding('utf8'); + res.on('data', callBackNewTweets); + res.on('end', function() { + myLog('End Twitter Connection — Trying to reconnect'); + requestTwitter(); + }); + }); + + req.write('track=' + encodeURIComponent(tracking_keyword)); + req.socket.setTimeout(60000); + req.socket.on('timeout', function() { + myLog('TimeOut — Trying to reconnect'); + requestTwitter(); + }); + req.end(); } function getSendLastPos() { db.execute("SELECT MAX(pos) lastpos FROM tweets", function (err, results) { - if (err) throw err; - lastpos = results[0].lastpos ? results[0].lastpos : 0; - myLog("Broadcasting last pos = ",lastpos); - io.sockets.emit('tweetSummary', { - tweetcount : lastpos - }); + if (err) { myLog("SQLITE error",err.stack); } + if (results[0].lastpos != lastpos) { + lastpos = results[0].lastpos ? results[0].lastpos : 0; +// myLog("Broadcasting last pos = ",lastpos); + try { + io.sockets.emit('tweetSummary', { + tweetcount : lastpos + }); + } catch(err) { + myLog("SOCKET.IO error while Broadcasting tweetSummary",err.stack); + } + } + }); +} + +function getSendTweetPosByDate(date, socket) { + db.execute("SELECT pos, created_at, ABS(created_at-" + date + ") AS dist FROM tweets ORDER BY dist ASC LIMIT 0,1", function (err, results) { + if (err) { myLog("SQLITE error",err.stack); } + if (results.length) { + try { + socket.emit('tweetPosByDate', { + tweetpos : results[0].pos, + date : results[0].created_at + }); + } catch(err) { + myLog("SOCKET.IO error while sending tweetPosByDate",err.stack); + } + } }); } - -function getSendTweets(posList, socket) { - myLog("request for tweets ("+posList.join(',')+") from "+socket.id); - db.execute("SELECT * FROM tweets WHERE pos IN ( " + posList.join(',') + " )", function (err, results) { - if (err) throw err; - socket.emit('tweets', - results.map(function(line) { - var tw = JSON.parse(line.json); - tw.pos = line.pos; - return tw; - }) - ); +function getSendLinkedTweets(pos, socket) { + myLog("request for tweets linked to",pos); + db.execute("SELECT A.pos pos_a, A.tweet_id id_a, A.json json_a, B.pos pos_b, B.tweet_id id_b, B.json json_b, ref_type FROM tweets A, tweets B, tweet_refs WHERE id_a = from_id AND id_b = to_id AND (pos_a = ? OR pos_b = ?)", [ pos, pos ], function(err, results) { + if (err) { myLog("SQLITE error: ",err.stack); } + var struct = { + "tweetpos" : pos, + "referencing" : [], + "referenced_by" : [] + }; + for (var i in results) { + if (results[i].pos_a == pos) { + var tw = JSON.parse(results[i].json_b); + tw.pos = results[i].pos_b; + struct.referencing.push({ + "tweet" : tw, + "ref_type" : results[i].ref_type + }); + } else { + var tw = JSON.parse(results[i].json_a); + tw.pos = results[i].pos_a; + struct.referenced_by.push({ + "tweet" : tw, + "ref_type" : results[i].ref_type + }); + } + } + try { + socket.emit('linkedTweets', struct); + } catch(err) { + myLog("SOCKET.IO error while sending linkedTweets: ",err.stack); + } }); } -function getSendTimeline(level, socket) { - myLog("request for timeline ("+level+") from "+socket.id); - var lvl = date_levels[level], +function getSendTweets(posList, socket) { +// myLog("request for tweets ("+posList.join(',')+") from "+socket.id); + db.execute("SELECT * FROM tweets WHERE pos IN ( " + posList.join(',') + " )", function (err, results) { + if (err) { myLog("SQLITE error",err.stack); } + try { + socket.emit('tweets', + results.map(function(line) { + var tw = JSON.parse(line.json); + tw.pos = line.pos; + return tw; + }) + ); + } catch (err) { + myLog("SOCKET.IO error while sending tweets",err.stack); + } + }); +} + +function getSendTimeline(data, socket) { + myLog("request for timeline (",data.level, data.full,") from "+socket.id); + var lvl = date_levels[data.level], requete = "SELECT COUNT(*) AS nb, " + lvl + "*ROUND(created_at/" + lvl + ") AS tranche" + annotationMap(function (a) { return " , SUM(a_" + a + ") AS s_" + a }).join("") - + " FROM tweets GROUP BY tranche ORDER BY tranche DESC LIMIT 0,50"; + + " FROM tweets GROUP BY tranche ORDER BY tranche DESC LIMIT 0," + ( data.full ? "50" : "1" ); db.execute(requete, function (err, results) { - if (err) throw err; + if (err) { myLog("SQLITE error",err.stack); } var tbl = [], lastend = parseInt(results[results.length - 1].tranche); for (var i = results.length - 1; i >= 0; i--) { @@ -188,16 +277,35 @@ } tbl.push(struct); } - socket.emit('timeline', tbl); + try { + socket.emit('timeline', { + "full" : data.full, + "level" : data.level, + "data" : tbl + }); + } catch (err) { + myLog("SOCKET.IO error while sending timeline",err.stack); + } }); } function textids(object) { for (var key in object) { + // Workaround for Unicode bug in socket.io. + + if (typeof object[key] == "string") { + var tmp = ''; + for (var i = 0; i < object[key].length; i++) { + tmp += ( object[key].charCodeAt(i) < 128 ? object[key].charAt(i) : "&#" + object[key].charCodeAt(i) + ";" ); + } + object[key] = tmp; + } + if (key.substr(-2) == 'id') { object[key] = object[key + '_str']; delete object[key + '_str']; } + } } @@ -292,41 +400,34 @@ myLog("Listening on port: "+app_port); myLog("Opening SQLITE file: "+sqlfile); db.open(sqlfile , function(err) { - if (err) throw err; + if (err) { myLog("SQLITE error",err.stack); } createTables(); }); if (RECORD_NEW_TWEETS) { - myLog("Requesting Twitter to track keyword(s): "+tracking_keyword); - var req = https.request({ - host: "stream.twitter.com", - path: "/1/statuses/filter.json", - method: "POST", - headers: { - 'Authorization': 'Basic ' + new Buffer( TWITTER_USER + ":" + TWITTER_PASS ).toString('base64'), - 'Content-Type': 'application/x-www-form-urlencoded' - } - }, function(res) { - myLog('Reply from stream.twitter.com: ' + res.statusCode); - myLog('Headers: ' + JSON.stringify(res.headers)); - res.setEncoding('utf8'); - res.on('data', callBackNewTweets); - }); - - req.write('track=' + encodeURIComponent(tracking_keyword)); - req.end(); + requestTwitter(); } io.set('log level', 0); io.sockets.on('connection', function(socket) { myLog("New connection from", socket.handshake.address.address, "with id=", socket.id); - socket.emit('tweetSummary', { tweetcount : lastpos }); + try { + socket.emit('tweetSummary', { tweetcount : lastpos }); + } catch (err) { + myLog("SOCKET.IO error while sending tweetSummary",err.stack); + } socket.on('updateTweets', function(data) { if (data.tweets.length) { getSendTweets(data.tweets, socket); } }); socket.on('updateTimeline', function(data) { - getSendTimeline(data.level, socket); + getSendTimeline(data, socket); + }); + socket.on('tweetPosByDate', function(data) { + getSendTweetPosByDate(data.date, socket); + }); + socket.on('linkedTweets', function(data) { + getSendLinkedTweets(data.tweetpos, socket); }); }); \ No newline at end of file