tweetcast/nodejs/node-direct.js
changeset 336 d60efd677b50
parent 335 5f83c21dee69
child 338 60dff8a71024
equal deleted inserted replaced
330:1a77b44dfa7d 336:d60efd677b50
       
     1 /* CALLING COMMON CONFIGURATION FILE */
       
     2 
       
     3 console.log('Reading Configuration from conf.js');
       
     4 
       
     5 var fs = require('fs');
       
     6 eval(fs.readFileSync(__dirname + '/conf.js','utf8'));
       
     7 
       
     8 /* SERVER-SIDE ONLY CONFIGURATION */
       
     9 
       
    10 sqlfile = __dirname + '/tweets-' + encodeURIComponent(tracking_keyword) + '.sqlite';
       
    11 TWITTER_USER = 'materiauxnum';
       
    12 TWITTER_PASS = 'm473r14ux7w337';
       
    13 RECORD_NEW_TWEETS = true;
       
    14 
       
    15 /* FUNCTIONS */
       
    16 
       
    17 function annotationMap(callback, options) {
       
    18     var includeDefault = ( options && options.includeDefault ? options.includeDefault : false );
       
    19     var returnObject = ( options && options.returnObject ? options.returnObject : false );
       
    20     res = (returnObject ? {} : []);
       
    21     for (var i in annotations) {
       
    22         if (i != "default" || includeDefault) {
       
    23             var el = callback(i, annotations[i])
       
    24             if (returnObject) {
       
    25                 res[i] = el;
       
    26             } else {
       
    27                 res.push(el);
       
    28             }
       
    29         }
       
    30     }
       
    31     return res;
       
    32 }
       
    33 
       
    34 function createTables() {
       
    35 
       
    36     var requete = "CREATE TABLE IF NOT EXISTS tweets ( pos INTEGER PRIMARY KEY, tweet_id TEXT UNIQUE, created_at INTEGER, json TEXT" + annotationMap(function(a) { return ', a_' + a + ' INTEGER' }).join("") + " )";
       
    37     db.execute(requete, function(err) {
       
    38         if (err) throw err;
       
    39         db.execute("CREATE INDEX IF NOT EXISTS idx_created_at ON tweets ( created_at )", function(err) { if (err) throw err; });
       
    40         getSendLastPos();
       
    41     });
       
    42     
       
    43     db.execute("CREATE TABLE IF NOT EXISTS tweet_refs ( id INTEGER PRIMARY KEY, from_id TEXT, to_id TEXT, ref_type TEXT )", function(err) { if (err) throw err; });
       
    44 }
       
    45 
       
    46 function commitReference(from_id, to_id, ref_type) {
       
    47     db.execute(
       
    48         "INSERT INTO tweet_refs ( from_id, to_id, ref_type ) VALUES ( ?, ?, ? )",
       
    49         [ from_id, to_id, ref_type ],
       
    50         function(err) { if (err) throw err; }
       
    51     );
       
    52 }
       
    53 
       
    54 function commitTweet(data) {
       
    55 
       
    56     var tweet = JSON.parse(data),
       
    57         ann = [];
       
    58 
       
    59     textids(tweet);
       
    60     for (var j in keys_to_delete) {
       
    61         delete tweet[keys_to_delete[j]];
       
    62     }
       
    63     textids(tweet.user);
       
    64     for (var j in user_keys_to_delete) {
       
    65         delete tweet.user[user_keys_to_delete[j]];
       
    66     }
       
    67     if (tweet.retweeted_status) {
       
    68         textids(tweet.retweeted_status);
       
    69         for (var j in keys_to_delete) {
       
    70             delete tweet.retweeted_status[keys_to_delete[j]];
       
    71         }
       
    72     }
       
    73     annotationMap(function(i, annotation) {
       
    74         for (var j in annotation.keywords) {
       
    75             if (tweet.text.search(annotation.keywords[j]) != -1) {
       
    76                 ann.push(i);
       
    77                 break;
       
    78             }
       
    79         }
       
    80     });
       
    81     tweet.annotations = ann;
       
    82     tweet.created_at = new Date(tweet.created_at);
       
    83     
       
    84     if (tweet.in_reply_to_status_id) {
       
    85         commitReference( tweet.id, tweet.in_reply_to_status_id, "reply" );
       
    86     }
       
    87     if (tweet.retweeted_status) {
       
    88         commitReference( tweet.id, tweet.retweeted_status.id, "retweet" );
       
    89     }
       
    90     db.execute(
       
    91         "INSERT INTO tweets ( tweet_id, created_at, json "
       
    92         + annotationMap(function(a) { return ', a_' + a }).join("")
       
    93         + " ) VALUES ( ?, ?, ? "
       
    94         + annotationMap(function(a) { return ', ?' }).join("")
       
    95         + " )",
       
    96         [ tweet.id, tweet.created_at.valueOf(), JSON.stringify(tweet) ].concat(annotationMap(function(a) { return ann.indexOf(a) == -1 ? 0 : 1 })),
       
    97         function(err) {
       
    98             if (err) throw err;
       
    99             getSendLastPos();
       
   100         }
       
   101     );
       
   102 }
       
   103 
       
   104 function callBackNewTweets(chunk) {
       
   105     var newdata = chunk.split('\r\n');
       
   106     for (var i in newdata) {
       
   107         if (newdata[i].length > 0) {
       
   108             commitTweet(newdata[i]);
       
   109         }
       
   110     }
       
   111     console.log("New tweets received");
       
   112 }
       
   113 
       
   114 function getSendLastPos() {
       
   115     db.execute("SELECT MAX(pos) lastpos FROM tweets", function (err, results) {
       
   116         if (err) throw err;
       
   117         lastpos = results[0].lastpos ? results[0].lastpos : 0;
       
   118         console.log("Broadcasting last pos = ",lastpos);
       
   119         io.sockets.emit('tweetSummary', {
       
   120             tweetcount : lastpos
       
   121         });
       
   122     });
       
   123 }
       
   124 
       
   125 
       
   126 function getSendTweets(posList, socket) {
       
   127     console.log("request for tweets ("+posList.join(',')+") from "+socket.id);
       
   128     db.execute("SELECT * FROM tweets WHERE pos IN ( " + posList.join(',') + " )", function (err, results) {
       
   129         if (err) throw err;
       
   130         socket.emit('tweets',
       
   131             results.map(function(line) {
       
   132                 var tw = JSON.parse(line.json);
       
   133                 tw.pos = line.pos;
       
   134                 return tw;
       
   135             })
       
   136         );
       
   137     });
       
   138 }
       
   139 
       
   140 function getSendTimeline(level, socket) {
       
   141     console.log("request for timeline ("+level+") from "+socket.id);
       
   142     var lvl = date_levels[level],
       
   143         requete = "SELECT COUNT(*) AS nb, "
       
   144         + lvl
       
   145         + "*ROUND(created_at/"
       
   146         + lvl
       
   147         + ") AS tranche"
       
   148         + annotationMap(function (a) { return " , SUM(a_" + a + ") AS s_" + a }).join("")
       
   149         + " FROM tweets GROUP BY tranche ORDER BY tranche DESC LIMIT 0,50";
       
   150     db.execute(requete, function (err, results) {
       
   151         if (err) throw err;
       
   152         var tbl = [],
       
   153             lastend = parseInt(results[results.length - 1].tranche);
       
   154         for (var i = results.length - 1; i >= 0; i--) {
       
   155             var start = parseInt(results[i].tranche);
       
   156             while (start > lastend) {
       
   157                 var struct = { "start": lastend, "tweets" : 0, "annotations" : {} };
       
   158                 lastend += lvl;
       
   159                 struct.end = lastend;
       
   160                 tbl.push(struct);
       
   161             }
       
   162             lastend += lvl;
       
   163             var struct = {
       
   164                 "start" : start,
       
   165                 "end" : lastend,
       
   166                 "tweets" : results[i].nb,
       
   167                 "annotations" : annotationMap(function (a) {
       
   168                     return results[i]['s_'+a];
       
   169                 },{returnObject: true})
       
   170             }
       
   171             tbl.push(struct);
       
   172         }
       
   173         socket.emit('timeline', tbl);
       
   174     });
       
   175 }
       
   176 
       
   177 function textids(object) {
       
   178     for (var key in object) {
       
   179         if (key.substr(-2) == 'id') {
       
   180             object[key] = object[key + '_str'];
       
   181             delete object[key + '_str'];
       
   182         }
       
   183     }
       
   184 }
       
   185 
       
   186 function httpHandler(req, res) {
       
   187     console.log("HTTP Request for URL "+req.url);
       
   188     var url = __dirname + ( req.url == "/conf.js" ? "" : "/client" ) + req.url + ( req.url[req.url.length - 1] == "/" ? "index.html" : "" );
       
   189     fs.readFile( url, function(err, data) {
       
   190         if (err) {
       
   191             console.log("Error 404");
       
   192             res.writeHead(404);
       
   193             return res.end('File not found');
       
   194         }
       
   195         res.writeHead(200);
       
   196         res.end(data);
       
   197     });
       
   198 }
       
   199 
       
   200 /* Initialization */
       
   201 
       
   202 var http = require('http'),
       
   203     https = require('https'),
       
   204     sqlite = require('sqlite'),
       
   205     socketio = require('socket.io'),
       
   206     tweets = [],
       
   207     lastpos = 0,
       
   208     arcs = [],
       
   209     tweet_ids = [],
       
   210     date_struct = [],
       
   211     date_levels = [
       
   212         3600 * 1000,
       
   213         15 * 60 * 1000,
       
   214         5 * 60 * 1000,
       
   215         60 * 1000,
       
   216         15 * 1000
       
   217     ],
       
   218     keys_to_delete = [
       
   219         'in_reply_to_screen_name',
       
   220         'in_reply_to_user_id',
       
   221         'retweeted',
       
   222         'place',
       
   223         'geo',
       
   224         'source',
       
   225         'contributors',
       
   226         'coordinates',
       
   227         'retweet_count',
       
   228         'favorited',
       
   229         'truncated',
       
   230         'possibly_sensitive'
       
   231     ],
       
   232     user_keys_to_delete = [
       
   233         'default_profile_image',
       
   234         'show_all_inline_media',
       
   235         'contributors_enabled',
       
   236         'profile_sidebar_fill_color',
       
   237         'created_at',
       
   238         'lang',
       
   239         'time_zone',
       
   240         'profile_sidebar_border_color',
       
   241         'follow_request_sent',
       
   242         'profile_background_image_url',
       
   243         'profile_background_image_url_https',
       
   244         'followers_count',
       
   245         'description',
       
   246         'url',
       
   247         'geo_enabled',
       
   248         'profile_use_background_image',
       
   249         'default_profile',
       
   250         'following',
       
   251         'profile_text_color',
       
   252         'is_translator',
       
   253         'favourites_count',
       
   254         'listed_count',
       
   255         'friends_count',
       
   256         'profile_link_color',
       
   257         'protected',
       
   258         'location',
       
   259         'notifications',
       
   260         'profile_image_url_https',
       
   261         'statuses_count',
       
   262         'verified',
       
   263         'profile_background_color',
       
   264         'profile_background_tile',
       
   265         'utc_offset'
       
   266     ],
       
   267     app = http.createServer(httpHandler),
       
   268     io = socketio.listen(app),
       
   269     db = new sqlite.Database();
       
   270 
       
   271 /* MAIN CODE */
       
   272 
       
   273 app.listen(app_port);
       
   274 console.log("Listening on port: "+app_port);
       
   275 console.log("Opening SQLITE file: "+sqlfile);
       
   276 db.open(sqlfile , function(err) {
       
   277     if (err) throw err;
       
   278     createTables();
       
   279 });
       
   280 
       
   281 if (RECORD_NEW_TWEETS) {
       
   282     console.log("Requesting Twitter to track keyword(s): "+tracking_keyword);
       
   283     var req = https.request({
       
   284         host: "stream.twitter.com",
       
   285         path: "/1/statuses/filter.json",
       
   286         method: "POST",
       
   287         headers: {
       
   288             'Authorization': 'Basic ' + new Buffer( TWITTER_USER + ":" + TWITTER_PASS ).toString('base64'),
       
   289             'Content-Type': 'application/x-www-form-urlencoded'
       
   290         }
       
   291     }, function(res) {
       
   292         console.log('Reply from stream.twitter.com: ' + res.statusCode);
       
   293         console.log('Headers: ' + JSON.stringify(res.headers));
       
   294         res.setEncoding('utf8');
       
   295         res.on('data', callBackNewTweets);
       
   296     });
       
   297     
       
   298     req.write('track=' + encodeURIComponent(tracking_keyword));
       
   299     req.end();
       
   300 }
       
   301 
       
   302 io.set('log level', 0);
       
   303 io.sockets.on('connection', function(socket) {
       
   304     console.log("New connection from" + socket.handshake.address.address + " with id=" + socket.id);
       
   305     socket.emit('tweetSummary', { tweetcount : tweets.length });
       
   306     socket.on('updateTweets', function(data) {
       
   307         if (data.tweets.length) {
       
   308             getSendTweets(data.tweets, socket);
       
   309         }
       
   310     });
       
   311     socket.on('updateTimeline', function(data) {
       
   312         getSendTimeline(data.level, socket);
       
   313     });
       
   314 });