tweetcast/nodejs/node-direct.js
changeset 340 a99a04556e3b
parent 339 6a073c4a8578
child 341 cab5c9e10f90
--- a/tweetcast/nodejs/node-direct.js	Thu Oct 27 17:57:53 2011 +0200
+++ b/tweetcast/nodejs/node-direct.js	Fri Oct 28 16:46:13 2011 +0200
@@ -55,7 +55,6 @@
     db.execute(requete, function(err) {
         if (err) { myLog("SQLITE error",err.stack); }
         db.execute("CREATE INDEX IF NOT EXISTS idx_created_at ON tweets ( created_at )", function(err) { if (err) { myLog("SQLITE error",err.stack); } });
-        getSendLastPos();
     });
     
     db.execute("CREATE TABLE IF NOT EXISTS tweet_refs ( id INTEGER PRIMARY KEY, from_id TEXT, to_id TEXT, ref_type TEXT )", function(err) { if (err) { myLog("SQLITE error",err.stack); } });
@@ -73,31 +72,41 @@
 
     var tweet = JSON.parse(data),
         ann = [];
-
-    textids(tweet);
-    for (var j in keys_to_delete) {
-        delete tweet[keys_to_delete[j]];
-    }
-    textids(tweet.user);
-    for (var j in user_keys_to_delete) {
-        delete tweet.user[user_keys_to_delete[j]];
+    
+    if (!tweet.id) {
+        myLog("Error: Could not parse data",data);
+        return;
     }
-    if (tweet.retweeted_status) {
-        textids(tweet.retweeted_status);
+    
+    try {
+        textids(tweet);
         for (var j in keys_to_delete) {
-            delete tweet.retweeted_status[keys_to_delete[j]];
+            delete tweet[keys_to_delete[j]];
         }
-    }
-    annotationMap(function(i, annotation) {
-        for (var j in annotation.keywords) {
-            if (tweet.text.search(annotation.keywords[j]) != -1) {
-                ann.push(i);
-                break;
+        textids(tweet.user);
+        for (var j in user_keys_to_delete) {
+            delete tweet.user[user_keys_to_delete[j]];
+        }
+        if (tweet.retweeted_status) {
+            textids(tweet.retweeted_status);
+            for (var j in keys_to_delete) {
+                delete tweet.retweeted_status[keys_to_delete[j]];
             }
         }
-    });
-    tweet.annotations = ann;
-    tweet.created_at = new Date(tweet.created_at);
+        annotationMap(function(i, annotation) {
+            for (var j in annotation.keywords) {
+                if (tweet.text.search(annotation.keywords[j]) != -1) {
+                    ann.push(i);
+                    break;
+                }
+            }
+        });
+        tweet.annotations = ann;
+        tweet.created_at = new Date(tweet.created_at);
+//        myLog("Time delta :",(new Date() - tweet.created_at) / 1000);
+    } catch (err) {
+        myLog("Error while processing tweet",err.stack);
+    }
     
     if (tweet.in_reply_to_status_id) {
         commitReference( tweet.id, tweet.in_reply_to_status_id, "reply" );
@@ -114,7 +123,6 @@
         [ tweet.id, tweet.created_at.valueOf(), JSON.stringify(tweet) ].concat(annotationMap(function(a) { return ann.indexOf(a) == -1 ? 0 : 1 })),
         function(err) {
             if (err) { myLog("SQLITE error",err.stack); }
-            getSendLastPos();
         }
     );
 }
@@ -126,7 +134,7 @@
             commitTweet(newdata[i]);
         }
     }
-//    myLog("New tweets received");
+    myLog("Data received - length :",chunk.length);
 }
 
 function requestTwitter() {
@@ -153,7 +161,7 @@
     req.write('track=' + encodeURIComponent(tracking_keyword));
     req.socket.setTimeout(60000);
     req.socket.on('timeout', function() {
-        myLog('TimeOut — Trying to reconnect');
+        myLog('TimeOut - Trying to reconnect');
         requestTwitter();
     });
     req.end();
@@ -177,7 +185,7 @@
 }
 
 function getSendTweetPosByDate(date, socket) {
-    db.execute("SELECT pos, created_at, ABS(created_at-" + date + ") AS dist FROM tweets ORDER BY dist ASC LIMIT 0,1", function (err, results) {
+    db.execute("SELECT pos, created_at, ABS(created_at-" + date + ") AS dist FROM tweets ORDER BY dist ASC LIMIT 0,9", function (err, results) {
         if (err) { myLog("SQLITE error",err.stack); }
         if (results.length) {
             try {
@@ -193,8 +201,8 @@
 }
 
 function getSendLinkedTweets(pos, socket) {
-    myLog("request for tweets linked to",pos);
-    db.execute("SELECT A.pos pos_a, A.tweet_id id_a, A.json json_a, B.pos pos_b, B.tweet_id id_b, B.json json_b, ref_type FROM tweets A, tweets B, tweet_refs WHERE id_a = from_id AND id_b = to_id AND (pos_a = ? OR pos_b = ?)", [ pos, pos ], function(err, results) {
+//    myLog("request for tweets linked to",pos);
+    db.execute("SELECT A.pos pos_a, A.tweet_id id_a, B.pos pos_b, B.tweet_id id_b, ref_type, ABS(B.created_at - A.created_at) delta FROM tweets A, tweets B, tweet_refs WHERE id_a = from_id AND id_b = to_id AND (pos_a = ? OR pos_b = ?) ORDER BY delta ASC LIMIT 0, 10", [ pos, pos ], function(err, results) {
         if (err) { myLog("SQLITE error: ",err.stack); }
         var struct = {
             "tweetpos" : pos,
@@ -203,17 +211,13 @@
         };
         for (var i in results) {
             if (results[i].pos_a == pos) {
-                var tw = JSON.parse(results[i].json_b);
-                tw.pos = results[i].pos_b;
                 struct.referencing.push({
-                    "tweet" : tw,
+                    "pos" : results[i].pos_b,
                     "ref_type" : results[i].ref_type
                 });
             } else {
-                var tw = JSON.parse(results[i].json_a);
-                tw.pos = results[i].pos_a;
                 struct.referenced_by.push({
-                    "tweet" : tw,
+                    "pos" : results[i].pos_a,
                     "ref_type" : results[i].ref_type
                 });
             }
@@ -245,7 +249,7 @@
 }
 
 function getSendTimeline(data, socket) {
-    myLog("request for timeline (",data.level, data.full,") from "+socket.id);
+//    myLog("request for timeline (",data.level, data.full,") from "+socket.id);
     var lvl = date_levels[data.level],
         requete = "SELECT COUNT(*) AS nb, "
         + lvl
@@ -256,6 +260,9 @@
         + " FROM tweets GROUP BY tranche ORDER BY tranche DESC LIMIT 0," + ( data.full ? "50" : "1" );
     db.execute(requete, function (err, results) {
         if (err) { myLog("SQLITE error",err.stack); }
+        if (!results.length) {
+            return;
+        }
         var tbl = [],
             lastend = parseInt(results[results.length - 1].tranche);
         for (var i = results.length - 1; i >= 0; i--) {
@@ -404,6 +411,8 @@
     createTables();
 });
 
+setInterval(getSendLastPos,300);
+
 if (RECORD_NEW_TWEETS) {
     requestTwitter();
 }