227 session.close() |
227 session.close() |
228 self.queue.close() |
228 self.queue.close() |
229 self.stop_event.set() |
229 self.stop_event.set() |
230 |
230 |
231 |
231 |
232 def process_tweet(tweet, source_id, session, access_token, logger): |
232 def process_tweet(tweet, source_id, session, access_token, twitter_query_user, logger): |
233 try: |
233 try: |
234 tweet_obj = anyjson.deserialize(tweet) |
234 tweet_obj = anyjson.deserialize(tweet) |
235 if 'text' not in tweet_obj: |
235 if 'text' not in tweet_obj: |
236 tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET']) |
236 tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET']) |
237 session.add(tweet_log) |
237 session.add(tweet_log) |
239 screen_name = "" |
239 screen_name = "" |
240 if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']: |
240 if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']: |
241 screen_name = tweet_obj['user']['screen_name'] |
241 screen_name = tweet_obj['user']['screen_name'] |
242 logger.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text'])) |
242 logger.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text'])) |
243 logger.debug(u"Process_tweet :" + repr(tweet)) |
243 logger.debug(u"Process_tweet :" + repr(tweet)) |
244 processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None) |
244 processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None, twitter_query_user) |
245 processor.process() |
245 processor.process() |
246 except Exception as e: |
246 except Exception as e: |
247 message = u"Error %s processing tweet %s" % (repr(e), tweet) |
247 message = u"Error %s processing tweet %s" % (repr(e), tweet) |
248 logger.exception(message) |
248 logger.exception(message) |
249 output = StringIO.StringIO() |
249 output = StringIO.StringIO() |
261 |
261 |
262 class TweetProcess(BaseProcess): |
262 class TweetProcess(BaseProcess): |
263 |
263 |
264 def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): |
264 def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): |
265 super(TweetProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid) |
265 super(TweetProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid) |
|
266 self.twitter_query_user = options.twitter_query_user |
266 |
267 |
267 |
268 |
268 def do_run(self): |
269 def do_run(self): |
269 |
270 |
270 self.logger = set_logging_process(self.options, self.logger_queue) |
271 self.logger = set_logging_process(self.options, self.logger_queue) |
275 source_id, tweet_txt = self.queue.get(True, 3) |
276 source_id, tweet_txt = self.queue.get(True, 3) |
276 self.logger.debug("Processing source id " + repr(source_id)) |
277 self.logger.debug("Processing source id " + repr(source_id)) |
277 except Exception as e: |
278 except Exception as e: |
278 self.logger.debug('Process tweet exception in loop : ' + repr(e)) |
279 self.logger.debug('Process tweet exception in loop : ' + repr(e)) |
279 continue |
280 continue |
280 process_tweet(tweet_txt, source_id, session, self.access_token, self.logger) |
281 process_tweet(tweet_txt, source_id, session, self.access_token, self.twitter_query_user, self.logger) |
281 session.commit() |
282 session.commit() |
282 finally: |
283 finally: |
283 session.rollback() |
284 session.rollback() |
284 self.stop_event.set() |
285 self.stop_event.set() |
285 session.close() |
286 session.close() |
343 help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int') |
344 help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int') |
344 parser.add_option("-N", "--nb-process", dest="process_nb", |
345 parser.add_option("-N", "--nb-process", dest="process_nb", |
345 help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int') |
346 help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int') |
346 parser.add_option("--url", dest="url", |
347 parser.add_option("--url", dest="url", |
347 help="The twitter url to connect to.", metavar="URL", default=tweetstream.FilterStream.url) |
348 help="The twitter url to connect to.", metavar="URL", default=tweetstream.FilterStream.url) |
|
349 parser.add_option("--query-user", dest="twitter_query_user", action="store_true", |
|
350 help="Query twitter for users", default=False, metavar="QUERY_USER") |
|
351 |
348 |
352 |
349 |
353 |
350 utils.set_logging_options(parser) |
354 utils.set_logging_options(parser) |
351 |
355 |
352 return parser.parse_args() |
356 return parser.parse_args() |