tweetcast/nodejs/node-direct.js
changeset 336 d60efd677b50
parent 335 5f83c21dee69
child 338 60dff8a71024
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/tweetcast/nodejs/node-direct.js	Tue Oct 25 18:27:08 2011 +0200
@@ -0,0 +1,314 @@
+/* CALLING COMMON CONFIGURATION FILE */
+
+console.log('Reading Configuration from conf.js');
+
+var fs = require('fs');
+eval(fs.readFileSync(__dirname + '/conf.js','utf8'));
+
+/* SERVER-SIDE ONLY CONFIGURATION */
+
+sqlfile = __dirname + '/tweets-' + encodeURIComponent(tracking_keyword) + '.sqlite';
+TWITTER_USER = 'materiauxnum';
+TWITTER_PASS = 'm473r14ux7w337';
+RECORD_NEW_TWEETS = true;
+
+/* FUNCTIONS */
+
+function annotationMap(callback, options) {
+    var includeDefault = ( options && options.includeDefault ? options.includeDefault : false );
+    var returnObject = ( options && options.returnObject ? options.returnObject : false );
+    res = (returnObject ? {} : []);
+    for (var i in annotations) {
+        if (i != "default" || includeDefault) {
+            var el = callback(i, annotations[i])
+            if (returnObject) {
+                res[i] = el;
+            } else {
+                res.push(el);
+            }
+        }
+    }
+    return res;
+}
+
+function createTables() {
+
+    var requete = "CREATE TABLE IF NOT EXISTS tweets ( pos INTEGER PRIMARY KEY, tweet_id TEXT UNIQUE, created_at INTEGER, json TEXT" + annotationMap(function(a) { return ', a_' + a + ' INTEGER' }).join("") + " )";
+    db.execute(requete, function(err) {
+        if (err) throw err;
+        db.execute("CREATE INDEX IF NOT EXISTS idx_created_at ON tweets ( created_at )", function(err) { if (err) throw err; });
+        getSendLastPos();
+    });
+    
+    db.execute("CREATE TABLE IF NOT EXISTS tweet_refs ( id INTEGER PRIMARY KEY, from_id TEXT, to_id TEXT, ref_type TEXT )", function(err) { if (err) throw err; });
+}
+
+function commitReference(from_id, to_id, ref_type) {
+    db.execute(
+        "INSERT INTO tweet_refs ( from_id, to_id, ref_type ) VALUES ( ?, ?, ? )",
+        [ from_id, to_id, ref_type ],
+        function(err) { if (err) throw err; }
+    );
+}
+
+function commitTweet(data) {
+
+    var tweet = JSON.parse(data),
+        ann = [];
+
+    textids(tweet);
+    for (var j in keys_to_delete) {
+        delete tweet[keys_to_delete[j]];
+    }
+    textids(tweet.user);
+    for (var j in user_keys_to_delete) {
+        delete tweet.user[user_keys_to_delete[j]];
+    }
+    if (tweet.retweeted_status) {
+        textids(tweet.retweeted_status);
+        for (var j in keys_to_delete) {
+            delete tweet.retweeted_status[keys_to_delete[j]];
+        }
+    }
+    annotationMap(function(i, annotation) {
+        for (var j in annotation.keywords) {
+            if (tweet.text.search(annotation.keywords[j]) != -1) {
+                ann.push(i);
+                break;
+            }
+        }
+    });
+    tweet.annotations = ann;
+    tweet.created_at = new Date(tweet.created_at);
+    
+    if (tweet.in_reply_to_status_id) {
+        commitReference( tweet.id, tweet.in_reply_to_status_id, "reply" );
+    }
+    if (tweet.retweeted_status) {
+        commitReference( tweet.id, tweet.retweeted_status.id, "retweet" );
+    }
+    db.execute(
+        "INSERT INTO tweets ( tweet_id, created_at, json "
+        + annotationMap(function(a) { return ', a_' + a }).join("")
+        + " ) VALUES ( ?, ?, ? "
+        + annotationMap(function(a) { return ', ?' }).join("")
+        + " )",
+        [ tweet.id, tweet.created_at.valueOf(), JSON.stringify(tweet) ].concat(annotationMap(function(a) { return ann.indexOf(a) == -1 ? 0 : 1 })),
+        function(err) {
+            if (err) throw err;
+            getSendLastPos();
+        }
+    );
+}
+
+function callBackNewTweets(chunk) {
+    var newdata = chunk.split('\r\n');
+    for (var i in newdata) {
+        if (newdata[i].length > 0) {
+            commitTweet(newdata[i]);
+        }
+    }
+    console.log("New tweets received");
+}
+
+function getSendLastPos() {
+    db.execute("SELECT MAX(pos) lastpos FROM tweets", function (err, results) {
+        if (err) throw err;
+        lastpos = results[0].lastpos ? results[0].lastpos : 0;
+        console.log("Broadcasting last pos = ",lastpos);
+        io.sockets.emit('tweetSummary', {
+            tweetcount : lastpos
+        });
+    });
+}
+
+
+function getSendTweets(posList, socket) {
+    console.log("request for tweets ("+posList.join(',')+") from "+socket.id);
+    db.execute("SELECT * FROM tweets WHERE pos IN ( " + posList.join(',') + " )", function (err, results) {
+        if (err) throw err;
+        socket.emit('tweets',
+            results.map(function(line) {
+                var tw = JSON.parse(line.json);
+                tw.pos = line.pos;
+                return tw;
+            })
+        );
+    });
+}
+
+function getSendTimeline(level, socket) {
+    console.log("request for timeline ("+level+") from "+socket.id);
+    var lvl = date_levels[level],
+        requete = "SELECT COUNT(*) AS nb, "
+        + lvl
+        + "*ROUND(created_at/"
+        + lvl
+        + ") AS tranche"
+        + annotationMap(function (a) { return " , SUM(a_" + a + ") AS s_" + a }).join("")
+        + " FROM tweets GROUP BY tranche ORDER BY tranche DESC LIMIT 0,50";
+    db.execute(requete, function (err, results) {
+        if (err) throw err;
+        var tbl = [],
+            lastend = parseInt(results[results.length - 1].tranche);
+        for (var i = results.length - 1; i >= 0; i--) {
+            var start = parseInt(results[i].tranche);
+            while (start > lastend) {
+                var struct = { "start": lastend, "tweets" : 0, "annotations" : {} };
+                lastend += lvl;
+                struct.end = lastend;
+                tbl.push(struct);
+            }
+            lastend += lvl;
+            var struct = {
+                "start" : start,
+                "end" : lastend,
+                "tweets" : results[i].nb,
+                "annotations" : annotationMap(function (a) {
+                    return results[i]['s_'+a];
+                },{returnObject: true})
+            }
+            tbl.push(struct);
+        }
+        socket.emit('timeline', tbl);
+    });
+}
+
+function textids(object) {
+    for (var key in object) {
+        if (key.substr(-2) == 'id') {
+            object[key] = object[key + '_str'];
+            delete object[key + '_str'];
+        }
+    }
+}
+
+function httpHandler(req, res) {
+    console.log("HTTP Request for URL "+req.url);
+    var url = __dirname + ( req.url == "/conf.js" ? "" : "/client" ) + req.url + ( req.url[req.url.length - 1] == "/" ? "index.html" : "" );
+    fs.readFile( url, function(err, data) {
+        if (err) {
+            console.log("Error 404");
+            res.writeHead(404);
+            return res.end('File not found');
+        }
+        res.writeHead(200);
+        res.end(data);
+    });
+}
+
+/* Initialization */
+
+var http = require('http'),
+    https = require('https'),
+    sqlite = require('sqlite'),
+    socketio = require('socket.io'),
+    tweets = [],
+    lastpos = 0,
+    arcs = [],
+    tweet_ids = [],
+    date_struct = [],
+    date_levels = [
+        3600 * 1000,
+        15 * 60 * 1000,
+        5 * 60 * 1000,
+        60 * 1000,
+        15 * 1000
+    ],
+    keys_to_delete = [
+        'in_reply_to_screen_name',
+        'in_reply_to_user_id',
+        'retweeted',
+        'place',
+        'geo',
+        'source',
+        'contributors',
+        'coordinates',
+        'retweet_count',
+        'favorited',
+        'truncated',
+        'possibly_sensitive'
+    ],
+    user_keys_to_delete = [
+        'default_profile_image',
+        'show_all_inline_media',
+        'contributors_enabled',
+        'profile_sidebar_fill_color',
+        'created_at',
+        'lang',
+        'time_zone',
+        'profile_sidebar_border_color',
+        'follow_request_sent',
+        'profile_background_image_url',
+        'profile_background_image_url_https',
+        'followers_count',
+        'description',
+        'url',
+        'geo_enabled',
+        'profile_use_background_image',
+        'default_profile',
+        'following',
+        'profile_text_color',
+        'is_translator',
+        'favourites_count',
+        'listed_count',
+        'friends_count',
+        'profile_link_color',
+        'protected',
+        'location',
+        'notifications',
+        'profile_image_url_https',
+        'statuses_count',
+        'verified',
+        'profile_background_color',
+        'profile_background_tile',
+        'utc_offset'
+    ],
+    app = http.createServer(httpHandler),
+    io = socketio.listen(app),
+    db = new sqlite.Database();
+
+/* MAIN CODE */
+
+app.listen(app_port);
+console.log("Listening on port: "+app_port);
+console.log("Opening SQLITE file: "+sqlfile);
+db.open(sqlfile , function(err) {
+    if (err) throw err;
+    createTables();
+});
+
+if (RECORD_NEW_TWEETS) {
+    console.log("Requesting Twitter to track keyword(s): "+tracking_keyword);
+    var req = https.request({
+        host: "stream.twitter.com",
+        path: "/1/statuses/filter.json",
+        method: "POST",
+        headers: {
+            'Authorization': 'Basic ' + new Buffer( TWITTER_USER + ":" + TWITTER_PASS ).toString('base64'),
+            'Content-Type': 'application/x-www-form-urlencoded'
+        }
+    }, function(res) {
+        console.log('Reply from stream.twitter.com: ' + res.statusCode);
+        console.log('Headers: ' + JSON.stringify(res.headers));
+        res.setEncoding('utf8');
+        res.on('data', callBackNewTweets);
+    });
+    
+    req.write('track=' + encodeURIComponent(tracking_keyword));
+    req.end();
+}
+
+io.set('log level', 0);
+io.sockets.on('connection', function(socket) {
+    console.log("New connection from" + socket.handshake.address.address + " with id=" + socket.id);
+    socket.emit('tweetSummary', { tweetcount : tweets.length });
+    socket.on('updateTweets', function(data) {
+        if (data.tweets.length) {
+            getSendTweets(data.tweets, socket);
+        }
+    });
+    socket.on('updateTimeline', function(data) {
+        getSendTimeline(data.level, socket);
+    });
+});
\ No newline at end of file