--- a/tweetcast/nodejs/node-direct.js Thu Oct 27 17:57:53 2011 +0200
+++ b/tweetcast/nodejs/node-direct.js Fri Oct 28 16:46:13 2011 +0200
@@ -55,7 +55,6 @@
db.execute(requete, function(err) {
if (err) { myLog("SQLITE error",err.stack); }
db.execute("CREATE INDEX IF NOT EXISTS idx_created_at ON tweets ( created_at )", function(err) { if (err) { myLog("SQLITE error",err.stack); } });
- getSendLastPos();
});
db.execute("CREATE TABLE IF NOT EXISTS tweet_refs ( id INTEGER PRIMARY KEY, from_id TEXT, to_id TEXT, ref_type TEXT )", function(err) { if (err) { myLog("SQLITE error",err.stack); } });
@@ -73,31 +72,41 @@
var tweet = JSON.parse(data),
ann = [];
-
- textids(tweet);
- for (var j in keys_to_delete) {
- delete tweet[keys_to_delete[j]];
- }
- textids(tweet.user);
- for (var j in user_keys_to_delete) {
- delete tweet.user[user_keys_to_delete[j]];
+
+ if (!tweet.id) {
+ myLog("Error: Could not parse data",data);
+ return;
}
- if (tweet.retweeted_status) {
- textids(tweet.retweeted_status);
+
+ try {
+ textids(tweet);
for (var j in keys_to_delete) {
- delete tweet.retweeted_status[keys_to_delete[j]];
+ delete tweet[keys_to_delete[j]];
}
- }
- annotationMap(function(i, annotation) {
- for (var j in annotation.keywords) {
- if (tweet.text.search(annotation.keywords[j]) != -1) {
- ann.push(i);
- break;
+ textids(tweet.user);
+ for (var j in user_keys_to_delete) {
+ delete tweet.user[user_keys_to_delete[j]];
+ }
+ if (tweet.retweeted_status) {
+ textids(tweet.retweeted_status);
+ for (var j in keys_to_delete) {
+ delete tweet.retweeted_status[keys_to_delete[j]];
}
}
- });
- tweet.annotations = ann;
- tweet.created_at = new Date(tweet.created_at);
+ annotationMap(function(i, annotation) {
+ for (var j in annotation.keywords) {
+ if (tweet.text.search(annotation.keywords[j]) != -1) {
+ ann.push(i);
+ break;
+ }
+ }
+ });
+ tweet.annotations = ann;
+ tweet.created_at = new Date(tweet.created_at);
+// myLog("Time delta :",(new Date() - tweet.created_at) / 1000);
+ } catch (err) {
+ myLog("Error while processing tweet",err.stack);
+ }
if (tweet.in_reply_to_status_id) {
commitReference( tweet.id, tweet.in_reply_to_status_id, "reply" );
@@ -114,7 +123,6 @@
[ tweet.id, tweet.created_at.valueOf(), JSON.stringify(tweet) ].concat(annotationMap(function(a) { return ann.indexOf(a) == -1 ? 0 : 1 })),
function(err) {
if (err) { myLog("SQLITE error",err.stack); }
- getSendLastPos();
}
);
}
@@ -126,7 +134,7 @@
commitTweet(newdata[i]);
}
}
-// myLog("New tweets received");
+ myLog("Data received - length :",chunk.length);
}
function requestTwitter() {
@@ -153,7 +161,7 @@
req.write('track=' + encodeURIComponent(tracking_keyword));
req.socket.setTimeout(60000);
req.socket.on('timeout', function() {
- myLog('TimeOut — Trying to reconnect');
+ myLog('TimeOut - Trying to reconnect');
requestTwitter();
});
req.end();
@@ -177,7 +185,7 @@
}
function getSendTweetPosByDate(date, socket) {
- db.execute("SELECT pos, created_at, ABS(created_at-" + date + ") AS dist FROM tweets ORDER BY dist ASC LIMIT 0,1", function (err, results) {
+ db.execute("SELECT pos, created_at, ABS(created_at-" + date + ") AS dist FROM tweets ORDER BY dist ASC LIMIT 0,9", function (err, results) {
if (err) { myLog("SQLITE error",err.stack); }
if (results.length) {
try {
@@ -193,8 +201,8 @@
}
function getSendLinkedTweets(pos, socket) {
- myLog("request for tweets linked to",pos);
- db.execute("SELECT A.pos pos_a, A.tweet_id id_a, A.json json_a, B.pos pos_b, B.tweet_id id_b, B.json json_b, ref_type FROM tweets A, tweets B, tweet_refs WHERE id_a = from_id AND id_b = to_id AND (pos_a = ? OR pos_b = ?)", [ pos, pos ], function(err, results) {
+// myLog("request for tweets linked to",pos);
+ db.execute("SELECT A.pos pos_a, A.tweet_id id_a, B.pos pos_b, B.tweet_id id_b, ref_type, ABS(B.created_at - A.created_at) delta FROM tweets A, tweets B, tweet_refs WHERE id_a = from_id AND id_b = to_id AND (pos_a = ? OR pos_b = ?) ORDER BY delta ASC LIMIT 0, 10", [ pos, pos ], function(err, results) {
if (err) { myLog("SQLITE error: ",err.stack); }
var struct = {
"tweetpos" : pos,
@@ -203,17 +211,13 @@
};
for (var i in results) {
if (results[i].pos_a == pos) {
- var tw = JSON.parse(results[i].json_b);
- tw.pos = results[i].pos_b;
struct.referencing.push({
- "tweet" : tw,
+ "pos" : results[i].pos_b,
"ref_type" : results[i].ref_type
});
} else {
- var tw = JSON.parse(results[i].json_a);
- tw.pos = results[i].pos_a;
struct.referenced_by.push({
- "tweet" : tw,
+ "pos" : results[i].pos_a,
"ref_type" : results[i].ref_type
});
}
@@ -245,7 +249,7 @@
}
function getSendTimeline(data, socket) {
- myLog("request for timeline (",data.level, data.full,") from "+socket.id);
+// myLog("request for timeline (",data.level, data.full,") from "+socket.id);
var lvl = date_levels[data.level],
requete = "SELECT COUNT(*) AS nb, "
+ lvl
@@ -256,6 +260,9 @@
+ " FROM tweets GROUP BY tranche ORDER BY tranche DESC LIMIT 0," + ( data.full ? "50" : "1" );
db.execute(requete, function (err, results) {
if (err) { myLog("SQLITE error",err.stack); }
+ if (!results.length) {
+ return;
+ }
var tbl = [],
lastend = parseInt(results[results.length - 1].tranche);
for (var i = results.length - 1; i >= 0; i--) {
@@ -404,6 +411,8 @@
createTables();
});
+setInterval(getSendLastPos,300);
+
if (RECORD_NEW_TWEETS) {
requestTwitter();
}