READ_OLD_TWEETS = true;
RECORD_NEW_TWEETS = true;
TWEET_FILE_DIR = './';
TWEET_FILE_START = 'tweets-';
TWEET_FILE_EXT = '.txt';
TRACKING_KEYWORD = '#p2';
var fs = require('fs'),
https = require('https'),
io = require('socket.io')
.listen(8000),
tweets = [],
arcs = [],
tweet_ids = [],
date_struct = [],
date_levels = [
3600 * 1000,
15 * 60 * 1000,
5 * 60 * 1000,
60 * 1000
],
annkw = {
'positive' : '++',
'negative' : '--',
'reference' : '==',
'question' : '??'
};
function populateDateStruct(level, start) {
if (typeof start == "object") {
start = start.valueOf();
}
var end = start + date_levels[level],
struct = {
"level" : level,
"start" : new Date(start),
"end" : new Date(end)
};
if (level < date_levels.length - 1) {
struct.slices = [];
var newstart = start;
while (newstart < end) {
struct.slices.push(populateDateStruct(level + 1, newstart));
newstart += date_levels[level + 1];
}
} else {
struct.tweets = [];
struct.annotations = {};
}
return struct;
}
function insertIntoDateStruct(slices, tweet) {
var creadate = new Date(tweet.created_at);
for (var i in slices) {
if (creadate < slices[i].end) {
if (slices[i].slices) {
insertIntoDateStruct(slices[i].slices, tweet);
} else {
slices[i].tweets.push(tweet.pos);
if (tweet.annotations.length) {
var ann = tweet.annotations[0];
if (slices[i].annotations[ann]) {
slices[i].annotations[ann].push(tweet.pos);
} else {
slices[i].annotations[ann] = [ tweet.pos ];
}
}
}
break;
}
}
}
function getSliceContent(slice) {
if (slice.slices) {
var twz = [],
annotations = {};
for (var i in slice.slices) {
var data = getSliceContent(slice.slices[i]);
twz = twz.concat(data.tweets);
for (var j in data.annotations) {
if (annotations[j]) {
annotations[j] = annotations[j].concat(data.annotations[j]);
} else {
annotations[j] = data.annotations[j];
}
}
}
} else {
twz = slice.tweets;
annotations = slice.annotations;
}
return { "tweets" : twz, "annotations" : annotations }
};
function flattenDateStruct(slices, target_level) {
var current_level = slices[0].level,
result = [];
if (current_level < target_level) {
if (slices[0].slices) {
for (var i in slices) {
result = result.concat(flattenDateStruct(slices[i].slices, target_level));
}
}
}
else {
for (var i in slices) {
var data = getSliceContent(slices[i]);
result.push({
"start" : slices[i].start,
"end" : slices[i].end,
"tweets" : data.tweets,
"annotations" : data.annotations
});
}
}
return result;
}
function trimFDS(slices) {
while (slices[0].tweets.length == 0) {
slices.splice(0,1);
}
while (slices[slices.length - 1].tweets.length == 0) {
slices.pop();
}
return slices;
}
function addToList(tweet) {
if (tweet_ids.indexOf(tweet.id) != -1) {
console.log("Error: Tweet already in list");
return;
}
tweet.pos = tweets.length;
tweets.push(tweet);
tweet_ids.push(tweet.id);
var creadate = new Date(tweet.created_at);
if (!date_struct.length) {
date_struct = [ populateDateStruct(0, date_levels[0] * parseInt(creadate / date_levels[0])) ]
}
while (creadate > date_struct[date_struct.length - 1].end) {
date_struct.push( populateDateStruct(0, date_struct[date_struct.length - 1].end.valueOf()) );
}
insertIntoDateStruct(date_struct, tweet);
if (tweet.in_reply_to_status_id) {
var ref = tweet_ids.indexOf(tweet.in_reply_to_status_id)
if (ref != -1) {
arcs.push({
"from" : tweet.pos,
"to" : ref,
"type" : "reply"
});
}
}
if (tweet.retweeted_status) {
var ref = tweet_ids.indexOf(tweet.retweeted_status.id_str)
if (ref != -1) {
arcs.push({
"from" : tweet.pos,
"to" : ref,
"type" : "retweet"
});
}
}
}
function textids(object) {
for (var key in object) {
if (key.substr(-2) == 'id') {
object[key] = object[key + '_str'];
delete object[key + '_str'];
}
}
}
function readTweetsFromFile() {
var dir = fs.readdirSync(TWEET_FILE_DIR),
fileName = null;
for (var i in dir) {
if ( dir[i].indexOf( TWEET_FILE_START + encodeURIComponent(TRACKING_KEYWORD) ) == 0 ) {
var oldtweets = fs.readFileSync(dir[i], 'utf8').split('\n'),
tweetscopied = 0;
for (var j in oldtweets) {
if (oldtweets[j].length > 1) {
addToList(JSON.parse(oldtweets[j]));
tweetscopied++;
}
}
console.log(tweetscopied, "tweets copied from", dir[i]);
console.log(arcs);
}
}
}
function callBackNewTweets(chunk) {
var newdata = chunk.split('\r\n');
for (var i in newdata) {
if (newdata[i].length > 0) {
var tweet = JSON.parse(newdata[i]),
annotations = [];
for (var a in annkw) {
if (tweet.text.indexOf(annkw[a]) != -1) {
annotations.push(a);
}
}
tweet.annotations = annotations;
textids(tweet);
textids(tweet.user);
addToList(tweet);
var txt = JSON.stringify(tweet)+'\n';
writestream.write(txt);
}
}
console.log("New tweets received. We now have", tweets.length, "tweets in memory");
io.sockets.emit('tweetSummary', { tweetcount : tweets.length });
}
/* MAIN CODE */
if (READ_OLD_TWEETS) {
readTweetsFromFile();
}
if (RECORD_NEW_TWEETS) {
var now = new Date(),
fileTimestamp = (1900+now.getYear())+"-"+("0"+(1+now.getMonth())).slice(-2)+"-"+("0"+now.getDate()).slice(-2)+"."+("0"+now.getHours()).slice(-2)+"-"+("0"+now.getMinutes()).slice(-2),
fileName = TWEET_FILE_DIR + TWEET_FILE_START + encodeURIComponent(TRACKING_KEYWORD) + '-' + fileTimestamp + TWEET_FILE_EXT,
writestream = fs.createWriteStream(fileName, { flags: 'w', encoding: 'utf-8' });
console.log('Writing to',fileName);
var req = https.request({
host: "stream.twitter.com",
path: "/1/statuses/filter.json",
method: "POST",
headers: {
'Authorization': 'Basic cmFwaHY6N3czMzdMZkMyM2dF',
'Content-Type': 'application/x-www-form-urlencoded'
}
}, function(res) {
console.log('STATUS: ' + res.statusCode);
console.log('HEADERS: ' + JSON.stringify(res.headers));
res.setEncoding('utf8');
res.on('data', callBackNewTweets);
});
req.write('track=' + encodeURIComponent(TRACKING_KEYWORD));
req.end();
}
io.set('log level', 0);
io.sockets.on('connection', function(socket) {
console.log("New connection from", socket.handshake.address.address,"id :", socket.id);
socket.emit('tweetSummary', { tweetcount : tweets.length });
socket.on('getTweets', function(data) {
console.log('getTweets from',socket.id);
data.tweets = tweets.slice(Math.max(0, data.from), data.to);
socket.emit('tweets', data);
});
socket.on('getTimeline', function(data) {
socket.emit('timeline', { "timeline" : trimFDS(flattenDateStruct(date_struct, data.level)), "arcs" : arcs });
console.log('getTimeline from',socket.id);
});
});