| changeset 340 | a99a04556e3b |
| parent 339 | 6a073c4a8578 |
| child 341 | cab5c9e10f90 |
| 339:6a073c4a8578 | 340:a99a04556e3b |
|---|---|
53 |
53 |
54 var requete = "CREATE TABLE IF NOT EXISTS tweets ( pos INTEGER PRIMARY KEY, tweet_id TEXT UNIQUE, created_at INTEGER, json TEXT" + annotationMap(function(a) { return ', a_' + a + ' INTEGER' }).join("") + " )"; |
54 var requete = "CREATE TABLE IF NOT EXISTS tweets ( pos INTEGER PRIMARY KEY, tweet_id TEXT UNIQUE, created_at INTEGER, json TEXT" + annotationMap(function(a) { return ', a_' + a + ' INTEGER' }).join("") + " )"; |
55 db.execute(requete, function(err) { |
55 db.execute(requete, function(err) { |
56 if (err) { myLog("SQLITE error",err.stack); } |
56 if (err) { myLog("SQLITE error",err.stack); } |
57 db.execute("CREATE INDEX IF NOT EXISTS idx_created_at ON tweets ( created_at )", function(err) { if (err) { myLog("SQLITE error",err.stack); } }); |
57 db.execute("CREATE INDEX IF NOT EXISTS idx_created_at ON tweets ( created_at )", function(err) { if (err) { myLog("SQLITE error",err.stack); } }); |
58 getSendLastPos(); |
|
59 }); |
58 }); |
60 |
59 |
61 db.execute("CREATE TABLE IF NOT EXISTS tweet_refs ( id INTEGER PRIMARY KEY, from_id TEXT, to_id TEXT, ref_type TEXT )", function(err) { if (err) { myLog("SQLITE error",err.stack); } }); |
60 db.execute("CREATE TABLE IF NOT EXISTS tweet_refs ( id INTEGER PRIMARY KEY, from_id TEXT, to_id TEXT, ref_type TEXT )", function(err) { if (err) { myLog("SQLITE error",err.stack); } }); |
62 } |
61 } |
63 |
62 |
71 |
70 |
72 function commitTweet(data) { |
71 function commitTweet(data) { |
73 |
72 |
74 var tweet = JSON.parse(data), |
73 var tweet = JSON.parse(data), |
75 ann = []; |
74 ann = []; |
76 |
75 |
77 textids(tweet); |
76 if (!tweet.id) { |
78 for (var j in keys_to_delete) { |
77 myLog("Error: Could not parse data",data); |
79 delete tweet[keys_to_delete[j]]; |
78 return; |
80 } |
79 } |
81 textids(tweet.user); |
80 |
82 for (var j in user_keys_to_delete) { |
81 try { |
83 delete tweet.user[user_keys_to_delete[j]]; |
82 textids(tweet); |
84 } |
|
85 if (tweet.retweeted_status) { |
|
86 textids(tweet.retweeted_status); |
|
87 for (var j in keys_to_delete) { |
83 for (var j in keys_to_delete) { |
88 delete tweet.retweeted_status[keys_to_delete[j]]; |
84 delete tweet[keys_to_delete[j]]; |
89 } |
85 } |
90 } |
86 textids(tweet.user); |
91 annotationMap(function(i, annotation) { |
87 for (var j in user_keys_to_delete) { |
92 for (var j in annotation.keywords) { |
88 delete tweet.user[user_keys_to_delete[j]]; |
93 if (tweet.text.search(annotation.keywords[j]) != -1) { |
89 } |
94 ann.push(i); |
90 if (tweet.retweeted_status) { |
95 break; |
91 textids(tweet.retweeted_status); |
96 } |
92 for (var j in keys_to_delete) { |
97 } |
93 delete tweet.retweeted_status[keys_to_delete[j]]; |
98 }); |
94 } |
99 tweet.annotations = ann; |
95 } |
100 tweet.created_at = new Date(tweet.created_at); |
96 annotationMap(function(i, annotation) { |
97 for (var j in annotation.keywords) { |
|
98 if (tweet.text.search(annotation.keywords[j]) != -1) { |
|
99 ann.push(i); |
|
100 break; |
|
101 } |
|
102 } |
|
103 }); |
|
104 tweet.annotations = ann; |
|
105 tweet.created_at = new Date(tweet.created_at); |
|
106 // myLog("Time delta :",(new Date() - tweet.created_at) / 1000); |
|
107 } catch (err) { |
|
108 myLog("Error while processing tweet",err.stack); |
|
109 } |
|
101 |
110 |
102 if (tweet.in_reply_to_status_id) { |
111 if (tweet.in_reply_to_status_id) { |
103 commitReference( tweet.id, tweet.in_reply_to_status_id, "reply" ); |
112 commitReference( tweet.id, tweet.in_reply_to_status_id, "reply" ); |
104 } |
113 } |
105 if (tweet.retweeted_status) { |
114 if (tweet.retweeted_status) { |
112 + annotationMap(function(a) { return ', ?' }).join("") |
121 + annotationMap(function(a) { return ', ?' }).join("") |
113 + " )", |
122 + " )", |
114 [ tweet.id, tweet.created_at.valueOf(), JSON.stringify(tweet) ].concat(annotationMap(function(a) { return ann.indexOf(a) == -1 ? 0 : 1 })), |
123 [ tweet.id, tweet.created_at.valueOf(), JSON.stringify(tweet) ].concat(annotationMap(function(a) { return ann.indexOf(a) == -1 ? 0 : 1 })), |
115 function(err) { |
124 function(err) { |
116 if (err) { myLog("SQLITE error",err.stack); } |
125 if (err) { myLog("SQLITE error",err.stack); } |
117 getSendLastPos(); |
|
118 } |
126 } |
119 ); |
127 ); |
120 } |
128 } |
121 |
129 |
122 function callBackNewTweets(chunk) { |
130 function callBackNewTweets(chunk) { |
124 for (var i in newdata) { |
132 for (var i in newdata) { |
125 if (newdata[i].length > 0) { |
133 if (newdata[i].length > 0) { |
126 commitTweet(newdata[i]); |
134 commitTweet(newdata[i]); |
127 } |
135 } |
128 } |
136 } |
129 // myLog("New tweets received"); |
137 myLog("Data received - length :",chunk.length); |
130 } |
138 } |
131 |
139 |
132 function requestTwitter() { |
140 function requestTwitter() { |
133 myLog("Requesting Twitter to track keyword(s): "+tracking_keyword); |
141 myLog("Requesting Twitter to track keyword(s): "+tracking_keyword); |
134 var req = https.request({ |
142 var req = https.request({ |
151 }); |
159 }); |
152 |
160 |
153 req.write('track=' + encodeURIComponent(tracking_keyword)); |
161 req.write('track=' + encodeURIComponent(tracking_keyword)); |
154 req.socket.setTimeout(60000); |
162 req.socket.setTimeout(60000); |
155 req.socket.on('timeout', function() { |
163 req.socket.on('timeout', function() { |
156 myLog('TimeOut — Trying to reconnect'); |
164 myLog('TimeOut - Trying to reconnect'); |
157 requestTwitter(); |
165 requestTwitter(); |
158 }); |
166 }); |
159 req.end(); |
167 req.end(); |
160 } |
168 } |
161 |
169 |
175 } |
183 } |
176 }); |
184 }); |
177 } |
185 } |
178 |
186 |
179 function getSendTweetPosByDate(date, socket) { |
187 function getSendTweetPosByDate(date, socket) { |
180 db.execute("SELECT pos, created_at, ABS(created_at-" + date + ") AS dist FROM tweets ORDER BY dist ASC LIMIT 0,1", function (err, results) { |
188 db.execute("SELECT pos, created_at, ABS(created_at-" + date + ") AS dist FROM tweets ORDER BY dist ASC LIMIT 0,9", function (err, results) { |
181 if (err) { myLog("SQLITE error",err.stack); } |
189 if (err) { myLog("SQLITE error",err.stack); } |
182 if (results.length) { |
190 if (results.length) { |
183 try { |
191 try { |
184 socket.emit('tweetPosByDate', { |
192 socket.emit('tweetPosByDate', { |
185 tweetpos : results[0].pos, |
193 tweetpos : results[0].pos, |
191 } |
199 } |
192 }); |
200 }); |
193 } |
201 } |
194 |
202 |
195 function getSendLinkedTweets(pos, socket) { |
203 function getSendLinkedTweets(pos, socket) { |
196 myLog("request for tweets linked to",pos); |
204 // myLog("request for tweets linked to",pos); |
197 db.execute("SELECT A.pos pos_a, A.tweet_id id_a, A.json json_a, B.pos pos_b, B.tweet_id id_b, B.json json_b, ref_type FROM tweets A, tweets B, tweet_refs WHERE id_a = from_id AND id_b = to_id AND (pos_a = ? OR pos_b = ?)", [ pos, pos ], function(err, results) { |
205 db.execute("SELECT A.pos pos_a, A.tweet_id id_a, B.pos pos_b, B.tweet_id id_b, ref_type, ABS(B.created_at - A.created_at) delta FROM tweets A, tweets B, tweet_refs WHERE id_a = from_id AND id_b = to_id AND (pos_a = ? OR pos_b = ?) ORDER BY delta ASC LIMIT 0, 10", [ pos, pos ], function(err, results) { |
198 if (err) { myLog("SQLITE error: ",err.stack); } |
206 if (err) { myLog("SQLITE error: ",err.stack); } |
199 var struct = { |
207 var struct = { |
200 "tweetpos" : pos, |
208 "tweetpos" : pos, |
201 "referencing" : [], |
209 "referencing" : [], |
202 "referenced_by" : [] |
210 "referenced_by" : [] |
203 }; |
211 }; |
204 for (var i in results) { |
212 for (var i in results) { |
205 if (results[i].pos_a == pos) { |
213 if (results[i].pos_a == pos) { |
206 var tw = JSON.parse(results[i].json_b); |
|
207 tw.pos = results[i].pos_b; |
|
208 struct.referencing.push({ |
214 struct.referencing.push({ |
209 "tweet" : tw, |
215 "pos" : results[i].pos_b, |
210 "ref_type" : results[i].ref_type |
216 "ref_type" : results[i].ref_type |
211 }); |
217 }); |
212 } else { |
218 } else { |
213 var tw = JSON.parse(results[i].json_a); |
|
214 tw.pos = results[i].pos_a; |
|
215 struct.referenced_by.push({ |
219 struct.referenced_by.push({ |
216 "tweet" : tw, |
220 "pos" : results[i].pos_a, |
217 "ref_type" : results[i].ref_type |
221 "ref_type" : results[i].ref_type |
218 }); |
222 }); |
219 } |
223 } |
220 } |
224 } |
221 try { |
225 try { |
243 } |
247 } |
244 }); |
248 }); |
245 } |
249 } |
246 |
250 |
247 function getSendTimeline(data, socket) { |
251 function getSendTimeline(data, socket) { |
248 myLog("request for timeline (",data.level, data.full,") from "+socket.id); |
252 // myLog("request for timeline (",data.level, data.full,") from "+socket.id); |
249 var lvl = date_levels[data.level], |
253 var lvl = date_levels[data.level], |
250 requete = "SELECT COUNT(*) AS nb, " |
254 requete = "SELECT COUNT(*) AS nb, " |
251 + lvl |
255 + lvl |
252 + "*ROUND(created_at/" |
256 + "*ROUND(created_at/" |
253 + lvl |
257 + lvl |
254 + ") AS tranche" |
258 + ") AS tranche" |
255 + annotationMap(function (a) { return " , SUM(a_" + a + ") AS s_" + a }).join("") |
259 + annotationMap(function (a) { return " , SUM(a_" + a + ") AS s_" + a }).join("") |
256 + " FROM tweets GROUP BY tranche ORDER BY tranche DESC LIMIT 0," + ( data.full ? "50" : "1" ); |
260 + " FROM tweets GROUP BY tranche ORDER BY tranche DESC LIMIT 0," + ( data.full ? "50" : "1" ); |
257 db.execute(requete, function (err, results) { |
261 db.execute(requete, function (err, results) { |
258 if (err) { myLog("SQLITE error",err.stack); } |
262 if (err) { myLog("SQLITE error",err.stack); } |
263 if (!results.length) { |
|
264 return; |
|
265 } |
|
259 var tbl = [], |
266 var tbl = [], |
260 lastend = parseInt(results[results.length - 1].tranche); |
267 lastend = parseInt(results[results.length - 1].tranche); |
261 for (var i = results.length - 1; i >= 0; i--) { |
268 for (var i = results.length - 1; i >= 0; i--) { |
262 var start = parseInt(results[i].tranche); |
269 var start = parseInt(results[i].tranche); |
263 while (start > lastend) { |
270 while (start > lastend) { |
402 db.open(sqlfile , function(err) { |
409 db.open(sqlfile , function(err) { |
403 if (err) { myLog("SQLITE error",err.stack); } |
410 if (err) { myLog("SQLITE error",err.stack); } |
404 createTables(); |
411 createTables(); |
405 }); |
412 }); |
406 |
413 |
414 setInterval(getSendLastPos,300); |
|
415 |
|
407 if (RECORD_NEW_TWEETS) { |
416 if (RECORD_NEW_TWEETS) { |
408 requestTwitter(); |
417 requestTwitter(); |
409 } |
418 } |
410 |
419 |
411 io.set('log level', 0); |
420 io.set('log level', 0); |