75 else: |
75 else: |
76 return value |
76 return value |
77 return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()]) |
77 return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()]) |
78 |
78 |
79 |
79 |
|
80 class ObjectBufferProxy(object): |
|
81 def __init__(self, klass, args, kwargs, must_flush): |
|
82 self.klass= klass |
|
83 self.args = args |
|
84 self.kwargs = kwargs |
|
85 self.must_flush = must_flush |
|
86 self.instance = None |
|
87 |
|
88 def persists(self, session): |
|
89 new_args = [arg() if callable(arg) else arg for arg in self.args] if self.args is not None else [] |
|
90 new_kwargs = dict([(k,v()) if callable(v) else (k,v) for k,v in self.kwargs.items()]) if self.kwargs is not None else {} |
|
91 |
|
92 self.instance = self.klass(*new_args, **new_kwargs) |
|
93 session.add(self.instance) |
|
94 if self.must_flush: |
|
95 session.flush() |
|
96 |
|
97 def __getattr__(self, name): |
|
98 return lambda : getattr(self.instance, name) if self.instance else None |
|
99 |
|
100 |
|
101 |
|
102 |
|
103 class ObjectsBuffer(object): |
|
104 |
|
105 def __init__(self): |
|
106 self.__bufferlist = [] |
|
107 |
|
108 def persists(self, session): |
|
109 for object_proxy in self.__bufferlist: |
|
110 object_proxy.persists(session) |
|
111 |
|
112 def add_object(self, klass, args, kwargs, must_flush): |
|
113 new_proxy = ObjectBufferProxy(klass, args, kwargs, must_flush) |
|
114 self.__bufferlist.append(new_proxy) |
|
115 return new_proxy |
|
116 |
|
117 |
|
118 |
80 |
119 |
81 class TwitterProcessorException(Exception): |
120 class TwitterProcessorException(Exception): |
82 pass |
121 pass |
83 |
122 |
84 class TwitterProcessor(object): |
123 class TwitterProcessor(object): |
102 raise TwitterProcessorException("No id in json") |
141 raise TwitterProcessorException("No id in json") |
103 |
142 |
104 self.source_id = source_id |
143 self.source_id = source_id |
105 self.session = session |
144 self.session = session |
106 self.token_filename = token_filename |
145 self.token_filename = token_filename |
|
146 self.obj_buffer = ObjectsBuffer() |
107 |
147 |
108 def __get_user(self, user_dict): |
148 def __get_user(self, user_dict): |
109 logging.debug("Get user : " + repr(user_dict)) #@UndefinedVariable |
149 logger.debug("Get user : " + repr(user_dict)) #@UndefinedVariable |
110 |
150 |
111 user_id = user_dict.get("id",None) |
151 user_id = user_dict.get("id",None) |
112 user_name = user_dict.get("screen_name", user_dict.get("name", None)) |
152 user_name = user_dict.get("screen_name", user_dict.get("name", None)) |
113 |
153 |
114 if user_id is None and user_name is None: |
154 if user_id is None and user_name is None: |
285 |
325 |
286 extractor = twitter_text.Extractor(text) |
326 extractor = twitter_text.Extractor(text) |
287 |
327 |
288 for ind in extractor.extract_hashtags_with_indices(): |
328 for ind in extractor.extract_hashtags_with_indices(): |
289 self.__process_entity(ind, "hashtags") |
329 self.__process_entity(ind, "hashtags") |
290 |
330 |
|
331 for ind in extractor.extract_urls_with_indices(): |
|
332 self.__process_entity(ind, "urls") |
|
333 |
291 for ind in extractor.extract_mentioned_screen_names_with_indices(): |
334 for ind in extractor.extract_mentioned_screen_names_with_indices(): |
292 self.__process_entity(ind, "user_mentions") |
335 self.__process_entity(ind, "user_mentions") |
293 |
336 |
294 for ind in extractor.extract_urls_with_indices(): |
|
295 self.__process_entity(ind, "urls") |
|
296 |
|
297 self.session.flush() |
|
298 |
337 |
299 |
338 |
300 def process(self): |
339 def process(self): |
301 |
340 |
302 if self.source_id is None: |
341 if self.source_id is None: |
303 tweet_source = TweetSource(original_json=self.json_txt); |
342 tweet_source = TweetSource(original_json=self.json_txt); |
304 self.session.add(tweet_source) |
343 self.session.add(tweet_source) |
305 self.session.flush() |
344 self.session.flush() |
306 self.source_id = tweet_source.id |
345 self.source_id = tweet_source.id |
307 |
346 |
308 try: |
347 if "metadata" in self.json_dict: |
309 if "metadata" in self.json_dict: |
348 self.__process_twitter_rest() |
310 self.__process_twitter_rest() |
349 else: |
311 else: |
350 self.__process_twitter_stream() |
312 self.__process_twitter_stream() |
351 |
313 |
352 tweet_log = TweetLog(tweet_source_id = self.source_id, status = TweetLog.TWEET_STATUS['OK']) |
314 tweet_log = TweetLog(tweet_source_id = self.source_id, status = TweetLog.TWEET_STATUS['OK']) |
353 self.session.add(tweet_log) |
315 except: |
354 |
316 |
355 |
317 raise |
356 def set_logging(options, plogger=None): |
318 |
357 |
319 |
358 logging_config = { |
320 def set_logging(options): |
359 "format" : '%(asctime)s %(levelname)s:%(name)s:%(message)s', |
321 |
360 "level" : max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)), #@UndefinedVariable |
322 logging_config = {} |
361 } |
323 |
362 |
324 if options.logfile == "stdout": |
363 if options.logfile == "stdout": |
325 logging_config["stream"] = sys.stdout |
364 logging_config["stream"] = sys.stdout |
326 elif options.logfile == "stderr": |
365 elif options.logfile == "stderr": |
327 logging_config["stream"] = sys.stderr |
366 logging_config["stream"] = sys.stderr |
328 else: |
367 else: |
329 logging_config["filename"] = options.logfile |
368 logging_config["filename"] = options.logfile |
330 |
369 |
331 logging_config["level"] = max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)) #@UndefinedVariable |
370 logger = plogger |
332 logging.basicConfig(**logging_config) #@UndefinedVariable |
371 if logger is None: |
|
372 logger = logging.getLogger() #@UndefinedVariable |
|
373 |
|
374 if len(logger.handlers) == 0: |
|
375 filename = logging_config.get("filename") |
|
376 if filename: |
|
377 mode = logging_config.get("filemode", 'a') |
|
378 hdlr = logging.FileHandler(filename, mode) #@UndefinedVariable |
|
379 else: |
|
380 stream = logging_config.get("stream") |
|
381 hdlr = logging.StreamHandler(stream) #@UndefinedVariable |
|
382 fs = logging_config.get("format", logging.BASIC_FORMAT) #@UndefinedVariable |
|
383 dfs = logging_config.get("datefmt", None) |
|
384 fmt = logging.Formatter(fs, dfs) #@UndefinedVariable |
|
385 hdlr.setFormatter(fmt) |
|
386 logger.addHandler(hdlr) |
|
387 level = logging_config.get("level") |
|
388 if level is not None: |
|
389 logger.setLevel(level) |
333 |
390 |
334 options.debug = (options.verbose-options.quiet > 0) |
391 options.debug = (options.verbose-options.quiet > 0) |
335 |
392 |
336 def set_logging_options(parser): |
393 def set_logging_options(parser): |
337 parser.add_option("-l", "--log", dest="logfile", |
394 parser.add_option("-l", "--log", dest="logfile", |