diff -r 7d9c576bfaac -r c28048fb63b4 tweetcast/nodejs/server/node-direct.js --- a/tweetcast/nodejs/server/node-direct.js Mon Oct 17 17:40:58 2011 +0200 +++ b/tweetcast/nodejs/server/node-direct.js Tue Oct 18 16:19:08 2011 +0200 @@ -1,13 +1,16 @@ READ_OLD_TWEETS = true; RECORD_NEW_TWEETS = true; -TWEET_FILE_PATH = 'tweets.txt'; -TRACKING_KEYWORD = 'Bieber'; +TWEET_FILE_DIR = './'; +TWEET_FILE_START = 'tweets-'; +TWEET_FILE_EXT = '.txt'; +TRACKING_KEYWORD = '#p2'; var fs = require('fs'), https = require('https'), io = require('socket.io') .listen(8000), tweets = [], + arcs = [], tweet_ids = [], date_struct = [], date_levels = [ @@ -21,9 +24,12 @@ 'negative' : '--', 'reference' : '==', 'question' : '??' - } + }; function populateDateStruct(level, start) { + if (typeof start == "object") { + start = start.valueOf(); + } var end = start + date_levels[level], struct = { "level" : level, @@ -39,7 +45,7 @@ } } else { struct.tweets = []; - struct.annotations = []; + struct.annotations = {}; } return struct; } @@ -52,13 +58,13 @@ insertIntoDateStruct(slices[i].slices, tweet); } else { slices[i].tweets.push(tweet.pos); - for (var j in tweet.annotations) { - if (slices[i].annotations[j]) { - slices[i].annotations[j]++; + if (tweet.annotations.length) { + var ann = tweet.annotations[0]; + if (slices[i].annotations[ann]) { + slices[i].annotations[ann].push(tweet.pos); } else { - slices[i].annotations[j] = 1; + slices[i].annotations[ann] = [ tweet.pos ]; } - break; } } break; @@ -66,6 +72,62 @@ } } +function getSliceContent(slice) { + if (slice.slices) { + var twz = [], + annotations = {}; + for (var i in slice.slices) { + var data = getSliceContent(slice.slices[i]); + twz = twz.concat(data.tweets); + for (var j in data.annotations) { + if (annotations[j]) { + annotations[j] = annotations[j].concat(data.annotations[j]); + } else { + annotations[j] = data.annotations[j]; + } + } + } + } else { + twz = slice.tweets; + annotations = slice.annotations; + } + return { "tweets" : twz, "annotations" : annotations } +}; + +function flattenDateStruct(slices, target_level) { + var current_level = slices[0].level, + result = []; + if (current_level < target_level) { + if (slices[0].slices) { + for (var i in slices) { + result = result.concat(flattenDateStruct(slices[i].slices, target_level)); + } + } + } + else { + for (var i in slices) { + var data = getSliceContent(slices[i]); + result.push({ + "start" : slices[i].start, + "end" : slices[i].end, + "tweets" : data.tweets, + "annotations" : data.annotations + }); + } + } + return result; +} + +function trimFDS(slices) { + while (slices[0].tweets.length == 0) { + slices.splice(0,1); + } + while (slices[slices.length - 1].tweets.length == 0) { + slices.pop(); + } + return slices; +} + function addToList(tweet) { if (tweet_ids.indexOf(tweet.id) != -1) { console.log("Error: Tweet already in list"); @@ -79,9 +141,29 @@ date_struct = [ populateDateStruct(0, date_levels[0] * parseInt(creadate / date_levels[0])) ] } while (creadate > date_struct[date_struct.length - 1].end) { - date_struct.push( populateDateStruct(0, date_struct[date_struct.length - 1].end) ); + date_struct.push( populateDateStruct(0, date_struct[date_struct.length - 1].end.valueOf()) ); } insertIntoDateStruct(date_struct, tweet); + if (tweet.in_reply_to_status_id) { + var ref = tweet_ids.indexOf(tweet.in_reply_to_status_id) + if (ref != -1) { + arcs.push({ + "from" : tweet.pos, + "to" : ref, + "type" : "reply" + }); + } + } + if (tweet.retweeted_status) { + var ref = tweet_ids.indexOf(tweet.retweeted_status.id_str) + if (ref != -1) { + arcs.push({ + "from" : tweet.pos, + "to" : ref, + "type" : "retweet" + }); + } + } } function textids(object) { @@ -93,33 +175,61 @@ } } -if (READ_OLD_TWEETS) { - try { - var filebuff = ""; - readstream = fs.createReadStream(TWEET_FILE_PATH, { flags: 'r', encoding: 'utf-8' }); - readstream.on("data", function(data) { - console.log("data"); - filebuff += data; - }); - readstream.on("end", function() { - console.log("end"); - oldtweets = filebuff.split('\r\n'); - var tweetscopied = 0; - for (var i in oldtweets) { - if (oldtweets[i].length > 0) { - addToList(JSON.parse(oldtweets[i])); +function readTweetsFromFile() { + var dir = fs.readdirSync(TWEET_FILE_DIR), + fileName = null; + for (var i in dir) { + if ( dir[i].indexOf( TWEET_FILE_START + encodeURIComponent(TRACKING_KEYWORD) ) == 0 ) { + var oldtweets = fs.readFileSync(dir[i], 'utf8').split('\n'), + tweetscopied = 0; + for (var j in oldtweets) { + if (oldtweets[j].length > 1) { + addToList(JSON.parse(oldtweets[j])); tweetscopied++; } } - console.log(tweetscopied, "tweets copied from", TWEET_FILE_PATH); - }); - } catch(err) { - console.log(err); + console.log(tweetscopied, "tweets copied from", dir[i]); + console.log(arcs); + } } } +function callBackNewTweets(chunk) { + var newdata = chunk.split('\r\n'); + for (var i in newdata) { + if (newdata[i].length > 0) { + var tweet = JSON.parse(newdata[i]), + annotations = []; + for (var a in annkw) { + if (tweet.text.indexOf(annkw[a]) != -1) { + annotations.push(a); + } + } + tweet.annotations = annotations; + textids(tweet); + textids(tweet.user); + + addToList(tweet); + var txt = JSON.stringify(tweet)+'\n'; + writestream.write(txt); + } + } + console.log("New tweets received. We now have", tweets.length, "tweets in memory"); + io.sockets.emit('tweetSummary', { tweetcount : tweets.length }); +} + +/* MAIN CODE */ + +if (READ_OLD_TWEETS) { + readTweetsFromFile(); +} + if (RECORD_NEW_TWEETS) { - var writestream = null; + var now = new Date(), + fileTimestamp = (1900+now.getYear())+"-"+("0"+(1+now.getMonth())).slice(-2)+"-"+("0"+now.getDate()).slice(-2)+"."+("0"+now.getHours()).slice(-2)+"-"+("0"+now.getMinutes()).slice(-2), + fileName = TWEET_FILE_DIR + TWEET_FILE_START + encodeURIComponent(TRACKING_KEYWORD) + '-' + fileTimestamp + TWEET_FILE_EXT, + writestream = fs.createWriteStream(fileName, { flags: 'w', encoding: 'utf-8' }); + console.log('Writing to',fileName); var req = https.request({ host: "stream.twitter.com", path: "/1/statuses/filter.json", @@ -132,34 +242,7 @@ console.log('STATUS: ' + res.statusCode); console.log('HEADERS: ' + JSON.stringify(res.headers)); res.setEncoding('utf8'); - res.on('data', function(chunk) { - var newdata = chunk.split('\r\n'); - try { - for (var i in newdata) { - if (newdata[i].length > 0) { - var tweet = JSON.parse(newdata[i]), - annotations = []; - for (var a in annkw) { - if (tweet.text.indexOf(annkw[a]) != -1) { - annotations.push(a); - } - } - tweet.annotations = annotations; - textids(tweet); - addToList(tweet); - } - } - if (!writestream) { - writestream = fs.createWriteStream(TWEET_FILE_PATH, { flags: 'a', encoding: 'utf-8' }); - } - writestream.write(chunk); - io.sockets.emit('tweetSummary', { tweetcount : tweets.length }); - console.log("New tweets received. We now have", tweets.length, "tweets in memory"); - } - catch(err) { - console.log(err); - } - }); + res.on('data', callBackNewTweets); }); req.write('track=' + encodeURIComponent(TRACKING_KEYWORD)); @@ -168,9 +251,15 @@ io.set('log level', 0); io.sockets.on('connection', function(socket) { - console.log("New connection", socket); + console.log("New connection from", socket.handshake.address.address,"id :", socket.id); socket.emit('tweetSummary', { tweetcount : tweets.length }); socket.on('getTweets', function(data) { - socket.emit('tweets', tweets.slice(Math.max(0, data.from), data.to)); + console.log('getTweets from',socket.id); + data.tweets = tweets.slice(Math.max(0, data.from), data.to); + socket.emit('tweets', data); + }); + socket.on('getTimeline', function(data) { + socket.emit('timeline', { "timeline" : trimFDS(flattenDateStruct(date_struct, data.level)), "arcs" : arcs }); + console.log('getTimeline from',socket.id); }); }); \ No newline at end of file