--- a/tweetcast/nodejs/server/node-direct.js Wed Oct 19 00:05:09 2011 +0200
+++ b/tweetcast/nodejs/server/node-direct.js Mon Oct 24 18:01:55 2011 +0200
@@ -1,169 +1,160 @@
-READ_OLD_TWEETS = true;
+/* CONFIGURATION */
+
RECORD_NEW_TWEETS = true;
-TWEET_FILE_DIR = './';
-TWEET_FILE_START = 'tweets-';
-TWEET_FILE_EXT = '.txt';
-TRACKING_KEYWORD = '#p2';
+DEFAULT_SIO_PORT = 8000;
+/* Overriden par the "-p" parameter, e.g. node tweetcast.js -p 8080 */
+SQLITE_FILE_DIR = './';
+SQLITE_FILE_START = 'tweets-';
+SQLITE_FILE_EXT = '.sqlite';
+DEFAULT_TRACKING_KEYWORD = 'Bieber';
+/* Overriden par the "-T" parameter, e.g. node tweetcast.js -T "Bieber" */
+TWITTER_USER = 'materiauxnum';
+TWITTER_PASS = 'm473r14ux7w337';
-var fs = require('fs'),
- https = require('https'),
- io = require('socket.io')
- .listen(8000),
- tweets = [],
- arcs = [],
- tweet_ids = [],
- date_struct = [],
- date_levels = [
- 3600 * 1000,
- 15 * 60 * 1000,
- 5 * 60 * 1000,
- 60 * 1000
- ],
- annkw = {
- 'positive' : '++',
- 'negative' : '--',
- 'reference' : '==',
- 'question' : '??'
- };
+/* FUNCTIONS */
+
+function createTables() {
-function populateDateStruct(level, start) {
- if (typeof start == "object") {
- start = start.valueOf();
- }
- var end = start + date_levels[level],
- struct = {
- "level" : level,
- "start" : new Date(start),
- "end" : new Date(end)
- };
- if (level < date_levels.length - 1) {
- struct.slices = [];
- var newstart = start;
- while (newstart < end) {
- struct.slices.push(populateDateStruct(level + 1, newstart));
- newstart += date_levels[level + 1];
- }
- } else {
- struct.tweets = [];
- struct.annotations = {};
- }
- return struct;
+ var requete = "CREATE TABLE IF NOT EXISTS tweets ( pos INTEGER PRIMARY KEY, tweet_id TEXT UNIQUE, created_at INTEGER, json TEXT" + annotations.map(function(a) { return ', a_' + a + ' INTEGER' }).join("") + " )";
+ db.execute(requete, function(err) {
+ if (err) throw err;
+ db.execute("CREATE INDEX IF NOT EXISTS idx_created_at ON tweets ( created_at )", function(err) { if (err) throw err; });
+ getSendLastPos();
+ });
+
+ db.execute("CREATE TABLE IF NOT EXISTS tweet_refs ( id INTEGER PRIMARY KEY, from_id TEXT, to_id TEXT, ref_type TEXT )", function(err) { if (err) throw err; });
}
-function insertIntoDateStruct(slices, tweet) {
- var creadate = new Date(tweet.created_at);
- for (var i in slices) {
- if (creadate < slices[i].end) {
- if (slices[i].slices) {
- insertIntoDateStruct(slices[i].slices, tweet);
- } else {
- slices[i].tweets.push(tweet.pos);
- if (tweet.annotations.length) {
- var ann = tweet.annotations[0];
- if (slices[i].annotations[ann]) {
- slices[i].annotations[ann].push(tweet.pos);
- } else {
- slices[i].annotations[ann] = [ tweet.pos ];
- }
- }
- }
- break;
- }
- }
+function commitReference(from_id, to_id, ref_type) {
+ db.execute(
+ "INSERT INTO tweet_refs ( from_id, to_id, ref_type ) VALUES ( ?, ?, ? )",
+ [ from_id, to_id, ref_type ],
+ function(err) { if (err) throw err; }
+ );
}
-function getSliceContent(slice) {
- if (slice.slices) {
- var twz = [],
- annotations = {};
- for (var i in slice.slices) {
- var data = getSliceContent(slice.slices[i]);
- twz = twz.concat(data.tweets);
- for (var j in data.annotations) {
- if (annotations[j]) {
- annotations[j] = annotations[j].concat(data.annotations[j]);
- } else {
- annotations[j] = data.annotations[j];
- }
- }
+function commitTweet(data) {
+
+ var tweet = JSON.parse(data),
+ ann = [];
+
+ textids(tweet);
+ for (var j in keys_to_delete) {
+ delete tweet[keys_to_delete[j]];
+ }
+ textids(tweet.user);
+ for (var j in user_keys_to_delete) {
+ delete tweet.user[user_keys_to_delete[j]];
+ }
+ if (tweet.retweeted_status) {
+ textids(tweet.retweeted_status);
+ for (var j in keys_to_delete) {
+ delete tweet.retweeted_status[keys_to_delete[j]];
}
- } else {
- twz = slice.tweets;
- annotations = slice.annotations;
}
- return { "tweets" : twz, "annotations" : annotations }
-};
-
-function flattenDateStruct(slices, target_level) {
- var current_level = slices[0].level,
- result = [];
- if (current_level < target_level) {
- if (slices[0].slices) {
- for (var i in slices) {
- result = result.concat(flattenDateStruct(slices[i].slices, target_level));
+ for (var i in annotations_keywords) {
+ for (var j in annotations_keywords[i]) {
+ if (tweet.text.indexOf(annotations_keywords[i][j]) != -1) {
+ ann.push(annotations[i]);
+ break;
}
}
}
- else {
- for (var i in slices) {
- var data = getSliceContent(slices[i]);
- result.push({
- "start" : slices[i].start,
- "end" : slices[i].end,
- "tweets" : data.tweets,
- "annotations" : data.annotations
- });
+ tweet.annotations = ann;
+ tweet.created_at = new Date(tweet.created_at);
+
+ if (tweet.in_reply_to_status_id) {
+ commitReference( tweet.id, tweet.in_reply_to_status_id, "reply" );
+ }
+ if (tweet.retweeted_status) {
+ commitReference( tweet.id, tweet.retweeted_status.id, "retweet" );
+ }
+ db.execute(
+ "INSERT INTO tweets ( tweet_id, created_at, json "
+ + annotations.map(function(a) { return ', a_' + a }).join("")
+ + " ) VALUES ( ?, ?, ? "
+ + annotations.map(function(a) { return ', ?' }).join("")
+ + " )",
+ [ tweet.id, tweet.created_at.valueOf(), JSON.stringify(tweet) ].concat(annotations.map(function(a) { return ann.indexOf(a) == -1 ? 0 : 1 })),
+ function(err) {
+ if (err) throw err;
+ getSendLastPos();
+ }
+ );
+}
+
+function callBackNewTweets(chunk) {
+ var newdata = chunk.split('\r\n');
+ for (var i in newdata) {
+ if (newdata[i].length > 0) {
+ commitTweet(newdata[i]);
}
}
- return result;
+ console.log("New tweets received");
}
-function trimFDS(slices) {
- while (slices[0].tweets.length == 0) {
- slices.splice(0,1);
- }
- while (slices[slices.length - 1].tweets.length == 0) {
- slices.pop();
- }
- return slices;
+function getSendLastPos() {
+ db.execute("SELECT MAX(pos) lastpos FROM tweets", function (err, results) {
+ if (err) throw err;
+ lastpos = results[0].lastpos ? results[0].lastpos : 0;
+ console.log("Broadcasting last pos = ",lastpos);
+ io.sockets.emit('tweetSummary', {
+ tweetcount : lastpos
+ });
+ });
}
-function addToList(tweet) {
- if (tweet_ids.indexOf(tweet.id) != -1) {
- console.log("Error: Tweet already in list");
- return;
- }
- tweet.pos = tweets.length;
- tweets.push(tweet);
- tweet_ids.push(tweet.id);
- var creadate = new Date(tweet.created_at);
- if (!date_struct.length) {
- date_struct = [ populateDateStruct(0, date_levels[0] * parseInt(creadate / date_levels[0])) ]
- }
- while (creadate > date_struct[date_struct.length - 1].end) {
- date_struct.push( populateDateStruct(0, date_struct[date_struct.length - 1].end.valueOf()) );
- }
- insertIntoDateStruct(date_struct, tweet);
- if (tweet.in_reply_to_status_id) {
- var ref = tweet_ids.indexOf(tweet.in_reply_to_status_id)
- if (ref != -1) {
- arcs.push({
- "from" : tweet.pos,
- "to" : ref,
- "type" : "reply"
- });
+
+function getSendTweets(posList, socket) {
+ console.log("request for tweets ("+posList.join(',')+") from "+socket.id);
+ db.execute("SELECT * FROM tweets WHERE pos IN ( " + posList.join(',') + " )", function (err, results) {
+ if (err) throw err;
+ socket.emit('tweets',
+ results.map(function(line) {
+ var tw = JSON.parse(line.json);
+ tw.pos = line.pos;
+ return tw;
+ })
+ );
+ });
+}
+
+function getSendTimeline(level, socket) {
+ console.log("request for timeline ("+level+") from "+socket.id);
+ var lvl = date_levels[level],
+ requete = "SELECT COUNT(*) AS nb, "
+ + lvl
+ + "*ROUND(created_at/"
+ + lvl
+ + ") AS tranche"
+ + annotations.map(function (a) { return " , SUM(a_" + a + ") AS s_" + a }).join("")
+ + " FROM tweets GROUP BY tranche ORDER BY tranche DESC LIMIT 0,50";
+ db.execute(requete, function (err, results) {
+ if (err) throw err;
+ var tbl = [],
+ lastend = parseInt(results[results.length - 1].tranche);
+ for (var i = results.length - 1; i >= 0; i--) {
+ var start = parseInt(results[i].tranche);
+ while (start > lastend) {
+ var struct = { "start": lastend, "tweets" : 0, "annotations" : {} };
+ lastend += lvl;
+ struct.end = lastend;
+ tbl.push(struct);
+ }
+ lastend += lvl;
+ var struct = {
+ "start" : start,
+ "end" : lastend,
+ "tweets" : results[i].nb,
+ "annotations" : {}
+ }
+ for (var j in annotations) {
+ struct.annotations[annotations[j]] = results[i]['s_' + annotations[j]];
+ }
+ tbl.push(struct);
}
- }
- if (tweet.retweeted_status) {
- var ref = tweet_ids.indexOf(tweet.retweeted_status.id_str)
- if (ref != -1) {
- arcs.push({
- "from" : tweet.pos,
- "to" : ref,
- "type" : "retweet"
- });
- }
- }
+ socket.emit('timeline', tbl);
+ });
}
function textids(object) {
@@ -175,91 +166,129 @@
}
}
-function readTweetsFromFile() {
- var dir = fs.readdirSync(TWEET_FILE_DIR),
- fileName = null;
- for (var i in dir) {
- if ( dir[i].indexOf( TWEET_FILE_START + encodeURIComponent(TRACKING_KEYWORD) ) == 0 ) {
- var oldtweets = fs.readFileSync(dir[i], 'utf8').split('\n'),
- tweetscopied = 0;
- for (var j in oldtweets) {
- if (oldtweets[j].length > 1) {
- addToList(JSON.parse(oldtweets[j]));
- tweetscopied++;
- }
- }
- console.log(tweetscopied, "tweets copied from", dir[i]);
- console.log(arcs);
- }
- }
-}
+/* Initialization */
-function callBackNewTweets(chunk) {
- var newdata = chunk.split('\r\n');
- for (var i in newdata) {
- if (newdata[i].length > 0) {
- var tweet = JSON.parse(newdata[i]),
- annotations = [];
- for (var a in annkw) {
- if (tweet.text.indexOf(annkw[a]) != -1) {
- annotations.push(a);
- }
- }
- tweet.annotations = annotations;
- textids(tweet);
- textids(tweet.user);
-
- addToList(tweet);
- var txt = JSON.stringify(tweet)+'\n';
- writestream.write(txt);
- }
- }
- console.log("New tweets received. We now have", tweets.length, "tweets in memory");
- io.sockets.emit('tweetSummary', { tweetcount : tweets.length });
-}
+var fs = require('fs'),
+ https = require('https'),
+ sqlite = require('sqlite'),
+ socketio = require('socket.io'),
+ tweets = [],
+ lastpos = 0,
+ arcs = [],
+ tweet_ids = [],
+ date_struct = [],
+ date_levels = [
+ 3600 * 1000,
+ 15 * 60 * 1000,
+ 5 * 60 * 1000,
+ 60 * 1000,
+ 15 * 1000
+ ],
+ annotations = [ 'positive', 'negative', 'reference', 'question' ],
+ annotations_keywords = [ [ '++' ], [ '--' ], [ '==' ], [ '??' ] ],
+ annkw = {
+ 'positive' : '++',
+ 'negative' : '--',
+ 'reference' : '==',
+ 'question' : '??'
+ },
+ keys_to_delete = [
+ 'in_reply_to_screen_name',
+ 'in_reply_to_user_id',
+ 'retweeted',
+ 'place',
+ 'geo',
+ 'source',
+ 'contributors',
+ 'coordinates',
+ 'retweet_count',
+ 'favorited',
+ 'truncated',
+ 'possibly_sensitive'
+ ],
+ user_keys_to_delete = [
+ 'default_profile_image',
+ 'show_all_inline_media',
+ 'contributors_enabled',
+ 'profile_sidebar_fill_color',
+ 'created_at',
+ 'lang',
+ 'time_zone',
+ 'profile_sidebar_border_color',
+ 'follow_request_sent',
+ 'profile_background_image_url',
+ 'profile_background_image_url_https',
+ 'followers_count',
+ 'description',
+ 'url',
+ 'geo_enabled',
+ 'profile_use_background_image',
+ 'default_profile',
+ 'following',
+ 'profile_text_color',
+ 'is_translator',
+ 'favourites_count',
+ 'listed_count',
+ 'friends_count',
+ 'profile_link_color',
+ 'protected',
+ 'location',
+ 'notifications',
+ 'profile_image_url_https',
+ 'statuses_count',
+ 'verified',
+ 'profile_background_color',
+ 'profile_background_tile',
+ 'utc_offset'
+ ],
+ port_flag = process.argv.indexOf("-p"),
+ sio_port = ( port_flag != -1 && port_flag < process.argv.length - 1 && parseInt(process.argv[port_flag + 1]) ? parseInt(process.argv[port_flag + 1]) : DEFAULT_SIO_PORT )
+ io = socketio.listen(sio_port),
+ track_flag = process.argv.indexOf("-T"),
+ tracking_keyword = ( track_flag != -1 && track_flag < process.argv.length - 1 ? process.argv[track_flag + 1] : DEFAULT_TRACKING_KEYWORD ),
+ sqlfile = SQLITE_FILE_DIR + SQLITE_FILE_START + encodeURIComponent(tracking_keyword) + SQLITE_FILE_EXT,
+ db = new sqlite.Database();
/* MAIN CODE */
-if (READ_OLD_TWEETS) {
- readTweetsFromFile();
-}
+console.log("Listening on port: "+sio_port);
+console.log("Opening SQLITE file: "+sqlfile);
+db.open(sqlfile , function(err) {
+ if (err) throw err;
+ createTables();
+});
if (RECORD_NEW_TWEETS) {
- var now = new Date(),
- fileTimestamp = (1900+now.getYear())+"-"+("0"+(1+now.getMonth())).slice(-2)+"-"+("0"+now.getDate()).slice(-2)+"."+("0"+now.getHours()).slice(-2)+"-"+("0"+now.getMinutes()).slice(-2),
- fileName = TWEET_FILE_DIR + TWEET_FILE_START + encodeURIComponent(TRACKING_KEYWORD) + '-' + fileTimestamp + TWEET_FILE_EXT,
- writestream = fs.createWriteStream(fileName, { flags: 'w', encoding: 'utf-8' });
- console.log('Writing to',fileName);
+ console.log("Requesting Twitter to track keyword(s): "+tracking_keyword);
var req = https.request({
host: "stream.twitter.com",
path: "/1/statuses/filter.json",
method: "POST",
headers: {
- 'Authorization': 'Basic cmFwaHY6N3czMzdMZkMyM2dF',
+ 'Authorization': 'Basic ' + new Buffer( TWITTER_USER + ":" + TWITTER_PASS ).toString('base64'),
'Content-Type': 'application/x-www-form-urlencoded'
}
}, function(res) {
- console.log('STATUS: ' + res.statusCode);
- console.log('HEADERS: ' + JSON.stringify(res.headers));
+ console.log('Reply from stream.twitter.com: ' + res.statusCode);
+ console.log('Headers: ' + JSON.stringify(res.headers));
res.setEncoding('utf8');
res.on('data', callBackNewTweets);
});
- req.write('track=' + encodeURIComponent(TRACKING_KEYWORD));
+ req.write('track=' + encodeURIComponent(tracking_keyword));
req.end();
}
io.set('log level', 0);
io.sockets.on('connection', function(socket) {
- console.log("New connection from", socket.handshake.address.address,"id :", socket.id);
+ console.log("New connection from" + socket.handshake.address.address + " with id=" + socket.id);
socket.emit('tweetSummary', { tweetcount : tweets.length });
- socket.on('getTweets', function(data) {
- console.log('getTweets from',socket.id);
- data.tweets = tweets.slice(Math.max(0, data.from), data.to);
- socket.emit('tweets', data);
+ socket.on('updateTweets', function(data) {
+ if (data.tweets.length) {
+ getSendTweets(data.tweets, socket);
+ }
});
- socket.on('getTimeline', function(data) {
- socket.emit('timeline', { "timeline" : trimFDS(flattenDateStruct(date_struct, data.level)), "arcs" : arcs });
- console.log('getTimeline from',socket.id);
+ socket.on('updateTimeline', function(data) {
+ getSendTimeline(data.level, socket);
});
});
\ No newline at end of file