tweetcast/nodejs/server/node-direct.js
author Raphael Velt <raph.velt@gmail.com>
Tue, 18 Oct 2011 16:19:08 +0200
changeset 326 c28048fb63b4
parent 325 7d9c576bfaac
child 331 03c69425efa6
permissions -rw-r--r--
Added visual timeline feature to node client

READ_OLD_TWEETS = true;
RECORD_NEW_TWEETS = true;
TWEET_FILE_DIR = './';
TWEET_FILE_START = 'tweets-';
TWEET_FILE_EXT = '.txt';
TRACKING_KEYWORD = '#p2';

var fs = require('fs'),
    https = require('https'),
    io = require('socket.io')
        .listen(8000),
    tweets = [],
    arcs = [],
    tweet_ids = [],
    date_struct = [],
    date_levels = [
        3600 * 1000,
        15 * 60 * 1000,
        5 * 60 * 1000,
        60 * 1000
    ],
    annkw = {
        'positive' : '++',
        'negative' : '--',
        'reference' : '==',
        'question' : '??'
    };

function populateDateStruct(level, start) {
    if (typeof start == "object") {
        start = start.valueOf();
    }
    var end = start + date_levels[level],
        struct = {
            "level" : level,
            "start" : new Date(start),
            "end" : new Date(end)
        };
    if (level < date_levels.length - 1) {
        struct.slices = [];
        var newstart = start;
        while (newstart < end) {
            struct.slices.push(populateDateStruct(level + 1, newstart));
            newstart += date_levels[level + 1];
        }
    } else {
        struct.tweets = [];
        struct.annotations = {};
    }
    return struct;
}

function insertIntoDateStruct(slices, tweet) {
    var creadate = new Date(tweet.created_at);
    for (var i in slices) {
        if (creadate < slices[i].end) {
            if (slices[i].slices) {
                insertIntoDateStruct(slices[i].slices, tweet);
            } else {
                slices[i].tweets.push(tweet.pos);
                if (tweet.annotations.length) {
                    var ann = tweet.annotations[0];
                    if (slices[i].annotations[ann]) {
                        slices[i].annotations[ann].push(tweet.pos);
                    } else {
                        slices[i].annotations[ann] = [ tweet.pos ];
                    }
                }
            }
            break;
        }
    }
}

function getSliceContent(slice) {
    if (slice.slices) {
        var twz = [],
            annotations = {};
        for (var i in slice.slices) {
            var data = getSliceContent(slice.slices[i]);
            twz = twz.concat(data.tweets);
            for (var j in data.annotations) {
                if (annotations[j]) {
                    annotations[j] = annotations[j].concat(data.annotations[j]);
                } else {
                    annotations[j] = data.annotations[j];
                }
            }
        }
    } else {
        twz = slice.tweets;
        annotations = slice.annotations;
    }
    return { "tweets" : twz, "annotations" : annotations }
};

function flattenDateStruct(slices, target_level) {
    var current_level = slices[0].level,
        result = [];
    if (current_level < target_level) {
        if (slices[0].slices) {
            for (var i in slices) {
                result = result.concat(flattenDateStruct(slices[i].slices, target_level));
            }
        }
    }
    else {
        for (var i in slices) {
            var data = getSliceContent(slices[i]);
            result.push({
                "start" : slices[i].start,
                "end" : slices[i].end,
                "tweets" : data.tweets,
                "annotations" : data.annotations
            });
        }
    }
    return result;
}

function trimFDS(slices) {
    while (slices[0].tweets.length == 0) {
        slices.splice(0,1);
    }
    while (slices[slices.length - 1].tweets.length == 0) {
        slices.pop();
    }
    return slices;
}

function addToList(tweet) {
    if (tweet_ids.indexOf(tweet.id) != -1) {
        console.log("Error: Tweet already in list");
        return;
    }
    tweet.pos = tweets.length;
    tweets.push(tweet);
    tweet_ids.push(tweet.id);
    var creadate = new Date(tweet.created_at);
    if (!date_struct.length) {
        date_struct = [ populateDateStruct(0, date_levels[0] * parseInt(creadate / date_levels[0])) ]
    }
    while (creadate > date_struct[date_struct.length - 1].end) {
        date_struct.push( populateDateStruct(0, date_struct[date_struct.length - 1].end.valueOf()) );
    }
    insertIntoDateStruct(date_struct, tweet);
    if (tweet.in_reply_to_status_id) {
        var ref = tweet_ids.indexOf(tweet.in_reply_to_status_id)
        if (ref != -1) {
            arcs.push({
                "from" : tweet.pos,
                "to" : ref,
                "type" : "reply"
            });
        }
    }
    if (tweet.retweeted_status) {
        var ref = tweet_ids.indexOf(tweet.retweeted_status.id_str)
        if (ref != -1) {
            arcs.push({
                "from" : tweet.pos,
                "to" : ref,
                "type" : "retweet"
            });
        }
    }
}

function textids(object) {
    for (var key in object) {
        if (key.substr(-2) == 'id') {
            object[key] = object[key + '_str'];
            delete object[key + '_str'];
        }
    }
}

function readTweetsFromFile() {
    var dir = fs.readdirSync(TWEET_FILE_DIR),
        fileName = null;
    for (var i in dir) {
        if ( dir[i].indexOf( TWEET_FILE_START + encodeURIComponent(TRACKING_KEYWORD) ) == 0 ) {
            var oldtweets = fs.readFileSync(dir[i], 'utf8').split('\n'),
                tweetscopied = 0;
            for (var j in oldtweets) {
                if (oldtweets[j].length > 1) {
                    addToList(JSON.parse(oldtweets[j]));
                    tweetscopied++;
                }
            }
            console.log(tweetscopied, "tweets copied from", dir[i]);
            console.log(arcs);
        }
    }
}

function callBackNewTweets(chunk) {
    var newdata = chunk.split('\r\n');
    for (var i in newdata) {
        if (newdata[i].length > 0) {
            var tweet = JSON.parse(newdata[i]),
                annotations = [];
            for (var a in annkw) {
                if (tweet.text.indexOf(annkw[a]) != -1) {
                    annotations.push(a);
                }
            }
            tweet.annotations = annotations;
            textids(tweet);
            textids(tweet.user);
            
            addToList(tweet);
            var txt = JSON.stringify(tweet)+'\n';
            writestream.write(txt);
        }
    }
    console.log("New tweets received. We now have", tweets.length, "tweets in memory");
    io.sockets.emit('tweetSummary', { tweetcount : tweets.length });
}

/* MAIN CODE */

if (READ_OLD_TWEETS) {
    readTweetsFromFile();
}

if (RECORD_NEW_TWEETS) {
    var now = new Date(),
        fileTimestamp = (1900+now.getYear())+"-"+("0"+(1+now.getMonth())).slice(-2)+"-"+("0"+now.getDate()).slice(-2)+"."+("0"+now.getHours()).slice(-2)+"-"+("0"+now.getMinutes()).slice(-2),
        fileName = TWEET_FILE_DIR + TWEET_FILE_START + encodeURIComponent(TRACKING_KEYWORD) + '-' + fileTimestamp + TWEET_FILE_EXT,
        writestream = fs.createWriteStream(fileName, { flags: 'w', encoding: 'utf-8' });
        console.log('Writing to',fileName);
    var req = https.request({
        host: "stream.twitter.com",
        path: "/1/statuses/filter.json",
        method: "POST",
        headers: {
            'Authorization': 'Basic cmFwaHY6N3czMzdMZkMyM2dF',
            'Content-Type': 'application/x-www-form-urlencoded'
        }
    }, function(res) {
        console.log('STATUS: ' + res.statusCode);
        console.log('HEADERS: ' + JSON.stringify(res.headers));
        res.setEncoding('utf8');
        res.on('data', callBackNewTweets);
    });
    
    req.write('track=' + encodeURIComponent(TRACKING_KEYWORD));
    req.end();
}

io.set('log level', 0);
io.sockets.on('connection', function(socket) {
    console.log("New connection from", socket.handshake.address.address,"id :", socket.id);
    socket.emit('tweetSummary', { tweetcount : tweets.length });
    socket.on('getTweets', function(data) {
        console.log('getTweets from',socket.id);
        data.tweets = tweets.slice(Math.max(0, data.from), data.to);
        socket.emit('tweets', data);
    });
    socket.on('getTimeline', function(data) {
        socket.emit('timeline', { "timeline" : trimFDS(flattenDateStruct(date_struct, data.level)), "arcs" : arcs });
        console.log('getTimeline from',socket.id);
    });
});