diff -r f7ceddf99d6d -r bf5cf5a9e737 tweetcast/nodejs/node-direct.js --- a/tweetcast/nodejs/node-direct.js Wed Dec 07 19:28:46 2011 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,459 +0,0 @@ -/* DEFAULT CONFIGURATION */ - -//var TWITTER_USER = 'materiauxnum', -// TWITTER_PASS = 'm473r14ux7w337', -var TWITTER_USER = 'raphv', - TWITTER_PASS = '7w337LfC23gE', - RECORD_NEW_TWEETS = true, - commit_script = ''; - -/* CALLING COMMON CONFIGURATION FILE */ - -var conf_file = flagOption('-c', 'conf.js'); - -myLog('Reading Configuration from ' + conf_file); - -var fs = require('fs'); -eval(fs.readFileSync(conf_file,'utf8')); - -if (typeof sqlfile == "undefined") { - sqlfile = __dirname + '/tweets-' + encodeURIComponent(tracking_keyword) + '.sqlite' - } - -/* FUNCTIONS */ - -function flagOption(flag, defaultValue) { - var flagPos = process.argv.indexOf(flag); - return ( flagPos != -1 && flagPos < process.argv.length - 1) ? process.argv[flagPos + 1] : defaultValue; -} - -function myLog() { - var args = ["\033[1;34m["+new Date().toLocaleTimeString()+"]\033[0m"]; - for (var i in arguments) { - args.push(arguments[i]); - } - console.log.apply(console, args); -} - -function annotationMap(callback, options) { - var includeDefault = ( options && options.includeDefault ? options.includeDefault : false ); - var returnObject = ( options && options.returnObject ? options.returnObject : false ); - res = (returnObject ? {} : []); - for (var i in annotations) { - if (i != "default" || includeDefault) { - var el = callback(i, annotations[i]) - if (returnObject) { - res[i] = el; - } else { - res.push(el); - } - } - } - return res; -} - -function createTables() { - - var requete = "CREATE TABLE IF NOT EXISTS tweets ( pos INTEGER PRIMARY KEY, tweet_id TEXT UNIQUE, created_at INTEGER, json TEXT" - + annotationMap(function(a) { return ', a_' + a + ' INTEGER' }).join("") - + " );\n" - + "CREATE TABLE IF NOT EXISTS tweet_refs ( id INTEGER PRIMARY KEY, from_id TEXT, to_id TEXT, ref_type TEXT );\n" - + "CREATE INDEX IF NOT EXISTS idx_created_at ON tweets ( created_at );"; - db.executeScript(requete, function(err) { - if (err) { myLog("SQLITE error",err.stack); } - getSendLastPos(); - }); -} - -function commitReference(from_id, to_id, ref_type) { - commit_script += 'INSERT OR IGNORE INTO tweet_refs ( from_id, to_id, ref_type ) VALUES ( "' + from_id + '", "' + to_id + '", "' + ref_type + '" );\n'; -} - -function commitTweet(data) { - - var tweet = JSON.parse(data), - ann = []; - - if (!tweet.id) { - myLog("Error: Could not parse data",data); - return; - } - - try { - textids(tweet); - for (var j in keys_to_delete) { - delete tweet[keys_to_delete[j]]; - } - textids(tweet.user); - for (var j in user_keys_to_delete) { - delete tweet.user[user_keys_to_delete[j]]; - } - if (tweet.retweeted_status) { - textids(tweet.retweeted_status); - for (var j in keys_to_delete) { - delete tweet.retweeted_status[keys_to_delete[j]]; - } - } - annotationMap(function(i, annotation) { - for (var j in annotation.keywords) { - if (tweet.text.search(annotation.keywords[j]) != -1) { - ann.push(i); - break; - } - } - }); - tweet.annotations = ann; - tweet.created_at = new Date(tweet.created_at); -// myLog("Time delta :",(new Date() - tweet.created_at) / 1000); - } catch (err) { - myLog("Error while processing tweet",err.stack); - } - - if (tweet.in_reply_to_status_id) { - commitReference( tweet.id, tweet.in_reply_to_status_id, "reply" ); - } - if (tweet.retweeted_status) { - commitReference( tweet.id, tweet.retweeted_status.id, "retweet" ); - } - commit_script += 'INSERT INTO tweets ( tweet_id, created_at, json ' - + annotationMap(function(a) { return ', a_' + a }).join("") - + ' ) VALUES ( "' - + tweet.id - + '", ' - + tweet.created_at.valueOf() - + ', "' - + JSON.stringify(tweet).replace(/\"/g, '""') - + '"' - + annotationMap(function(a) { - return ann.indexOf(a) == -1 ? ', 0' : ', 1' - }).join("") - + ' );\n'; -} - -function callBackNewTweets(chunk) { - var newdata = chunk.split('\r\n'); - for (var i in newdata) { - if (newdata[i].length > 0) { - commitTweet(newdata[i]); - } - } -// myLog("Data received - length :",chunk.length); -} - -function requestTwitter() { - myLog("Requesting Twitter to track keyword(s): "+tracking_keyword); - var req = https.request({ - host: "stream.twitter.com", - path: "/1/statuses/filter.json", - method: "POST", - headers: { - 'Authorization': 'Basic ' + new Buffer( TWITTER_USER + ":" + TWITTER_PASS ).toString('base64'), - 'Content-Type': 'application/x-www-form-urlencoded' - } - }, function(res) { - myLog('Reply from stream.twitter.com: ' + res.statusCode); - myLog('Headers: ' + JSON.stringify(res.headers)); - res.setEncoding('utf8'); - res.on('data', callBackNewTweets); - res.on('end', function() { - myLog('End Twitter Connection — Trying to reconnect'); - requestTwitter(); - }); - }); - - req.write('track=' + encodeURIComponent(tracking_keyword)); - req.socket.setTimeout(60000); - req.socket.on('timeout', function() { - myLog('TimeOut - Trying to reconnect'); - requestTwitter(); - }); - req.end(); -} - -function getSendLastPos() { - db.execute("SELECT MAX(pos) lastpos FROM tweets", function (err, results) { - if (err) { myLog("SQLITE error",err.stack); } - if (results[0].lastpos != lastpos) { - lastpos = results[0].lastpos ? results[0].lastpos : 0; - try { - io.sockets.emit('tweetSummary', { - tweetcount : lastpos - }); - } catch(err) { - myLog("SOCKET.IO error while Broadcasting tweetSummary",err.stack); - } - } - }); -} - -function commitTweets() { - if (commit_script != '') { - var requete = commit_script; - commit_script = ''; - // console.log(requete); - // var reqd = new Date(); - db.executeScript(requete, function (err) { - if (err) { myLog("SQLITE error",err.stack); } - // myLog("Commit took",(new Date() - reqd),"ms"); - getSendLastPos(); - }); - } -} - -function getSendTweetPosByDate(date, socket) { - db.execute("SELECT pos, created_at, ABS(created_at-" + date + ") AS dist FROM tweets ORDER BY dist ASC LIMIT 0,9", function (err, results) { - if (err) { myLog("SQLITE error",err.stack); } - if (results.length) { - try { - socket.emit('tweetPosByDate', { - tweetpos : results[0].pos, - date : results[0].created_at - }); - } catch(err) { - myLog("SOCKET.IO error while sending tweetPosByDate",err.stack); - } - } - }); -} - -function getSendLinkedTweets(pos, socket) { -// myLog("request for tweets linked to",pos); - db.execute("SELECT A.pos pos_a, A.tweet_id id_a, B.pos pos_b, B.tweet_id id_b, ref_type, ABS(B.created_at - A.created_at) delta FROM tweets A, tweets B, tweet_refs WHERE id_a = from_id AND id_b = to_id AND (pos_a = ? OR pos_b = ?) ORDER BY delta ASC LIMIT 0, 10", [ pos, pos ], function(err, results) { - if (err) { myLog("SQLITE error: ",err.stack); } - var struct = { - "tweetpos" : pos, - "referencing" : [], - "referenced_by" : [] - }; - for (var i in results) { - if (results[i].pos_a == pos) { - struct.referencing.push({ - "pos" : results[i].pos_b, - "ref_type" : results[i].ref_type - }); - } else { - struct.referenced_by.push({ - "pos" : results[i].pos_a, - "ref_type" : results[i].ref_type - }); - } - } - try { - socket.emit('linkedTweets', struct); - } catch(err) { - myLog("SOCKET.IO error while sending linkedTweets: ",err.stack); - } - }); -} - -function getSendTweets(posList, socket) { -// myLog("request for tweets ("+posList.join(',')+") from "+socket.id); - db.execute("SELECT * FROM tweets WHERE pos IN ( " + posList.join(',') + " )", function (err, results) { - if (err) { myLog("SQLITE error",err.stack); } - try { - socket.emit('tweets', - results.map(function(line) { - var tw = JSON.parse(line.json); - tw.pos = line.pos; - return tw; - }) - ); - } catch (err) { - myLog("SOCKET.IO error while sending tweets",err.stack); - } - }); -} - -function getSendTimeline(data, socket) { -// myLog("request for timeline (",data.level, data.full,") from "+socket.id); - var lvl = date_levels[data.level], - requete = "SELECT COUNT(*) AS nb, " - + lvl - + "*ROUND(created_at/" - + lvl - + ") AS tranche" - + annotationMap(function (a) { return " , SUM(a_" + a + ") AS s_" + a }).join("") - + " FROM tweets GROUP BY tranche ORDER BY tranche DESC LIMIT 0," + ( data.full ? "50" : "1" ); - db.execute(requete, function (err, results) { - if (err) { myLog("SQLITE error",err.stack); } - if (!results.length) { - return; - } - var tbl = [], - lastend = parseInt(results[results.length - 1].tranche); - for (var i = results.length - 1; i >= 0; i--) { - var start = parseInt(results[i].tranche); - while (start > lastend) { - var struct = { "start": lastend, "tweets" : 0, "annotations" : {} }; - lastend += lvl; - struct.end = lastend; - tbl.push(struct); - } - lastend += lvl; - var struct = { - "start" : start, - "end" : lastend, - "tweets" : results[i].nb, - "annotations" : annotationMap(function (a) { - return results[i]['s_'+a]; - },{returnObject: true}) - } - tbl.push(struct); - } - try { - socket.emit('timeline', { - "full" : data.full, - "level" : data.level, - "data" : tbl - }); - } catch (err) { - myLog("SOCKET.IO error while sending timeline",err.stack); - } - }); -} - -function textids(object) { - for (var key in object) { - // Workaround for Unicode bug in socket.io. - - if (typeof object[key] == "string") { - var tmp = ''; - for (var i = 0; i < object[key].length; i++) { - tmp += ( object[key].charCodeAt(i) < 128 ? object[key].charAt(i) : "&#" + object[key].charCodeAt(i) + ";" ); - } - object[key] = tmp; - } - - if (key.substr(-2) == 'id') { - object[key] = object[key + '_str']; - delete object[key + '_str']; - } - - } -} - -function httpHandler(req, res) { - myLog("HTTP Request for URL "+req.url); - var url = ( req.url == "/config" ? conf_file : __dirname + "/client" + req.url + ( req.url[req.url.length - 1] == "/" ? "index.html" : "" ) ); - fs.readFile( url, function(err, data) { - if (err) { - myLog("Error 404"); - res.writeHead(404); - return res.end('File not found'); - } - res.writeHead(200); - res.end(data); - }); -} - -/* Initialization */ - -var http = require('http'), - https = require('https'), - sqlite = require('sqlite'), - socketio = require('socket.io'), - tweets = [], - lastpos = 0, - arcs = [], - tweet_ids = [], - date_struct = [], - date_levels = [ - 3600 * 1000, - 15 * 60 * 1000, - 5 * 60 * 1000, - 60 * 1000, - 15 * 1000 - ], - keys_to_delete = [ - 'in_reply_to_screen_name', - 'in_reply_to_user_id', - 'retweeted', - 'place', - 'geo', - 'source', - 'contributors', - 'coordinates', - 'retweet_count', - 'favorited', - 'truncated', - 'possibly_sensitive' - ], - user_keys_to_delete = [ - 'default_profile_image', - 'show_all_inline_media', - 'contributors_enabled', - 'profile_sidebar_fill_color', - 'created_at', - 'lang', - 'time_zone', - 'profile_sidebar_border_color', - 'follow_request_sent', - 'profile_background_image_url', - 'profile_background_image_url_https', - 'followers_count', - 'description', - 'url', - 'geo_enabled', - 'profile_use_background_image', - 'default_profile', - 'following', - 'profile_text_color', - 'is_translator', - 'favourites_count', - 'listed_count', - 'friends_count', - 'profile_link_color', - 'protected', - 'location', - 'notifications', - 'profile_image_url_https', - 'statuses_count', - 'verified', - 'profile_background_color', - 'profile_background_tile', - 'utc_offset' - ], - app = http.createServer(httpHandler), - io = socketio.listen(app), - db = new sqlite.Database(); - -/* MAIN CODE */ - -app.listen(app_port); -myLog("Listening on port: "+app_port); -myLog("Opening SQLITE file: "+sqlfile); -db.open(sqlfile , function(err) { - if (err) { myLog("SQLITE error",err.stack); } - createTables(); -}); - -setInterval(commitTweets,500); -setInterval(function(){myLog("Still alive, tweet count",lastpos)}, 60000); - -if (RECORD_NEW_TWEETS) { - requestTwitter(); -} - -io.set('log level', 0); -io.sockets.on('connection', function(socket) { - myLog("New connection from", socket.handshake.address.address, "with id=", socket.id); - try { - socket.emit('tweetSummary', { tweetcount : lastpos }); - } catch (err) { - myLog("SOCKET.IO error while sending tweetSummary",err.stack); - } - socket.on('updateTweets', function(data) { - if (data.tweets.length) { - getSendTweets(data.tweets, socket); - } - }); - socket.on('updateTimeline', function(data) { - getSendTimeline(data, socket); - }); - socket.on('tweetPosByDate', function(data) { - getSendTweetPosByDate(data.date, socket); - }); - socket.on('linkedTweets', function(data) { - getSendLinkedTweets(data.tweetpos, socket); - }); -}); \ No newline at end of file