script/stream/recorder_tweetstream.py
changeset 893 10a19dd4e1c9
parent 877 41ce1c341abe
parent 891 8628c590f608
child 894 cba4554e9c03
equal deleted inserted replaced
877:41ce1c341abe 893:10a19dd4e1c9
     1 from getpass import getpass
       
     2 from iri_tweet import models, utils
       
     3 from iri_tweet.models import TweetSource, TweetLog, ProcessEvent
       
     4 from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event, 
       
     5     get_logger)
       
     6 from optparse import OptionParser
       
     7 from sqlalchemy.exc import OperationalError
       
     8 from sqlalchemy.orm import scoped_session
       
     9 import Queue
       
    10 import StringIO
       
    11 import anyjson
       
    12 import datetime
       
    13 import inspect
       
    14 import logging
       
    15 import os
       
    16 import re
       
    17 import shutil
       
    18 import signal
       
    19 import socket
       
    20 import sqlalchemy.schema
       
    21 import sys
       
    22 import threading
       
    23 import time
       
    24 import traceback
       
    25 import tweepy.auth
       
    26 import tweetstream
       
    27 import urllib2
       
    28 socket._fileobject.default_bufsize = 0
       
    29 
       
    30 
       
    31 
       
    32 #columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user']
       
    33 columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source']
       
    34 #columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following']
       
    35 columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count']
       
    36 #just put it in a sqlite3 tqble
       
    37 
       
    38 
       
    39 def set_logging(options):
       
    40     loggers = []
       
    41     
       
    42     loggers.append(utils.set_logging(options, logging.getLogger('iri.tweet')))
       
    43     loggers.append(utils.set_logging(options, logging.getLogger('multiprocessing')))
       
    44     if options.debug >= 2:
       
    45         loggers.append(utils.set_logging(options, logging.getLogger('sqlalchemy.engine')))
       
    46     #utils.set_logging(options, logging.getLogger('sqlalchemy.dialects'))
       
    47     #utils.set_logging(options, logging.getLogger('sqlalchemy.pool'))
       
    48     #utils.set_logging(options, logging.getLogger('sqlalchemy.orm'))
       
    49     return loggers
       
    50 
       
    51 def set_logging_process(options, queue):
       
    52     qlogger = utils.set_logging(options, logging.getLogger('iri.tweet.p'), queue)
       
    53     qlogger.propagate = 0
       
    54     return qlogger
       
    55 
       
    56 def get_auth(options, access_token):
       
    57     if options.username and options.password:
       
    58         auth = tweepy.auth.BasicAuthHandler(options.username, options.password)        
       
    59     else:
       
    60         consumer_key = models.CONSUMER_KEY
       
    61         consumer_secret = models.CONSUMER_SECRET
       
    62         auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
       
    63         auth.set_access_token(*access_token)
       
    64     return auth
       
    65 
       
    66 
       
    67 def add_process_event(type, args, session_maker):
       
    68     session = session_maker()
       
    69     try:
       
    70         evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type)
       
    71         session.add(evt)
       
    72         session.commit()
       
    73     finally:
       
    74         session.close()
       
    75 
       
    76 
       
    77 class BaseProcess(Process):
       
    78 
       
    79     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
       
    80         self.parent_pid = parent_pid
       
    81         self.session_maker = session_maker
       
    82         self.queue = queue
       
    83         self.options = options
       
    84         self.logger_queue = logger_queue
       
    85         self.stop_event = stop_event
       
    86         self.access_token = access_token
       
    87 
       
    88         super(BaseProcess, self).__init__()
       
    89 
       
    90     #
       
    91     # from http://stackoverflow.com/questions/2542610/python-daemon-doesnt-kill-its-kids
       
    92     #
       
    93     def parent_is_alive(self):
       
    94         try:
       
    95             # try to call Parent
       
    96             os.kill(self.parent_pid, 0)
       
    97         except OSError:
       
    98             # *beeep* oh no! The phone's disconnected!
       
    99             return False
       
   100         else:
       
   101             # *ring* Hi mom!
       
   102             return True
       
   103     
       
   104 
       
   105     def __get_process_event_args(self):
       
   106         return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__, 'access_token':self.access_token}
       
   107 
       
   108     def run(self):
       
   109         try:
       
   110             add_process_event("start_worker", self.__get_process_event_args(), self.session_maker)
       
   111             self.do_run()
       
   112         finally:
       
   113             add_process_event("stop_worker", self.__get_process_event_args(), self.session_maker)
       
   114         
       
   115     def do_run(self):
       
   116         raise NotImplementedError()
       
   117 
       
   118 
       
   119 
       
   120 class SourceProcess(BaseProcess):
       
   121     
       
   122     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
       
   123         self.track = options.track
       
   124         self.token_filename = options.token_filename
       
   125         self.catchup = options.catchup
       
   126         self.timeout = options.timeout
       
   127         super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
       
   128 
       
   129     def do_run(self):
       
   130         
       
   131         #import pydevd
       
   132         #pydevd.settrace(suspend=True)
       
   133 
       
   134         self.logger = set_logging_process(self.options, self.logger_queue)
       
   135         self.auth = get_auth(self.options, self.access_token) 
       
   136         
       
   137         self.logger.debug("SourceProcess : run ")
       
   138         track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
       
   139         self.logger.debug("SourceProcess : track list " + track_list)
       
   140         
       
   141         track_list = [k.strip() for k in track_list.split(',')]
       
   142 
       
   143         self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list))                        
       
   144         stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout)
       
   145         self.logger.debug("SourceProcess : after connecting to stream")
       
   146         stream.muststop = lambda: self.stop_event.is_set()
       
   147         
       
   148         session = self.session_maker()
       
   149         
       
   150         try:
       
   151             for tweet in stream:
       
   152                 if not self.parent_is_alive():
       
   153                     sys.exit()
       
   154                 self.logger.debug("SourceProcess : tweet " + repr(tweet))
       
   155                 source = TweetSource(original_json=tweet)
       
   156                 self.logger.debug("SourceProcess : source created")
       
   157                 add_retries = 0
       
   158                 while add_retries < 10:
       
   159                     try:
       
   160                         add_retries += 1
       
   161                         session.add(source)
       
   162                         session.flush()
       
   163                         break
       
   164                     except OperationalError as e:
       
   165                         session.rollback()
       
   166                         self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries))
       
   167                         if add_retries == 10:
       
   168                             raise e
       
   169                      
       
   170                 source_id = source.id
       
   171                 self.logger.debug("SourceProcess : before queue + source id " + repr(source_id))
       
   172                 self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
       
   173                 session.commit()
       
   174                 self.queue.put((source_id, tweet), False)
       
   175 
       
   176         except Exception as e:
       
   177             self.logger.error("SourceProcess : Error when processing tweet " + repr(e))
       
   178         finally:
       
   179             session.rollback()
       
   180             stream.close()
       
   181             session.close()
       
   182             self.queue.close()
       
   183             self.stop_event.set()
       
   184 
       
   185 
       
   186 def process_tweet(tweet, source_id, session, access_token, twitter_query_user, logger):
       
   187     try:
       
   188         tweet_obj = anyjson.deserialize(tweet)
       
   189         if 'text' not in tweet_obj:
       
   190             tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
       
   191             session.add(tweet_log)
       
   192             return
       
   193         screen_name = ""
       
   194         if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
       
   195             screen_name = tweet_obj['user']['screen_name']
       
   196         logger.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
       
   197         logger.debug(u"Process_tweet :" + repr(tweet))
       
   198         processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None, twitter_query_user)
       
   199         processor.process()
       
   200     except Exception as e:
       
   201         message = u"Error %s processing tweet %s" % (repr(e), tweet)
       
   202         logger.exception(message)
       
   203         output = StringIO.StringIO()
       
   204         try:
       
   205             traceback.print_exc(file=output)
       
   206             error_stack = output.getvalue()
       
   207         finally:
       
   208             output.close()
       
   209         session.rollback()
       
   210         tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack)
       
   211         session.add(tweet_log)
       
   212         session.commit()
       
   213 
       
   214     
       
   215         
       
   216 class TweetProcess(BaseProcess):
       
   217     
       
   218     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
       
   219         super(TweetProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
       
   220         self.twitter_query_user = options.twitter_query_user
       
   221 
       
   222 
       
   223     def do_run(self):
       
   224         
       
   225         self.logger = set_logging_process(self.options, self.logger_queue)
       
   226         session = self.session_maker()
       
   227         try:
       
   228             while not self.stop_event.is_set() and self.parent_is_alive():
       
   229                 try:
       
   230                     source_id, tweet_txt = self.queue.get(True, 3)
       
   231                     self.logger.debug("Processing source id " + repr(source_id))
       
   232                 except Exception as e:
       
   233                     self.logger.debug('Process tweet exception in loop : ' + repr(e))
       
   234                     continue
       
   235                 process_tweet(tweet_txt, source_id, session, self.access_token, self.twitter_query_user, self.logger)
       
   236                 session.commit()
       
   237         finally:
       
   238             session.rollback()
       
   239             self.stop_event.set()
       
   240             session.close()
       
   241 
       
   242 
       
   243 def get_sessionmaker(conn_str):
       
   244     engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False)
       
   245     Session = scoped_session(Session)
       
   246     return Session, engine, metadata
       
   247 
       
   248             
       
   249 def process_leftovers(session, access_token, twitter_query_user, logger):
       
   250     
       
   251     sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
       
   252     
       
   253     for src in sources:
       
   254         tweet_txt = src.original_json
       
   255         process_tweet(tweet_txt, src.id, session, access_token, twitter_query_user, logger)
       
   256         session.commit()
       
   257 
       
   258         
       
   259     
       
   260     #get tweet source that do not match any message
       
   261     #select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull;
       
   262 def process_log(logger_queues, stop_event):
       
   263     while not stop_event.is_set():
       
   264         for lqueue in logger_queues:
       
   265             try:
       
   266                 record = lqueue.get_nowait()
       
   267                 logging.getLogger(record.name).handle(record)
       
   268             except Queue.Empty:
       
   269                 continue
       
   270             except IOError:
       
   271                 continue
       
   272         time.sleep(0.1)
       
   273 
       
   274         
       
   275 def get_options():
       
   276 
       
   277     usage = "usage: %prog [options]"
       
   278 
       
   279     parser = OptionParser(usage=usage)
       
   280 
       
   281     parser.add_option("-f", "--file", dest="conn_str",
       
   282                       help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db")
       
   283     parser.add_option("-u", "--user", dest="username",
       
   284                       help="Twitter user", metavar="USER", default=None)
       
   285     parser.add_option("-w", "--password", dest="password",
       
   286                       help="Twitter password", metavar="PASSWORD", default=None)
       
   287     parser.add_option("-T", "--track", dest="track",
       
   288                       help="Twitter track", metavar="TRACK")
       
   289     parser.add_option("-n", "--new", dest="new", action="store_true",
       
   290                       help="new database", default=False)
       
   291     parser.add_option("-D", "--daemon", dest="daemon", action="store_true",
       
   292                       help="launch daemon", default=False)
       
   293     parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
       
   294                       help="Token file name")
       
   295     parser.add_option("-d", "--duration", dest="duration",
       
   296                       help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
       
   297     parser.add_option("-N", "--nb-process", dest="process_nb",
       
   298                       help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int')
       
   299     parser.add_option("--url", dest="url",
       
   300                       help="The twitter url to connect to.", metavar="URL", default=tweetstream.FilterStream.url)
       
   301     parser.add_option("--query-user", dest="twitter_query_user", action="store_true",
       
   302                       help="Query twitter for users", default=False, metavar="QUERY_USER")
       
   303     parser.add_option("--catchup", dest="catchup",
       
   304                       help="catchup count for tweets", default=None, metavar="CATCHUP", type='int')
       
   305     parser.add_option("--timeout", dest="timeout",
       
   306                       help="timeout for connecting in seconds", default=60, metavar="TIMEOUT", type='int')
       
   307     
       
   308 
       
   309 
       
   310 
       
   311     utils.set_logging_options(parser)
       
   312 
       
   313     return parser.parse_args()
       
   314 
       
   315 
       
   316 def do_run(options, session_maker):
       
   317 
       
   318     stop_args = {}
       
   319 
       
   320     access_token = None
       
   321     if not options.username or not options.password:
       
   322         access_token = utils.get_oauth_token(options.token_filename)
       
   323     
       
   324     session = session_maker()
       
   325     try:
       
   326         process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger())
       
   327         session.commit()
       
   328     finally:
       
   329         session.rollback()
       
   330         session.close()
       
   331     
       
   332     if options.process_nb <= 0:
       
   333         utils.get_logger().debug("Leftovers processed. Exiting.")
       
   334         return None
       
   335 
       
   336     queue = mQueue()
       
   337     stop_event = Event()
       
   338     
       
   339     #workaround for bug on using urllib2 and multiprocessing
       
   340     req = urllib2.Request('http://localhost')
       
   341     conn = None
       
   342     try:
       
   343         conn = urllib2.urlopen(req)
       
   344     except:
       
   345         utils.get_logger().debug("could not open localhost")
       
   346         #donothing
       
   347     finally:
       
   348         if conn is not None:
       
   349             conn.close()
       
   350     
       
   351     process_engines = []
       
   352     logger_queues = []
       
   353     
       
   354     SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
       
   355     process_engines.append(engine_process)
       
   356     lqueue = mQueue(1)
       
   357     logger_queues.append(lqueue)
       
   358     pid = os.getpid()
       
   359     sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)    
       
   360     
       
   361     tweet_processes = []
       
   362     
       
   363     for i in range(options.process_nb - 1):
       
   364         SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
       
   365         process_engines.append(engine_process)
       
   366         lqueue = mQueue(1)
       
   367         logger_queues.append(lqueue)
       
   368         cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
       
   369         tweet_processes.append(cprocess)
       
   370 
       
   371     def interupt_handler(signum, frame):
       
   372         utils.get_logger().debug("shutdown asked " + repr(signum) + "  " + repr(inspect.getframeinfo(frame, 9)))
       
   373         stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)})
       
   374         stop_event.set()
       
   375         
       
   376     signal.signal(signal.SIGINT , interupt_handler)
       
   377     signal.signal(signal.SIGHUP , interupt_handler)
       
   378     signal.signal(signal.SIGALRM, interupt_handler)
       
   379     signal.signal(signal.SIGTERM, interupt_handler)
       
   380 
       
   381     log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,))
       
   382     log_thread.daemon = True
       
   383 
       
   384     log_thread.start()
       
   385 
       
   386     sprocess.start()
       
   387     for cprocess in tweet_processes:
       
   388         cprocess.start()
       
   389 
       
   390     add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name, sprocess.pid), 'consumers':dict([(p.name, p.pid) for p in tweet_processes])}, session_maker)
       
   391 
       
   392     if options.duration >= 0:
       
   393         end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)    
       
   394     
       
   395 
       
   396     while not stop_event.is_set():
       
   397         if options.duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
       
   398             stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts})
       
   399             stop_event.set()
       
   400             break
       
   401         if sprocess.is_alive():            
       
   402             time.sleep(1)
       
   403         else:
       
   404             stop_args.update({'message': 'Source process killed'})
       
   405             stop_event.set()
       
   406             break
       
   407     utils.get_logger().debug("Joining Source Process")
       
   408     try:
       
   409         sprocess.join(10)
       
   410     except:
       
   411         utils.get_logger().debug("Pb joining Source Process - terminating")
       
   412         sprocess.terminate()
       
   413         
       
   414     for i, cprocess in enumerate(tweet_processes):
       
   415         utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
       
   416         try:
       
   417             cprocess.join(3)
       
   418         except:
       
   419             utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
       
   420             cprocess.terminate()
       
   421 
       
   422     
       
   423     utils.get_logger().debug("Close queues")
       
   424     try:
       
   425         queue.close()
       
   426         for lqueue in logger_queues:
       
   427             lqueue.close()
       
   428     except exception as e:
       
   429         utils.get_logger().error("error when closing queues %s", repr(e))
       
   430         #do nothing
       
   431         
       
   432     
       
   433     if options.process_nb > 1:
       
   434         utils.get_logger().debug("Processing leftovers")
       
   435         session = session_maker()
       
   436         try:
       
   437             process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger())
       
   438             session.commit()
       
   439         finally:
       
   440             session.rollback()
       
   441             session.close()
       
   442 
       
   443     for pengine in process_engines:
       
   444         pengine.dispose()
       
   445 
       
   446     return stop_args
       
   447 
       
   448 
       
   449 def main(options, args):
       
   450     
       
   451     global conn_str
       
   452     
       
   453     conn_str = options.conn_str.strip()
       
   454     if not re.match("^\w+://.+", conn_str):
       
   455         conn_str = 'sqlite:///' + options.conn_str
       
   456         
       
   457     if conn_str.startswith("sqlite") and options.new:
       
   458         filepath = conn_str[conn_str.find(":///") + 4:]
       
   459         if os.path.exists(filepath):
       
   460             i = 1
       
   461             basename, extension = os.path.splitext(filepath)
       
   462             new_path = '%s.%d%s' % (basename, i, extension)
       
   463             while i < 1000000 and os.path.exists(new_path):
       
   464                 i += 1
       
   465                 new_path = '%s.%d%s' % (basename, i, extension)
       
   466             if i >= 1000000:
       
   467                 raise Exception("Unable to find new filename for " + filepath)
       
   468             else:
       
   469                 shutil.move(filepath, new_path)
       
   470 
       
   471     Session, engine, metadata = get_sessionmaker(conn_str)
       
   472     
       
   473     if options.new:
       
   474         check_metadata = sqlalchemy.schema.MetaData(bind=engine, reflect=True)
       
   475         if len(check_metadata.sorted_tables) > 0:
       
   476             message = "Database %s not empty exiting" % conn_str
       
   477             utils.get_logger().error(message)
       
   478             sys.exit(message)
       
   479     
       
   480     metadata.create_all(engine)
       
   481     session = Session()
       
   482     try:
       
   483         models.add_model_version(session)
       
   484     finally:
       
   485         session.close()
       
   486     
       
   487     stop_args = {}
       
   488     try:
       
   489         add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session)
       
   490         stop_args = do_run(options, Session)
       
   491     except Exception as e:
       
   492         utils.get_logger().exception("Error in main thread")        
       
   493         outfile = StringIO.StringIO()
       
   494         try:
       
   495             traceback.print_exc(file=outfile)
       
   496             stop_args = {'error': repr(e), 'message': getattr(e, 'message', ''), 'stacktrace':outfile.getvalue()}
       
   497         finally:
       
   498             outfile.close()
       
   499         raise
       
   500     finally:    
       
   501         add_process_event(type="shutdown", args=stop_args, session_maker=Session)
       
   502 
       
   503     utils.get_logger().debug("Done. Exiting. " + repr(stop_args))
       
   504 
       
   505 
       
   506 
       
   507 if __name__ == '__main__':
       
   508 
       
   509     (options, args) = get_options()
       
   510     
       
   511     loggers = set_logging(options)
       
   512     
       
   513     utils.get_logger().debug("OPTIONS : " + repr(options))
       
   514     
       
   515     if options.daemon:
       
   516         import daemon
       
   517         import lockfile
       
   518         
       
   519         hdlr_preserve = []
       
   520         for logger in loggers:
       
   521             hdlr_preserve.extend([h.stream for h in logger.handlers])
       
   522             
       
   523         context = daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=hdlr_preserve) 
       
   524         with context:
       
   525             main(options, args)
       
   526     else:
       
   527         main(options, args)
       
   528