tweetcast/nodejs/node-direct.js
changeset 339 6a073c4a8578
parent 338 60dff8a71024
child 340 a99a04556e3b
--- a/tweetcast/nodejs/node-direct.js	Wed Oct 26 18:29:46 2011 +0200
+++ b/tweetcast/nodejs/node-direct.js	Thu Oct 27 17:57:53 2011 +0200
@@ -53,19 +53,19 @@
 
     var requete = "CREATE TABLE IF NOT EXISTS tweets ( pos INTEGER PRIMARY KEY, tweet_id TEXT UNIQUE, created_at INTEGER, json TEXT" + annotationMap(function(a) { return ', a_' + a + ' INTEGER' }).join("") + " )";
     db.execute(requete, function(err) {
-        if (err) throw err;
-        db.execute("CREATE INDEX IF NOT EXISTS idx_created_at ON tweets ( created_at )", function(err) { if (err) throw err; });
+        if (err) { myLog("SQLITE error",err.stack); }
+        db.execute("CREATE INDEX IF NOT EXISTS idx_created_at ON tweets ( created_at )", function(err) { if (err) { myLog("SQLITE error",err.stack); } });
         getSendLastPos();
     });
     
-    db.execute("CREATE TABLE IF NOT EXISTS tweet_refs ( id INTEGER PRIMARY KEY, from_id TEXT, to_id TEXT, ref_type TEXT )", function(err) { if (err) throw err; });
+    db.execute("CREATE TABLE IF NOT EXISTS tweet_refs ( id INTEGER PRIMARY KEY, from_id TEXT, to_id TEXT, ref_type TEXT )", function(err) { if (err) { myLog("SQLITE error",err.stack); } });
 }
 
 function commitReference(from_id, to_id, ref_type) {
     db.execute(
         "INSERT INTO tweet_refs ( from_id, to_id, ref_type ) VALUES ( ?, ?, ? )",
         [ from_id, to_id, ref_type ],
-        function(err) { if (err) throw err; }
+        function(err) { if (err) { myLog("SQLITE error",err.stack); } }
     );
 }
 
@@ -113,7 +113,7 @@
         + " )",
         [ tweet.id, tweet.created_at.valueOf(), JSON.stringify(tweet) ].concat(annotationMap(function(a) { return ann.indexOf(a) == -1 ? 0 : 1 })),
         function(err) {
-            if (err) throw err;
+            if (err) { myLog("SQLITE error",err.stack); }
             getSendLastPos();
         }
     );
@@ -126,47 +126,136 @@
             commitTweet(newdata[i]);
         }
     }
-    myLog("New tweets received");
+//    myLog("New tweets received");
+}
+
+function requestTwitter() {
+    myLog("Requesting Twitter to track keyword(s): "+tracking_keyword);
+    var req = https.request({
+        host: "stream.twitter.com",
+        path: "/1/statuses/filter.json",
+        method: "POST",
+        headers: {
+            'Authorization': 'Basic ' + new Buffer( TWITTER_USER + ":" + TWITTER_PASS ).toString('base64'),
+            'Content-Type': 'application/x-www-form-urlencoded'
+        }
+    }, function(res) {
+        myLog('Reply from stream.twitter.com: ' + res.statusCode);
+        myLog('Headers: ' + JSON.stringify(res.headers));
+        res.setEncoding('utf8');
+        res.on('data', callBackNewTweets);
+        res.on('end', function() {
+            myLog('End Twitter Connection — Trying to reconnect');
+            requestTwitter();
+        });
+    });
+    
+    req.write('track=' + encodeURIComponent(tracking_keyword));
+    req.socket.setTimeout(60000);
+    req.socket.on('timeout', function() {
+        myLog('TimeOut — Trying to reconnect');
+        requestTwitter();
+    });
+    req.end();
 }
 
 function getSendLastPos() {
     db.execute("SELECT MAX(pos) lastpos FROM tweets", function (err, results) {
-        if (err) throw err;
-        lastpos = results[0].lastpos ? results[0].lastpos : 0;
-        myLog("Broadcasting last pos = ",lastpos);
-        io.sockets.emit('tweetSummary', {
-            tweetcount : lastpos
-        });
+        if (err) { myLog("SQLITE error",err.stack); }
+        if (results[0].lastpos != lastpos) {
+            lastpos = results[0].lastpos ? results[0].lastpos : 0;
+//            myLog("Broadcasting last pos = ",lastpos);
+            try {
+                io.sockets.emit('tweetSummary', {
+                    tweetcount : lastpos
+                });
+            } catch(err) {
+                myLog("SOCKET.IO error while Broadcasting tweetSummary",err.stack);
+            }
+        }
+    });
+}
+
+function getSendTweetPosByDate(date, socket) {
+    db.execute("SELECT pos, created_at, ABS(created_at-" + date + ") AS dist FROM tweets ORDER BY dist ASC LIMIT 0,1", function (err, results) {
+        if (err) { myLog("SQLITE error",err.stack); }
+        if (results.length) {
+            try {
+                socket.emit('tweetPosByDate', {
+                    tweetpos : results[0].pos,
+                    date : results[0].created_at
+                });
+            } catch(err) {
+                myLog("SOCKET.IO error while sending tweetPosByDate",err.stack);
+            }
+        }
     });
 }
 
-
-function getSendTweets(posList, socket) {
-    myLog("request for tweets ("+posList.join(',')+") from "+socket.id);
-    db.execute("SELECT * FROM tweets WHERE pos IN ( " + posList.join(',') + " )", function (err, results) {
-        if (err) throw err;
-        socket.emit('tweets',
-            results.map(function(line) {
-                var tw = JSON.parse(line.json);
-                tw.pos = line.pos;
-                return tw;
-            })
-        );
+function getSendLinkedTweets(pos, socket) {
+    myLog("request for tweets linked to",pos);
+    db.execute("SELECT A.pos pos_a, A.tweet_id id_a, A.json json_a, B.pos pos_b, B.tweet_id id_b, B.json json_b, ref_type FROM tweets A, tweets B, tweet_refs WHERE id_a = from_id AND id_b = to_id AND (pos_a = ? OR pos_b = ?)", [ pos, pos ], function(err, results) {
+        if (err) { myLog("SQLITE error: ",err.stack); }
+        var struct = {
+            "tweetpos" : pos,
+            "referencing" : [],
+            "referenced_by" : []
+        };
+        for (var i in results) {
+            if (results[i].pos_a == pos) {
+                var tw = JSON.parse(results[i].json_b);
+                tw.pos = results[i].pos_b;
+                struct.referencing.push({
+                    "tweet" : tw,
+                    "ref_type" : results[i].ref_type
+                });
+            } else {
+                var tw = JSON.parse(results[i].json_a);
+                tw.pos = results[i].pos_a;
+                struct.referenced_by.push({
+                    "tweet" : tw,
+                    "ref_type" : results[i].ref_type
+                });
+            }
+        }
+        try {
+            socket.emit('linkedTweets', struct);
+        } catch(err) {
+            myLog("SOCKET.IO error while sending linkedTweets: ",err.stack);
+        }
     });
 }
 
-function getSendTimeline(level, socket) {
-    myLog("request for timeline ("+level+") from "+socket.id);
-    var lvl = date_levels[level],
+function getSendTweets(posList, socket) {
+//    myLog("request for tweets ("+posList.join(',')+") from "+socket.id);
+    db.execute("SELECT * FROM tweets WHERE pos IN ( " + posList.join(',') + " )", function (err, results) {
+        if (err) { myLog("SQLITE error",err.stack); }
+        try {
+            socket.emit('tweets',
+                results.map(function(line) {
+                    var tw = JSON.parse(line.json);
+                    tw.pos = line.pos;
+                    return tw;
+                })
+            );
+        } catch (err) {
+            myLog("SOCKET.IO error while sending tweets",err.stack);
+        }
+    });
+}
+
+function getSendTimeline(data, socket) {
+    myLog("request for timeline (",data.level, data.full,") from "+socket.id);
+    var lvl = date_levels[data.level],
         requete = "SELECT COUNT(*) AS nb, "
         + lvl
         + "*ROUND(created_at/"
         + lvl
         + ") AS tranche"
         + annotationMap(function (a) { return " , SUM(a_" + a + ") AS s_" + a }).join("")
-        + " FROM tweets GROUP BY tranche ORDER BY tranche DESC LIMIT 0,50";
+        + " FROM tweets GROUP BY tranche ORDER BY tranche DESC LIMIT 0," + ( data.full ? "50" : "1" );
     db.execute(requete, function (err, results) {
-        if (err) throw err;
+        if (err) { myLog("SQLITE error",err.stack); }
         var tbl = [],
             lastend = parseInt(results[results.length - 1].tranche);
         for (var i = results.length - 1; i >= 0; i--) {
@@ -188,16 +277,35 @@
             }
             tbl.push(struct);
         }
-        socket.emit('timeline', tbl);
+        try {
+            socket.emit('timeline', {
+                "full" : data.full,
+                "level" : data.level,
+                "data" : tbl
+            });
+        } catch (err) {
+            myLog("SOCKET.IO error while sending timeline",err.stack);
+        }
     });
 }
 
 function textids(object) {
     for (var key in object) {
+        // Workaround for Unicode bug in socket.io.
+        
+        if (typeof object[key] == "string") {
+            var tmp = '';
+            for (var i = 0; i < object[key].length; i++) {
+                tmp += ( object[key].charCodeAt(i) < 128 ? object[key].charAt(i) : "&#" + object[key].charCodeAt(i) + ";" );
+            }
+            object[key] = tmp;
+        }
+        
         if (key.substr(-2) == 'id') {
             object[key] = object[key + '_str'];
             delete object[key + '_str'];
         }
+        
     }
 }
 
@@ -292,41 +400,34 @@
 myLog("Listening on port: "+app_port);
 myLog("Opening SQLITE file: "+sqlfile);
 db.open(sqlfile , function(err) {
-    if (err) throw err;
+    if (err) { myLog("SQLITE error",err.stack); }
     createTables();
 });
 
 if (RECORD_NEW_TWEETS) {
-    myLog("Requesting Twitter to track keyword(s): "+tracking_keyword);
-    var req = https.request({
-        host: "stream.twitter.com",
-        path: "/1/statuses/filter.json",
-        method: "POST",
-        headers: {
-            'Authorization': 'Basic ' + new Buffer( TWITTER_USER + ":" + TWITTER_PASS ).toString('base64'),
-            'Content-Type': 'application/x-www-form-urlencoded'
-        }
-    }, function(res) {
-        myLog('Reply from stream.twitter.com: ' + res.statusCode);
-        myLog('Headers: ' + JSON.stringify(res.headers));
-        res.setEncoding('utf8');
-        res.on('data', callBackNewTweets);
-    });
-    
-    req.write('track=' + encodeURIComponent(tracking_keyword));
-    req.end();
+    requestTwitter();
 }
 
 io.set('log level', 0);
 io.sockets.on('connection', function(socket) {
     myLog("New connection from", socket.handshake.address.address, "with id=", socket.id);
-    socket.emit('tweetSummary', { tweetcount : lastpos });
+    try {
+        socket.emit('tweetSummary', { tweetcount : lastpos });
+    } catch (err) {
+        myLog("SOCKET.IO error while sending tweetSummary",err.stack);
+    }
     socket.on('updateTweets', function(data) {
         if (data.tweets.length) {
             getSendTweets(data.tweets, socket);
         }
     });
     socket.on('updateTimeline', function(data) {
-        getSendTimeline(data.level, socket);
+        getSendTimeline(data, socket);
+    });
+    socket.on('tweetPosByDate', function(data) {
+        getSendTweetPosByDate(data.date, socket);
+    });
+    socket.on('linkedTweets', function(data) {
+        getSendLinkedTweets(data.tweetpos, socket);
     });
 });
\ No newline at end of file