script/stream/recorder_tweetstream.py
changeset 289 a5eff8f2b81d
parent 272 fe2efe3600ea
child 290 2ddd11ec2da2
equal deleted inserted replaced
288:4c870c767d3e 289:a5eff8f2b81d
     1 from getpass import getpass
     1 from getpass import getpass
     2 from iri_tweet import models, utils
     2 from iri_tweet import models, utils
     3 from iri_tweet.models import TweetSource, TweetLog, ProcessEvent
     3 from iri_tweet.models import TweetSource, TweetLog, ProcessEvent
     4 from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event, 
     4 from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event,
     5     get_logger)
     5     get_logger)
     6 from optparse import OptionParser
     6 from optparse import OptionParser
     7 from sqlalchemy.exc import OperationalError
     7 from sqlalchemy.exc import OperationalError
     8 from sqlalchemy.orm import scoped_session, sessionmaker
     8 from sqlalchemy.orm import scoped_session
     9 import Queue
     9 import Queue
    10 import StringIO
    10 import StringIO
    11 import anyjson
    11 import anyjson
    12 import datetime
    12 import datetime
    13 import inspect
    13 import inspect
    23 import time
    23 import time
    24 import traceback
    24 import traceback
    25 import tweepy.auth
    25 import tweepy.auth
    26 import tweetstream
    26 import tweetstream
    27 import urllib2
    27 import urllib2
    28 #from iri_tweet.utils import get_logger
       
    29 socket._fileobject.default_bufsize = 0
    28 socket._fileobject.default_bufsize = 0
    30 
    29 
    31 
    30 
    32 
    31 
    33 #columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user']
    32 #columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user']
   158     def run(self):
   157     def run(self):
   159         try:
   158         try:
   160             add_process_event("start_worker", self.__get_process_event_args(), self.session_maker)
   159             add_process_event("start_worker", self.__get_process_event_args(), self.session_maker)
   161             self.do_run()
   160             self.do_run()
   162         finally:
   161         finally:
   163             add_process_event("stop_worker",self.__get_process_event_args(), self.session_maker)
   162             add_process_event("stop_worker", self.__get_process_event_args(), self.session_maker)
   164         
   163         
   165     def do_run(self):
   164     def do_run(self):
   166         raise NotImplementedError()
   165         raise NotImplementedError()
   167 
   166 
   168 
   167 
   285             self.stop_event.set()
   284             self.stop_event.set()
   286             session.close()
   285             session.close()
   287 
   286 
   288 
   287 
   289 def get_sessionmaker(conn_str):
   288 def get_sessionmaker(conn_str):
   290     engine, metadata = models.setup_database(conn_str, echo=False, create_all=False)
   289     engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False)
   291     Session = scoped_session(sessionmaker(bind=engine, autocommit=False))
   290     Session = scoped_session(Session)
   292     return Session, engine, metadata
   291     return Session, engine, metadata
   293 
   292 
   294             
   293             
   295 def process_leftovers(session, access_token, logger):
   294 def process_leftovers(session, access_token, logger):
   296     
   295     
   422 
   421 
   423     sprocess.start()
   422     sprocess.start()
   424     for cprocess in tweet_processes:
   423     for cprocess in tweet_processes:
   425         cprocess.start()
   424         cprocess.start()
   426 
   425 
   427     add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name,sprocess.pid), 'consumers':dict([(p.name,p.pid) for p in tweet_processes])}, session_maker)
   426     add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name, sprocess.pid), 'consumers':dict([(p.name, p.pid) for p in tweet_processes])}, session_maker)
   428 
   427 
   429     if options.duration >= 0:
   428     if options.duration >= 0:
   430         end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)    
   429         end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)    
   431     
   430     
   432 
   431 
   513             message = "Database %s not empty exiting" % conn_str
   512             message = "Database %s not empty exiting" % conn_str
   514             utils.get_logger().error(message)
   513             utils.get_logger().error(message)
   515             sys.exit(message)
   514             sys.exit(message)
   516     
   515     
   517     metadata.create_all(engine)
   516     metadata.create_all(engine)
       
   517     session = Session()
       
   518     try:
       
   519         models.add_model_version(session)
       
   520     finally:
       
   521         session.close()
       
   522     
   518     stop_args = {}
   523     stop_args = {}
   519     try:
   524     try:
   520         add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session)
   525         add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session)
   521         stop_args = do_run(options, Session)
   526         stop_args = do_run(options, Session)
   522     except Exception as e:
   527     except Exception as e: