tweetcast/nodejs/node-direct.js
changeset 410 bf5cf5a9e737
parent 409 f7ceddf99d6d
parent 406 86b62b98fea7
child 411 0471e6eb8a1b
child 412 97c082990f8d
--- a/tweetcast/nodejs/node-direct.js	Wed Dec 07 19:28:46 2011 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,459 +0,0 @@
-/* DEFAULT CONFIGURATION */
-
-//var TWITTER_USER = 'materiauxnum',
-//    TWITTER_PASS = 'm473r14ux7w337',
-var TWITTER_USER = 'raphv',
-    TWITTER_PASS = '7w337LfC23gE',
-    RECORD_NEW_TWEETS = true,
-    commit_script = '';
-
-/* CALLING COMMON CONFIGURATION FILE */
-
-var conf_file = flagOption('-c', 'conf.js');
-
-myLog('Reading Configuration from ' + conf_file);
-
-var fs = require('fs');
-eval(fs.readFileSync(conf_file,'utf8'));
-
-if (typeof sqlfile == "undefined") {
-    sqlfile = __dirname + '/tweets-' + encodeURIComponent(tracking_keyword) + '.sqlite'
-    }
-
-/* FUNCTIONS */
-
-function flagOption(flag, defaultValue) {
-    var flagPos = process.argv.indexOf(flag);
-    return ( flagPos != -1 && flagPos < process.argv.length - 1) ? process.argv[flagPos + 1] : defaultValue;
-}
-
-function myLog() {
-    var args = ["\033[1;34m["+new Date().toLocaleTimeString()+"]\033[0m"];
-    for (var i in arguments) {
-        args.push(arguments[i]);
-    }
-    console.log.apply(console, args);
-}
-
-function annotationMap(callback, options) {
-    var includeDefault = ( options && options.includeDefault ? options.includeDefault : false );
-    var returnObject = ( options && options.returnObject ? options.returnObject : false );
-    res = (returnObject ? {} : []);
-    for (var i in annotations) {
-        if (i != "default" || includeDefault) {
-            var el = callback(i, annotations[i])
-            if (returnObject) {
-                res[i] = el;
-            } else {
-                res.push(el);
-            }
-        }
-    }
-    return res;
-}
-
-function createTables() {
-
-    var requete = "CREATE TABLE IF NOT EXISTS tweets ( pos INTEGER PRIMARY KEY, tweet_id TEXT UNIQUE, created_at INTEGER, json TEXT"
-        + annotationMap(function(a) { return ', a_' + a + ' INTEGER' }).join("")
-        + " );\n"
-        + "CREATE TABLE IF NOT EXISTS tweet_refs ( id INTEGER PRIMARY KEY, from_id TEXT, to_id TEXT, ref_type TEXT );\n"
-        + "CREATE INDEX IF NOT EXISTS idx_created_at ON tweets ( created_at );";
-    db.executeScript(requete, function(err) {
-        if (err) { myLog("SQLITE error",err.stack); }
-        getSendLastPos();
-    });
-}
-
-function commitReference(from_id, to_id, ref_type) {
-    commit_script += 'INSERT OR IGNORE INTO tweet_refs ( from_id, to_id, ref_type ) VALUES ( "' + from_id + '", "' + to_id + '", "' + ref_type + '" );\n';
-}
-
-function commitTweet(data) {
-
-    var tweet = JSON.parse(data),
-        ann = [];
-    
-    if (!tweet.id) {
-        myLog("Error: Could not parse data",data);
-        return;
-    }
-    
-    try {
-        textids(tweet);
-        for (var j in keys_to_delete) {
-            delete tweet[keys_to_delete[j]];
-        }
-        textids(tweet.user);
-        for (var j in user_keys_to_delete) {
-            delete tweet.user[user_keys_to_delete[j]];
-        }
-        if (tweet.retweeted_status) {
-            textids(tweet.retweeted_status);
-            for (var j in keys_to_delete) {
-                delete tweet.retweeted_status[keys_to_delete[j]];
-            }
-        }
-        annotationMap(function(i, annotation) {
-            for (var j in annotation.keywords) {
-                if (tweet.text.search(annotation.keywords[j]) != -1) {
-                    ann.push(i);
-                    break;
-                }
-            }
-        });
-        tweet.annotations = ann;
-        tweet.created_at = new Date(tweet.created_at);
-//        myLog("Time delta :",(new Date() - tweet.created_at) / 1000);
-    } catch (err) {
-        myLog("Error while processing tweet",err.stack);
-    }
-    
-    if (tweet.in_reply_to_status_id) {
-        commitReference( tweet.id, tweet.in_reply_to_status_id, "reply" );
-    }
-    if (tweet.retweeted_status) {
-        commitReference( tweet.id, tweet.retweeted_status.id, "retweet" );
-    }
-    commit_script += 'INSERT INTO tweets ( tweet_id, created_at, json '
-        + annotationMap(function(a) { return ', a_' + a }).join("")
-        + ' ) VALUES ( "'
-        + tweet.id
-        + '", '
-        + tweet.created_at.valueOf()
-        + ', "'
-        + JSON.stringify(tweet).replace(/\"/g, '""')
-        + '"'
-        + annotationMap(function(a) {
-            return ann.indexOf(a) == -1 ? ', 0' : ', 1'
-        }).join("")
-        + ' );\n';
-}
-
-function callBackNewTweets(chunk) {
-    var newdata = chunk.split('\r\n');
-    for (var i in newdata) {
-        if (newdata[i].length > 0) {
-            commitTweet(newdata[i]);
-        }
-    }
-//    myLog("Data received - length :",chunk.length);
-}
-
-function requestTwitter() {
-    myLog("Requesting Twitter to track keyword(s): "+tracking_keyword);
-    var req = https.request({
-        host: "stream.twitter.com",
-        path: "/1/statuses/filter.json",
-        method: "POST",
-        headers: {
-            'Authorization': 'Basic ' + new Buffer( TWITTER_USER + ":" + TWITTER_PASS ).toString('base64'),
-            'Content-Type': 'application/x-www-form-urlencoded'
-        }
-    }, function(res) {
-        myLog('Reply from stream.twitter.com: ' + res.statusCode);
-        myLog('Headers: ' + JSON.stringify(res.headers));
-        res.setEncoding('utf8');
-        res.on('data', callBackNewTweets);
-        res.on('end', function() {
-            myLog('End Twitter Connection — Trying to reconnect');
-            requestTwitter();
-        });
-    });
-    
-    req.write('track=' + encodeURIComponent(tracking_keyword));
-    req.socket.setTimeout(60000);
-    req.socket.on('timeout', function() {
-        myLog('TimeOut - Trying to reconnect');
-        requestTwitter();
-    });
-    req.end();
-}
-
-function getSendLastPos() {
-    db.execute("SELECT MAX(pos) lastpos FROM tweets", function (err, results) {
-        if (err) { myLog("SQLITE error",err.stack); }
-        if (results[0].lastpos != lastpos) {
-            lastpos = results[0].lastpos ? results[0].lastpos : 0;
-            try {
-                io.sockets.emit('tweetSummary', {
-                    tweetcount : lastpos
-                });
-            } catch(err) {
-                myLog("SOCKET.IO error while Broadcasting tweetSummary",err.stack);
-            }
-        }
-    });
-}
-
-function commitTweets() {
-    if (commit_script != '') {
-        var requete = commit_script;
-        commit_script = '';
-      //  console.log(requete);
-      //  var reqd = new Date();
-        db.executeScript(requete, function (err) {
-            if (err) { myLog("SQLITE error",err.stack); }
-        //    myLog("Commit took",(new Date() - reqd),"ms");
-            getSendLastPos();
-        });
-    }
-}
-
-function getSendTweetPosByDate(date, socket) {
-    db.execute("SELECT pos, created_at, ABS(created_at-" + date + ") AS dist FROM tweets ORDER BY dist ASC LIMIT 0,9", function (err, results) {
-        if (err) { myLog("SQLITE error",err.stack); }
-        if (results.length) {
-            try {
-                socket.emit('tweetPosByDate', {
-                    tweetpos : results[0].pos,
-                    date : results[0].created_at
-                });
-            } catch(err) {
-                myLog("SOCKET.IO error while sending tweetPosByDate",err.stack);
-            }
-        }
-    });
-}
-
-function getSendLinkedTweets(pos, socket) {
-//    myLog("request for tweets linked to",pos);
-    db.execute("SELECT A.pos pos_a, A.tweet_id id_a, B.pos pos_b, B.tweet_id id_b, ref_type, ABS(B.created_at - A.created_at) delta FROM tweets A, tweets B, tweet_refs WHERE id_a = from_id AND id_b = to_id AND (pos_a = ? OR pos_b = ?) ORDER BY delta ASC LIMIT 0, 10", [ pos, pos ], function(err, results) {
-        if (err) { myLog("SQLITE error: ",err.stack); }
-        var struct = {
-            "tweetpos" : pos,
-            "referencing" : [],
-            "referenced_by" : []
-        };
-        for (var i in results) {
-            if (results[i].pos_a == pos) {
-                struct.referencing.push({
-                    "pos" : results[i].pos_b,
-                    "ref_type" : results[i].ref_type
-                });
-            } else {
-                struct.referenced_by.push({
-                    "pos" : results[i].pos_a,
-                    "ref_type" : results[i].ref_type
-                });
-            }
-        }
-        try {
-            socket.emit('linkedTweets', struct);
-        } catch(err) {
-            myLog("SOCKET.IO error while sending linkedTweets: ",err.stack);
-        }
-    });
-}
-
-function getSendTweets(posList, socket) {
-//    myLog("request for tweets ("+posList.join(',')+") from "+socket.id);
-    db.execute("SELECT * FROM tweets WHERE pos IN ( " + posList.join(',') + " )", function (err, results) {
-        if (err) { myLog("SQLITE error",err.stack); }
-        try {
-            socket.emit('tweets',
-                results.map(function(line) {
-                    var tw = JSON.parse(line.json);
-                    tw.pos = line.pos;
-                    return tw;
-                })
-            );
-        } catch (err) {
-            myLog("SOCKET.IO error while sending tweets",err.stack);
-        }
-    });
-}
-
-function getSendTimeline(data, socket) {
-//    myLog("request for timeline (",data.level, data.full,") from "+socket.id);
-    var lvl = date_levels[data.level],
-        requete = "SELECT COUNT(*) AS nb, "
-        + lvl
-        + "*ROUND(created_at/"
-        + lvl
-        + ") AS tranche"
-        + annotationMap(function (a) { return " , SUM(a_" + a + ") AS s_" + a }).join("")
-        + " FROM tweets GROUP BY tranche ORDER BY tranche DESC LIMIT 0," + ( data.full ? "50" : "1" );
-    db.execute(requete, function (err, results) {
-        if (err) { myLog("SQLITE error",err.stack); }
-        if (!results.length) {
-            return;
-        }
-        var tbl = [],
-            lastend = parseInt(results[results.length - 1].tranche);
-        for (var i = results.length - 1; i >= 0; i--) {
-            var start = parseInt(results[i].tranche);
-            while (start > lastend) {
-                var struct = { "start": lastend, "tweets" : 0, "annotations" : {} };
-                lastend += lvl;
-                struct.end = lastend;
-                tbl.push(struct);
-            }
-            lastend += lvl;
-            var struct = {
-                "start" : start,
-                "end" : lastend,
-                "tweets" : results[i].nb,
-                "annotations" : annotationMap(function (a) {
-                    return results[i]['s_'+a];
-                },{returnObject: true})
-            }
-            tbl.push(struct);
-        }
-        try {
-            socket.emit('timeline', {
-                "full" : data.full,
-                "level" : data.level,
-                "data" : tbl
-            });
-        } catch (err) {
-            myLog("SOCKET.IO error while sending timeline",err.stack);
-        }
-    });
-}
-
-function textids(object) {
-    for (var key in object) {
-        // Workaround for Unicode bug in socket.io.
-        
-        if (typeof object[key] == "string") {
-            var tmp = '';
-            for (var i = 0; i < object[key].length; i++) {
-                tmp += ( object[key].charCodeAt(i) < 128 ? object[key].charAt(i) : "&#" + object[key].charCodeAt(i) + ";" );
-            }
-            object[key] = tmp;
-        }
-        
-        if (key.substr(-2) == 'id') {
-            object[key] = object[key + '_str'];
-            delete object[key + '_str'];
-        }
-        
-    }
-}
-
-function httpHandler(req, res) {
-    myLog("HTTP Request for URL "+req.url);
-    var url = ( req.url == "/config" ? conf_file : __dirname + "/client" + req.url + ( req.url[req.url.length - 1] == "/" ? "index.html" : "" ) );
-    fs.readFile( url, function(err, data) {
-        if (err) {
-            myLog("Error 404");
-            res.writeHead(404);
-            return res.end('File not found');
-        }
-        res.writeHead(200);
-        res.end(data);
-    });
-}
-
-/* Initialization */
-
-var http = require('http'),
-    https = require('https'),
-    sqlite = require('sqlite'),
-    socketio = require('socket.io'),
-    tweets = [],
-    lastpos = 0,
-    arcs = [],
-    tweet_ids = [],
-    date_struct = [],
-    date_levels = [
-        3600 * 1000,
-        15 * 60 * 1000,
-        5 * 60 * 1000,
-        60 * 1000,
-        15 * 1000
-    ],
-    keys_to_delete = [
-        'in_reply_to_screen_name',
-        'in_reply_to_user_id',
-        'retweeted',
-        'place',
-        'geo',
-        'source',
-        'contributors',
-        'coordinates',
-        'retweet_count',
-        'favorited',
-        'truncated',
-        'possibly_sensitive'
-    ],
-    user_keys_to_delete = [
-        'default_profile_image',
-        'show_all_inline_media',
-        'contributors_enabled',
-        'profile_sidebar_fill_color',
-        'created_at',
-        'lang',
-        'time_zone',
-        'profile_sidebar_border_color',
-        'follow_request_sent',
-        'profile_background_image_url',
-        'profile_background_image_url_https',
-        'followers_count',
-        'description',
-        'url',
-        'geo_enabled',
-        'profile_use_background_image',
-        'default_profile',
-        'following',
-        'profile_text_color',
-        'is_translator',
-        'favourites_count',
-        'listed_count',
-        'friends_count',
-        'profile_link_color',
-        'protected',
-        'location',
-        'notifications',
-        'profile_image_url_https',
-        'statuses_count',
-        'verified',
-        'profile_background_color',
-        'profile_background_tile',
-        'utc_offset'
-    ],
-    app = http.createServer(httpHandler),
-    io = socketio.listen(app),
-    db = new sqlite.Database();
-
-/* MAIN CODE */
-
-app.listen(app_port);
-myLog("Listening on port: "+app_port);
-myLog("Opening SQLITE file: "+sqlfile);
-db.open(sqlfile , function(err) {
-    if (err) { myLog("SQLITE error",err.stack); }
-    createTables();
-});
-
-setInterval(commitTweets,500);
-setInterval(function(){myLog("Still alive, tweet count",lastpos)}, 60000);
-
-if (RECORD_NEW_TWEETS) {
-    requestTwitter();
-}
-
-io.set('log level', 0);
-io.sockets.on('connection', function(socket) {
-    myLog("New connection from", socket.handshake.address.address, "with id=", socket.id);
-    try {
-        socket.emit('tweetSummary', { tweetcount : lastpos });
-    } catch (err) {
-        myLog("SOCKET.IO error while sending tweetSummary",err.stack);
-    }
-    socket.on('updateTweets', function(data) {
-        if (data.tweets.length) {
-            getSendTweets(data.tweets, socket);
-        }
-    });
-    socket.on('updateTimeline', function(data) {
-        getSendTimeline(data, socket);
-    });
-    socket.on('tweetPosByDate', function(data) {
-        getSendTweetPosByDate(data.date, socket);
-    });
-    socket.on('linkedTweets', function(data) {
-        getSendLinkedTweets(data.tweetpos, socket);
-    });
-});
\ No newline at end of file