--- a/script/lib/iri_tweet/utils.py Wed Jul 27 00:04:55 2011 +0200
+++ b/script/lib/iri_tweet/utils.py Mon Aug 08 09:01:40 2011 +0200
@@ -77,6 +77,45 @@
return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()])
+class ObjectBufferProxy(object):
+ def __init__(self, klass, args, kwargs, must_flush):
+ self.klass= klass
+ self.args = args
+ self.kwargs = kwargs
+ self.must_flush = must_flush
+ self.instance = None
+
+ def persists(self, session):
+ new_args = [arg() if callable(arg) else arg for arg in self.args] if self.args is not None else []
+ new_kwargs = dict([(k,v()) if callable(v) else (k,v) for k,v in self.kwargs.items()]) if self.kwargs is not None else {}
+
+ self.instance = self.klass(*new_args, **new_kwargs)
+ session.add(self.instance)
+ if self.must_flush:
+ session.flush()
+
+ def __getattr__(self, name):
+ return lambda : getattr(self.instance, name) if self.instance else None
+
+
+
+
+class ObjectsBuffer(object):
+
+ def __init__(self):
+ self.__bufferlist = []
+
+ def persists(self, session):
+ for object_proxy in self.__bufferlist:
+ object_proxy.persists(session)
+
+ def add_object(self, klass, args, kwargs, must_flush):
+ new_proxy = ObjectBufferProxy(klass, args, kwargs, must_flush)
+ self.__bufferlist.append(new_proxy)
+ return new_proxy
+
+
+
class TwitterProcessorException(Exception):
pass
@@ -104,9 +143,10 @@
self.source_id = source_id
self.session = session
self.token_filename = token_filename
+ self.obj_buffer = ObjectsBuffer()
def __get_user(self, user_dict):
- logging.debug("Get user : " + repr(user_dict)) #@UndefinedVariable
+ logger.debug("Get user : " + repr(user_dict)) #@UndefinedVariable
user_id = user_dict.get("id",None)
user_name = user_dict.get("screen_name", user_dict.get("name", None))
@@ -133,8 +173,8 @@
else:
user_dict = t.users.show(screen_name=user_name)
except Exception as e:
- logging.info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable
- logging.info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable
+ logger.info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable
+ logger.info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable
user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"])
if "id" not in user_dict:
@@ -148,7 +188,7 @@
return user
def __process_entity(self, ind, ind_type):
- logging.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable
+ logger.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable
ind = clean_keys(ind)
@@ -203,7 +243,7 @@
'urls' : process_urls
}[ind_type]()
- logging.debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable
+ logger.debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable
if entity:
self.session.add(entity)
self.session.flush()
@@ -220,7 +260,7 @@
# get or create user
user = self.__get_user(self.json_dict["user"])
if user is None:
- logging.warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable
+ logger.warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable
ts_copy["user"] = None
ts_copy["user_id"] = None
else:
@@ -270,7 +310,7 @@
user = self.__get_user(user_fields)
if user is None:
- logging.warning("USER not found " + repr(user_fields)) #@UndefinedVariable
+ logger.warning("USER not found " + repr(user_fields)) #@UndefinedVariable
tweet_fields["user"] = None
tweet_fields["user_id"] = None
else:
@@ -287,14 +327,13 @@
for ind in extractor.extract_hashtags_with_indices():
self.__process_entity(ind, "hashtags")
-
- for ind in extractor.extract_mentioned_screen_names_with_indices():
- self.__process_entity(ind, "user_mentions")
-
+
for ind in extractor.extract_urls_with_indices():
self.__process_entity(ind, "urls")
- self.session.flush()
+ for ind in extractor.extract_mentioned_screen_names_with_indices():
+ self.__process_entity(ind, "user_mentions")
+
def process(self):
@@ -305,21 +344,21 @@
self.session.flush()
self.source_id = tweet_source.id
- try:
- if "metadata" in self.json_dict:
- self.__process_twitter_rest()
- else:
- self.__process_twitter_stream()
-
- tweet_log = TweetLog(tweet_source_id = self.source_id, status = TweetLog.TWEET_STATUS['OK'])
- except:
-
- raise
+ if "metadata" in self.json_dict:
+ self.__process_twitter_rest()
+ else:
+ self.__process_twitter_stream()
+
+ tweet_log = TweetLog(tweet_source_id = self.source_id, status = TweetLog.TWEET_STATUS['OK'])
+ self.session.add(tweet_log)
-def set_logging(options):
+def set_logging(options, plogger=None):
- logging_config = {}
+ logging_config = {
+ "format" : '%(asctime)s %(levelname)s:%(name)s:%(message)s',
+ "level" : max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)), #@UndefinedVariable
+ }
if options.logfile == "stdout":
logging_config["stream"] = sys.stdout
@@ -327,9 +366,27 @@
logging_config["stream"] = sys.stderr
else:
logging_config["filename"] = options.logfile
-
- logging_config["level"] = max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)) #@UndefinedVariable
- logging.basicConfig(**logging_config) #@UndefinedVariable
+
+ logger = plogger
+ if logger is None:
+ logger = logging.getLogger() #@UndefinedVariable
+
+ if len(logger.handlers) == 0:
+ filename = logging_config.get("filename")
+ if filename:
+ mode = logging_config.get("filemode", 'a')
+ hdlr = logging.FileHandler(filename, mode) #@UndefinedVariable
+ else:
+ stream = logging_config.get("stream")
+ hdlr = logging.StreamHandler(stream) #@UndefinedVariable
+ fs = logging_config.get("format", logging.BASIC_FORMAT) #@UndefinedVariable
+ dfs = logging_config.get("datefmt", None)
+ fmt = logging.Formatter(fs, dfs) #@UndefinedVariable
+ hdlr.setFormatter(fmt)
+ logger.addHandler(hdlr)
+ level = logging_config.get("level")
+ if level is not None:
+ logger.setLevel(level)
options.debug = (options.verbose-options.quiet > 0)
@@ -384,4 +441,4 @@
return query.distinct()
-
+logger = logging.getLogger() #@UndefinedVariable