diff -r 6a073c4a8578 -r a99a04556e3b tweetcast/nodejs/node-direct.js --- a/tweetcast/nodejs/node-direct.js Thu Oct 27 17:57:53 2011 +0200 +++ b/tweetcast/nodejs/node-direct.js Fri Oct 28 16:46:13 2011 +0200 @@ -55,7 +55,6 @@ db.execute(requete, function(err) { if (err) { myLog("SQLITE error",err.stack); } db.execute("CREATE INDEX IF NOT EXISTS idx_created_at ON tweets ( created_at )", function(err) { if (err) { myLog("SQLITE error",err.stack); } }); - getSendLastPos(); }); db.execute("CREATE TABLE IF NOT EXISTS tweet_refs ( id INTEGER PRIMARY KEY, from_id TEXT, to_id TEXT, ref_type TEXT )", function(err) { if (err) { myLog("SQLITE error",err.stack); } }); @@ -73,31 +72,41 @@ var tweet = JSON.parse(data), ann = []; - - textids(tweet); - for (var j in keys_to_delete) { - delete tweet[keys_to_delete[j]]; - } - textids(tweet.user); - for (var j in user_keys_to_delete) { - delete tweet.user[user_keys_to_delete[j]]; + + if (!tweet.id) { + myLog("Error: Could not parse data",data); + return; } - if (tweet.retweeted_status) { - textids(tweet.retweeted_status); + + try { + textids(tweet); for (var j in keys_to_delete) { - delete tweet.retweeted_status[keys_to_delete[j]]; + delete tweet[keys_to_delete[j]]; } - } - annotationMap(function(i, annotation) { - for (var j in annotation.keywords) { - if (tweet.text.search(annotation.keywords[j]) != -1) { - ann.push(i); - break; + textids(tweet.user); + for (var j in user_keys_to_delete) { + delete tweet.user[user_keys_to_delete[j]]; + } + if (tweet.retweeted_status) { + textids(tweet.retweeted_status); + for (var j in keys_to_delete) { + delete tweet.retweeted_status[keys_to_delete[j]]; } } - }); - tweet.annotations = ann; - tweet.created_at = new Date(tweet.created_at); + annotationMap(function(i, annotation) { + for (var j in annotation.keywords) { + if (tweet.text.search(annotation.keywords[j]) != -1) { + ann.push(i); + break; + } + } + }); + tweet.annotations = ann; + tweet.created_at = new Date(tweet.created_at); +// myLog("Time delta :",(new Date() - tweet.created_at) / 1000); + } catch (err) { + myLog("Error while processing tweet",err.stack); + } if (tweet.in_reply_to_status_id) { commitReference( tweet.id, tweet.in_reply_to_status_id, "reply" ); @@ -114,7 +123,6 @@ [ tweet.id, tweet.created_at.valueOf(), JSON.stringify(tweet) ].concat(annotationMap(function(a) { return ann.indexOf(a) == -1 ? 0 : 1 })), function(err) { if (err) { myLog("SQLITE error",err.stack); } - getSendLastPos(); } ); } @@ -126,7 +134,7 @@ commitTweet(newdata[i]); } } -// myLog("New tweets received"); + myLog("Data received - length :",chunk.length); } function requestTwitter() { @@ -153,7 +161,7 @@ req.write('track=' + encodeURIComponent(tracking_keyword)); req.socket.setTimeout(60000); req.socket.on('timeout', function() { - myLog('TimeOut — Trying to reconnect'); + myLog('TimeOut - Trying to reconnect'); requestTwitter(); }); req.end(); @@ -177,7 +185,7 @@ } function getSendTweetPosByDate(date, socket) { - db.execute("SELECT pos, created_at, ABS(created_at-" + date + ") AS dist FROM tweets ORDER BY dist ASC LIMIT 0,1", function (err, results) { + db.execute("SELECT pos, created_at, ABS(created_at-" + date + ") AS dist FROM tweets ORDER BY dist ASC LIMIT 0,9", function (err, results) { if (err) { myLog("SQLITE error",err.stack); } if (results.length) { try { @@ -193,8 +201,8 @@ } function getSendLinkedTweets(pos, socket) { - myLog("request for tweets linked to",pos); - db.execute("SELECT A.pos pos_a, A.tweet_id id_a, A.json json_a, B.pos pos_b, B.tweet_id id_b, B.json json_b, ref_type FROM tweets A, tweets B, tweet_refs WHERE id_a = from_id AND id_b = to_id AND (pos_a = ? OR pos_b = ?)", [ pos, pos ], function(err, results) { +// myLog("request for tweets linked to",pos); + db.execute("SELECT A.pos pos_a, A.tweet_id id_a, B.pos pos_b, B.tweet_id id_b, ref_type, ABS(B.created_at - A.created_at) delta FROM tweets A, tweets B, tweet_refs WHERE id_a = from_id AND id_b = to_id AND (pos_a = ? OR pos_b = ?) ORDER BY delta ASC LIMIT 0, 10", [ pos, pos ], function(err, results) { if (err) { myLog("SQLITE error: ",err.stack); } var struct = { "tweetpos" : pos, @@ -203,17 +211,13 @@ }; for (var i in results) { if (results[i].pos_a == pos) { - var tw = JSON.parse(results[i].json_b); - tw.pos = results[i].pos_b; struct.referencing.push({ - "tweet" : tw, + "pos" : results[i].pos_b, "ref_type" : results[i].ref_type }); } else { - var tw = JSON.parse(results[i].json_a); - tw.pos = results[i].pos_a; struct.referenced_by.push({ - "tweet" : tw, + "pos" : results[i].pos_a, "ref_type" : results[i].ref_type }); } @@ -245,7 +249,7 @@ } function getSendTimeline(data, socket) { - myLog("request for timeline (",data.level, data.full,") from "+socket.id); +// myLog("request for timeline (",data.level, data.full,") from "+socket.id); var lvl = date_levels[data.level], requete = "SELECT COUNT(*) AS nb, " + lvl @@ -256,6 +260,9 @@ + " FROM tweets GROUP BY tranche ORDER BY tranche DESC LIMIT 0," + ( data.full ? "50" : "1" ); db.execute(requete, function (err, results) { if (err) { myLog("SQLITE error",err.stack); } + if (!results.length) { + return; + } var tbl = [], lastend = parseInt(results[results.length - 1].tranche); for (var i = results.length - 1; i >= 0; i--) { @@ -404,6 +411,8 @@ createTables(); }); +setInterval(getSendLastPos,300); + if (RECORD_NEW_TWEETS) { requestTwitter(); }