tweetcast/nodejs/server/node-direct.js
changeset 325 7d9c576bfaac
parent 314 0f1e6ce19b6d
child 326 c28048fb63b4
--- a/tweetcast/nodejs/server/node-direct.js	Fri Oct 14 17:36:34 2011 +0200
+++ b/tweetcast/nodejs/server/node-direct.js	Mon Oct 17 17:40:58 2011 +0200
@@ -1,10 +1,21 @@
+READ_OLD_TWEETS = true;
+RECORD_NEW_TWEETS = true;
+TWEET_FILE_PATH = 'tweets.txt';
+TRACKING_KEYWORD = 'Bieber';
+
 var fs = require('fs'),
     https = require('https'),
     io = require('socket.io')
-        .listen(8000)
-    keyword = "Bieber",
+        .listen(8000),
     tweets = [],
     tweet_ids = [],
+    date_struct = [],
+    date_levels = [
+        3600 * 1000,
+        15 * 60 * 1000,
+        5 * 60 * 1000,
+        60 * 1000
+    ],
     annkw = {
         'positive' : '++',
         'negative' : '--',
@@ -12,6 +23,67 @@
         'question' : '??'
     }
 
+function populateDateStruct(level, start) {
+    var end = start + date_levels[level],
+        struct = {
+            "level" : level,
+            "start" : new Date(start),
+            "end" : new Date(end)
+        };
+    if (level < date_levels.length - 1) {
+        struct.slices = [];
+        var newstart = start;
+        while (newstart < end) {
+            struct.slices.push(populateDateStruct(level + 1, newstart));
+            newstart += date_levels[level + 1];
+        }
+    } else {
+        struct.tweets = [];
+        struct.annotations = [];
+    }
+    return struct;
+}
+
+function insertIntoDateStruct(slices, tweet) {
+    var creadate = new Date(tweet.created_at);
+    for (var i in slices) {
+        if (creadate < slices[i].end) {
+            if (slices[i].slices) {
+                insertIntoDateStruct(slices[i].slices, tweet);
+            } else {
+                slices[i].tweets.push(tweet.pos);
+                for (var j in tweet.annotations) {
+                    if (slices[i].annotations[j]) {
+                        slices[i].annotations[j]++;
+                    } else {
+                        slices[i].annotations[j] = 1;
+                    }
+                    break;
+                }
+            }
+            break;
+        }
+    }
+}
+
+function addToList(tweet) {
+    if (tweet_ids.indexOf(tweet.id) != -1) {
+        console.log("Error: Tweet already in list");
+        return;
+    }
+    tweet.pos = tweets.length;
+    tweets.push(tweet);
+    tweet_ids.push(tweet.id);
+    var creadate = new Date(tweet.created_at);
+    if (!date_struct.length) {
+        date_struct = [ populateDateStruct(0, date_levels[0] * parseInt(creadate / date_levels[0])) ]
+    }
+    while (creadate > date_struct[date_struct.length - 1].end) {
+        date_struct.push( populateDateStruct(0, date_struct[date_struct.length - 1].end) );
+    }
+    insertIntoDateStruct(date_struct, tweet);
+}
+
 function textids(object) {
     for (var key in object) {
         if (key.substr(-2) == 'id') {
@@ -21,54 +93,84 @@
     }
 }
 
-var fd = fs.createWriteStream('tweets.txt');
-
-req = https.request({
-    host: "stream.twitter.com",
-    path: "/1/statuses/filter.json",
-    method: "POST",
-    headers: {
-        'Authorization': 'Basic cmFwaHY6N3czMzdMZkMyM2dF',
-        'Content-Type': 'application/x-www-form-urlencoded'
-    }
-}, function(res) {
-    console.log('STATUS: ' + res.statusCode);
-    console.log('HEADERS: ' + JSON.stringify(res.headers));
-    res.setEncoding('utf8');
-    res.on('data', function(chunk) {
-        newdata = chunk.split('\r\n');
-        try {
-            for (var i in newdata) {
-                if (newdata[i].length > 0) {
-                    tweet = JSON.parse(newdata[i]);
-                    annotations = [];
-                    for (var a in annkw) {
-                        if (tweet.text.indexOf(annkw[a]) != -1) {
-                            annotations.push(a);
-                        }
-                    }
-                    tweet.annotations = annotations;
-                    tweets.push(tweet);
-                    textids(tweet);
-                    tweet_ids.push(tweet.id_str);
-                    io.sockets.emit('newtweet', tweet);
+if (READ_OLD_TWEETS) {
+    try {
+        var filebuff = "";
+            readstream = fs.createReadStream(TWEET_FILE_PATH, { flags: 'r', encoding: 'utf-8' });
+        readstream.on("data", function(data) {
+            console.log("data");
+            filebuff += data;
+        });
+        readstream.on("end", function() {
+            console.log("end");
+            oldtweets = filebuff.split('\r\n');
+            var tweetscopied = 0;
+            for (var i in oldtweets) {
+                if (oldtweets[i].length > 0) {
+                    addToList(JSON.parse(oldtweets[i]));
+                    tweetscopied++;
                 }
             }
-            fd.write(chunk); 
-        }
-        catch(err) {
-            console.log(err);
+            console.log(tweetscopied, "tweets copied from", TWEET_FILE_PATH);
+        });
+    } catch(err) {
+        console.log(err);
+    }
+}
+
+if (RECORD_NEW_TWEETS) {
+    var writestream = null;
+    var req = https.request({
+        host: "stream.twitter.com",
+        path: "/1/statuses/filter.json",
+        method: "POST",
+        headers: {
+            'Authorization': 'Basic cmFwaHY6N3czMzdMZkMyM2dF',
+            'Content-Type': 'application/x-www-form-urlencoded'
         }
+    }, function(res) {
+        console.log('STATUS: ' + res.statusCode);
+        console.log('HEADERS: ' + JSON.stringify(res.headers));
+        res.setEncoding('utf8');
+        res.on('data', function(chunk) {
+            var newdata = chunk.split('\r\n');
+            try {
+                for (var i in newdata) {
+                    if (newdata[i].length > 0) {
+                        var tweet = JSON.parse(newdata[i]),
+                            annotations = [];
+                        for (var a in annkw) {
+                            if (tweet.text.indexOf(annkw[a]) != -1) {
+                                annotations.push(a);
+                            }
+                        }
+                        tweet.annotations = annotations;
+                        textids(tweet);
+                        addToList(tweet);
+                    }
+                }
+                if (!writestream) {
+                    writestream = fs.createWriteStream(TWEET_FILE_PATH, { flags: 'a', encoding: 'utf-8' });
+                }
+                writestream.write(chunk);
+                io.sockets.emit('tweetSummary', { tweetcount : tweets.length });
+                console.log("New tweets received. We now have", tweets.length, "tweets in memory");
+            }
+            catch(err) {
+                console.log(err);
+            }
+        });
     });
-});
+    
+    req.write('track=' + encodeURIComponent(TRACKING_KEYWORD));
+    req.end();
+}
 
-req.write('track=' + encodeURIComponent(keyword));
-req.end();
 io.set('log level', 0);
 io.sockets.on('connection', function(socket) {
-    socket.emit('tweets', tweets.slice(-10));
-    socket.on('tweetsbefore', function(data) {
-        tweetpos = tweet_ids.indexOf(data);
-        socket.emit('oldtweets', tweets.slice(0, tweetpos).slice(-10));
+    console.log("New connection", socket);
+    socket.emit('tweetSummary', { tweetcount : tweets.length });
+    socket.on('getTweets', function(data) {
+        socket.emit('tweets', tweets.slice(Math.max(0, data.from), data.to));
     });
 });
\ No newline at end of file