tweetcast/nodejs/server/node-direct.js
changeset 326 c28048fb63b4
parent 325 7d9c576bfaac
child 331 03c69425efa6
--- a/tweetcast/nodejs/server/node-direct.js	Mon Oct 17 17:40:58 2011 +0200
+++ b/tweetcast/nodejs/server/node-direct.js	Tue Oct 18 16:19:08 2011 +0200
@@ -1,13 +1,16 @@
 READ_OLD_TWEETS = true;
 RECORD_NEW_TWEETS = true;
-TWEET_FILE_PATH = 'tweets.txt';
-TRACKING_KEYWORD = 'Bieber';
+TWEET_FILE_DIR = './';
+TWEET_FILE_START = 'tweets-';
+TWEET_FILE_EXT = '.txt';
+TRACKING_KEYWORD = '#p2';
 
 var fs = require('fs'),
     https = require('https'),
     io = require('socket.io')
         .listen(8000),
     tweets = [],
+    arcs = [],
     tweet_ids = [],
     date_struct = [],
     date_levels = [
@@ -21,9 +24,12 @@
         'negative' : '--',
         'reference' : '==',
         'question' : '??'
-    }
+    };
 
 function populateDateStruct(level, start) {
+    if (typeof start == "object") {
+        start = start.valueOf();
+    }
     var end = start + date_levels[level],
         struct = {
             "level" : level,
@@ -39,7 +45,7 @@
         }
     } else {
         struct.tweets = [];
-        struct.annotations = [];
+        struct.annotations = {};
     }
     return struct;
 }
@@ -52,13 +58,13 @@
                 insertIntoDateStruct(slices[i].slices, tweet);
             } else {
                 slices[i].tweets.push(tweet.pos);
-                for (var j in tweet.annotations) {
-                    if (slices[i].annotations[j]) {
-                        slices[i].annotations[j]++;
+                if (tweet.annotations.length) {
+                    var ann = tweet.annotations[0];
+                    if (slices[i].annotations[ann]) {
+                        slices[i].annotations[ann].push(tweet.pos);
                     } else {
-                        slices[i].annotations[j] = 1;
+                        slices[i].annotations[ann] = [ tweet.pos ];
                     }
-                    break;
                 }
             }
             break;
@@ -66,6 +72,62 @@
     }
 }
 
+function getSliceContent(slice) {
+    if (slice.slices) {
+        var twz = [],
+            annotations = {};
+        for (var i in slice.slices) {
+            var data = getSliceContent(slice.slices[i]);
+            twz = twz.concat(data.tweets);
+            for (var j in data.annotations) {
+                if (annotations[j]) {
+                    annotations[j] = annotations[j].concat(data.annotations[j]);
+                } else {
+                    annotations[j] = data.annotations[j];
+                }
+            }
+        }
+    } else {
+        twz = slice.tweets;
+        annotations = slice.annotations;
+    }
+    return { "tweets" : twz, "annotations" : annotations }
+};
+
+function flattenDateStruct(slices, target_level) {
+    var current_level = slices[0].level,
+        result = [];
+    if (current_level < target_level) {
+        if (slices[0].slices) {
+            for (var i in slices) {
+                result = result.concat(flattenDateStruct(slices[i].slices, target_level));
+            }
+        }
+    }
+    else {
+        for (var i in slices) {
+            var data = getSliceContent(slices[i]);
+            result.push({
+                "start" : slices[i].start,
+                "end" : slices[i].end,
+                "tweets" : data.tweets,
+                "annotations" : data.annotations
+            });
+        }
+    }
+    return result;
+}
+
+function trimFDS(slices) {
+    while (slices[0].tweets.length == 0) {
+        slices.splice(0,1);
+    }
+    while (slices[slices.length - 1].tweets.length == 0) {
+        slices.pop();
+    }
+    return slices;
+}
+
 function addToList(tweet) {
     if (tweet_ids.indexOf(tweet.id) != -1) {
         console.log("Error: Tweet already in list");
@@ -79,9 +141,29 @@
         date_struct = [ populateDateStruct(0, date_levels[0] * parseInt(creadate / date_levels[0])) ]
     }
     while (creadate > date_struct[date_struct.length - 1].end) {
-        date_struct.push( populateDateStruct(0, date_struct[date_struct.length - 1].end) );
+        date_struct.push( populateDateStruct(0, date_struct[date_struct.length - 1].end.valueOf()) );
     }
     insertIntoDateStruct(date_struct, tweet);
+    if (tweet.in_reply_to_status_id) {
+        var ref = tweet_ids.indexOf(tweet.in_reply_to_status_id)
+        if (ref != -1) {
+            arcs.push({
+                "from" : tweet.pos,
+                "to" : ref,
+                "type" : "reply"
+            });
+        }
+    }
+    if (tweet.retweeted_status) {
+        var ref = tweet_ids.indexOf(tweet.retweeted_status.id_str)
+        if (ref != -1) {
+            arcs.push({
+                "from" : tweet.pos,
+                "to" : ref,
+                "type" : "retweet"
+            });
+        }
+    }
 }
 
 function textids(object) {
@@ -93,33 +175,61 @@
     }
 }
 
-if (READ_OLD_TWEETS) {
-    try {
-        var filebuff = "";
-            readstream = fs.createReadStream(TWEET_FILE_PATH, { flags: 'r', encoding: 'utf-8' });
-        readstream.on("data", function(data) {
-            console.log("data");
-            filebuff += data;
-        });
-        readstream.on("end", function() {
-            console.log("end");
-            oldtweets = filebuff.split('\r\n');
-            var tweetscopied = 0;
-            for (var i in oldtweets) {
-                if (oldtweets[i].length > 0) {
-                    addToList(JSON.parse(oldtweets[i]));
+function readTweetsFromFile() {
+    var dir = fs.readdirSync(TWEET_FILE_DIR),
+        fileName = null;
+    for (var i in dir) {
+        if ( dir[i].indexOf( TWEET_FILE_START + encodeURIComponent(TRACKING_KEYWORD) ) == 0 ) {
+            var oldtweets = fs.readFileSync(dir[i], 'utf8').split('\n'),
+                tweetscopied = 0;
+            for (var j in oldtweets) {
+                if (oldtweets[j].length > 1) {
+                    addToList(JSON.parse(oldtweets[j]));
                     tweetscopied++;
                 }
             }
-            console.log(tweetscopied, "tweets copied from", TWEET_FILE_PATH);
-        });
-    } catch(err) {
-        console.log(err);
+            console.log(tweetscopied, "tweets copied from", dir[i]);
+            console.log(arcs);
+        }
     }
 }
 
+function callBackNewTweets(chunk) {
+    var newdata = chunk.split('\r\n');
+    for (var i in newdata) {
+        if (newdata[i].length > 0) {
+            var tweet = JSON.parse(newdata[i]),
+                annotations = [];
+            for (var a in annkw) {
+                if (tweet.text.indexOf(annkw[a]) != -1) {
+                    annotations.push(a);
+                }
+            }
+            tweet.annotations = annotations;
+            textids(tweet);
+            textids(tweet.user);
+            
+            addToList(tweet);
+            var txt = JSON.stringify(tweet)+'\n';
+            writestream.write(txt);
+        }
+    }
+    console.log("New tweets received. We now have", tweets.length, "tweets in memory");
+    io.sockets.emit('tweetSummary', { tweetcount : tweets.length });
+}
+
+/* MAIN CODE */
+
+if (READ_OLD_TWEETS) {
+    readTweetsFromFile();
+}
+
 if (RECORD_NEW_TWEETS) {
-    var writestream = null;
+    var now = new Date(),
+        fileTimestamp = (1900+now.getYear())+"-"+("0"+(1+now.getMonth())).slice(-2)+"-"+("0"+now.getDate()).slice(-2)+"."+("0"+now.getHours()).slice(-2)+"-"+("0"+now.getMinutes()).slice(-2),
+        fileName = TWEET_FILE_DIR + TWEET_FILE_START + encodeURIComponent(TRACKING_KEYWORD) + '-' + fileTimestamp + TWEET_FILE_EXT,
+        writestream = fs.createWriteStream(fileName, { flags: 'w', encoding: 'utf-8' });
+        console.log('Writing to',fileName);
     var req = https.request({
         host: "stream.twitter.com",
         path: "/1/statuses/filter.json",
@@ -132,34 +242,7 @@
         console.log('STATUS: ' + res.statusCode);
         console.log('HEADERS: ' + JSON.stringify(res.headers));
         res.setEncoding('utf8');
-        res.on('data', function(chunk) {
-            var newdata = chunk.split('\r\n');
-            try {
-                for (var i in newdata) {
-                    if (newdata[i].length > 0) {
-                        var tweet = JSON.parse(newdata[i]),
-                            annotations = [];
-                        for (var a in annkw) {
-                            if (tweet.text.indexOf(annkw[a]) != -1) {
-                                annotations.push(a);
-                            }
-                        }
-                        tweet.annotations = annotations;
-                        textids(tweet);
-                        addToList(tweet);
-                    }
-                }
-                if (!writestream) {
-                    writestream = fs.createWriteStream(TWEET_FILE_PATH, { flags: 'a', encoding: 'utf-8' });
-                }
-                writestream.write(chunk);
-                io.sockets.emit('tweetSummary', { tweetcount : tweets.length });
-                console.log("New tweets received. We now have", tweets.length, "tweets in memory");
-            }
-            catch(err) {
-                console.log(err);
-            }
-        });
+        res.on('data', callBackNewTweets);
     });
     
     req.write('track=' + encodeURIComponent(TRACKING_KEYWORD));
@@ -168,9 +251,15 @@
 
 io.set('log level', 0);
 io.sockets.on('connection', function(socket) {
-    console.log("New connection", socket);
+    console.log("New connection from", socket.handshake.address.address,"id :", socket.id);
     socket.emit('tweetSummary', { tweetcount : tweets.length });
     socket.on('getTweets', function(data) {
-        socket.emit('tweets', tweets.slice(Math.max(0, data.from), data.to));
+        console.log('getTweets from',socket.id);
+        data.tweets = tweets.slice(Math.max(0, data.from), data.to);
+        socket.emit('tweets', data);
+    });
+    socket.on('getTimeline', function(data) {
+        socket.emit('timeline', { "timeline" : trimFDS(flattenDateStruct(date_struct, data.level)), "arcs" : arcs });
+        console.log('getTimeline from',socket.id);
     });
 });
\ No newline at end of file