diff -r 03c69425efa6 -r 738594562e44 tweetcast/nodejs/server/node-direct.js --- a/tweetcast/nodejs/server/node-direct.js Mon Oct 24 18:01:55 2011 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,294 +0,0 @@ -/* CONFIGURATION */ - -RECORD_NEW_TWEETS = true; -DEFAULT_SIO_PORT = 8000; -/* Overriden par the "-p" parameter, e.g. node tweetcast.js -p 8080 */ -SQLITE_FILE_DIR = './'; -SQLITE_FILE_START = 'tweets-'; -SQLITE_FILE_EXT = '.sqlite'; -DEFAULT_TRACKING_KEYWORD = 'Bieber'; -/* Overriden par the "-T" parameter, e.g. node tweetcast.js -T "Bieber" */ -TWITTER_USER = 'materiauxnum'; -TWITTER_PASS = 'm473r14ux7w337'; - -/* FUNCTIONS */ - -function createTables() { - - var requete = "CREATE TABLE IF NOT EXISTS tweets ( pos INTEGER PRIMARY KEY, tweet_id TEXT UNIQUE, created_at INTEGER, json TEXT" + annotations.map(function(a) { return ', a_' + a + ' INTEGER' }).join("") + " )"; - db.execute(requete, function(err) { - if (err) throw err; - db.execute("CREATE INDEX IF NOT EXISTS idx_created_at ON tweets ( created_at )", function(err) { if (err) throw err; }); - getSendLastPos(); - }); - - db.execute("CREATE TABLE IF NOT EXISTS tweet_refs ( id INTEGER PRIMARY KEY, from_id TEXT, to_id TEXT, ref_type TEXT )", function(err) { if (err) throw err; }); -} - -function commitReference(from_id, to_id, ref_type) { - db.execute( - "INSERT INTO tweet_refs ( from_id, to_id, ref_type ) VALUES ( ?, ?, ? )", - [ from_id, to_id, ref_type ], - function(err) { if (err) throw err; } - ); -} - -function commitTweet(data) { - - var tweet = JSON.parse(data), - ann = []; - - textids(tweet); - for (var j in keys_to_delete) { - delete tweet[keys_to_delete[j]]; - } - textids(tweet.user); - for (var j in user_keys_to_delete) { - delete tweet.user[user_keys_to_delete[j]]; - } - if (tweet.retweeted_status) { - textids(tweet.retweeted_status); - for (var j in keys_to_delete) { - delete tweet.retweeted_status[keys_to_delete[j]]; - } - } - for (var i in annotations_keywords) { - for (var j in annotations_keywords[i]) { - if (tweet.text.indexOf(annotations_keywords[i][j]) != -1) { - ann.push(annotations[i]); - break; - } - } - } - tweet.annotations = ann; - tweet.created_at = new Date(tweet.created_at); - - if (tweet.in_reply_to_status_id) { - commitReference( tweet.id, tweet.in_reply_to_status_id, "reply" ); - } - if (tweet.retweeted_status) { - commitReference( tweet.id, tweet.retweeted_status.id, "retweet" ); - } - db.execute( - "INSERT INTO tweets ( tweet_id, created_at, json " - + annotations.map(function(a) { return ', a_' + a }).join("") - + " ) VALUES ( ?, ?, ? " - + annotations.map(function(a) { return ', ?' }).join("") - + " )", - [ tweet.id, tweet.created_at.valueOf(), JSON.stringify(tweet) ].concat(annotations.map(function(a) { return ann.indexOf(a) == -1 ? 0 : 1 })), - function(err) { - if (err) throw err; - getSendLastPos(); - } - ); -} - -function callBackNewTweets(chunk) { - var newdata = chunk.split('\r\n'); - for (var i in newdata) { - if (newdata[i].length > 0) { - commitTweet(newdata[i]); - } - } - console.log("New tweets received"); -} - -function getSendLastPos() { - db.execute("SELECT MAX(pos) lastpos FROM tweets", function (err, results) { - if (err) throw err; - lastpos = results[0].lastpos ? results[0].lastpos : 0; - console.log("Broadcasting last pos = ",lastpos); - io.sockets.emit('tweetSummary', { - tweetcount : lastpos - }); - }); -} - - -function getSendTweets(posList, socket) { - console.log("request for tweets ("+posList.join(',')+") from "+socket.id); - db.execute("SELECT * FROM tweets WHERE pos IN ( " + posList.join(',') + " )", function (err, results) { - if (err) throw err; - socket.emit('tweets', - results.map(function(line) { - var tw = JSON.parse(line.json); - tw.pos = line.pos; - return tw; - }) - ); - }); -} - -function getSendTimeline(level, socket) { - console.log("request for timeline ("+level+") from "+socket.id); - var lvl = date_levels[level], - requete = "SELECT COUNT(*) AS nb, " - + lvl - + "*ROUND(created_at/" - + lvl - + ") AS tranche" - + annotations.map(function (a) { return " , SUM(a_" + a + ") AS s_" + a }).join("") - + " FROM tweets GROUP BY tranche ORDER BY tranche DESC LIMIT 0,50"; - db.execute(requete, function (err, results) { - if (err) throw err; - var tbl = [], - lastend = parseInt(results[results.length - 1].tranche); - for (var i = results.length - 1; i >= 0; i--) { - var start = parseInt(results[i].tranche); - while (start > lastend) { - var struct = { "start": lastend, "tweets" : 0, "annotations" : {} }; - lastend += lvl; - struct.end = lastend; - tbl.push(struct); - } - lastend += lvl; - var struct = { - "start" : start, - "end" : lastend, - "tweets" : results[i].nb, - "annotations" : {} - } - for (var j in annotations) { - struct.annotations[annotations[j]] = results[i]['s_' + annotations[j]]; - } - tbl.push(struct); - } - socket.emit('timeline', tbl); - }); -} - -function textids(object) { - for (var key in object) { - if (key.substr(-2) == 'id') { - object[key] = object[key + '_str']; - delete object[key + '_str']; - } - } -} - -/* Initialization */ - -var fs = require('fs'), - https = require('https'), - sqlite = require('sqlite'), - socketio = require('socket.io'), - tweets = [], - lastpos = 0, - arcs = [], - tweet_ids = [], - date_struct = [], - date_levels = [ - 3600 * 1000, - 15 * 60 * 1000, - 5 * 60 * 1000, - 60 * 1000, - 15 * 1000 - ], - annotations = [ 'positive', 'negative', 'reference', 'question' ], - annotations_keywords = [ [ '++' ], [ '--' ], [ '==' ], [ '??' ] ], - annkw = { - 'positive' : '++', - 'negative' : '--', - 'reference' : '==', - 'question' : '??' - }, - keys_to_delete = [ - 'in_reply_to_screen_name', - 'in_reply_to_user_id', - 'retweeted', - 'place', - 'geo', - 'source', - 'contributors', - 'coordinates', - 'retweet_count', - 'favorited', - 'truncated', - 'possibly_sensitive' - ], - user_keys_to_delete = [ - 'default_profile_image', - 'show_all_inline_media', - 'contributors_enabled', - 'profile_sidebar_fill_color', - 'created_at', - 'lang', - 'time_zone', - 'profile_sidebar_border_color', - 'follow_request_sent', - 'profile_background_image_url', - 'profile_background_image_url_https', - 'followers_count', - 'description', - 'url', - 'geo_enabled', - 'profile_use_background_image', - 'default_profile', - 'following', - 'profile_text_color', - 'is_translator', - 'favourites_count', - 'listed_count', - 'friends_count', - 'profile_link_color', - 'protected', - 'location', - 'notifications', - 'profile_image_url_https', - 'statuses_count', - 'verified', - 'profile_background_color', - 'profile_background_tile', - 'utc_offset' - ], - port_flag = process.argv.indexOf("-p"), - sio_port = ( port_flag != -1 && port_flag < process.argv.length - 1 && parseInt(process.argv[port_flag + 1]) ? parseInt(process.argv[port_flag + 1]) : DEFAULT_SIO_PORT ) - io = socketio.listen(sio_port), - track_flag = process.argv.indexOf("-T"), - tracking_keyword = ( track_flag != -1 && track_flag < process.argv.length - 1 ? process.argv[track_flag + 1] : DEFAULT_TRACKING_KEYWORD ), - sqlfile = SQLITE_FILE_DIR + SQLITE_FILE_START + encodeURIComponent(tracking_keyword) + SQLITE_FILE_EXT, - db = new sqlite.Database(); - -/* MAIN CODE */ - -console.log("Listening on port: "+sio_port); -console.log("Opening SQLITE file: "+sqlfile); -db.open(sqlfile , function(err) { - if (err) throw err; - createTables(); -}); - -if (RECORD_NEW_TWEETS) { - console.log("Requesting Twitter to track keyword(s): "+tracking_keyword); - var req = https.request({ - host: "stream.twitter.com", - path: "/1/statuses/filter.json", - method: "POST", - headers: { - 'Authorization': 'Basic ' + new Buffer( TWITTER_USER + ":" + TWITTER_PASS ).toString('base64'), - 'Content-Type': 'application/x-www-form-urlencoded' - } - }, function(res) { - console.log('Reply from stream.twitter.com: ' + res.statusCode); - console.log('Headers: ' + JSON.stringify(res.headers)); - res.setEncoding('utf8'); - res.on('data', callBackNewTweets); - }); - - req.write('track=' + encodeURIComponent(tracking_keyword)); - req.end(); -} - -io.set('log level', 0); -io.sockets.on('connection', function(socket) { - console.log("New connection from" + socket.handshake.address.address + " with id=" + socket.id); - socket.emit('tweetSummary', { tweetcount : tweets.length }); - socket.on('updateTweets', function(data) { - if (data.tweets.length) { - getSendTweets(data.tweets, socket); - } - }); - socket.on('updateTimeline', function(data) { - getSendTimeline(data.level, socket); - }); -}); \ No newline at end of file