improve the user management. try to complete user information whenever possible.
/* DEFAULT CONFIGURATION */
//var TWITTER_USER = 'materiauxnum',
// TWITTER_PASS = 'm473r14ux7w337',
var TWITTER_USER = 'raphv',
TWITTER_PASS = '7w337LfC23gE',
RECORD_NEW_TWEETS = true,
commit_script = '';
/* CALLING COMMON CONFIGURATION FILE */
var conf_file = flagOption('-c', 'conf.js');
myLog('Reading Configuration from ' + conf_file);
var fs = require('fs');
eval(fs.readFileSync(conf_file,'utf8'));
if (typeof sqlfile == "undefined") {
sqlfile = __dirname + '/tweets-' + encodeURIComponent(tracking_keyword) + '.sqlite'
}
/* FUNCTIONS */
function flagOption(flag, defaultValue) {
var flagPos = process.argv.indexOf(flag);
return ( flagPos != -1 && flagPos < process.argv.length - 1) ? process.argv[flagPos + 1] : defaultValue;
}
function myLog() {
var args = ["\033[1;34m["+new Date().toLocaleTimeString()+"]\033[0m"];
for (var i in arguments) {
args.push(arguments[i]);
}
console.log.apply(console, args);
}
function annotationMap(callback, options) {
var includeDefault = ( options && options.includeDefault ? options.includeDefault : false );
var returnObject = ( options && options.returnObject ? options.returnObject : false );
res = (returnObject ? {} : []);
for (var i in annotations) {
if (i != "default" || includeDefault) {
var el = callback(i, annotations[i])
if (returnObject) {
res[i] = el;
} else {
res.push(el);
}
}
}
return res;
}
function createTables() {
var requete = "CREATE TABLE IF NOT EXISTS tweets ( pos INTEGER PRIMARY KEY, tweet_id TEXT UNIQUE, created_at INTEGER, json TEXT"
+ annotationMap(function(a) { return ', a_' + a + ' INTEGER' }).join("")
+ " );\n"
+ "CREATE TABLE IF NOT EXISTS tweet_refs ( id INTEGER PRIMARY KEY, from_id TEXT, to_id TEXT, ref_type TEXT );\n"
+ "CREATE INDEX IF NOT EXISTS idx_created_at ON tweets ( created_at );";
db.executeScript(requete, function(err) {
if (err) { myLog("SQLITE error",err.stack); }
getSendLastPos();
});
}
function commitReference(from_id, to_id, ref_type) {
commit_script += 'INSERT OR IGNORE INTO tweet_refs ( from_id, to_id, ref_type ) VALUES ( "' + from_id + '", "' + to_id + '", "' + ref_type + '" );\n';
}
function commitTweet(data) {
var tweet = JSON.parse(data),
ann = [];
if (!tweet.id) {
myLog("Error: Could not parse data",data);
return;
}
try {
textids(tweet);
for (var j in keys_to_delete) {
delete tweet[keys_to_delete[j]];
}
textids(tweet.user);
for (var j in user_keys_to_delete) {
delete tweet.user[user_keys_to_delete[j]];
}
if (tweet.retweeted_status) {
textids(tweet.retweeted_status);
for (var j in keys_to_delete) {
delete tweet.retweeted_status[keys_to_delete[j]];
}
}
annotationMap(function(i, annotation) {
for (var j in annotation.keywords) {
if (tweet.text.search(annotation.keywords[j]) != -1) {
ann.push(i);
break;
}
}
});
tweet.annotations = ann;
tweet.created_at = new Date(tweet.created_at);
// myLog("Time delta :",(new Date() - tweet.created_at) / 1000);
} catch (err) {
myLog("Error while processing tweet",err.stack);
}
if (tweet.in_reply_to_status_id) {
commitReference( tweet.id, tweet.in_reply_to_status_id, "reply" );
}
if (tweet.retweeted_status) {
commitReference( tweet.id, tweet.retweeted_status.id, "retweet" );
}
commit_script += 'INSERT INTO tweets ( tweet_id, created_at, json '
+ annotationMap(function(a) { return ', a_' + a }).join("")
+ ' ) VALUES ( "'
+ tweet.id
+ '", '
+ tweet.created_at.valueOf()
+ ', "'
+ JSON.stringify(tweet).replace(/\"/g, '""')
+ '"'
+ annotationMap(function(a) {
return ann.indexOf(a) == -1 ? ', 0' : ', 1'
}).join("")
+ ' );\n';
}
function callBackNewTweets(chunk) {
var newdata = chunk.split('\r\n');
for (var i in newdata) {
if (newdata[i].length > 0) {
commitTweet(newdata[i]);
}
}
// myLog("Data received - length :",chunk.length);
}
function requestTwitter() {
myLog("Requesting Twitter to track keyword(s): "+tracking_keyword);
var req = https.request({
host: "stream.twitter.com",
path: "/1/statuses/filter.json",
method: "POST",
headers: {
'Authorization': 'Basic ' + new Buffer( TWITTER_USER + ":" + TWITTER_PASS ).toString('base64'),
'Content-Type': 'application/x-www-form-urlencoded'
}
}, function(res) {
myLog('Reply from stream.twitter.com: ' + res.statusCode);
myLog('Headers: ' + JSON.stringify(res.headers));
res.setEncoding('utf8');
res.on('data', callBackNewTweets);
res.on('end', function() {
myLog('End Twitter Connection — Trying to reconnect');
requestTwitter();
});
});
req.write('track=' + encodeURIComponent(tracking_keyword));
req.socket.setTimeout(60000);
req.socket.on('timeout', function() {
myLog('TimeOut - Trying to reconnect');
requestTwitter();
});
req.end();
}
function getSendLastPos() {
db.execute("SELECT MAX(pos) lastpos FROM tweets", function (err, results) {
if (err) { myLog("SQLITE error",err.stack); }
if (results[0].lastpos != lastpos) {
lastpos = results[0].lastpos ? results[0].lastpos : 0;
try {
io.sockets.emit('tweetSummary', {
tweetcount : lastpos
});
} catch(err) {
myLog("SOCKET.IO error while Broadcasting tweetSummary",err.stack);
}
}
});
}
function commitTweets() {
if (commit_script != '') {
var requete = commit_script;
commit_script = '';
// console.log(requete);
// var reqd = new Date();
db.executeScript(requete, function (err) {
if (err) { myLog("SQLITE error",err.stack); }
// myLog("Commit took",(new Date() - reqd),"ms");
getSendLastPos();
});
}
}
function getSendTweetPosByDate(date, socket) {
db.execute("SELECT pos, created_at, ABS(created_at-" + date + ") AS dist FROM tweets ORDER BY dist ASC LIMIT 0,9", function (err, results) {
if (err) { myLog("SQLITE error",err.stack); }
if (results.length) {
try {
socket.emit('tweetPosByDate', {
tweetpos : results[0].pos,
date : results[0].created_at
});
} catch(err) {
myLog("SOCKET.IO error while sending tweetPosByDate",err.stack);
}
}
});
}
function getSendLinkedTweets(pos, socket) {
// myLog("request for tweets linked to",pos);
db.execute("SELECT A.pos pos_a, A.tweet_id id_a, B.pos pos_b, B.tweet_id id_b, ref_type, ABS(B.created_at - A.created_at) delta FROM tweets A, tweets B, tweet_refs WHERE id_a = from_id AND id_b = to_id AND (pos_a = ? OR pos_b = ?) ORDER BY delta ASC LIMIT 0, 10", [ pos, pos ], function(err, results) {
if (err) { myLog("SQLITE error: ",err.stack); }
var struct = {
"tweetpos" : pos,
"referencing" : [],
"referenced_by" : []
};
for (var i in results) {
if (results[i].pos_a == pos) {
struct.referencing.push({
"pos" : results[i].pos_b,
"ref_type" : results[i].ref_type
});
} else {
struct.referenced_by.push({
"pos" : results[i].pos_a,
"ref_type" : results[i].ref_type
});
}
}
try {
socket.emit('linkedTweets', struct);
} catch(err) {
myLog("SOCKET.IO error while sending linkedTweets: ",err.stack);
}
});
}
function getSendTweets(posList, socket) {
// myLog("request for tweets ("+posList.join(',')+") from "+socket.id);
db.execute("SELECT * FROM tweets WHERE pos IN ( " + posList.join(',') + " )", function (err, results) {
if (err) { myLog("SQLITE error",err.stack); }
try {
socket.emit('tweets',
results.map(function(line) {
var tw = JSON.parse(line.json);
tw.pos = line.pos;
return tw;
})
);
} catch (err) {
myLog("SOCKET.IO error while sending tweets",err.stack);
}
});
}
function getSendTimeline(data, socket) {
// myLog("request for timeline (",data.level, data.full,") from "+socket.id);
var lvl = date_levels[data.level],
requete = "SELECT COUNT(*) AS nb, "
+ lvl
+ "*ROUND(created_at/"
+ lvl
+ ") AS tranche"
+ annotationMap(function (a) { return " , SUM(a_" + a + ") AS s_" + a }).join("")
+ " FROM tweets GROUP BY tranche ORDER BY tranche DESC LIMIT 0," + ( data.full ? "50" : "1" );
db.execute(requete, function (err, results) {
if (err) { myLog("SQLITE error",err.stack); }
if (!results.length) {
return;
}
var tbl = [],
lastend = parseInt(results[results.length - 1].tranche);
for (var i = results.length - 1; i >= 0; i--) {
var start = parseInt(results[i].tranche);
while (start > lastend) {
var struct = { "start": lastend, "tweets" : 0, "annotations" : {} };
lastend += lvl;
struct.end = lastend;
tbl.push(struct);
}
lastend += lvl;
var struct = {
"start" : start,
"end" : lastend,
"tweets" : results[i].nb,
"annotations" : annotationMap(function (a) {
return results[i]['s_'+a];
},{returnObject: true})
}
tbl.push(struct);
}
try {
socket.emit('timeline', {
"full" : data.full,
"level" : data.level,
"data" : tbl
});
} catch (err) {
myLog("SOCKET.IO error while sending timeline",err.stack);
}
});
}
function textids(object) {
for (var key in object) {
// Workaround for Unicode bug in socket.io.
if (typeof object[key] == "string") {
var tmp = '';
for (var i = 0; i < object[key].length; i++) {
tmp += ( object[key].charCodeAt(i) < 128 ? object[key].charAt(i) : "&#" + object[key].charCodeAt(i) + ";" );
}
object[key] = tmp;
}
if (key.substr(-2) == 'id') {
object[key] = object[key + '_str'];
delete object[key + '_str'];
}
}
}
function httpHandler(req, res) {
myLog("HTTP Request for URL "+req.url);
var url = ( req.url == "/config" ? conf_file : __dirname + "/client" + req.url + ( req.url[req.url.length - 1] == "/" ? "index.html" : "" ) );
fs.readFile( url, function(err, data) {
if (err) {
myLog("Error 404");
res.writeHead(404);
return res.end('File not found');
}
res.writeHead(200);
res.end(data);
});
}
/* Initialization */
var http = require('http'),
https = require('https'),
sqlite = require('sqlite'),
socketio = require('socket.io'),
tweets = [],
lastpos = 0,
arcs = [],
tweet_ids = [],
date_struct = [],
date_levels = [
3600 * 1000,
15 * 60 * 1000,
5 * 60 * 1000,
60 * 1000,
15 * 1000
],
keys_to_delete = [
'in_reply_to_screen_name',
'in_reply_to_user_id',
'retweeted',
'place',
'geo',
'source',
'contributors',
'coordinates',
'retweet_count',
'favorited',
'truncated',
'possibly_sensitive'
],
user_keys_to_delete = [
'default_profile_image',
'show_all_inline_media',
'contributors_enabled',
'profile_sidebar_fill_color',
'created_at',
'lang',
'time_zone',
'profile_sidebar_border_color',
'follow_request_sent',
'profile_background_image_url',
'profile_background_image_url_https',
'followers_count',
'description',
'url',
'geo_enabled',
'profile_use_background_image',
'default_profile',
'following',
'profile_text_color',
'is_translator',
'favourites_count',
'listed_count',
'friends_count',
'profile_link_color',
'protected',
'location',
'notifications',
'profile_image_url_https',
'statuses_count',
'verified',
'profile_background_color',
'profile_background_tile',
'utc_offset'
],
app = http.createServer(httpHandler),
io = socketio.listen(app),
db = new sqlite.Database();
/* MAIN CODE */
app.listen(app_port);
myLog("Listening on port: "+app_port);
myLog("Opening SQLITE file: "+sqlfile);
db.open(sqlfile , function(err) {
if (err) { myLog("SQLITE error",err.stack); }
createTables();
});
setInterval(commitTweets,500);
setInterval(function(){myLog("Still alive, tweet count",lastpos)}, 60000);
if (RECORD_NEW_TWEETS) {
requestTwitter();
}
io.set('log level', 0);
io.sockets.on('connection', function(socket) {
myLog("New connection from", socket.handshake.address.address, "with id=", socket.id);
try {
socket.emit('tweetSummary', { tweetcount : lastpos });
} catch (err) {
myLog("SOCKET.IO error while sending tweetSummary",err.stack);
}
socket.on('updateTweets', function(data) {
if (data.tweets.length) {
getSendTweets(data.tweets, socket);
}
});
socket.on('updateTimeline', function(data) {
getSendTimeline(data, socket);
});
socket.on('tweetPosByDate', function(data) {
getSendTweetPosByDate(data.date, socket);
});
socket.on('linkedTweets', function(data) {
getSendLinkedTweets(data.tweetpos, socket);
});
});