tweetcast/nodejs/node-direct.js
changeset 340 a99a04556e3b
parent 339 6a073c4a8578
child 341 cab5c9e10f90
equal deleted inserted replaced
339:6a073c4a8578 340:a99a04556e3b
    53 
    53 
    54     var requete = "CREATE TABLE IF NOT EXISTS tweets ( pos INTEGER PRIMARY KEY, tweet_id TEXT UNIQUE, created_at INTEGER, json TEXT" + annotationMap(function(a) { return ', a_' + a + ' INTEGER' }).join("") + " )";
    54     var requete = "CREATE TABLE IF NOT EXISTS tweets ( pos INTEGER PRIMARY KEY, tweet_id TEXT UNIQUE, created_at INTEGER, json TEXT" + annotationMap(function(a) { return ', a_' + a + ' INTEGER' }).join("") + " )";
    55     db.execute(requete, function(err) {
    55     db.execute(requete, function(err) {
    56         if (err) { myLog("SQLITE error",err.stack); }
    56         if (err) { myLog("SQLITE error",err.stack); }
    57         db.execute("CREATE INDEX IF NOT EXISTS idx_created_at ON tweets ( created_at )", function(err) { if (err) { myLog("SQLITE error",err.stack); } });
    57         db.execute("CREATE INDEX IF NOT EXISTS idx_created_at ON tweets ( created_at )", function(err) { if (err) { myLog("SQLITE error",err.stack); } });
    58         getSendLastPos();
       
    59     });
    58     });
    60     
    59     
    61     db.execute("CREATE TABLE IF NOT EXISTS tweet_refs ( id INTEGER PRIMARY KEY, from_id TEXT, to_id TEXT, ref_type TEXT )", function(err) { if (err) { myLog("SQLITE error",err.stack); } });
    60     db.execute("CREATE TABLE IF NOT EXISTS tweet_refs ( id INTEGER PRIMARY KEY, from_id TEXT, to_id TEXT, ref_type TEXT )", function(err) { if (err) { myLog("SQLITE error",err.stack); } });
    62 }
    61 }
    63 
    62 
    71 
    70 
    72 function commitTweet(data) {
    71 function commitTweet(data) {
    73 
    72 
    74     var tweet = JSON.parse(data),
    73     var tweet = JSON.parse(data),
    75         ann = [];
    74         ann = [];
    76 
    75     
    77     textids(tweet);
    76     if (!tweet.id) {
    78     for (var j in keys_to_delete) {
    77         myLog("Error: Could not parse data",data);
    79         delete tweet[keys_to_delete[j]];
    78         return;
    80     }
    79     }
    81     textids(tweet.user);
    80     
    82     for (var j in user_keys_to_delete) {
    81     try {
    83         delete tweet.user[user_keys_to_delete[j]];
    82         textids(tweet);
    84     }
       
    85     if (tweet.retweeted_status) {
       
    86         textids(tweet.retweeted_status);
       
    87         for (var j in keys_to_delete) {
    83         for (var j in keys_to_delete) {
    88             delete tweet.retweeted_status[keys_to_delete[j]];
    84             delete tweet[keys_to_delete[j]];
    89         }
    85         }
    90     }
    86         textids(tweet.user);
    91     annotationMap(function(i, annotation) {
    87         for (var j in user_keys_to_delete) {
    92         for (var j in annotation.keywords) {
    88             delete tweet.user[user_keys_to_delete[j]];
    93             if (tweet.text.search(annotation.keywords[j]) != -1) {
    89         }
    94                 ann.push(i);
    90         if (tweet.retweeted_status) {
    95                 break;
    91             textids(tweet.retweeted_status);
    96             }
    92             for (var j in keys_to_delete) {
    97         }
    93                 delete tweet.retweeted_status[keys_to_delete[j]];
    98     });
    94             }
    99     tweet.annotations = ann;
    95         }
   100     tweet.created_at = new Date(tweet.created_at);
    96         annotationMap(function(i, annotation) {
       
    97             for (var j in annotation.keywords) {
       
    98                 if (tweet.text.search(annotation.keywords[j]) != -1) {
       
    99                     ann.push(i);
       
   100                     break;
       
   101                 }
       
   102             }
       
   103         });
       
   104         tweet.annotations = ann;
       
   105         tweet.created_at = new Date(tweet.created_at);
       
   106 //        myLog("Time delta :",(new Date() - tweet.created_at) / 1000);
       
   107     } catch (err) {
       
   108         myLog("Error while processing tweet",err.stack);
       
   109     }
   101     
   110     
   102     if (tweet.in_reply_to_status_id) {
   111     if (tweet.in_reply_to_status_id) {
   103         commitReference( tweet.id, tweet.in_reply_to_status_id, "reply" );
   112         commitReference( tweet.id, tweet.in_reply_to_status_id, "reply" );
   104     }
   113     }
   105     if (tweet.retweeted_status) {
   114     if (tweet.retweeted_status) {
   112         + annotationMap(function(a) { return ', ?' }).join("")
   121         + annotationMap(function(a) { return ', ?' }).join("")
   113         + " )",
   122         + " )",
   114         [ tweet.id, tweet.created_at.valueOf(), JSON.stringify(tweet) ].concat(annotationMap(function(a) { return ann.indexOf(a) == -1 ? 0 : 1 })),
   123         [ tweet.id, tweet.created_at.valueOf(), JSON.stringify(tweet) ].concat(annotationMap(function(a) { return ann.indexOf(a) == -1 ? 0 : 1 })),
   115         function(err) {
   124         function(err) {
   116             if (err) { myLog("SQLITE error",err.stack); }
   125             if (err) { myLog("SQLITE error",err.stack); }
   117             getSendLastPos();
       
   118         }
   126         }
   119     );
   127     );
   120 }
   128 }
   121 
   129 
   122 function callBackNewTweets(chunk) {
   130 function callBackNewTweets(chunk) {
   124     for (var i in newdata) {
   132     for (var i in newdata) {
   125         if (newdata[i].length > 0) {
   133         if (newdata[i].length > 0) {
   126             commitTweet(newdata[i]);
   134             commitTweet(newdata[i]);
   127         }
   135         }
   128     }
   136     }
   129 //    myLog("New tweets received");
   137     myLog("Data received - length :",chunk.length);
   130 }
   138 }
   131 
   139 
   132 function requestTwitter() {
   140 function requestTwitter() {
   133     myLog("Requesting Twitter to track keyword(s): "+tracking_keyword);
   141     myLog("Requesting Twitter to track keyword(s): "+tracking_keyword);
   134     var req = https.request({
   142     var req = https.request({
   151     });
   159     });
   152     
   160     
   153     req.write('track=' + encodeURIComponent(tracking_keyword));
   161     req.write('track=' + encodeURIComponent(tracking_keyword));
   154     req.socket.setTimeout(60000);
   162     req.socket.setTimeout(60000);
   155     req.socket.on('timeout', function() {
   163     req.socket.on('timeout', function() {
   156         myLog('TimeOut — Trying to reconnect');
   164         myLog('TimeOut - Trying to reconnect');
   157         requestTwitter();
   165         requestTwitter();
   158     });
   166     });
   159     req.end();
   167     req.end();
   160 }
   168 }
   161 
   169 
   175         }
   183         }
   176     });
   184     });
   177 }
   185 }
   178 
   186 
   179 function getSendTweetPosByDate(date, socket) {
   187 function getSendTweetPosByDate(date, socket) {
   180     db.execute("SELECT pos, created_at, ABS(created_at-" + date + ") AS dist FROM tweets ORDER BY dist ASC LIMIT 0,1", function (err, results) {
   188     db.execute("SELECT pos, created_at, ABS(created_at-" + date + ") AS dist FROM tweets ORDER BY dist ASC LIMIT 0,9", function (err, results) {
   181         if (err) { myLog("SQLITE error",err.stack); }
   189         if (err) { myLog("SQLITE error",err.stack); }
   182         if (results.length) {
   190         if (results.length) {
   183             try {
   191             try {
   184                 socket.emit('tweetPosByDate', {
   192                 socket.emit('tweetPosByDate', {
   185                     tweetpos : results[0].pos,
   193                     tweetpos : results[0].pos,
   191         }
   199         }
   192     });
   200     });
   193 }
   201 }
   194 
   202 
   195 function getSendLinkedTweets(pos, socket) {
   203 function getSendLinkedTweets(pos, socket) {
   196     myLog("request for tweets linked to",pos);
   204 //    myLog("request for tweets linked to",pos);
   197     db.execute("SELECT A.pos pos_a, A.tweet_id id_a, A.json json_a, B.pos pos_b, B.tweet_id id_b, B.json json_b, ref_type FROM tweets A, tweets B, tweet_refs WHERE id_a = from_id AND id_b = to_id AND (pos_a = ? OR pos_b = ?)", [ pos, pos ], function(err, results) {
   205     db.execute("SELECT A.pos pos_a, A.tweet_id id_a, B.pos pos_b, B.tweet_id id_b, ref_type, ABS(B.created_at - A.created_at) delta FROM tweets A, tweets B, tweet_refs WHERE id_a = from_id AND id_b = to_id AND (pos_a = ? OR pos_b = ?) ORDER BY delta ASC LIMIT 0, 10", [ pos, pos ], function(err, results) {
   198         if (err) { myLog("SQLITE error: ",err.stack); }
   206         if (err) { myLog("SQLITE error: ",err.stack); }
   199         var struct = {
   207         var struct = {
   200             "tweetpos" : pos,
   208             "tweetpos" : pos,
   201             "referencing" : [],
   209             "referencing" : [],
   202             "referenced_by" : []
   210             "referenced_by" : []
   203         };
   211         };
   204         for (var i in results) {
   212         for (var i in results) {
   205             if (results[i].pos_a == pos) {
   213             if (results[i].pos_a == pos) {
   206                 var tw = JSON.parse(results[i].json_b);
       
   207                 tw.pos = results[i].pos_b;
       
   208                 struct.referencing.push({
   214                 struct.referencing.push({
   209                     "tweet" : tw,
   215                     "pos" : results[i].pos_b,
   210                     "ref_type" : results[i].ref_type
   216                     "ref_type" : results[i].ref_type
   211                 });
   217                 });
   212             } else {
   218             } else {
   213                 var tw = JSON.parse(results[i].json_a);
       
   214                 tw.pos = results[i].pos_a;
       
   215                 struct.referenced_by.push({
   219                 struct.referenced_by.push({
   216                     "tweet" : tw,
   220                     "pos" : results[i].pos_a,
   217                     "ref_type" : results[i].ref_type
   221                     "ref_type" : results[i].ref_type
   218                 });
   222                 });
   219             }
   223             }
   220         }
   224         }
   221         try {
   225         try {
   243         }
   247         }
   244     });
   248     });
   245 }
   249 }
   246 
   250 
   247 function getSendTimeline(data, socket) {
   251 function getSendTimeline(data, socket) {
   248     myLog("request for timeline (",data.level, data.full,") from "+socket.id);
   252 //    myLog("request for timeline (",data.level, data.full,") from "+socket.id);
   249     var lvl = date_levels[data.level],
   253     var lvl = date_levels[data.level],
   250         requete = "SELECT COUNT(*) AS nb, "
   254         requete = "SELECT COUNT(*) AS nb, "
   251         + lvl
   255         + lvl
   252         + "*ROUND(created_at/"
   256         + "*ROUND(created_at/"
   253         + lvl
   257         + lvl
   254         + ") AS tranche"
   258         + ") AS tranche"
   255         + annotationMap(function (a) { return " , SUM(a_" + a + ") AS s_" + a }).join("")
   259         + annotationMap(function (a) { return " , SUM(a_" + a + ") AS s_" + a }).join("")
   256         + " FROM tweets GROUP BY tranche ORDER BY tranche DESC LIMIT 0," + ( data.full ? "50" : "1" );
   260         + " FROM tweets GROUP BY tranche ORDER BY tranche DESC LIMIT 0," + ( data.full ? "50" : "1" );
   257     db.execute(requete, function (err, results) {
   261     db.execute(requete, function (err, results) {
   258         if (err) { myLog("SQLITE error",err.stack); }
   262         if (err) { myLog("SQLITE error",err.stack); }
       
   263         if (!results.length) {
       
   264             return;
       
   265         }
   259         var tbl = [],
   266         var tbl = [],
   260             lastend = parseInt(results[results.length - 1].tranche);
   267             lastend = parseInt(results[results.length - 1].tranche);
   261         for (var i = results.length - 1; i >= 0; i--) {
   268         for (var i = results.length - 1; i >= 0; i--) {
   262             var start = parseInt(results[i].tranche);
   269             var start = parseInt(results[i].tranche);
   263             while (start > lastend) {
   270             while (start > lastend) {
   402 db.open(sqlfile , function(err) {
   409 db.open(sqlfile , function(err) {
   403     if (err) { myLog("SQLITE error",err.stack); }
   410     if (err) { myLog("SQLITE error",err.stack); }
   404     createTables();
   411     createTables();
   405 });
   412 });
   406 
   413 
       
   414 setInterval(getSendLastPos,300);
       
   415 
   407 if (RECORD_NEW_TWEETS) {
   416 if (RECORD_NEW_TWEETS) {
   408     requestTwitter();
   417     requestTwitter();
   409 }
   418 }
   410 
   419 
   411 io.set('log level', 0);
   420 io.set('log level', 0);