script/stream/recorder_tweetstream.py
changeset 254 2209e66bb50b
parent 243 9213a63fa34a
child 255 500cd0405c7a
equal deleted inserted replaced
253:e9335ee3cf71 254:2209e66bb50b
     1 from getpass import getpass
     1 from getpass import getpass
     2 from iri_tweet import models, utils
     2 from iri_tweet import models, utils
     3 from iri_tweet.models import TweetSource, TweetLog
     3 from iri_tweet.models import TweetSource, TweetLog
     4 from multiprocessing import Queue, JoinableQueue, Process, Event, get_logger
     4 from multiprocessing import Queue, JoinableQueue, Process, Event, get_logger
     5 from optparse import OptionParser
     5 from optparse import OptionParser
     6 from sqlalchemy.orm import sessionmaker
     6 from sqlalchemy.exc import OperationalError
       
     7 from sqlalchemy.orm import scoped_session, sessionmaker
     7 import StringIO
     8 import StringIO
     8 import logging
       
     9 import anyjson
     9 import anyjson
    10 import datetime
    10 import datetime
       
    11 import logging
    11 import os
    12 import os
       
    13 import re
    12 import shutil
    14 import shutil
    13 import signal
    15 import signal
    14 import socket
    16 import socket
       
    17 import sqlalchemy.schema
    15 import sys
    18 import sys
    16 import time
    19 import time
    17 import traceback
    20 import traceback
    18 import tweepy.auth
    21 import tweepy.auth
    19 import tweetstream
    22 import tweetstream
    20 from iri_tweet.utils import logger
    23 import urllib2
    21 from sqlalchemy.exc import OperationalError
    24 #from iri_tweet.utils import get_logger
    22 socket._fileobject.default_bufsize = 0
    25 socket._fileobject.default_bufsize = 0
    23 
    26 
    24 
    27 
    25 
    28 
    26 #columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user']
    29 #columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user']
    27 columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source']
    30 columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source']
    28 #columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following']
    31 #columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following']
    29 columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count']
    32 columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count']
    30 #just put it in a sqlite3 tqble
    33 #just put it in a sqlite3 tqble
       
    34 
       
    35 
       
    36 def set_logging(options):
       
    37     utils.set_logging(options, logging.getLogger('iri_tweet'))
       
    38     utils.set_logging(options, logging.getLogger('multiprocessing'))
       
    39     if options.debug >= 2:
       
    40         utils.set_logging(options, logging.getLogger('sqlalchemy.engine'))
       
    41     #utils.set_logging(options, logging.getLogger('sqlalchemy.dialects'))
       
    42     #utils.set_logging(options, logging.getLogger('sqlalchemy.pool'))
       
    43     #utils.set_logging(options, logging.getLogger('sqlalchemy.orm'))
       
    44 
       
    45 def get_auth(options, access_token):
       
    46     if options.username and options.password:
       
    47         auth = tweepy.auth.BasicAuthHandler(options.username, options.password)        
       
    48     else:
       
    49         consumer_key = models.CONSUMER_KEY
       
    50         consumer_secret = models.CONSUMER_SECRET
       
    51         auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
       
    52         auth.set_access_token(*access_token)
       
    53     return auth
    31 
    54 
    32 
    55 
    33 class ReconnectingTweetStream(tweetstream.FilterStream):
    56 class ReconnectingTweetStream(tweetstream.FilterStream):
    34     """TweetStream class that automatically tries to reconnect if the
    57     """TweetStream class that automatically tries to reconnect if the
    35     connecting goes down. Reconnecting, and waiting for reconnecting, is
    58     connecting goes down. Reconnecting, and waiting for reconnecting, is
    60         super(ReconnectingTweetStream, self).__init__(auth=auth, track=keywords, as_text=as_text, **kwargs)
    83         super(ReconnectingTweetStream, self).__init__(auth=auth, track=keywords, as_text=as_text, **kwargs)
    61 
    84 
    62     def next(self):
    85     def next(self):
    63         while True:
    86         while True:
    64             try:
    87             try:
       
    88                 utils.get_logger().debug("return super.next")
    65                 return super(ReconnectingTweetStream, self).next()
    89                 return super(ReconnectingTweetStream, self).next()
    66             except tweetstream.ConnectionError, e:
    90             except tweetstream.ConnectionError, e:
    67                 logging.debug("connection error :" + str(e))
    91                 utils.get_logger().debug("connection error :" + str(e))
    68                 self._reconnects += 1
    92                 self._reconnects += 1
    69                 if self._reconnects > self.max_reconnects:
    93                 if self._reconnects > self.max_reconnects:
    70                     raise tweetstream.ConnectionError("Too many retries")
    94                     raise tweetstream.ConnectionError("Too many retries")
    71 
    95 
    72                 # Note: error_cb is not called on the last error since we
    96                 # Note: error_cb is not called on the last error since we
    78         # Don't listen to auth error, since we can't reasonably reconnect
   102         # Don't listen to auth error, since we can't reasonably reconnect
    79         # when we get one.
   103         # when we get one.
    80 
   104 
    81 
   105 
    82 
   106 
       
   107 
    83 class SourceProcess(Process):
   108 class SourceProcess(Process):
    84     
   109     
    85     def __init__(self, session_maker, queue, auth, track, debug, reconnects, token_filename, stop_event):
   110     def __init__(self, session_maker, queue, options, access_token, stop_event):
    86         self.session_maker = session_maker
   111         self.session_maker = session_maker
    87         self.queue = queue
   112         self.queue = queue
    88         self.auth = auth
   113         self.track = options.track
    89         self.track = track
   114         self.reconnects = options.reconnects
    90         self.debug = debug
   115         self.token_filename = options.token_filename
    91         self.reconnects = reconnects
       
    92         self.token_filename = token_filename
       
    93         self.stop_event = stop_event
   116         self.stop_event = stop_event
       
   117         self.options = options
       
   118         self.access_token = access_token
    94         super(SourceProcess, self).__init__()
   119         super(SourceProcess, self).__init__()
    95 #        self.stop_event = 
       
    96     
   120     
    97     def run(self):
   121     def run(self):
    98         
   122         #import pydevd
    99         get_logger().debug("SourceProcess : run")
   123         #pydevd.settrace(suspend=False)
       
   124 
       
   125         set_logging(self.options)
       
   126         self.auth = get_auth(self.options, self.access_token) 
       
   127         
       
   128         utils.get_logger().debug("SourceProcess : run")
   100         track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
   129         track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
   101         track_list = [k for k in track_list.split(',')]
   130         track_list = [k for k in track_list.split(',')]
   102 
   131 
   103         get_logger().debug("SourceProcess : before connecting to stream " + repr(track_list))                        
   132         utils.get_logger().debug("SourceProcess : before connecting to stream " + repr(track_list))                        
   104         stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True)
   133         stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True)
   105         get_logger().debug("SourceProcess : after connecting to stream")
   134         utils.get_logger().debug("SourceProcess : after connecting to stream")
   106         stream.muststop = lambda: self.stop_event.is_set()
   135         stream.muststop = lambda: self.stop_event.is_set()
   107         
   136         
   108         session = self.session_maker()
   137         session = self.session_maker()
   109         
   138         
   110         try:
   139         try:
   111             for tweet in stream:
   140             for tweet in stream:
   112                 get_logger().debug("tweet " + repr(tweet))
   141                 utils.get_logger().debug("tweet " + repr(tweet))
   113                 source = TweetSource(original_json=tweet)
   142                 source = TweetSource(original_json=tweet)
   114                 get_logger().debug("source created")
   143                 utils.get_logger().debug("source created")
   115                 add_retries = 0
   144                 add_retries = 0
   116                 while add_retries < 10:
   145                 while add_retries < 10:
   117                     try:
   146                     try:
   118                         add_retries += 1
   147                         add_retries += 1
   119                         session.add(source)
   148                         session.add(source)
   120                         session.flush()
   149                         session.flush()
   121                         break
   150                         break
   122                     except OperationalError as e:
   151                     except OperationalError as e:
   123                         session.rollback()
   152                         session.rollback()
   124                         get_logger().debug("Operational Error %s nb %d" % (repr(e), add_retries))
   153                         utils.get_logger().debug("Operational Error %s nb %d" % (repr(e), add_retries))
   125                         if add_retries==10:
   154                         if add_retries == 10:
   126                             raise e
   155                             raise e
   127                      
   156                      
   128                 source_id = source.id
   157                 source_id = source.id
   129                 get_logger().debug("before queue + source id " + repr(source_id))
   158                 utils.get_logger().debug("before queue + source id " + repr(source_id))
       
   159                 utils.get_logger().info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
       
   160                 session.commit()
   130                 self.queue.put((source_id, tweet), False)
   161                 self.queue.put((source_id, tweet), False)
   131                 #process_tweet(tweet, source_id, self.session, self.debug, self.token_filename)
   162 
   132                 get_logger().info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
       
   133                 session.commit()
       
   134 #                if duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
       
   135 #                    print "Stop recording after %d seconds." % (duration)
       
   136 #                    break
       
   137         except Exception as e:
   163         except Exception as e:
   138             get_logger().error("Error when processing tweet " + repr(e))
   164             utils.get_logger().error("Error when processing tweet " + repr(e))
   139         finally:
   165         finally:
   140             session.rollback()
   166             session.rollback()
   141             stream.close()
   167             stream.close()
   142             session.close()
   168             session.close()
   143             self.queue.close()
   169             self.queue.close()
   144             self.stop_event.set()
   170             self.stop_event.set()
   145 
   171 
   146 
   172 
   147 def process_tweet(tweet, source_id, session, token_filename):
   173 def process_tweet(tweet, source_id, session, access_token):
   148     try:
   174     try:
   149         tweet_obj = anyjson.deserialize(tweet)
   175         tweet_obj = anyjson.deserialize(tweet)
   150         screen_name = ""
   176         screen_name = ""
   151         if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
   177         if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
   152             screen_name = tweet_obj['user']['screen_name']
   178             screen_name = tweet_obj['user']['screen_name']
   153         get_logger().info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
   179         utils.get_logger().debug(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
   154         get_logger().debug(u"Process_tweet :" + repr(tweet))
   180         utils.get_logger().debug(u"Process_tweet :" + repr(tweet))
   155         processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, token_filename)
   181         processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None)
   156         processor.process()
   182         processor.process()
   157     except Exception as e:
   183     except Exception as e:
   158         message = u"Error %s processing tweet %s" % (repr(e), tweet)
   184         message = u"Error %s processing tweet %s" % (repr(e), tweet)
   159         get_logger().error(message)
   185         utils.get_logger().error(message)
   160         output = StringIO.StringIO()
   186         output = StringIO.StringIO()
   161         traceback.print_exception(Exception, e, None, None, output)
   187         traceback.print_exception(Exception, e, None, None, output)
   162         error_stack = output.getvalue()
   188         error_stack = output.getvalue()
   163         output.close()
   189         output.close()
   164         session.rollback()
   190         session.rollback()
   168 
   194 
   169     
   195     
   170         
   196         
   171 class TweetProcess(Process):
   197 class TweetProcess(Process):
   172     
   198     
   173     def __init__(self, session_maker, queue, debug, token_filename, stop_event):
   199     def __init__(self, session_maker, queue, options, access_token, stop_event):
   174         self.session_maker = session_maker
   200         self.session_maker = session_maker
   175         self.queue = queue
   201         self.queue = queue
   176         self.debug = debug
       
   177         self.stop_event = stop_event
   202         self.stop_event = stop_event
   178         self.token_filename = token_filename
   203         self.options = options
       
   204         self.access_token = access_token
   179         super(TweetProcess, self).__init__()
   205         super(TweetProcess, self).__init__()
   180 
   206 
   181 
   207 
   182     def run(self):
   208     def run(self):
   183         
   209         
       
   210         set_logging(self.options)
   184         session = self.session_maker()
   211         session = self.session_maker()
   185         try:
   212         try:
   186             while not self.stop_event.is_set():
   213             while not self.stop_event.is_set():
   187                 try:
   214                 try:
   188                     source_id, tweet_txt = queue.get(True, 10)
   215                     source_id, tweet_txt = queue.get(True, 3)
   189                     get_logger().debug("Processing source id " + repr(source_id))
   216                     utils.get_logger().debug("Processing source id " + repr(source_id))
   190                 except Exception as e:
   217                 except Exception as e:
   191                     get_logger().debug('Process tweet exception in loop : ' + repr(e))
   218                     utils.get_logger().debug('Process tweet exception in loop : ' + repr(e))
   192                     continue
   219                     continue
   193                 process_tweet(tweet_txt, source_id, session, self.token_filename)
   220                 process_tweet(tweet_txt, source_id, session, self.access_token)
   194                 session.commit()
   221                 session.commit()
   195         except:
       
   196             raise
       
   197         finally:
   222         finally:
   198             session.rollback()
   223             session.rollback()
   199             self.stop_event.set()
   224             self.stop_event.set()
   200             session.close()
   225             session.close()
       
   226 
       
   227 
       
   228 def get_sessionmaker(conn_str):
       
   229     engine, metadata = models.setup_database(conn_str, echo=False, create_all=False)
       
   230     Session = scoped_session(sessionmaker(bind=engine, autocommit=False))
       
   231     return Session, engine, metadata
       
   232 
   201             
   233             
   202 def process_leftovers(session, token_filename):
   234 def process_leftovers(session, access_token):
   203     
   235     
   204     sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
   236     sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
   205     
   237     
   206     for src in sources:
   238     for src in sources:
   207         tweet_txt = src.original_json
   239         tweet_txt = src.original_json
   208         process_tweet(tweet_txt, src.id, session, token_filename)
   240         process_tweet(tweet_txt, src.id, session, access_token)
       
   241         session.commit()
   209 
   242 
   210         
   243         
   211     
   244     
   212     #get tweet source that do not match any message
   245     #get tweet source that do not match any message
   213     #select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull;
   246     #select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull;
   214 
   247 
   215         
   248         
   216 def get_options():
   249 def get_options():
   217     parser = OptionParser()
   250     parser = OptionParser()
   218     parser.add_option("-f", "--file", dest="filename",
   251     parser.add_option("-f", "--file", dest="conn_str",
   219                       help="write tweet to FILE", metavar="FILE", default="enmi2010_twitter.db")
   252                       help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db")
   220     parser.add_option("-u", "--user", dest="username",
   253     parser.add_option("-u", "--user", dest="username",
   221                       help="Twitter user", metavar="USER", default=None)
   254                       help="Twitter user", metavar="USER", default=None)
   222     parser.add_option("-w", "--password", dest="password",
   255     parser.add_option("-w", "--password", dest="password",
   223                       help="Twitter password", metavar="PASSWORD", default=None)
   256                       help="Twitter password", metavar="PASSWORD", default=None)
   224     parser.add_option("-T", "--track", dest="track",
   257     parser.add_option("-T", "--track", dest="track",
   229                       help="Reconnects", metavar="RECONNECTS", default=10, type='int')
   262                       help="Reconnects", metavar="RECONNECTS", default=10, type='int')
   230     parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
   263     parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
   231                       help="Token file name")
   264                       help="Token file name")
   232     parser.add_option("-d", "--duration", dest="duration",
   265     parser.add_option("-d", "--duration", dest="duration",
   233                       help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
   266                       help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
   234     parser.add_option("-N", "--consumer", dest="consumer_nb",
   267     parser.add_option("-N", "--nb-process", dest="process_nb",
   235                       help="number of consumer", metavar="CONSUMER", default=1, type='int')
   268                       help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int')
   236 
   269 
   237 
   270 
   238 
   271 
   239     utils.set_logging_options(parser)
   272     utils.set_logging_options(parser)
   240 
   273 
   243 
   276 
   244 if __name__ == '__main__':
   277 if __name__ == '__main__':
   245     
   278     
   246     (options, args) = get_options()
   279     (options, args) = get_options()
   247     
   280     
   248     utils.set_logging(options, get_logger())
   281     set_logging(options)
   249         
   282         
   250     if options.debug:
   283     if options.debug:
   251         print "OPTIONS : "
   284         print "OPTIONS : "
   252         print repr(options)
   285         print repr(options)
   253     
   286     
   254     if options.new and os.path.exists(options.filename):
   287     
   255         i = 1
   288     conn_str = options.conn_str.strip()
   256         basename, extension = os.path.splitext(options.filename)
   289     if not re.match("^\w+://.+", conn_str):
   257         new_path = '%s.%d%s' % (basename, i, extension)
   290         conn_str = 'sqlite://' + options.conn_str
   258         while i < 1000000 and os.path.exists(new_path):
   291         
   259             i += 1
   292     if conn_str.startswith("sqlite") and options.new:
       
   293         filepath = conn_str[conn_str.find("://"):]
       
   294         if os.path.exists(filepath):
       
   295             i = 1
       
   296             basename, extension = os.path.splitext(filepath)
   260             new_path = '%s.%d%s' % (basename, i, extension)
   297             new_path = '%s.%d%s' % (basename, i, extension)
   261         if i >= 1000000:
   298             while i < 1000000 and os.path.exists(new_path):
   262             raise Exception("Unable to find new filename for " + options.filename)
   299                 i += 1
   263         else:
   300                 new_path = '%s.%d%s' % (basename, i, extension)
   264             shutil.move(options.filename, new_path)
   301             if i >= 1000000:
   265 
   302                 raise Exception("Unable to find new filename for " + filepath)
   266     
   303             else:
       
   304                 shutil.move(filepath, new_path)
       
   305 
       
   306     Session, engine, metadata = get_sessionmaker(conn_str)
       
   307     
       
   308     if options.new:
       
   309         check_metadata = sqlalchemy.schema.MetaData(bind=engine, reflect=True)
       
   310         if len(check_metadata.sorted_tables) > 0:
       
   311             message = "Database %s not empty exiting" % conn_str
       
   312             utils.get_logger().error(message)
       
   313             sys.exit(message)
       
   314     
       
   315     metadata.create_all(engine)
       
   316     
       
   317     access_token = None
       
   318     if not options.username or not options.password:
       
   319         access_token = utils.get_oauth_token(options.token_filename)
       
   320     
       
   321     session = Session()
       
   322     try:
       
   323         process_leftovers(session, access_token)
       
   324         session.commit()
       
   325     finally:
       
   326         session.rollback()
       
   327         session.close()
       
   328     
       
   329     if options.process_nb <= 0:
       
   330         utils.get_logger().debug("Leftovers processed. Exiting.")
       
   331         sys.exit()
       
   332 
   267     queue = JoinableQueue()
   333     queue = JoinableQueue()
   268     stop_event = Event()
   334     stop_event = Event()
   269 
   335     
   270     if options.username and options.password:
   336     #workaround for bug on using urllib2 and multiprocessing
   271         auth = tweepy.auth.BasicAuthHandler(options.username, options.password)        
   337     req = urllib2.Request('http://localhost')
   272     else:
   338     conn = None
   273         consumer_key = models.CONSUMER_KEY
   339     try:
   274         consumer_secret = models.CONSUMER_SECRET
   340         conn = urllib2.urlopen(req)
   275         auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
   341     except:
   276         auth.set_access_token(*(utils.get_oauth_token(options.token_filename)))
   342         pass
   277 
   343         #donothing
   278 
   344     finally:
   279     engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2))
   345         if conn is not None:
   280     Session = sessionmaker(bind=engine)
   346             conn.close()
   281     
   347         
   282     session = Session()
   348     
   283     process_leftovers(session, options.token_filename)
   349     sprocess = SourceProcess(Session, queue, options, access_token, stop_event)    
   284     session.commit()
       
   285     session.close()
       
   286          
       
   287     sprocess = SourceProcess(Session, queue, auth, options.track, options.debug, options.reconnects, options.token_filename, stop_event)    
       
   288     
   350     
   289     tweet_processes = []
   351     tweet_processes = []
   290     
   352     
   291     for i in range(options.consumer_nb):
   353     for i in range(options.process_nb - 1):
   292         engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2))
   354         Session, engine, metadata = get_sessionmaker(conn_str)
   293         Session = sessionmaker(bind=engine)
   355         cprocess = TweetProcess(Session, queue, options, access_token, stop_event)
   294         cprocess = TweetProcess(Session, queue, options.debug, options.token_filename, stop_event)
       
   295         tweet_processes.append(cprocess)
   356         tweet_processes.append(cprocess)
   296 
   357 
   297     def interupt_handler(signum, frame):
   358     def interupt_handler(signum, frame):
   298         stop_event.set()
   359         stop_event.set()
   299         
   360         
   309     while not stop_event.is_set():
   370     while not stop_event.is_set():
   310         if options.duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
   371         if options.duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
   311             stop_event.set()
   372             stop_event.set()
   312             break
   373             break
   313         if sprocess.is_alive():
   374         if sprocess.is_alive():
   314             time.sleep(0.1)
   375             time.sleep(1)
   315         else:
   376         else:
       
   377             stop_event.set()
   316             break
   378             break
   317     
   379     
   318     get_logger().debug("Joining Source Process")
   380     utils.get_logger().debug("Joining Source Process")
   319     sprocess.join()
   381     try:
   320     get_logger().debug("Joining Queue")
   382         sprocess.join(10)
       
   383     except:
       
   384         utils.get_logger().debug("Pb joining Source Process - terminating")
       
   385         sprocess.terminate()
       
   386         
       
   387     utils.get_logger().debug("Joining Queue")
   321     #queue.join()
   388     #queue.join()
   322     for i,cprocess in enumerate(tweet_processes):
   389     for i, cprocess in enumerate(tweet_processes):
   323         get_logger().debug("Joining consumer process Nb %d" % (i+1))
   390         utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
   324         cprocess.join()
   391         try:
   325     
   392             cprocess.join(3)
   326     get_logger().debug("Processing leftovers")
   393         except:
       
   394             utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
       
   395             cprocess.terminate()
       
   396     
       
   397     utils.get_logger().debug("Processing leftovers")
   327     session = Session()
   398     session = Session()
   328     process_leftovers(session, options.token_filename)
   399     try:
   329     session.commit()
   400         process_leftovers(session, access_token)
   330     session.close()
   401         session.commit()
   331 
   402     finally:
   332     get_logger().debug("Done. Exiting.")
   403         session.rollback()
   333         
   404         session.close()
       
   405 
       
   406     utils.get_logger().debug("Done. Exiting.")
       
   407