improve multi processing architecture
authorYves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
Wed, 17 Aug 2011 18:32:07 +0200
changeset 255 500cd0405c7a
parent 254 2209e66bb50b
child 256 2f335337ff64
improve multi processing architecture
script/lib/iri_tweet/export_twitter_alchemy.py
script/lib/iri_tweet/models.py
script/lib/iri_tweet/tweet_twitter_user.py
script/lib/iri_tweet/utils.py
script/rest/search_twitter.py
script/stream/recorder_tweetstream.py
script/virtualenv/res/psycopg2-2.4.2.tar.gz
--- a/script/lib/iri_tweet/export_twitter_alchemy.py	Fri Aug 12 18:17:27 2011 +0200
+++ b/script/lib/iri_tweet/export_twitter_alchemy.py	Wed Aug 17 18:32:07 2011 +0200
@@ -5,7 +5,7 @@
 from optparse import OptionParser #@UnresolvedImport
 from sqlalchemy import Table, Column, BigInteger, MetaData
 from sqlalchemy.orm import sessionmaker
-from utils import parse_date, set_logging_options, set_logging, get_filter_query, logger
+from utils import parse_date, set_logging_options, set_logging, get_filter_query, get_logger
 from models import setup_database
 import datetime
 import os.path
@@ -100,13 +100,17 @@
     
     set_logging(options)
         
-    logger.debug("OPTIONS : " + repr(options)) #@UndefinedVariable
+    get_logger().debug("OPTIONS : " + repr(options)) #@UndefinedVariable
     
     if len(sys.argv) == 1 or options.database is None:
         parser.print_help()
         sys.exit(1)
     
-    engine, metadata = setup_database('sqlite:///'+options.database, echo=((options.verbose-options.quiet)>0), create_all = False)        
+    conn_str = options.database.strip()
+    if not re.match("^\w+://.+", conn_str):
+        conn_str = 'sqlite:///' + conn_str
+
+    engine, metadata = setup_database(conn_str, echo=((options.verbose-options.quiet)>0), create_all = False)        
     
     Session = sessionmaker()
     conn = engine.connect()
@@ -158,7 +162,7 @@
             
             for params in parameters:
                 
-                logger.debug("PARAMETERS " + repr(params)) #@UndefinedVariable
+                get_logger().debug("PARAMETERS " + repr(params)) #@UndefinedVariable
                 
                 start_date_str = params.get("start_date",None)
                 end_date_str = params.get("end_date", None)
@@ -191,12 +195,12 @@
                 
                 if content_file and content_file.find("http") == 0:
                     
-                    logger.debug("url : " + content_file) #@UndefinedVariable
+                    get_logger().debug("url : " + content_file) #@UndefinedVariable
                     
                     h = httplib2.Http()
                     resp, content = h.request(content_file)
                     
-                    logger.debug("url response " + repr(resp) + " content " + repr(content)) #@UndefinedVariable
+                    get_logger().debug("url response " + repr(resp) + " content " + repr(content)) #@UndefinedVariable
                     
                     project = anyjson.deserialize(content)
                     root = etree.fromstring(project["ldt"])
@@ -253,7 +257,7 @@
                     
                     
                 if ensemble_parent is None:
-                    logger.error("Can not process file") #@UndefinedVariable
+                    get_logger().error("Can not process file") #@UndefinedVariable
                     sys.exit()
             
                 if options.replace:
@@ -308,18 +312,18 @@
                     
                     project["ldt"] = output_data
                     body = anyjson.serialize(project)
-                    logger.debug("write http " + content_file) #@UndefinedVariable
-                    logger.debug("write http " + repr(body)) #@UndefinedVariable
+                    get_logger().debug("write http " + content_file) #@UndefinedVariable
+                    get_logger().debug("write http " + repr(body)) #@UndefinedVariable
                     h = httplib2.Http()
                     resp, content = h.request(content_file, "PUT", headers={'content-type':'application/json'}, body=body)
-                    logger.debug("write http " + repr(resp) + " content " + content) #@UndefinedVariable
+                    get_logger().debug("write http " + repr(resp) + " content " + content) #@UndefinedVariable
                 else:
                     if content_file and os.path.exists(content_file):
                         dest_file_name = content_file 
                     else:
                         dest_file_name = options.filename
             
-                    logger.debug("WRITE : " + dest_file_name) #@UndefinedVariable
+                    get_logger().debug("WRITE : " + dest_file_name) #@UndefinedVariable
                     output = open(dest_file_name, "w")
                     output.write(output_data)
                     output.flush()
--- a/script/lib/iri_tweet/models.py	Fri Aug 12 18:17:27 2011 +0200
+++ b/script/lib/iri_tweet/models.py	Wed Aug 17 18:32:07 2011 +0200
@@ -66,6 +66,7 @@
     TWEET_STATUS = {
         'OK' : 1,
         'ERROR' : 2,
+        'NOT_TWEET': 3,
     }
     
     __tablename__ = 'tweet_tweet_log'
@@ -158,7 +159,7 @@
     profile_text_color = Column(String)
     profile_use_background_image = Column(Boolean)
     protected = Column(Boolean)
-    screen_name = Column(String, index=True, unique=True)
+    screen_name = Column(String, index=True)
     show_all_inline_media = Column(Boolean)
     statuses_count = Column(Integer)
     time_zone = Column(String)
--- a/script/lib/iri_tweet/tweet_twitter_user.py	Fri Aug 12 18:17:27 2011 +0200
+++ b/script/lib/iri_tweet/tweet_twitter_user.py	Wed Aug 17 18:32:07 2011 +0200
@@ -1,6 +1,6 @@
 from iri_tweet.models import setup_database, Message, UserMessage, User
 from iri_tweet.utils import (get_oauth_token, get_user_query, set_logging_options, 
-    set_logging, parse_date, logger)
+    set_logging, parse_date, get_logger)
 from optparse import OptionParser #@UnresolvedImport
 from sqlalchemy import BigInteger
 from sqlalchemy.orm import sessionmaker
@@ -10,6 +10,7 @@
 import sys
 import time
 import twitter
+import re
 
 APPLICATION_NAME = "Tweet recorder user"
 CONSUMER_KEY = "Vdr5ZcsjI1G3esTPI8yDg"
@@ -58,7 +59,11 @@
     if not options.message or len(options.message) == 0:
         sys.exit()
 
-    engine, metadata = setup_database('sqlite:///'+options.database, echo=((options.verbose-options.quiet)>0), create_all = True)        
+    conn_str = options.database.strip()
+    if not re.match("^\w+://.+", conn_str):
+        conn_str = 'sqlite:///' + conn_str
+
+    engine, metadata = setup_database(conn_str, echo=((options.verbose-options.quiet)>0), create_all = False)        
     
     Session = sessionmaker()
     conn = engine.connect()
@@ -107,7 +112,7 @@
                 screen_name = user.screen_name
                 
                 message = u"@%s: %s" % (screen_name, base_message)
-                logger.debug("new status : " + message) #@UndefinedVariable
+                get_logger.debug("new status : " + message) #@UndefinedVariable
                 if not options.simulate:
                     t.statuses.update(status=message)
                     user_message = UserMessage(user_id=user.id, message_id=message_obj.id)
--- a/script/lib/iri_tweet/utils.py	Fri Aug 12 18:17:27 2011 +0200
+++ b/script/lib/iri_tweet/utils.py	Wed Aug 17 18:32:07 2011 +0200
@@ -3,10 +3,11 @@
     ACCESS_TOKEN_SECRET, adapt_date, adapt_json, TweetSource, TweetLog, MediaType, 
     Media, EntityMedia, Entity, EntityType)
 from sqlalchemy.sql import select, or_ #@UnresolvedImport
+import Queue #@UnresolvedImport
 import anyjson #@UnresolvedImport
 import datetime
 import email.utils
-import logging #@UnresolvedImport
+import logging
 import os.path
 import sys
 import twitter.oauth #@UnresolvedImport
@@ -14,7 +15,6 @@
 import twitter_text #@UnresolvedImport
 
 
-
 CACHE_ACCESS_TOKEN = {}
 
 def get_oauth_token(token_file_path=None, check_access_token=True, application_name=APPLICATION_NAME, consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET):
@@ -448,7 +448,7 @@
         self.obj_buffer.persists(self.session)
 
 
-def set_logging(options, plogger=None):
+def set_logging(options, plogger=None, queue=None):
     
     logging_config = {
         "format" : '%(asctime)s %(levelname)s:%(name)s:%(message)s',
@@ -466,14 +466,17 @@
     if logger is None:
         logger = get_logger() #@UndefinedVariable
     
-    if len(logger.handlers) == 0:    
+    if len(logger.handlers) == 0:
         filename = logging_config.get("filename")
-        if filename:
+        if queue is not None:
+            hdlr = QueueHandler(queue, True)
+        elif filename:
             mode = logging_config.get("filemode", 'a')
             hdlr = logging.FileHandler(filename, mode) #@UndefinedVariable
         else:
             stream = logging_config.get("stream")
             hdlr = logging.StreamHandler(stream) #@UndefinedVariable
+            
         fs = logging_config.get("format", logging.BASIC_FORMAT) #@UndefinedVariable
         dfs = logging_config.get("datefmt", None)
         fmt = logging.Formatter(fs, dfs) #@UndefinedVariable
@@ -484,6 +487,7 @@
             logger.setLevel(level)
     
     options.debug = (options.verbose-options.quiet > 0)
+    return logger
 
 def set_logging_options(parser):
     parser.add_option("-l", "--log", dest="logfile",
@@ -536,5 +540,47 @@
     
     return query.distinct()
 
+logger_name = "iri.tweet"
+
 def get_logger():
-    return logging.getLogger("iri_tweet") #@UndefinedVariable
+    global logger_name
+    return logging.getLogger(logger_name) #@UndefinedVariable
+
+
+# Next two import lines for this demo only
+
+class QueueHandler(logging.Handler): #@UndefinedVariable
+    """
+    This is a logging handler which sends events to a multiprocessing queue.    
+    """
+
+    def __init__(self, queue, ignore_full):
+        """
+        Initialise an instance, using the passed queue.
+        """
+        logging.Handler.__init__(self) #@UndefinedVariable
+        self.queue = queue
+        self.ignore_full = True
+        
+    def emit(self, record):
+        """
+        Emit a record.
+
+        Writes the LogRecord to the queue.
+        """
+        try:
+            ei = record.exc_info
+            if ei:
+                dummy = self.format(record) # just to get traceback text into record.exc_text
+                record.exc_info = None  # not needed any more
+            if not self.ignore_full or not self.queue.full():
+                self.queue.put_nowait(record)
+        except Queue.Full:
+            if self.ignore_full:
+                pass
+            else:
+                raise
+        except (KeyboardInterrupt, SystemExit):
+            raise
+        except:
+            self.handleError(record)
--- a/script/rest/search_twitter.py	Fri Aug 12 18:17:27 2011 +0200
+++ b/script/rest/search_twitter.py	Wed Aug 17 18:32:07 2011 +0200
@@ -36,7 +36,13 @@
     (options, args) = get_option()
 
     twitter = twitter.Twitter(domain="search.twitter.com")
-    engine, metadata = models.setup_database('sqlite:///'+args[0], echo=((options.verbose-options.quiet)>0))
+    
+    conn_str = args[0].strip()
+    if not re.match("^\w+://.+", conn_str):
+        conn_str = 'sqlite:///' + conn_str
+
+    
+    engine, metadata = models.setup_database(conn_str, echo=((options.verbose-options.quiet)>0), create_all=True)
     Session = sessionmaker(bind=engine)
     session = Session()
     try:
@@ -54,7 +60,7 @@
                 print tweet
                 tweet_str = anyjson.serialize(tweet)
                 #invalidate user id
-                processor = utils.TwitterProcessor(tweet, tweet_str, None, session, options.token_filename)
+                processor = utils.TwitterProcessor(tweet, tweet_str, None, session, None, options.token_filename)
                 processor.process()
                 session.flush()
                 session.commit()
--- a/script/stream/recorder_tweetstream.py	Fri Aug 12 18:17:27 2011 +0200
+++ b/script/stream/recorder_tweetstream.py	Wed Aug 17 18:32:07 2011 +0200
@@ -1,10 +1,12 @@
 from getpass import getpass
 from iri_tweet import models, utils
 from iri_tweet.models import TweetSource, TweetLog
-from multiprocessing import Queue, JoinableQueue, Process, Event, get_logger
+from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event, 
+    get_logger)
 from optparse import OptionParser
 from sqlalchemy.exc import OperationalError
 from sqlalchemy.orm import scoped_session, sessionmaker
+import Queue
 import StringIO
 import anyjson
 import datetime
@@ -16,6 +18,7 @@
 import socket
 import sqlalchemy.schema
 import sys
+import threading
 import time
 import traceback
 import tweepy.auth
@@ -34,7 +37,7 @@
 
 
 def set_logging(options):
-    utils.set_logging(options, logging.getLogger('iri_tweet'))
+    utils.set_logging(options, logging.getLogger('iri.tweet'))
     utils.set_logging(options, logging.getLogger('multiprocessing'))
     if options.debug >= 2:
         utils.set_logging(options, logging.getLogger('sqlalchemy.engine'))
@@ -42,6 +45,11 @@
     #utils.set_logging(options, logging.getLogger('sqlalchemy.pool'))
     #utils.set_logging(options, logging.getLogger('sqlalchemy.orm'))
 
+def set_logging_process(options, queue):
+    qlogger = utils.set_logging(options, logging.getLogger('iri.tweet.p'), queue)
+    qlogger.propagate = 0
+    return qlogger
+
 def get_auth(options, access_token):
     if options.username and options.password:
         auth = tweepy.auth.BasicAuthHandler(options.username, options.password)        
@@ -107,7 +115,7 @@
 
 class SourceProcess(Process):
     
-    def __init__(self, session_maker, queue, options, access_token, stop_event):
+    def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue):
         self.session_maker = session_maker
         self.queue = queue
         self.track = options.track
@@ -116,31 +124,33 @@
         self.stop_event = stop_event
         self.options = options
         self.access_token = access_token
+        self.logger_queue = logger_queue
         super(SourceProcess, self).__init__()
     
     def run(self):
+        
         #import pydevd
         #pydevd.settrace(suspend=False)
 
-        set_logging(self.options)
+        self.logger = set_logging_process(self.options, self.logger_queue)
         self.auth = get_auth(self.options, self.access_token) 
         
-        utils.get_logger().debug("SourceProcess : run")
+        self.logger.debug("SourceProcess : run")
         track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
         track_list = [k for k in track_list.split(',')]
 
-        utils.get_logger().debug("SourceProcess : before connecting to stream " + repr(track_list))                        
+        self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list))                        
         stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True)
-        utils.get_logger().debug("SourceProcess : after connecting to stream")
+        self.logger.debug("SourceProcess : after connecting to stream")
         stream.muststop = lambda: self.stop_event.is_set()
         
         session = self.session_maker()
         
         try:
             for tweet in stream:
-                utils.get_logger().debug("tweet " + repr(tweet))
+                self.logger.debug("tweet " + repr(tweet))
                 source = TweetSource(original_json=tweet)
-                utils.get_logger().debug("source created")
+                self.logger.debug("source created")
                 add_retries = 0
                 while add_retries < 10:
                     try:
@@ -150,18 +160,18 @@
                         break
                     except OperationalError as e:
                         session.rollback()
-                        utils.get_logger().debug("Operational Error %s nb %d" % (repr(e), add_retries))
+                        self.logger.debug("Operational Error %s nb %d" % (repr(e), add_retries))
                         if add_retries == 10:
                             raise e
                      
                 source_id = source.id
-                utils.get_logger().debug("before queue + source id " + repr(source_id))
-                utils.get_logger().info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
+                self.logger.debug("before queue + source id " + repr(source_id))
+                self.logger.info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
                 session.commit()
                 self.queue.put((source_id, tweet), False)
 
         except Exception as e:
-            utils.get_logger().error("Error when processing tweet " + repr(e))
+            self.logger.error("Error when processing tweet " + repr(e))
         finally:
             session.rollback()
             stream.close()
@@ -170,19 +180,23 @@
             self.stop_event.set()
 
 
-def process_tweet(tweet, source_id, session, access_token):
+def process_tweet(tweet, source_id, session, access_token, logger):
     try:
         tweet_obj = anyjson.deserialize(tweet)
+        if 'text' not in tweet_obj:
+            tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
+            session.add(tweet_log)
+            return
         screen_name = ""
         if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
             screen_name = tweet_obj['user']['screen_name']
-        utils.get_logger().debug(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
-        utils.get_logger().debug(u"Process_tweet :" + repr(tweet))
+        logger.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
+        logger.debug(u"Process_tweet :" + repr(tweet))
         processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None)
         processor.process()
     except Exception as e:
         message = u"Error %s processing tweet %s" % (repr(e), tweet)
-        utils.get_logger().error(message)
+        logger.error(message)
         output = StringIO.StringIO()
         traceback.print_exception(Exception, e, None, None, output)
         error_stack = output.getvalue()
@@ -196,28 +210,29 @@
         
 class TweetProcess(Process):
     
-    def __init__(self, session_maker, queue, options, access_token, stop_event):
+    def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue):
         self.session_maker = session_maker
         self.queue = queue
         self.stop_event = stop_event
         self.options = options
         self.access_token = access_token
+        self.logger_queue = logger_queue
         super(TweetProcess, self).__init__()
 
 
     def run(self):
         
-        set_logging(self.options)
+        self.logger = set_logging_process(self.options, self.logger_queue)
         session = self.session_maker()
         try:
             while not self.stop_event.is_set():
                 try:
                     source_id, tweet_txt = queue.get(True, 3)
-                    utils.get_logger().debug("Processing source id " + repr(source_id))
+                    self.logger.debug("Processing source id " + repr(source_id))
                 except Exception as e:
-                    utils.get_logger().debug('Process tweet exception in loop : ' + repr(e))
+                    self.logger.debug('Process tweet exception in loop : ' + repr(e))
                     continue
-                process_tweet(tweet_txt, source_id, session, self.access_token)
+                process_tweet(tweet_txt, source_id, session, self.access_token, self.logger)
                 session.commit()
         finally:
             session.rollback()
@@ -231,19 +246,28 @@
     return Session, engine, metadata
 
             
-def process_leftovers(session, access_token):
+def process_leftovers(session, access_token, logger):
     
     sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
     
     for src in sources:
         tweet_txt = src.original_json
-        process_tweet(tweet_txt, src.id, session, access_token)
+        process_tweet(tweet_txt, src.id, session, access_token, logger)
         session.commit()
 
         
     
     #get tweet source that do not match any message
     #select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull;
+def process_log(logger_queues, stop_event):
+    while not stop_event.is_set():
+        for lqueue in logger_queues:
+            try:
+                record = lqueue.get_nowait()
+                logging.getLogger(record.name).handle(record)
+            except Queue.Empty:
+                continue
+        time.sleep(0.1)
 
         
 def get_options():
@@ -275,7 +299,7 @@
     
 
 if __name__ == '__main__':
-    
+
     (options, args) = get_options()
     
     set_logging(options)
@@ -287,10 +311,10 @@
     
     conn_str = options.conn_str.strip()
     if not re.match("^\w+://.+", conn_str):
-        conn_str = 'sqlite://' + options.conn_str
+        conn_str = 'sqlite:///' + options.conn_str
         
     if conn_str.startswith("sqlite") and options.new:
-        filepath = conn_str[conn_str.find("://"):]
+        filepath = conn_str[conn_str.find(":///")+4:]
         if os.path.exists(filepath):
             i = 1
             basename, extension = os.path.splitext(filepath)
@@ -320,7 +344,7 @@
     
     session = Session()
     try:
-        process_leftovers(session, access_token)
+        process_leftovers(session, access_token, utils.get_logger())
         session.commit()
     finally:
         session.rollback()
@@ -330,7 +354,7 @@
         utils.get_logger().debug("Leftovers processed. Exiting.")
         sys.exit()
 
-    queue = JoinableQueue()
+    queue = mQueue()
     stop_event = Event()
     
     #workaround for bug on using urllib2 and multiprocessing
@@ -344,15 +368,24 @@
     finally:
         if conn is not None:
             conn.close()
-        
+    
+    process_engines = []
+    logger_queues = []
     
-    sprocess = SourceProcess(Session, queue, options, access_token, stop_event)    
+    SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
+    process_engines.append(engine_process)
+    lqueue = mQueue(1)
+    logger_queues.append(lqueue)
+    sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue)    
     
     tweet_processes = []
     
     for i in range(options.process_nb - 1):
-        Session, engine, metadata = get_sessionmaker(conn_str)
-        cprocess = TweetProcess(Session, queue, options, access_token, stop_event)
+        SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
+        process_engines.append(engine_process)
+        lqueue = mQueue(1)
+        logger_queues.append(lqueue)
+        cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue)
         tweet_processes.append(cprocess)
 
     def interupt_handler(signum, frame):
@@ -360,23 +393,28 @@
         
     signal.signal(signal.SIGINT, interupt_handler)
 
+    log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues,stop_event,))
+    log_thread.daemon = True
+
     sprocess.start()
     for cprocess in tweet_processes:
         cprocess.start()
 
+    log_thread.start()
+
     if options.duration >= 0:
-        end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)
+        end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)    
+    
 
     while not stop_event.is_set():
         if options.duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
             stop_event.set()
             break
-        if sprocess.is_alive():
+        if sprocess.is_alive():            
             time.sleep(1)
         else:
             stop_event.set()
             break
-    
     utils.get_logger().debug("Joining Source Process")
     try:
         sprocess.join(10)
@@ -384,8 +422,6 @@
         utils.get_logger().debug("Pb joining Source Process - terminating")
         sprocess.terminate()
         
-    utils.get_logger().debug("Joining Queue")
-    #queue.join()
     for i, cprocess in enumerate(tweet_processes):
         utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
         try:
@@ -393,15 +429,30 @@
         except:
             utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
             cprocess.terminate()
+
     
-    utils.get_logger().debug("Processing leftovers")
-    session = Session()
+    utils.get_logger().debug("Close queues")
     try:
-        process_leftovers(session, access_token)
-        session.commit()
-    finally:
-        session.rollback()
-        session.close()
+        queue.close()
+        for lqueue in logger_queues:
+            lqueue.close()
+    except exception as e:
+        utils.get_logger().error("error when closing queues %s", repr(e))
+        #do nothing
+        
+    
+    if options.process_nb > 1:
+        utils.get_logger().debug("Processing leftovers")
+        session = Session()
+        try:
+            process_leftovers(session, access_token, utils.get_logger())
+            session.commit()
+        finally:
+            session.rollback()
+            session.close()
 
+    for pengine in process_engines:
+        pengine.dispose()
+        
     utils.get_logger().debug("Done. Exiting.")
         
Binary file script/virtualenv/res/psycopg2-2.4.2.tar.gz has changed