tweetcast/nodejs/node-direct.js
author Raphael Velt <raph.velt@gmail.com>
Wed, 26 Oct 2011 18:29:46 +0200
changeset 338 60dff8a71024
parent 335 5f83c21dee69
child 339 6a073c4a8578
permissions -rw-r--r--
Added alternate configuration

/* DEFAULT CONFIGURATION */

var TWITTER_USER = 'materiauxnum',
    TWITTER_PASS = 'm473r14ux7w337',
    RECORD_NEW_TWEETS = true;

/* CALLING COMMON CONFIGURATION FILE */

var conf_file = flagOption('-c', 'conf.js');

myLog('Reading Configuration from ' + conf_file);

var fs = require('fs');
eval(fs.readFileSync(conf_file,'utf8'));

if (typeof sqlfile == "undefined") {
    sqlfile = __dirname + '/tweets-' + encodeURIComponent(tracking_keyword) + '.sqlite'
    }

/* FUNCTIONS */

function flagOption(flag, defaultValue) {
    var flagPos = process.argv.indexOf(flag);
    return ( flagPos != -1 && flagPos < process.argv.length - 1) ? process.argv[flagPos + 1] : defaultValue;
}

function myLog() {
    var args = ["\033[1;34m["+new Date().toLocaleTimeString()+"]\033[0m"];
    for (var i in arguments) {
        args.push(arguments[i]);
    }
    console.log.apply(console, args);
}

function annotationMap(callback, options) {
    var includeDefault = ( options && options.includeDefault ? options.includeDefault : false );
    var returnObject = ( options && options.returnObject ? options.returnObject : false );
    res = (returnObject ? {} : []);
    for (var i in annotations) {
        if (i != "default" || includeDefault) {
            var el = callback(i, annotations[i])
            if (returnObject) {
                res[i] = el;
            } else {
                res.push(el);
            }
        }
    }
    return res;
}

function createTables() {

    var requete = "CREATE TABLE IF NOT EXISTS tweets ( pos INTEGER PRIMARY KEY, tweet_id TEXT UNIQUE, created_at INTEGER, json TEXT" + annotationMap(function(a) { return ', a_' + a + ' INTEGER' }).join("") + " )";
    db.execute(requete, function(err) {
        if (err) throw err;
        db.execute("CREATE INDEX IF NOT EXISTS idx_created_at ON tweets ( created_at )", function(err) { if (err) throw err; });
        getSendLastPos();
    });
    
    db.execute("CREATE TABLE IF NOT EXISTS tweet_refs ( id INTEGER PRIMARY KEY, from_id TEXT, to_id TEXT, ref_type TEXT )", function(err) { if (err) throw err; });
}

function commitReference(from_id, to_id, ref_type) {
    db.execute(
        "INSERT INTO tweet_refs ( from_id, to_id, ref_type ) VALUES ( ?, ?, ? )",
        [ from_id, to_id, ref_type ],
        function(err) { if (err) throw err; }
    );
}

function commitTweet(data) {

    var tweet = JSON.parse(data),
        ann = [];

    textids(tweet);
    for (var j in keys_to_delete) {
        delete tweet[keys_to_delete[j]];
    }
    textids(tweet.user);
    for (var j in user_keys_to_delete) {
        delete tweet.user[user_keys_to_delete[j]];
    }
    if (tweet.retweeted_status) {
        textids(tweet.retweeted_status);
        for (var j in keys_to_delete) {
            delete tweet.retweeted_status[keys_to_delete[j]];
        }
    }
    annotationMap(function(i, annotation) {
        for (var j in annotation.keywords) {
            if (tweet.text.search(annotation.keywords[j]) != -1) {
                ann.push(i);
                break;
            }
        }
    });
    tweet.annotations = ann;
    tweet.created_at = new Date(tweet.created_at);
    
    if (tweet.in_reply_to_status_id) {
        commitReference( tweet.id, tweet.in_reply_to_status_id, "reply" );
    }
    if (tweet.retweeted_status) {
        commitReference( tweet.id, tweet.retweeted_status.id, "retweet" );
    }
    db.execute(
        "INSERT INTO tweets ( tweet_id, created_at, json "
        + annotationMap(function(a) { return ', a_' + a }).join("")
        + " ) VALUES ( ?, ?, ? "
        + annotationMap(function(a) { return ', ?' }).join("")
        + " )",
        [ tweet.id, tweet.created_at.valueOf(), JSON.stringify(tweet) ].concat(annotationMap(function(a) { return ann.indexOf(a) == -1 ? 0 : 1 })),
        function(err) {
            if (err) throw err;
            getSendLastPos();
        }
    );
}

function callBackNewTweets(chunk) {
    var newdata = chunk.split('\r\n');
    for (var i in newdata) {
        if (newdata[i].length > 0) {
            commitTweet(newdata[i]);
        }
    }
    myLog("New tweets received");
}

function getSendLastPos() {
    db.execute("SELECT MAX(pos) lastpos FROM tweets", function (err, results) {
        if (err) throw err;
        lastpos = results[0].lastpos ? results[0].lastpos : 0;
        myLog("Broadcasting last pos = ",lastpos);
        io.sockets.emit('tweetSummary', {
            tweetcount : lastpos
        });
    });
}


function getSendTweets(posList, socket) {
    myLog("request for tweets ("+posList.join(',')+") from "+socket.id);
    db.execute("SELECT * FROM tweets WHERE pos IN ( " + posList.join(',') + " )", function (err, results) {
        if (err) throw err;
        socket.emit('tweets',
            results.map(function(line) {
                var tw = JSON.parse(line.json);
                tw.pos = line.pos;
                return tw;
            })
        );
    });
}

function getSendTimeline(level, socket) {
    myLog("request for timeline ("+level+") from "+socket.id);
    var lvl = date_levels[level],
        requete = "SELECT COUNT(*) AS nb, "
        + lvl
        + "*ROUND(created_at/"
        + lvl
        + ") AS tranche"
        + annotationMap(function (a) { return " , SUM(a_" + a + ") AS s_" + a }).join("")
        + " FROM tweets GROUP BY tranche ORDER BY tranche DESC LIMIT 0,50";
    db.execute(requete, function (err, results) {
        if (err) throw err;
        var tbl = [],
            lastend = parseInt(results[results.length - 1].tranche);
        for (var i = results.length - 1; i >= 0; i--) {
            var start = parseInt(results[i].tranche);
            while (start > lastend) {
                var struct = { "start": lastend, "tweets" : 0, "annotations" : {} };
                lastend += lvl;
                struct.end = lastend;
                tbl.push(struct);
            }
            lastend += lvl;
            var struct = {
                "start" : start,
                "end" : lastend,
                "tweets" : results[i].nb,
                "annotations" : annotationMap(function (a) {
                    return results[i]['s_'+a];
                },{returnObject: true})
            }
            tbl.push(struct);
        }
        socket.emit('timeline', tbl);
    });
}

function textids(object) {
    for (var key in object) {
        if (key.substr(-2) == 'id') {
            object[key] = object[key + '_str'];
            delete object[key + '_str'];
        }
    }
}

function httpHandler(req, res) {
    myLog("HTTP Request for URL "+req.url);
    var url = ( req.url == "/config" ? conf_file : __dirname + "/client" + req.url + ( req.url[req.url.length - 1] == "/" ? "index.html" : "" ) );
    fs.readFile( url, function(err, data) {
        if (err) {
            myLog("Error 404");
            res.writeHead(404);
            return res.end('File not found');
        }
        res.writeHead(200);
        res.end(data);
    });
}

/* Initialization */

var http = require('http'),
    https = require('https'),
    sqlite = require('sqlite'),
    socketio = require('socket.io'),
    tweets = [],
    lastpos = 0,
    arcs = [],
    tweet_ids = [],
    date_struct = [],
    date_levels = [
        3600 * 1000,
        15 * 60 * 1000,
        5 * 60 * 1000,
        60 * 1000,
        15 * 1000
    ],
    keys_to_delete = [
        'in_reply_to_screen_name',
        'in_reply_to_user_id',
        'retweeted',
        'place',
        'geo',
        'source',
        'contributors',
        'coordinates',
        'retweet_count',
        'favorited',
        'truncated',
        'possibly_sensitive'
    ],
    user_keys_to_delete = [
        'default_profile_image',
        'show_all_inline_media',
        'contributors_enabled',
        'profile_sidebar_fill_color',
        'created_at',
        'lang',
        'time_zone',
        'profile_sidebar_border_color',
        'follow_request_sent',
        'profile_background_image_url',
        'profile_background_image_url_https',
        'followers_count',
        'description',
        'url',
        'geo_enabled',
        'profile_use_background_image',
        'default_profile',
        'following',
        'profile_text_color',
        'is_translator',
        'favourites_count',
        'listed_count',
        'friends_count',
        'profile_link_color',
        'protected',
        'location',
        'notifications',
        'profile_image_url_https',
        'statuses_count',
        'verified',
        'profile_background_color',
        'profile_background_tile',
        'utc_offset'
    ],
    app = http.createServer(httpHandler),
    io = socketio.listen(app),
    db = new sqlite.Database();

/* MAIN CODE */

app.listen(app_port);
myLog("Listening on port: "+app_port);
myLog("Opening SQLITE file: "+sqlfile);
db.open(sqlfile , function(err) {
    if (err) throw err;
    createTables();
});

if (RECORD_NEW_TWEETS) {
    myLog("Requesting Twitter to track keyword(s): "+tracking_keyword);
    var req = https.request({
        host: "stream.twitter.com",
        path: "/1/statuses/filter.json",
        method: "POST",
        headers: {
            'Authorization': 'Basic ' + new Buffer( TWITTER_USER + ":" + TWITTER_PASS ).toString('base64'),
            'Content-Type': 'application/x-www-form-urlencoded'
        }
    }, function(res) {
        myLog('Reply from stream.twitter.com: ' + res.statusCode);
        myLog('Headers: ' + JSON.stringify(res.headers));
        res.setEncoding('utf8');
        res.on('data', callBackNewTweets);
    });
    
    req.write('track=' + encodeURIComponent(tracking_keyword));
    req.end();
}

io.set('log level', 0);
io.sockets.on('connection', function(socket) {
    myLog("New connection from", socket.handshake.address.address, "with id=", socket.id);
    socket.emit('tweetSummary', { tweetcount : lastpos });
    socket.on('updateTweets', function(data) {
        if (data.tweets.length) {
            getSendTweets(data.tweets, socket);
        }
    });
    socket.on('updateTimeline', function(data) {
        getSendTimeline(data.level, socket);
    });
});