diff -r 5064d838962b -r 03c69425efa6 tweetcast/nodejs/server/node-direct.js --- a/tweetcast/nodejs/server/node-direct.js Wed Oct 19 00:05:09 2011 +0200 +++ b/tweetcast/nodejs/server/node-direct.js Mon Oct 24 18:01:55 2011 +0200 @@ -1,169 +1,160 @@ -READ_OLD_TWEETS = true; +/* CONFIGURATION */ + RECORD_NEW_TWEETS = true; -TWEET_FILE_DIR = './'; -TWEET_FILE_START = 'tweets-'; -TWEET_FILE_EXT = '.txt'; -TRACKING_KEYWORD = '#p2'; +DEFAULT_SIO_PORT = 8000; +/* Overriden par the "-p" parameter, e.g. node tweetcast.js -p 8080 */ +SQLITE_FILE_DIR = './'; +SQLITE_FILE_START = 'tweets-'; +SQLITE_FILE_EXT = '.sqlite'; +DEFAULT_TRACKING_KEYWORD = 'Bieber'; +/* Overriden par the "-T" parameter, e.g. node tweetcast.js -T "Bieber" */ +TWITTER_USER = 'materiauxnum'; +TWITTER_PASS = 'm473r14ux7w337'; -var fs = require('fs'), - https = require('https'), - io = require('socket.io') - .listen(8000), - tweets = [], - arcs = [], - tweet_ids = [], - date_struct = [], - date_levels = [ - 3600 * 1000, - 15 * 60 * 1000, - 5 * 60 * 1000, - 60 * 1000 - ], - annkw = { - 'positive' : '++', - 'negative' : '--', - 'reference' : '==', - 'question' : '??' - }; +/* FUNCTIONS */ + +function createTables() { -function populateDateStruct(level, start) { - if (typeof start == "object") { - start = start.valueOf(); - } - var end = start + date_levels[level], - struct = { - "level" : level, - "start" : new Date(start), - "end" : new Date(end) - }; - if (level < date_levels.length - 1) { - struct.slices = []; - var newstart = start; - while (newstart < end) { - struct.slices.push(populateDateStruct(level + 1, newstart)); - newstart += date_levels[level + 1]; - } - } else { - struct.tweets = []; - struct.annotations = {}; - } - return struct; + var requete = "CREATE TABLE IF NOT EXISTS tweets ( pos INTEGER PRIMARY KEY, tweet_id TEXT UNIQUE, created_at INTEGER, json TEXT" + annotations.map(function(a) { return ', a_' + a + ' INTEGER' }).join("") + " )"; + db.execute(requete, function(err) { + if (err) throw err; + db.execute("CREATE INDEX IF NOT EXISTS idx_created_at ON tweets ( created_at )", function(err) { if (err) throw err; }); + getSendLastPos(); + }); + + db.execute("CREATE TABLE IF NOT EXISTS tweet_refs ( id INTEGER PRIMARY KEY, from_id TEXT, to_id TEXT, ref_type TEXT )", function(err) { if (err) throw err; }); } -function insertIntoDateStruct(slices, tweet) { - var creadate = new Date(tweet.created_at); - for (var i in slices) { - if (creadate < slices[i].end) { - if (slices[i].slices) { - insertIntoDateStruct(slices[i].slices, tweet); - } else { - slices[i].tweets.push(tweet.pos); - if (tweet.annotations.length) { - var ann = tweet.annotations[0]; - if (slices[i].annotations[ann]) { - slices[i].annotations[ann].push(tweet.pos); - } else { - slices[i].annotations[ann] = [ tweet.pos ]; - } - } - } - break; - } - } +function commitReference(from_id, to_id, ref_type) { + db.execute( + "INSERT INTO tweet_refs ( from_id, to_id, ref_type ) VALUES ( ?, ?, ? )", + [ from_id, to_id, ref_type ], + function(err) { if (err) throw err; } + ); } -function getSliceContent(slice) { - if (slice.slices) { - var twz = [], - annotations = {}; - for (var i in slice.slices) { - var data = getSliceContent(slice.slices[i]); - twz = twz.concat(data.tweets); - for (var j in data.annotations) { - if (annotations[j]) { - annotations[j] = annotations[j].concat(data.annotations[j]); - } else { - annotations[j] = data.annotations[j]; - } - } +function commitTweet(data) { + + var tweet = JSON.parse(data), + ann = []; + + textids(tweet); + for (var j in keys_to_delete) { + delete tweet[keys_to_delete[j]]; + } + textids(tweet.user); + for (var j in user_keys_to_delete) { + delete tweet.user[user_keys_to_delete[j]]; + } + if (tweet.retweeted_status) { + textids(tweet.retweeted_status); + for (var j in keys_to_delete) { + delete tweet.retweeted_status[keys_to_delete[j]]; } - } else { - twz = slice.tweets; - annotations = slice.annotations; } - return { "tweets" : twz, "annotations" : annotations } -}; - -function flattenDateStruct(slices, target_level) { - var current_level = slices[0].level, - result = []; - if (current_level < target_level) { - if (slices[0].slices) { - for (var i in slices) { - result = result.concat(flattenDateStruct(slices[i].slices, target_level)); + for (var i in annotations_keywords) { + for (var j in annotations_keywords[i]) { + if (tweet.text.indexOf(annotations_keywords[i][j]) != -1) { + ann.push(annotations[i]); + break; } } } - else { - for (var i in slices) { - var data = getSliceContent(slices[i]); - result.push({ - "start" : slices[i].start, - "end" : slices[i].end, - "tweets" : data.tweets, - "annotations" : data.annotations - }); + tweet.annotations = ann; + tweet.created_at = new Date(tweet.created_at); + + if (tweet.in_reply_to_status_id) { + commitReference( tweet.id, tweet.in_reply_to_status_id, "reply" ); + } + if (tweet.retweeted_status) { + commitReference( tweet.id, tweet.retweeted_status.id, "retweet" ); + } + db.execute( + "INSERT INTO tweets ( tweet_id, created_at, json " + + annotations.map(function(a) { return ', a_' + a }).join("") + + " ) VALUES ( ?, ?, ? " + + annotations.map(function(a) { return ', ?' }).join("") + + " )", + [ tweet.id, tweet.created_at.valueOf(), JSON.stringify(tweet) ].concat(annotations.map(function(a) { return ann.indexOf(a) == -1 ? 0 : 1 })), + function(err) { + if (err) throw err; + getSendLastPos(); + } + ); +} + +function callBackNewTweets(chunk) { + var newdata = chunk.split('\r\n'); + for (var i in newdata) { + if (newdata[i].length > 0) { + commitTweet(newdata[i]); } } - return result; + console.log("New tweets received"); } -function trimFDS(slices) { - while (slices[0].tweets.length == 0) { - slices.splice(0,1); - } - while (slices[slices.length - 1].tweets.length == 0) { - slices.pop(); - } - return slices; +function getSendLastPos() { + db.execute("SELECT MAX(pos) lastpos FROM tweets", function (err, results) { + if (err) throw err; + lastpos = results[0].lastpos ? results[0].lastpos : 0; + console.log("Broadcasting last pos = ",lastpos); + io.sockets.emit('tweetSummary', { + tweetcount : lastpos + }); + }); } -function addToList(tweet) { - if (tweet_ids.indexOf(tweet.id) != -1) { - console.log("Error: Tweet already in list"); - return; - } - tweet.pos = tweets.length; - tweets.push(tweet); - tweet_ids.push(tweet.id); - var creadate = new Date(tweet.created_at); - if (!date_struct.length) { - date_struct = [ populateDateStruct(0, date_levels[0] * parseInt(creadate / date_levels[0])) ] - } - while (creadate > date_struct[date_struct.length - 1].end) { - date_struct.push( populateDateStruct(0, date_struct[date_struct.length - 1].end.valueOf()) ); - } - insertIntoDateStruct(date_struct, tweet); - if (tweet.in_reply_to_status_id) { - var ref = tweet_ids.indexOf(tweet.in_reply_to_status_id) - if (ref != -1) { - arcs.push({ - "from" : tweet.pos, - "to" : ref, - "type" : "reply" - }); + +function getSendTweets(posList, socket) { + console.log("request for tweets ("+posList.join(',')+") from "+socket.id); + db.execute("SELECT * FROM tweets WHERE pos IN ( " + posList.join(',') + " )", function (err, results) { + if (err) throw err; + socket.emit('tweets', + results.map(function(line) { + var tw = JSON.parse(line.json); + tw.pos = line.pos; + return tw; + }) + ); + }); +} + +function getSendTimeline(level, socket) { + console.log("request for timeline ("+level+") from "+socket.id); + var lvl = date_levels[level], + requete = "SELECT COUNT(*) AS nb, " + + lvl + + "*ROUND(created_at/" + + lvl + + ") AS tranche" + + annotations.map(function (a) { return " , SUM(a_" + a + ") AS s_" + a }).join("") + + " FROM tweets GROUP BY tranche ORDER BY tranche DESC LIMIT 0,50"; + db.execute(requete, function (err, results) { + if (err) throw err; + var tbl = [], + lastend = parseInt(results[results.length - 1].tranche); + for (var i = results.length - 1; i >= 0; i--) { + var start = parseInt(results[i].tranche); + while (start > lastend) { + var struct = { "start": lastend, "tweets" : 0, "annotations" : {} }; + lastend += lvl; + struct.end = lastend; + tbl.push(struct); + } + lastend += lvl; + var struct = { + "start" : start, + "end" : lastend, + "tweets" : results[i].nb, + "annotations" : {} + } + for (var j in annotations) { + struct.annotations[annotations[j]] = results[i]['s_' + annotations[j]]; + } + tbl.push(struct); } - } - if (tweet.retweeted_status) { - var ref = tweet_ids.indexOf(tweet.retweeted_status.id_str) - if (ref != -1) { - arcs.push({ - "from" : tweet.pos, - "to" : ref, - "type" : "retweet" - }); - } - } + socket.emit('timeline', tbl); + }); } function textids(object) { @@ -175,91 +166,129 @@ } } -function readTweetsFromFile() { - var dir = fs.readdirSync(TWEET_FILE_DIR), - fileName = null; - for (var i in dir) { - if ( dir[i].indexOf( TWEET_FILE_START + encodeURIComponent(TRACKING_KEYWORD) ) == 0 ) { - var oldtweets = fs.readFileSync(dir[i], 'utf8').split('\n'), - tweetscopied = 0; - for (var j in oldtweets) { - if (oldtweets[j].length > 1) { - addToList(JSON.parse(oldtweets[j])); - tweetscopied++; - } - } - console.log(tweetscopied, "tweets copied from", dir[i]); - console.log(arcs); - } - } -} +/* Initialization */ -function callBackNewTweets(chunk) { - var newdata = chunk.split('\r\n'); - for (var i in newdata) { - if (newdata[i].length > 0) { - var tweet = JSON.parse(newdata[i]), - annotations = []; - for (var a in annkw) { - if (tweet.text.indexOf(annkw[a]) != -1) { - annotations.push(a); - } - } - tweet.annotations = annotations; - textids(tweet); - textids(tweet.user); - - addToList(tweet); - var txt = JSON.stringify(tweet)+'\n'; - writestream.write(txt); - } - } - console.log("New tweets received. We now have", tweets.length, "tweets in memory"); - io.sockets.emit('tweetSummary', { tweetcount : tweets.length }); -} +var fs = require('fs'), + https = require('https'), + sqlite = require('sqlite'), + socketio = require('socket.io'), + tweets = [], + lastpos = 0, + arcs = [], + tweet_ids = [], + date_struct = [], + date_levels = [ + 3600 * 1000, + 15 * 60 * 1000, + 5 * 60 * 1000, + 60 * 1000, + 15 * 1000 + ], + annotations = [ 'positive', 'negative', 'reference', 'question' ], + annotations_keywords = [ [ '++' ], [ '--' ], [ '==' ], [ '??' ] ], + annkw = { + 'positive' : '++', + 'negative' : '--', + 'reference' : '==', + 'question' : '??' + }, + keys_to_delete = [ + 'in_reply_to_screen_name', + 'in_reply_to_user_id', + 'retweeted', + 'place', + 'geo', + 'source', + 'contributors', + 'coordinates', + 'retweet_count', + 'favorited', + 'truncated', + 'possibly_sensitive' + ], + user_keys_to_delete = [ + 'default_profile_image', + 'show_all_inline_media', + 'contributors_enabled', + 'profile_sidebar_fill_color', + 'created_at', + 'lang', + 'time_zone', + 'profile_sidebar_border_color', + 'follow_request_sent', + 'profile_background_image_url', + 'profile_background_image_url_https', + 'followers_count', + 'description', + 'url', + 'geo_enabled', + 'profile_use_background_image', + 'default_profile', + 'following', + 'profile_text_color', + 'is_translator', + 'favourites_count', + 'listed_count', + 'friends_count', + 'profile_link_color', + 'protected', + 'location', + 'notifications', + 'profile_image_url_https', + 'statuses_count', + 'verified', + 'profile_background_color', + 'profile_background_tile', + 'utc_offset' + ], + port_flag = process.argv.indexOf("-p"), + sio_port = ( port_flag != -1 && port_flag < process.argv.length - 1 && parseInt(process.argv[port_flag + 1]) ? parseInt(process.argv[port_flag + 1]) : DEFAULT_SIO_PORT ) + io = socketio.listen(sio_port), + track_flag = process.argv.indexOf("-T"), + tracking_keyword = ( track_flag != -1 && track_flag < process.argv.length - 1 ? process.argv[track_flag + 1] : DEFAULT_TRACKING_KEYWORD ), + sqlfile = SQLITE_FILE_DIR + SQLITE_FILE_START + encodeURIComponent(tracking_keyword) + SQLITE_FILE_EXT, + db = new sqlite.Database(); /* MAIN CODE */ -if (READ_OLD_TWEETS) { - readTweetsFromFile(); -} +console.log("Listening on port: "+sio_port); +console.log("Opening SQLITE file: "+sqlfile); +db.open(sqlfile , function(err) { + if (err) throw err; + createTables(); +}); if (RECORD_NEW_TWEETS) { - var now = new Date(), - fileTimestamp = (1900+now.getYear())+"-"+("0"+(1+now.getMonth())).slice(-2)+"-"+("0"+now.getDate()).slice(-2)+"."+("0"+now.getHours()).slice(-2)+"-"+("0"+now.getMinutes()).slice(-2), - fileName = TWEET_FILE_DIR + TWEET_FILE_START + encodeURIComponent(TRACKING_KEYWORD) + '-' + fileTimestamp + TWEET_FILE_EXT, - writestream = fs.createWriteStream(fileName, { flags: 'w', encoding: 'utf-8' }); - console.log('Writing to',fileName); + console.log("Requesting Twitter to track keyword(s): "+tracking_keyword); var req = https.request({ host: "stream.twitter.com", path: "/1/statuses/filter.json", method: "POST", headers: { - 'Authorization': 'Basic cmFwaHY6N3czMzdMZkMyM2dF', + 'Authorization': 'Basic ' + new Buffer( TWITTER_USER + ":" + TWITTER_PASS ).toString('base64'), 'Content-Type': 'application/x-www-form-urlencoded' } }, function(res) { - console.log('STATUS: ' + res.statusCode); - console.log('HEADERS: ' + JSON.stringify(res.headers)); + console.log('Reply from stream.twitter.com: ' + res.statusCode); + console.log('Headers: ' + JSON.stringify(res.headers)); res.setEncoding('utf8'); res.on('data', callBackNewTweets); }); - req.write('track=' + encodeURIComponent(TRACKING_KEYWORD)); + req.write('track=' + encodeURIComponent(tracking_keyword)); req.end(); } io.set('log level', 0); io.sockets.on('connection', function(socket) { - console.log("New connection from", socket.handshake.address.address,"id :", socket.id); + console.log("New connection from" + socket.handshake.address.address + " with id=" + socket.id); socket.emit('tweetSummary', { tweetcount : tweets.length }); - socket.on('getTweets', function(data) { - console.log('getTweets from',socket.id); - data.tweets = tweets.slice(Math.max(0, data.from), data.to); - socket.emit('tweets', data); + socket.on('updateTweets', function(data) { + if (data.tweets.length) { + getSendTweets(data.tweets, socket); + } }); - socket.on('getTimeline', function(data) { - socket.emit('timeline', { "timeline" : trimFDS(flattenDateStruct(date_struct, data.level)), "arcs" : arcs }); - console.log('getTimeline from',socket.id); + socket.on('updateTimeline', function(data) { + getSendTimeline(data.level, socket); }); }); \ No newline at end of file