/* CONFIGURATION */
RECORD_NEW_TWEETS = true;
DEFAULT_SIO_PORT = 8000;
/* Overriden par the "-p" parameter, e.g. node tweetcast.js -p 8080 */
SQLITE_FILE_DIR = './';
SQLITE_FILE_START = 'tweets-';
SQLITE_FILE_EXT = '.sqlite';
DEFAULT_TRACKING_KEYWORD = 'Bieber';
/* Overriden par the "-T" parameter, e.g. node tweetcast.js -T "Bieber" */
TWITTER_USER = 'materiauxnum';
TWITTER_PASS = 'm473r14ux7w337';
/* FUNCTIONS */
function createTables() {
var requete = "CREATE TABLE IF NOT EXISTS tweets ( pos INTEGER PRIMARY KEY, tweet_id TEXT UNIQUE, created_at INTEGER, json TEXT" + annotations.map(function(a) { return ', a_' + a + ' INTEGER' }).join("") + " )";
db.execute(requete, function(err) {
if (err) throw err;
db.execute("CREATE INDEX IF NOT EXISTS idx_created_at ON tweets ( created_at )", function(err) { if (err) throw err; });
getSendLastPos();
});
db.execute("CREATE TABLE IF NOT EXISTS tweet_refs ( id INTEGER PRIMARY KEY, from_id TEXT, to_id TEXT, ref_type TEXT )", function(err) { if (err) throw err; });
}
function commitReference(from_id, to_id, ref_type) {
db.execute(
"INSERT INTO tweet_refs ( from_id, to_id, ref_type ) VALUES ( ?, ?, ? )",
[ from_id, to_id, ref_type ],
function(err) { if (err) throw err; }
);
}
function commitTweet(data) {
var tweet = JSON.parse(data),
ann = [];
textids(tweet);
for (var j in keys_to_delete) {
delete tweet[keys_to_delete[j]];
}
textids(tweet.user);
for (var j in user_keys_to_delete) {
delete tweet.user[user_keys_to_delete[j]];
}
if (tweet.retweeted_status) {
textids(tweet.retweeted_status);
for (var j in keys_to_delete) {
delete tweet.retweeted_status[keys_to_delete[j]];
}
}
for (var i in annotations_keywords) {
for (var j in annotations_keywords[i]) {
if (tweet.text.indexOf(annotations_keywords[i][j]) != -1) {
ann.push(annotations[i]);
break;
}
}
}
tweet.annotations = ann;
tweet.created_at = new Date(tweet.created_at);
if (tweet.in_reply_to_status_id) {
commitReference( tweet.id, tweet.in_reply_to_status_id, "reply" );
}
if (tweet.retweeted_status) {
commitReference( tweet.id, tweet.retweeted_status.id, "retweet" );
}
db.execute(
"INSERT INTO tweets ( tweet_id, created_at, json "
+ annotations.map(function(a) { return ', a_' + a }).join("")
+ " ) VALUES ( ?, ?, ? "
+ annotations.map(function(a) { return ', ?' }).join("")
+ " )",
[ tweet.id, tweet.created_at.valueOf(), JSON.stringify(tweet) ].concat(annotations.map(function(a) { return ann.indexOf(a) == -1 ? 0 : 1 })),
function(err) {
if (err) throw err;
getSendLastPos();
}
);
}
function callBackNewTweets(chunk) {
var newdata = chunk.split('\r\n');
for (var i in newdata) {
if (newdata[i].length > 0) {
commitTweet(newdata[i]);
}
}
console.log("New tweets received");
}
function getSendLastPos() {
db.execute("SELECT MAX(pos) lastpos FROM tweets", function (err, results) {
if (err) throw err;
lastpos = results[0].lastpos ? results[0].lastpos : 0;
console.log("Broadcasting last pos = ",lastpos);
io.sockets.emit('tweetSummary', {
tweetcount : lastpos
});
});
}
function getSendTweets(posList, socket) {
console.log("request for tweets ("+posList.join(',')+") from "+socket.id);
db.execute("SELECT * FROM tweets WHERE pos IN ( " + posList.join(',') + " )", function (err, results) {
if (err) throw err;
socket.emit('tweets',
results.map(function(line) {
var tw = JSON.parse(line.json);
tw.pos = line.pos;
return tw;
})
);
});
}
function getSendTimeline(level, socket) {
console.log("request for timeline ("+level+") from "+socket.id);
var lvl = date_levels[level],
requete = "SELECT COUNT(*) AS nb, "
+ lvl
+ "*ROUND(created_at/"
+ lvl
+ ") AS tranche"
+ annotations.map(function (a) { return " , SUM(a_" + a + ") AS s_" + a }).join("")
+ " FROM tweets GROUP BY tranche ORDER BY tranche DESC LIMIT 0,50";
db.execute(requete, function (err, results) {
if (err) throw err;
var tbl = [],
lastend = parseInt(results[results.length - 1].tranche);
for (var i = results.length - 1; i >= 0; i--) {
var start = parseInt(results[i].tranche);
while (start > lastend) {
var struct = { "start": lastend, "tweets" : 0, "annotations" : {} };
lastend += lvl;
struct.end = lastend;
tbl.push(struct);
}
lastend += lvl;
var struct = {
"start" : start,
"end" : lastend,
"tweets" : results[i].nb,
"annotations" : {}
}
for (var j in annotations) {
struct.annotations[annotations[j]] = results[i]['s_' + annotations[j]];
}
tbl.push(struct);
}
socket.emit('timeline', tbl);
});
}
function textids(object) {
for (var key in object) {
if (key.substr(-2) == 'id') {
object[key] = object[key + '_str'];
delete object[key + '_str'];
}
}
}
/* Initialization */
var fs = require('fs'),
https = require('https'),
sqlite = require('sqlite'),
socketio = require('socket.io'),
tweets = [],
lastpos = 0,
arcs = [],
tweet_ids = [],
date_struct = [],
date_levels = [
3600 * 1000,
15 * 60 * 1000,
5 * 60 * 1000,
60 * 1000,
15 * 1000
],
annotations = [ 'positive', 'negative', 'reference', 'question' ],
annotations_keywords = [ [ '++' ], [ '--' ], [ '==' ], [ '??' ] ],
annkw = {
'positive' : '++',
'negative' : '--',
'reference' : '==',
'question' : '??'
},
keys_to_delete = [
'in_reply_to_screen_name',
'in_reply_to_user_id',
'retweeted',
'place',
'geo',
'source',
'contributors',
'coordinates',
'retweet_count',
'favorited',
'truncated',
'possibly_sensitive'
],
user_keys_to_delete = [
'default_profile_image',
'show_all_inline_media',
'contributors_enabled',
'profile_sidebar_fill_color',
'created_at',
'lang',
'time_zone',
'profile_sidebar_border_color',
'follow_request_sent',
'profile_background_image_url',
'profile_background_image_url_https',
'followers_count',
'description',
'url',
'geo_enabled',
'profile_use_background_image',
'default_profile',
'following',
'profile_text_color',
'is_translator',
'favourites_count',
'listed_count',
'friends_count',
'profile_link_color',
'protected',
'location',
'notifications',
'profile_image_url_https',
'statuses_count',
'verified',
'profile_background_color',
'profile_background_tile',
'utc_offset'
],
port_flag = process.argv.indexOf("-p"),
sio_port = ( port_flag != -1 && port_flag < process.argv.length - 1 && parseInt(process.argv[port_flag + 1]) ? parseInt(process.argv[port_flag + 1]) : DEFAULT_SIO_PORT )
io = socketio.listen(sio_port),
track_flag = process.argv.indexOf("-T"),
tracking_keyword = ( track_flag != -1 && track_flag < process.argv.length - 1 ? process.argv[track_flag + 1] : DEFAULT_TRACKING_KEYWORD ),
sqlfile = SQLITE_FILE_DIR + SQLITE_FILE_START + encodeURIComponent(tracking_keyword) + SQLITE_FILE_EXT,
db = new sqlite.Database();
/* MAIN CODE */
console.log("Listening on port: "+sio_port);
console.log("Opening SQLITE file: "+sqlfile);
db.open(sqlfile , function(err) {
if (err) throw err;
createTables();
});
if (RECORD_NEW_TWEETS) {
console.log("Requesting Twitter to track keyword(s): "+tracking_keyword);
var req = https.request({
host: "stream.twitter.com",
path: "/1/statuses/filter.json",
method: "POST",
headers: {
'Authorization': 'Basic ' + new Buffer( TWITTER_USER + ":" + TWITTER_PASS ).toString('base64'),
'Content-Type': 'application/x-www-form-urlencoded'
}
}, function(res) {
console.log('Reply from stream.twitter.com: ' + res.statusCode);
console.log('Headers: ' + JSON.stringify(res.headers));
res.setEncoding('utf8');
res.on('data', callBackNewTweets);
});
req.write('track=' + encodeURIComponent(tracking_keyword));
req.end();
}
io.set('log level', 0);
io.sockets.on('connection', function(socket) {
console.log("New connection from" + socket.handshake.address.address + " with id=" + socket.id);
socket.emit('tweetSummary', { tweetcount : tweets.length });
socket.on('updateTweets', function(data) {
if (data.tweets.length) {
getSendTweets(data.tweets, socket);
}
});
socket.on('updateTimeline', function(data) {
getSendTimeline(data.level, socket);
});
});