diff -r 77ccba188e08 -r 7d9c576bfaac tweetcast/nodejs/server/node-direct.js --- a/tweetcast/nodejs/server/node-direct.js Fri Oct 14 17:36:34 2011 +0200 +++ b/tweetcast/nodejs/server/node-direct.js Mon Oct 17 17:40:58 2011 +0200 @@ -1,10 +1,21 @@ +READ_OLD_TWEETS = true; +RECORD_NEW_TWEETS = true; +TWEET_FILE_PATH = 'tweets.txt'; +TRACKING_KEYWORD = 'Bieber'; + var fs = require('fs'), https = require('https'), io = require('socket.io') - .listen(8000) - keyword = "Bieber", + .listen(8000), tweets = [], tweet_ids = [], + date_struct = [], + date_levels = [ + 3600 * 1000, + 15 * 60 * 1000, + 5 * 60 * 1000, + 60 * 1000 + ], annkw = { 'positive' : '++', 'negative' : '--', @@ -12,6 +23,67 @@ 'question' : '??' } +function populateDateStruct(level, start) { + var end = start + date_levels[level], + struct = { + "level" : level, + "start" : new Date(start), + "end" : new Date(end) + }; + if (level < date_levels.length - 1) { + struct.slices = []; + var newstart = start; + while (newstart < end) { + struct.slices.push(populateDateStruct(level + 1, newstart)); + newstart += date_levels[level + 1]; + } + } else { + struct.tweets = []; + struct.annotations = []; + } + return struct; +} + +function insertIntoDateStruct(slices, tweet) { + var creadate = new Date(tweet.created_at); + for (var i in slices) { + if (creadate < slices[i].end) { + if (slices[i].slices) { + insertIntoDateStruct(slices[i].slices, tweet); + } else { + slices[i].tweets.push(tweet.pos); + for (var j in tweet.annotations) { + if (slices[i].annotations[j]) { + slices[i].annotations[j]++; + } else { + slices[i].annotations[j] = 1; + } + break; + } + } + break; + } + } +} + +function addToList(tweet) { + if (tweet_ids.indexOf(tweet.id) != -1) { + console.log("Error: Tweet already in list"); + return; + } + tweet.pos = tweets.length; + tweets.push(tweet); + tweet_ids.push(tweet.id); + var creadate = new Date(tweet.created_at); + if (!date_struct.length) { + date_struct = [ populateDateStruct(0, date_levels[0] * parseInt(creadate / date_levels[0])) ] + } + while (creadate > date_struct[date_struct.length - 1].end) { + date_struct.push( populateDateStruct(0, date_struct[date_struct.length - 1].end) ); + } + insertIntoDateStruct(date_struct, tweet); +} + function textids(object) { for (var key in object) { if (key.substr(-2) == 'id') { @@ -21,54 +93,84 @@ } } -var fd = fs.createWriteStream('tweets.txt'); - -req = https.request({ - host: "stream.twitter.com", - path: "/1/statuses/filter.json", - method: "POST", - headers: { - 'Authorization': 'Basic cmFwaHY6N3czMzdMZkMyM2dF', - 'Content-Type': 'application/x-www-form-urlencoded' - } -}, function(res) { - console.log('STATUS: ' + res.statusCode); - console.log('HEADERS: ' + JSON.stringify(res.headers)); - res.setEncoding('utf8'); - res.on('data', function(chunk) { - newdata = chunk.split('\r\n'); - try { - for (var i in newdata) { - if (newdata[i].length > 0) { - tweet = JSON.parse(newdata[i]); - annotations = []; - for (var a in annkw) { - if (tweet.text.indexOf(annkw[a]) != -1) { - annotations.push(a); - } - } - tweet.annotations = annotations; - tweets.push(tweet); - textids(tweet); - tweet_ids.push(tweet.id_str); - io.sockets.emit('newtweet', tweet); +if (READ_OLD_TWEETS) { + try { + var filebuff = ""; + readstream = fs.createReadStream(TWEET_FILE_PATH, { flags: 'r', encoding: 'utf-8' }); + readstream.on("data", function(data) { + console.log("data"); + filebuff += data; + }); + readstream.on("end", function() { + console.log("end"); + oldtweets = filebuff.split('\r\n'); + var tweetscopied = 0; + for (var i in oldtweets) { + if (oldtweets[i].length > 0) { + addToList(JSON.parse(oldtweets[i])); + tweetscopied++; } } - fd.write(chunk); - } - catch(err) { - console.log(err); + console.log(tweetscopied, "tweets copied from", TWEET_FILE_PATH); + }); + } catch(err) { + console.log(err); + } +} + +if (RECORD_NEW_TWEETS) { + var writestream = null; + var req = https.request({ + host: "stream.twitter.com", + path: "/1/statuses/filter.json", + method: "POST", + headers: { + 'Authorization': 'Basic cmFwaHY6N3czMzdMZkMyM2dF', + 'Content-Type': 'application/x-www-form-urlencoded' } + }, function(res) { + console.log('STATUS: ' + res.statusCode); + console.log('HEADERS: ' + JSON.stringify(res.headers)); + res.setEncoding('utf8'); + res.on('data', function(chunk) { + var newdata = chunk.split('\r\n'); + try { + for (var i in newdata) { + if (newdata[i].length > 0) { + var tweet = JSON.parse(newdata[i]), + annotations = []; + for (var a in annkw) { + if (tweet.text.indexOf(annkw[a]) != -1) { + annotations.push(a); + } + } + tweet.annotations = annotations; + textids(tweet); + addToList(tweet); + } + } + if (!writestream) { + writestream = fs.createWriteStream(TWEET_FILE_PATH, { flags: 'a', encoding: 'utf-8' }); + } + writestream.write(chunk); + io.sockets.emit('tweetSummary', { tweetcount : tweets.length }); + console.log("New tweets received. We now have", tweets.length, "tweets in memory"); + } + catch(err) { + console.log(err); + } + }); }); -}); + + req.write('track=' + encodeURIComponent(TRACKING_KEYWORD)); + req.end(); +} -req.write('track=' + encodeURIComponent(keyword)); -req.end(); io.set('log level', 0); io.sockets.on('connection', function(socket) { - socket.emit('tweets', tweets.slice(-10)); - socket.on('tweetsbefore', function(data) { - tweetpos = tweet_ids.indexOf(data); - socket.emit('oldtweets', tweets.slice(0, tweetpos).slice(-10)); + console.log("New connection", socket); + socket.emit('tweetSummary', { tweetcount : tweets.length }); + socket.on('getTweets', function(data) { + socket.emit('tweets', tweets.slice(Math.max(0, data.from), data.to)); }); }); \ No newline at end of file