tweetcast/nodejs/node-direct.js
changeset 332 738594562e44
parent 331 03c69425efa6
child 334 b7f05d66b620
equal deleted inserted replaced
331:03c69425efa6 332:738594562e44
       
     1 /* CONFIGURATION */
       
     2 
       
     3 RECORD_NEW_TWEETS = true;
       
     4 DEFAULT_SIO_PORT = 8000;
       
     5 /* Overriden par the "-p" parameter, e.g. node tweetcast.js -p 8080 */
       
     6 SQLITE_FILE_DIR = __dirname + '/';
       
     7 SQLITE_FILE_START = 'tweets-';
       
     8 SQLITE_FILE_EXT = '.sqlite';
       
     9 DEFAULT_TRACKING_KEYWORD = 'Bieber';
       
    10 /* Overriden par the "-T" parameter, e.g. node tweetcast.js -T "Bieber" */
       
    11 TWITTER_USER = 'materiauxnum';
       
    12 TWITTER_PASS = 'm473r14ux7w337';
       
    13 
       
    14 /* FUNCTIONS */
       
    15 
       
    16 function createTables() {
       
    17 
       
    18     var requete = "CREATE TABLE IF NOT EXISTS tweets ( pos INTEGER PRIMARY KEY, tweet_id TEXT UNIQUE, created_at INTEGER, json TEXT" + annotations.map(function(a) { return ', a_' + a + ' INTEGER' }).join("") + " )";
       
    19     db.execute(requete, function(err) {
       
    20         if (err) throw err;
       
    21         db.execute("CREATE INDEX IF NOT EXISTS idx_created_at ON tweets ( created_at )", function(err) { if (err) throw err; });
       
    22         getSendLastPos();
       
    23     });
       
    24     
       
    25     db.execute("CREATE TABLE IF NOT EXISTS tweet_refs ( id INTEGER PRIMARY KEY, from_id TEXT, to_id TEXT, ref_type TEXT )", function(err) { if (err) throw err; });
       
    26 }
       
    27 
       
    28 function commitReference(from_id, to_id, ref_type) {
       
    29     db.execute(
       
    30         "INSERT INTO tweet_refs ( from_id, to_id, ref_type ) VALUES ( ?, ?, ? )",
       
    31         [ from_id, to_id, ref_type ],
       
    32         function(err) { if (err) throw err; }
       
    33     );
       
    34 }
       
    35 
       
    36 function commitTweet(data) {
       
    37 
       
    38     var tweet = JSON.parse(data),
       
    39         ann = [];
       
    40 
       
    41     textids(tweet);
       
    42     for (var j in keys_to_delete) {
       
    43         delete tweet[keys_to_delete[j]];
       
    44     }
       
    45     textids(tweet.user);
       
    46     for (var j in user_keys_to_delete) {
       
    47         delete tweet.user[user_keys_to_delete[j]];
       
    48     }
       
    49     if (tweet.retweeted_status) {
       
    50         textids(tweet.retweeted_status);
       
    51         for (var j in keys_to_delete) {
       
    52             delete tweet.retweeted_status[keys_to_delete[j]];
       
    53         }
       
    54     }
       
    55     for (var i in annotations_keywords) {
       
    56         for (var j in annotations_keywords[i]) {
       
    57             if (tweet.text.indexOf(annotations_keywords[i][j]) != -1) {
       
    58                 ann.push(annotations[i]);
       
    59                 break;
       
    60             }
       
    61         }
       
    62     }
       
    63     tweet.annotations = ann;
       
    64     tweet.created_at = new Date(tweet.created_at);
       
    65     
       
    66     if (tweet.in_reply_to_status_id) {
       
    67         commitReference( tweet.id, tweet.in_reply_to_status_id, "reply" );
       
    68     }
       
    69     if (tweet.retweeted_status) {
       
    70         commitReference( tweet.id, tweet.retweeted_status.id, "retweet" );
       
    71     }
       
    72     db.execute(
       
    73         "INSERT INTO tweets ( tweet_id, created_at, json "
       
    74         + annotations.map(function(a) { return ', a_' + a }).join("")
       
    75         + " ) VALUES ( ?, ?, ? "
       
    76         + annotations.map(function(a) { return ', ?' }).join("")
       
    77         + " )",
       
    78         [ tweet.id, tweet.created_at.valueOf(), JSON.stringify(tweet) ].concat(annotations.map(function(a) { return ann.indexOf(a) == -1 ? 0 : 1 })),
       
    79         function(err) {
       
    80             if (err) throw err;
       
    81             getSendLastPos();
       
    82         }
       
    83     );
       
    84 }
       
    85 
       
    86 function callBackNewTweets(chunk) {
       
    87     var newdata = chunk.split('\r\n');
       
    88     for (var i in newdata) {
       
    89         if (newdata[i].length > 0) {
       
    90             commitTweet(newdata[i]);
       
    91         }
       
    92     }
       
    93     console.log("New tweets received");
       
    94 }
       
    95 
       
    96 function getSendLastPos() {
       
    97     db.execute("SELECT MAX(pos) lastpos FROM tweets", function (err, results) {
       
    98         if (err) throw err;
       
    99         lastpos = results[0].lastpos ? results[0].lastpos : 0;
       
   100         console.log("Broadcasting last pos = ",lastpos);
       
   101         io.sockets.emit('tweetSummary', {
       
   102             tweetcount : lastpos
       
   103         });
       
   104     });
       
   105 }
       
   106 
       
   107 
       
   108 function getSendTweets(posList, socket) {
       
   109     console.log("request for tweets ("+posList.join(',')+") from "+socket.id);
       
   110     db.execute("SELECT * FROM tweets WHERE pos IN ( " + posList.join(',') + " )", function (err, results) {
       
   111         if (err) throw err;
       
   112         socket.emit('tweets',
       
   113             results.map(function(line) {
       
   114                 var tw = JSON.parse(line.json);
       
   115                 tw.pos = line.pos;
       
   116                 return tw;
       
   117             })
       
   118         );
       
   119     });
       
   120 }
       
   121 
       
   122 function getSendTimeline(level, socket) {
       
   123     console.log("request for timeline ("+level+") from "+socket.id);
       
   124     var lvl = date_levels[level],
       
   125         requete = "SELECT COUNT(*) AS nb, "
       
   126         + lvl
       
   127         + "*ROUND(created_at/"
       
   128         + lvl
       
   129         + ") AS tranche"
       
   130         + annotations.map(function (a) { return " , SUM(a_" + a + ") AS s_" + a }).join("")
       
   131         + " FROM tweets GROUP BY tranche ORDER BY tranche DESC LIMIT 0,50";
       
   132     db.execute(requete, function (err, results) {
       
   133         if (err) throw err;
       
   134         var tbl = [],
       
   135             lastend = parseInt(results[results.length - 1].tranche);
       
   136         for (var i = results.length - 1; i >= 0; i--) {
       
   137             var start = parseInt(results[i].tranche);
       
   138             while (start > lastend) {
       
   139                 var struct = { "start": lastend, "tweets" : 0, "annotations" : {} };
       
   140                 lastend += lvl;
       
   141                 struct.end = lastend;
       
   142                 tbl.push(struct);
       
   143             }
       
   144             lastend += lvl;
       
   145             var struct = {
       
   146                 "start" : start,
       
   147                 "end" : lastend,
       
   148                 "tweets" : results[i].nb,
       
   149                 "annotations" : {}
       
   150             }
       
   151             for (var j in annotations) {
       
   152                 struct.annotations[annotations[j]] = results[i]['s_' + annotations[j]];
       
   153             }
       
   154             tbl.push(struct);
       
   155         }
       
   156         socket.emit('timeline', tbl);
       
   157     });
       
   158 }
       
   159 
       
   160 function textids(object) {
       
   161     for (var key in object) {
       
   162         if (key.substr(-2) == 'id') {
       
   163             object[key] = object[key + '_str'];
       
   164             delete object[key + '_str'];
       
   165         }
       
   166     }
       
   167 }
       
   168 
       
   169 function httpHandler(req, res) {
       
   170     var url = req.url + ( req.url[req.url.length - 1] == "/" ? "index.html" : "" );
       
   171     fs.readFile(__dirname + "/client" + url, function(err, data) {
       
   172         if (err) {
       
   173             res.writeHead(404);
       
   174             return res.end('File not found');
       
   175         }
       
   176         res.writeHead(200);
       
   177         res.end(data);
       
   178     });
       
   179 }
       
   180 
       
   181 /* Initialization */
       
   182 
       
   183 var fs = require('fs'),
       
   184     http = require('http'),
       
   185     https = require('https'),
       
   186     sqlite = require('sqlite'),
       
   187     socketio = require('socket.io'),
       
   188     tweets = [],
       
   189     lastpos = 0,
       
   190     arcs = [],
       
   191     tweet_ids = [],
       
   192     date_struct = [],
       
   193     date_levels = [
       
   194         3600 * 1000,
       
   195         15 * 60 * 1000,
       
   196         5 * 60 * 1000,
       
   197         60 * 1000,
       
   198         15 * 1000
       
   199     ],
       
   200     annotations = [ 'positive', 'negative', 'reference', 'question' ],
       
   201     annotations_keywords = [ [ '++' ], [ '--' ], [ '==' ], [ '??' ] ],
       
   202     annkw = {
       
   203         'positive' : '++',
       
   204         'negative' : '--',
       
   205         'reference' : '==',
       
   206         'question' : '??'
       
   207     },
       
   208     keys_to_delete = [
       
   209         'in_reply_to_screen_name',
       
   210         'in_reply_to_user_id',
       
   211         'retweeted',
       
   212         'place',
       
   213         'geo',
       
   214         'source',
       
   215         'contributors',
       
   216         'coordinates',
       
   217         'retweet_count',
       
   218         'favorited',
       
   219         'truncated',
       
   220         'possibly_sensitive'
       
   221     ],
       
   222     user_keys_to_delete = [
       
   223         'default_profile_image',
       
   224         'show_all_inline_media',
       
   225         'contributors_enabled',
       
   226         'profile_sidebar_fill_color',
       
   227         'created_at',
       
   228         'lang',
       
   229         'time_zone',
       
   230         'profile_sidebar_border_color',
       
   231         'follow_request_sent',
       
   232         'profile_background_image_url',
       
   233         'profile_background_image_url_https',
       
   234         'followers_count',
       
   235         'description',
       
   236         'url',
       
   237         'geo_enabled',
       
   238         'profile_use_background_image',
       
   239         'default_profile',
       
   240         'following',
       
   241         'profile_text_color',
       
   242         'is_translator',
       
   243         'favourites_count',
       
   244         'listed_count',
       
   245         'friends_count',
       
   246         'profile_link_color',
       
   247         'protected',
       
   248         'location',
       
   249         'notifications',
       
   250         'profile_image_url_https',
       
   251         'statuses_count',
       
   252         'verified',
       
   253         'profile_background_color',
       
   254         'profile_background_tile',
       
   255         'utc_offset'
       
   256     ],
       
   257     app = http.createServer(httpHandler),
       
   258     port_flag = process.argv.indexOf("-p"),
       
   259     sio_port = ( port_flag != -1 && port_flag < process.argv.length - 1 && parseInt(process.argv[port_flag + 1]) ? parseInt(process.argv[port_flag + 1]) : DEFAULT_SIO_PORT )
       
   260     io = socketio.listen(app),
       
   261     track_flag = process.argv.indexOf("-T"),
       
   262     tracking_keyword = ( track_flag != -1 && track_flag < process.argv.length - 1 ? process.argv[track_flag + 1] : DEFAULT_TRACKING_KEYWORD ),
       
   263     sqlfile = SQLITE_FILE_DIR + SQLITE_FILE_START + encodeURIComponent(tracking_keyword) + SQLITE_FILE_EXT,
       
   264     db = new sqlite.Database();
       
   265 
       
   266 /* MAIN CODE */
       
   267 
       
   268 app.listen(sio_port);
       
   269 
       
   270 console.log("Listening on port: "+sio_port);
       
   271 console.log("Opening SQLITE file: "+sqlfile);
       
   272 db.open(sqlfile , function(err) {
       
   273     if (err) throw err;
       
   274     createTables();
       
   275 });
       
   276 
       
   277 if (RECORD_NEW_TWEETS) {
       
   278     console.log("Requesting Twitter to track keyword(s): "+tracking_keyword);
       
   279     var req = https.request({
       
   280         host: "stream.twitter.com",
       
   281         path: "/1/statuses/filter.json",
       
   282         method: "POST",
       
   283         headers: {
       
   284             'Authorization': 'Basic ' + new Buffer( TWITTER_USER + ":" + TWITTER_PASS ).toString('base64'),
       
   285             'Content-Type': 'application/x-www-form-urlencoded'
       
   286         }
       
   287     }, function(res) {
       
   288         console.log('Reply from stream.twitter.com: ' + res.statusCode);
       
   289         console.log('Headers: ' + JSON.stringify(res.headers));
       
   290         res.setEncoding('utf8');
       
   291         res.on('data', callBackNewTweets);
       
   292     });
       
   293     
       
   294     req.write('track=' + encodeURIComponent(tracking_keyword));
       
   295     req.end();
       
   296 }
       
   297 
       
   298 io.set('log level', 0);
       
   299 io.sockets.on('connection', function(socket) {
       
   300     console.log("New connection from" + socket.handshake.address.address + " with id=" + socket.id);
       
   301     socket.emit('tweetSummary', { tweetcount : tweets.length });
       
   302     socket.on('updateTweets', function(data) {
       
   303         if (data.tweets.length) {
       
   304             getSendTweets(data.tweets, socket);
       
   305         }
       
   306     });
       
   307     socket.on('updateTimeline', function(data) {
       
   308         getSendTimeline(data.level, socket);
       
   309     });
       
   310 });