tweetcast/nodejs/node-direct.js
changeset 404 89968844eb7d
parent 403 dd1686ae5506
child 405 6626b728b142
equal deleted inserted replaced
403:dd1686ae5506 404:89968844eb7d
     1 /* DEFAULT CONFIGURATION */
       
     2 
       
     3 //var TWITTER_USER = 'materiauxnum',
       
     4 //    TWITTER_PASS = 'm473r14ux7w337',
       
     5 var TWITTER_USER = 'raphv',
       
     6     TWITTER_PASS = '7w337LfC23gE',
       
     7     RECORD_NEW_TWEETS = true,
       
     8     commit_script = '';
       
     9 
       
    10 /* CALLING COMMON CONFIGURATION FILE */
       
    11 
       
    12 var conf_file = flagOption('-c', 'conf.js');
       
    13 
       
    14 myLog('Reading Configuration from ' + conf_file);
       
    15 
       
    16 var fs = require('fs');
       
    17 eval(fs.readFileSync(conf_file,'utf8'));
       
    18 
       
    19 if (typeof sqlfile == "undefined") {
       
    20     sqlfile = __dirname + '/tweets-' + encodeURIComponent(tracking_keyword) + '.sqlite'
       
    21     }
       
    22 
       
    23 /* FUNCTIONS */
       
    24 
       
    25 function flagOption(flag, defaultValue) {
       
    26     var flagPos = process.argv.indexOf(flag);
       
    27     return ( flagPos != -1 && flagPos < process.argv.length - 1) ? process.argv[flagPos + 1] : defaultValue;
       
    28 }
       
    29 
       
    30 function myLog() {
       
    31     var args = ["\033[1;34m["+new Date().toLocaleTimeString()+"]\033[0m"];
       
    32     for (var i in arguments) {
       
    33         args.push(arguments[i]);
       
    34     }
       
    35     console.log.apply(console, args);
       
    36 }
       
    37 
       
    38 function annotationMap(callback, options) {
       
    39     var includeDefault = ( options && options.includeDefault ? options.includeDefault : false );
       
    40     var returnObject = ( options && options.returnObject ? options.returnObject : false );
       
    41     res = (returnObject ? {} : []);
       
    42     for (var i in annotations) {
       
    43         if (i != "default" || includeDefault) {
       
    44             var el = callback(i, annotations[i])
       
    45             if (returnObject) {
       
    46                 res[i] = el;
       
    47             } else {
       
    48                 res.push(el);
       
    49             }
       
    50         }
       
    51     }
       
    52     return res;
       
    53 }
       
    54 
       
    55 function createTables() {
       
    56 
       
    57     var requete = "CREATE TABLE IF NOT EXISTS tweets ( pos INTEGER PRIMARY KEY, tweet_id TEXT UNIQUE, created_at INTEGER, json TEXT"
       
    58         + annotationMap(function(a) { return ', a_' + a + ' INTEGER' }).join("")
       
    59         + " );\n"
       
    60         + "CREATE TABLE IF NOT EXISTS tweet_refs ( id INTEGER PRIMARY KEY, from_id TEXT, to_id TEXT, ref_type TEXT );\n"
       
    61         + "CREATE INDEX IF NOT EXISTS idx_created_at ON tweets ( created_at );";
       
    62     db.executeScript(requete, function(err) {
       
    63         if (err) { myLog("SQLITE error",err.stack); }
       
    64         getSendLastPos();
       
    65     });
       
    66 }
       
    67 
       
    68 function commitReference(from_id, to_id, ref_type) {
       
    69     commit_script += 'INSERT OR IGNORE INTO tweet_refs ( from_id, to_id, ref_type ) VALUES ( "' + from_id + '", "' + to_id + '", "' + ref_type + '" );\n';
       
    70 }
       
    71 
       
    72 function commitTweet(data) {
       
    73 
       
    74     var tweet = JSON.parse(data),
       
    75         ann = [];
       
    76     
       
    77     if (!tweet.id) {
       
    78         myLog("Error: Could not parse data",data);
       
    79         return;
       
    80     }
       
    81     
       
    82     try {
       
    83         textids(tweet);
       
    84         for (var j in keys_to_delete) {
       
    85             delete tweet[keys_to_delete[j]];
       
    86         }
       
    87         textids(tweet.user);
       
    88         for (var j in user_keys_to_delete) {
       
    89             delete tweet.user[user_keys_to_delete[j]];
       
    90         }
       
    91         if (tweet.retweeted_status) {
       
    92             textids(tweet.retweeted_status);
       
    93             for (var j in keys_to_delete) {
       
    94                 delete tweet.retweeted_status[keys_to_delete[j]];
       
    95             }
       
    96         }
       
    97         annotationMap(function(i, annotation) {
       
    98             for (var j in annotation.keywords) {
       
    99                 if (tweet.text.search(annotation.keywords[j]) != -1) {
       
   100                     ann.push(i);
       
   101                     break;
       
   102                 }
       
   103             }
       
   104         });
       
   105         tweet.annotations = ann;
       
   106         tweet.created_at = new Date(tweet.created_at);
       
   107 //        myLog("Time delta :",(new Date() - tweet.created_at) / 1000);
       
   108     } catch (err) {
       
   109         myLog("Error while processing tweet",err.stack);
       
   110     }
       
   111     
       
   112     if (tweet.in_reply_to_status_id) {
       
   113         commitReference( tweet.id, tweet.in_reply_to_status_id, "reply" );
       
   114     }
       
   115     if (tweet.retweeted_status) {
       
   116         commitReference( tweet.id, tweet.retweeted_status.id, "retweet" );
       
   117     }
       
   118     commit_script += 'INSERT INTO tweets ( tweet_id, created_at, json '
       
   119         + annotationMap(function(a) { return ', a_' + a }).join("")
       
   120         + ' ) VALUES ( "'
       
   121         + tweet.id
       
   122         + '", '
       
   123         + tweet.created_at.valueOf()
       
   124         + ', "'
       
   125         + JSON.stringify(tweet).replace(/\"/g, '""')
       
   126         + '"'
       
   127         + annotationMap(function(a) {
       
   128             return ann.indexOf(a) == -1 ? ', 0' : ', 1'
       
   129         }).join("")
       
   130         + ' );\n';
       
   131 }
       
   132 
       
   133 function callBackNewTweets(chunk) {
       
   134     var newdata = chunk.split('\r\n');
       
   135     for (var i in newdata) {
       
   136         if (newdata[i].length > 0) {
       
   137             commitTweet(newdata[i]);
       
   138         }
       
   139     }
       
   140 //    myLog("Data received - length :",chunk.length);
       
   141 }
       
   142 
       
   143 function requestTwitter() {
       
   144     myLog("Requesting Twitter to track keyword(s): "+tracking_keyword);
       
   145     var req = https.request({
       
   146         host: "stream.twitter.com",
       
   147         path: "/1/statuses/filter.json",
       
   148         method: "POST",
       
   149         headers: {
       
   150             'Authorization': 'Basic ' + new Buffer( TWITTER_USER + ":" + TWITTER_PASS ).toString('base64'),
       
   151             'Content-Type': 'application/x-www-form-urlencoded'
       
   152         }
       
   153     }, function(res) {
       
   154         myLog('Reply from stream.twitter.com: ' + res.statusCode);
       
   155         myLog('Headers: ' + JSON.stringify(res.headers));
       
   156         res.setEncoding('utf8');
       
   157         res.on('data', callBackNewTweets);
       
   158         res.on('end', function() {
       
   159             myLog('End Twitter Connection — Trying to reconnect');
       
   160             requestTwitter();
       
   161         });
       
   162     });
       
   163     
       
   164     req.write('track=' + encodeURIComponent(tracking_keyword));
       
   165     req.socket.setTimeout(60000);
       
   166     req.socket.on('timeout', function() {
       
   167         myLog('TimeOut - Trying to reconnect');
       
   168         requestTwitter();
       
   169     });
       
   170     req.end();
       
   171 }
       
   172 
       
   173 function getSendLastPos() {
       
   174     db.execute("SELECT MAX(pos) lastpos FROM tweets", function (err, results) {
       
   175         if (err) { myLog("SQLITE error",err.stack); }
       
   176         if (results[0].lastpos != lastpos) {
       
   177             lastpos = results[0].lastpos ? results[0].lastpos : 0;
       
   178             try {
       
   179                 io.sockets.emit('tweetSummary', {
       
   180                     tweetcount : lastpos
       
   181                 });
       
   182             } catch(err) {
       
   183                 myLog("SOCKET.IO error while Broadcasting tweetSummary",err.stack);
       
   184             }
       
   185         }
       
   186     });
       
   187 }
       
   188 
       
   189 function commitTweets() {
       
   190     if (commit_script != '') {
       
   191         var requete = commit_script;
       
   192         commit_script = '';
       
   193       //  console.log(requete);
       
   194       //  var reqd = new Date();
       
   195         db.executeScript(requete, function (err) {
       
   196             if (err) { myLog("SQLITE error",err.stack); }
       
   197         //    myLog("Commit took",(new Date() - reqd),"ms");
       
   198             getSendLastPos();
       
   199         });
       
   200     }
       
   201 }
       
   202 
       
   203 function getSendTweetPosByDate(date, socket) {
       
   204     db.execute("SELECT pos, created_at, ABS(created_at-" + date + ") AS dist FROM tweets ORDER BY dist ASC LIMIT 0,9", function (err, results) {
       
   205         if (err) { myLog("SQLITE error",err.stack); }
       
   206         if (results.length) {
       
   207             try {
       
   208                 socket.emit('tweetPosByDate', {
       
   209                     tweetpos : results[0].pos,
       
   210                     date : results[0].created_at
       
   211                 });
       
   212             } catch(err) {
       
   213                 myLog("SOCKET.IO error while sending tweetPosByDate",err.stack);
       
   214             }
       
   215         }
       
   216     });
       
   217 }
       
   218 
       
   219 function getSendLinkedTweets(pos, socket) {
       
   220 //    myLog("request for tweets linked to",pos);
       
   221     db.execute("SELECT A.pos pos_a, A.tweet_id id_a, B.pos pos_b, B.tweet_id id_b, ref_type, ABS(B.created_at - A.created_at) delta FROM tweets A, tweets B, tweet_refs WHERE id_a = from_id AND id_b = to_id AND (pos_a = ? OR pos_b = ?) ORDER BY delta ASC LIMIT 0, 10", [ pos, pos ], function(err, results) {
       
   222         if (err) { myLog("SQLITE error: ",err.stack); }
       
   223         var struct = {
       
   224             "tweetpos" : pos,
       
   225             "referencing" : [],
       
   226             "referenced_by" : []
       
   227         };
       
   228         for (var i in results) {
       
   229             if (results[i].pos_a == pos) {
       
   230                 struct.referencing.push({
       
   231                     "pos" : results[i].pos_b,
       
   232                     "ref_type" : results[i].ref_type
       
   233                 });
       
   234             } else {
       
   235                 struct.referenced_by.push({
       
   236                     "pos" : results[i].pos_a,
       
   237                     "ref_type" : results[i].ref_type
       
   238                 });
       
   239             }
       
   240         }
       
   241         try {
       
   242             socket.emit('linkedTweets', struct);
       
   243         } catch(err) {
       
   244             myLog("SOCKET.IO error while sending linkedTweets: ",err.stack);
       
   245         }
       
   246     });
       
   247 }
       
   248 
       
   249 function getSendTweets(posList, socket) {
       
   250 //    myLog("request for tweets ("+posList.join(',')+") from "+socket.id);
       
   251     db.execute("SELECT * FROM tweets WHERE pos IN ( " + posList.join(',') + " )", function (err, results) {
       
   252         if (err) { myLog("SQLITE error",err.stack); }
       
   253         try {
       
   254             socket.emit('tweets',
       
   255                 results.map(function(line) {
       
   256                     var tw = JSON.parse(line.json);
       
   257                     tw.pos = line.pos;
       
   258                     return tw;
       
   259                 })
       
   260             );
       
   261         } catch (err) {
       
   262             myLog("SOCKET.IO error while sending tweets",err.stack);
       
   263         }
       
   264     });
       
   265 }
       
   266 
       
   267 function getSendTimeline(data, socket) {
       
   268 //    myLog("request for timeline (",data.level, data.full,") from "+socket.id);
       
   269     var lvl = date_levels[data.level],
       
   270         requete = "SELECT COUNT(*) AS nb, "
       
   271         + lvl
       
   272         + "*ROUND(created_at/"
       
   273         + lvl
       
   274         + ") AS tranche"
       
   275         + annotationMap(function (a) { return " , SUM(a_" + a + ") AS s_" + a }).join("")
       
   276         + " FROM tweets GROUP BY tranche ORDER BY tranche DESC LIMIT 0," + ( data.full ? "50" : "1" );
       
   277     db.execute(requete, function (err, results) {
       
   278         if (err) { myLog("SQLITE error",err.stack); }
       
   279         if (!results.length) {
       
   280             return;
       
   281         }
       
   282         var tbl = [],
       
   283             lastend = parseInt(results[results.length - 1].tranche);
       
   284         for (var i = results.length - 1; i >= 0; i--) {
       
   285             var start = parseInt(results[i].tranche);
       
   286             while (start > lastend) {
       
   287                 var struct = { "start": lastend, "tweets" : 0, "annotations" : {} };
       
   288                 lastend += lvl;
       
   289                 struct.end = lastend;
       
   290                 tbl.push(struct);
       
   291             }
       
   292             lastend += lvl;
       
   293             var struct = {
       
   294                 "start" : start,
       
   295                 "end" : lastend,
       
   296                 "tweets" : results[i].nb,
       
   297                 "annotations" : annotationMap(function (a) {
       
   298                     return results[i]['s_'+a];
       
   299                 },{returnObject: true})
       
   300             }
       
   301             tbl.push(struct);
       
   302         }
       
   303         try {
       
   304             socket.emit('timeline', {
       
   305                 "full" : data.full,
       
   306                 "level" : data.level,
       
   307                 "data" : tbl
       
   308             });
       
   309         } catch (err) {
       
   310             myLog("SOCKET.IO error while sending timeline",err.stack);
       
   311         }
       
   312     });
       
   313 }
       
   314 
       
   315 function textids(object) {
       
   316     for (var key in object) {
       
   317         // Workaround for Unicode bug in socket.io.
       
   318         
       
   319         if (typeof object[key] == "string") {
       
   320             var tmp = '';
       
   321             for (var i = 0; i < object[key].length; i++) {
       
   322                 tmp += ( object[key].charCodeAt(i) < 128 ? object[key].charAt(i) : "&#" + object[key].charCodeAt(i) + ";" );
       
   323             }
       
   324             object[key] = tmp;
       
   325         }
       
   326         
       
   327         if (key.substr(-2) == 'id') {
       
   328             object[key] = object[key + '_str'];
       
   329             delete object[key + '_str'];
       
   330         }
       
   331         
       
   332     }
       
   333 }
       
   334 
       
   335 function httpHandler(req, res) {
       
   336     myLog("HTTP Request for URL "+req.url);
       
   337     var url = ( req.url == "/config" ? conf_file : __dirname + "/client" + req.url + ( req.url[req.url.length - 1] == "/" ? "index.html" : "" ) );
       
   338     fs.readFile( url, function(err, data) {
       
   339         if (err) {
       
   340             myLog("Error 404");
       
   341             res.writeHead(404);
       
   342             return res.end('File not found');
       
   343         }
       
   344         res.writeHead(200);
       
   345         res.end(data);
       
   346     });
       
   347 }
       
   348 
       
   349 /* Initialization */
       
   350 
       
   351 var http = require('http'),
       
   352     https = require('https'),
       
   353     sqlite = require('sqlite'),
       
   354     socketio = require('socket.io'),
       
   355     tweets = [],
       
   356     lastpos = 0,
       
   357     arcs = [],
       
   358     tweet_ids = [],
       
   359     date_struct = [],
       
   360     date_levels = [
       
   361         3600 * 1000,
       
   362         15 * 60 * 1000,
       
   363         5 * 60 * 1000,
       
   364         60 * 1000,
       
   365         15 * 1000
       
   366     ],
       
   367     keys_to_delete = [
       
   368         'in_reply_to_screen_name',
       
   369         'in_reply_to_user_id',
       
   370         'retweeted',
       
   371         'place',
       
   372         'geo',
       
   373         'source',
       
   374         'contributors',
       
   375         'coordinates',
       
   376         'retweet_count',
       
   377         'favorited',
       
   378         'truncated',
       
   379         'possibly_sensitive'
       
   380     ],
       
   381     user_keys_to_delete = [
       
   382         'default_profile_image',
       
   383         'show_all_inline_media',
       
   384         'contributors_enabled',
       
   385         'profile_sidebar_fill_color',
       
   386         'created_at',
       
   387         'lang',
       
   388         'time_zone',
       
   389         'profile_sidebar_border_color',
       
   390         'follow_request_sent',
       
   391         'profile_background_image_url',
       
   392         'profile_background_image_url_https',
       
   393         'followers_count',
       
   394         'description',
       
   395         'url',
       
   396         'geo_enabled',
       
   397         'profile_use_background_image',
       
   398         'default_profile',
       
   399         'following',
       
   400         'profile_text_color',
       
   401         'is_translator',
       
   402         'favourites_count',
       
   403         'listed_count',
       
   404         'friends_count',
       
   405         'profile_link_color',
       
   406         'protected',
       
   407         'location',
       
   408         'notifications',
       
   409         'profile_image_url_https',
       
   410         'statuses_count',
       
   411         'verified',
       
   412         'profile_background_color',
       
   413         'profile_background_tile',
       
   414         'utc_offset'
       
   415     ],
       
   416     app = http.createServer(httpHandler),
       
   417     io = socketio.listen(app),
       
   418     db = new sqlite.Database();
       
   419 
       
   420 /* MAIN CODE */
       
   421 
       
   422 app.listen(app_port);
       
   423 myLog("Listening on port: "+app_port);
       
   424 myLog("Opening SQLITE file: "+sqlfile);
       
   425 db.open(sqlfile , function(err) {
       
   426     if (err) { myLog("SQLITE error",err.stack); }
       
   427     createTables();
       
   428 });
       
   429 
       
   430 setInterval(commitTweets,500);
       
   431 setInterval(function(){myLog("Still alive, tweet count",lastpos)}, 60000);
       
   432 
       
   433 if (RECORD_NEW_TWEETS) {
       
   434     requestTwitter();
       
   435 }
       
   436 
       
   437 io.set('log level', 0);
       
   438 io.sockets.on('connection', function(socket) {
       
   439     myLog("New connection from", socket.handshake.address.address, "with id=", socket.id);
       
   440     try {
       
   441         socket.emit('tweetSummary', { tweetcount : lastpos });
       
   442     } catch (err) {
       
   443         myLog("SOCKET.IO error while sending tweetSummary",err.stack);
       
   444     }
       
   445     socket.on('updateTweets', function(data) {
       
   446         if (data.tweets.length) {
       
   447             getSendTweets(data.tweets, socket);
       
   448         }
       
   449     });
       
   450     socket.on('updateTimeline', function(data) {
       
   451         getSendTimeline(data, socket);
       
   452     });
       
   453     socket.on('tweetPosByDate', function(data) {
       
   454         getSendTweetPosByDate(data.date, socket);
       
   455     });
       
   456     socket.on('linkedTweets', function(data) {
       
   457         getSendLinkedTweets(data.tweetpos, socket);
       
   458     });
       
   459 });