--- a/tweetcast/nodejs/server/node-direct.js Mon Oct 17 17:40:58 2011 +0200
+++ b/tweetcast/nodejs/server/node-direct.js Tue Oct 18 16:19:08 2011 +0200
@@ -1,13 +1,16 @@
READ_OLD_TWEETS = true;
RECORD_NEW_TWEETS = true;
-TWEET_FILE_PATH = 'tweets.txt';
-TRACKING_KEYWORD = 'Bieber';
+TWEET_FILE_DIR = './';
+TWEET_FILE_START = 'tweets-';
+TWEET_FILE_EXT = '.txt';
+TRACKING_KEYWORD = '#p2';
var fs = require('fs'),
https = require('https'),
io = require('socket.io')
.listen(8000),
tweets = [],
+ arcs = [],
tweet_ids = [],
date_struct = [],
date_levels = [
@@ -21,9 +24,12 @@
'negative' : '--',
'reference' : '==',
'question' : '??'
- }
+ };
function populateDateStruct(level, start) {
+ if (typeof start == "object") {
+ start = start.valueOf();
+ }
var end = start + date_levels[level],
struct = {
"level" : level,
@@ -39,7 +45,7 @@
}
} else {
struct.tweets = [];
- struct.annotations = [];
+ struct.annotations = {};
}
return struct;
}
@@ -52,13 +58,13 @@
insertIntoDateStruct(slices[i].slices, tweet);
} else {
slices[i].tweets.push(tweet.pos);
- for (var j in tweet.annotations) {
- if (slices[i].annotations[j]) {
- slices[i].annotations[j]++;
+ if (tweet.annotations.length) {
+ var ann = tweet.annotations[0];
+ if (slices[i].annotations[ann]) {
+ slices[i].annotations[ann].push(tweet.pos);
} else {
- slices[i].annotations[j] = 1;
+ slices[i].annotations[ann] = [ tweet.pos ];
}
- break;
}
}
break;
@@ -66,6 +72,62 @@
}
}
+function getSliceContent(slice) {
+ if (slice.slices) {
+ var twz = [],
+ annotations = {};
+ for (var i in slice.slices) {
+ var data = getSliceContent(slice.slices[i]);
+ twz = twz.concat(data.tweets);
+ for (var j in data.annotations) {
+ if (annotations[j]) {
+ annotations[j] = annotations[j].concat(data.annotations[j]);
+ } else {
+ annotations[j] = data.annotations[j];
+ }
+ }
+ }
+ } else {
+ twz = slice.tweets;
+ annotations = slice.annotations;
+ }
+ return { "tweets" : twz, "annotations" : annotations }
+};
+
+function flattenDateStruct(slices, target_level) {
+ var current_level = slices[0].level,
+ result = [];
+ if (current_level < target_level) {
+ if (slices[0].slices) {
+ for (var i in slices) {
+ result = result.concat(flattenDateStruct(slices[i].slices, target_level));
+ }
+ }
+ }
+ else {
+ for (var i in slices) {
+ var data = getSliceContent(slices[i]);
+ result.push({
+ "start" : slices[i].start,
+ "end" : slices[i].end,
+ "tweets" : data.tweets,
+ "annotations" : data.annotations
+ });
+ }
+ }
+ return result;
+}
+
+function trimFDS(slices) {
+ while (slices[0].tweets.length == 0) {
+ slices.splice(0,1);
+ }
+ while (slices[slices.length - 1].tweets.length == 0) {
+ slices.pop();
+ }
+ return slices;
+}
+
function addToList(tweet) {
if (tweet_ids.indexOf(tweet.id) != -1) {
console.log("Error: Tweet already in list");
@@ -79,9 +141,29 @@
date_struct = [ populateDateStruct(0, date_levels[0] * parseInt(creadate / date_levels[0])) ]
}
while (creadate > date_struct[date_struct.length - 1].end) {
- date_struct.push( populateDateStruct(0, date_struct[date_struct.length - 1].end) );
+ date_struct.push( populateDateStruct(0, date_struct[date_struct.length - 1].end.valueOf()) );
}
insertIntoDateStruct(date_struct, tweet);
+ if (tweet.in_reply_to_status_id) {
+ var ref = tweet_ids.indexOf(tweet.in_reply_to_status_id)
+ if (ref != -1) {
+ arcs.push({
+ "from" : tweet.pos,
+ "to" : ref,
+ "type" : "reply"
+ });
+ }
+ }
+ if (tweet.retweeted_status) {
+ var ref = tweet_ids.indexOf(tweet.retweeted_status.id_str)
+ if (ref != -1) {
+ arcs.push({
+ "from" : tweet.pos,
+ "to" : ref,
+ "type" : "retweet"
+ });
+ }
+ }
}
function textids(object) {
@@ -93,33 +175,61 @@
}
}
-if (READ_OLD_TWEETS) {
- try {
- var filebuff = "";
- readstream = fs.createReadStream(TWEET_FILE_PATH, { flags: 'r', encoding: 'utf-8' });
- readstream.on("data", function(data) {
- console.log("data");
- filebuff += data;
- });
- readstream.on("end", function() {
- console.log("end");
- oldtweets = filebuff.split('\r\n');
- var tweetscopied = 0;
- for (var i in oldtweets) {
- if (oldtweets[i].length > 0) {
- addToList(JSON.parse(oldtweets[i]));
+function readTweetsFromFile() {
+ var dir = fs.readdirSync(TWEET_FILE_DIR),
+ fileName = null;
+ for (var i in dir) {
+ if ( dir[i].indexOf( TWEET_FILE_START + encodeURIComponent(TRACKING_KEYWORD) ) == 0 ) {
+ var oldtweets = fs.readFileSync(dir[i], 'utf8').split('\n'),
+ tweetscopied = 0;
+ for (var j in oldtweets) {
+ if (oldtweets[j].length > 1) {
+ addToList(JSON.parse(oldtweets[j]));
tweetscopied++;
}
}
- console.log(tweetscopied, "tweets copied from", TWEET_FILE_PATH);
- });
- } catch(err) {
- console.log(err);
+ console.log(tweetscopied, "tweets copied from", dir[i]);
+ console.log(arcs);
+ }
}
}
+function callBackNewTweets(chunk) {
+ var newdata = chunk.split('\r\n');
+ for (var i in newdata) {
+ if (newdata[i].length > 0) {
+ var tweet = JSON.parse(newdata[i]),
+ annotations = [];
+ for (var a in annkw) {
+ if (tweet.text.indexOf(annkw[a]) != -1) {
+ annotations.push(a);
+ }
+ }
+ tweet.annotations = annotations;
+ textids(tweet);
+ textids(tweet.user);
+
+ addToList(tweet);
+ var txt = JSON.stringify(tweet)+'\n';
+ writestream.write(txt);
+ }
+ }
+ console.log("New tweets received. We now have", tweets.length, "tweets in memory");
+ io.sockets.emit('tweetSummary', { tweetcount : tweets.length });
+}
+
+/* MAIN CODE */
+
+if (READ_OLD_TWEETS) {
+ readTweetsFromFile();
+}
+
if (RECORD_NEW_TWEETS) {
- var writestream = null;
+ var now = new Date(),
+ fileTimestamp = (1900+now.getYear())+"-"+("0"+(1+now.getMonth())).slice(-2)+"-"+("0"+now.getDate()).slice(-2)+"."+("0"+now.getHours()).slice(-2)+"-"+("0"+now.getMinutes()).slice(-2),
+ fileName = TWEET_FILE_DIR + TWEET_FILE_START + encodeURIComponent(TRACKING_KEYWORD) + '-' + fileTimestamp + TWEET_FILE_EXT,
+ writestream = fs.createWriteStream(fileName, { flags: 'w', encoding: 'utf-8' });
+ console.log('Writing to',fileName);
var req = https.request({
host: "stream.twitter.com",
path: "/1/statuses/filter.json",
@@ -132,34 +242,7 @@
console.log('STATUS: ' + res.statusCode);
console.log('HEADERS: ' + JSON.stringify(res.headers));
res.setEncoding('utf8');
- res.on('data', function(chunk) {
- var newdata = chunk.split('\r\n');
- try {
- for (var i in newdata) {
- if (newdata[i].length > 0) {
- var tweet = JSON.parse(newdata[i]),
- annotations = [];
- for (var a in annkw) {
- if (tweet.text.indexOf(annkw[a]) != -1) {
- annotations.push(a);
- }
- }
- tweet.annotations = annotations;
- textids(tweet);
- addToList(tweet);
- }
- }
- if (!writestream) {
- writestream = fs.createWriteStream(TWEET_FILE_PATH, { flags: 'a', encoding: 'utf-8' });
- }
- writestream.write(chunk);
- io.sockets.emit('tweetSummary', { tweetcount : tweets.length });
- console.log("New tweets received. We now have", tweets.length, "tweets in memory");
- }
- catch(err) {
- console.log(err);
- }
- });
+ res.on('data', callBackNewTweets);
});
req.write('track=' + encodeURIComponent(TRACKING_KEYWORD));
@@ -168,9 +251,15 @@
io.set('log level', 0);
io.sockets.on('connection', function(socket) {
- console.log("New connection", socket);
+ console.log("New connection from", socket.handshake.address.address,"id :", socket.id);
socket.emit('tweetSummary', { tweetcount : tweets.length });
socket.on('getTweets', function(data) {
- socket.emit('tweets', tweets.slice(Math.max(0, data.from), data.to));
+ console.log('getTweets from',socket.id);
+ data.tweets = tweets.slice(Math.max(0, data.from), data.to);
+ socket.emit('tweets', data);
+ });
+ socket.on('getTimeline', function(data) {
+ socket.emit('timeline', { "timeline" : trimFDS(flattenDateStruct(date_struct, data.level)), "arcs" : arcs });
+ console.log('getTimeline from',socket.id);
});
});
\ No newline at end of file