| changeset 289 | a5eff8f2b81d |
| parent 272 | fe2efe3600ea |
| child 290 | 2ddd11ec2da2 |
| 288:4c870c767d3e | 289:a5eff8f2b81d |
|---|---|
1 from getpass import getpass |
1 from getpass import getpass |
2 from iri_tweet import models, utils |
2 from iri_tweet import models, utils |
3 from iri_tweet.models import TweetSource, TweetLog, ProcessEvent |
3 from iri_tweet.models import TweetSource, TweetLog, ProcessEvent |
4 from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event, |
4 from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event, |
5 get_logger) |
5 get_logger) |
6 from optparse import OptionParser |
6 from optparse import OptionParser |
7 from sqlalchemy.exc import OperationalError |
7 from sqlalchemy.exc import OperationalError |
8 from sqlalchemy.orm import scoped_session, sessionmaker |
8 from sqlalchemy.orm import scoped_session |
9 import Queue |
9 import Queue |
10 import StringIO |
10 import StringIO |
11 import anyjson |
11 import anyjson |
12 import datetime |
12 import datetime |
13 import inspect |
13 import inspect |
23 import time |
23 import time |
24 import traceback |
24 import traceback |
25 import tweepy.auth |
25 import tweepy.auth |
26 import tweetstream |
26 import tweetstream |
27 import urllib2 |
27 import urllib2 |
28 #from iri_tweet.utils import get_logger |
|
29 socket._fileobject.default_bufsize = 0 |
28 socket._fileobject.default_bufsize = 0 |
30 |
29 |
31 |
30 |
32 |
31 |
33 #columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user'] |
32 #columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user'] |
158 def run(self): |
157 def run(self): |
159 try: |
158 try: |
160 add_process_event("start_worker", self.__get_process_event_args(), self.session_maker) |
159 add_process_event("start_worker", self.__get_process_event_args(), self.session_maker) |
161 self.do_run() |
160 self.do_run() |
162 finally: |
161 finally: |
163 add_process_event("stop_worker",self.__get_process_event_args(), self.session_maker) |
162 add_process_event("stop_worker", self.__get_process_event_args(), self.session_maker) |
164 |
163 |
165 def do_run(self): |
164 def do_run(self): |
166 raise NotImplementedError() |
165 raise NotImplementedError() |
167 |
166 |
168 |
167 |
285 self.stop_event.set() |
284 self.stop_event.set() |
286 session.close() |
285 session.close() |
287 |
286 |
288 |
287 |
289 def get_sessionmaker(conn_str): |
288 def get_sessionmaker(conn_str): |
290 engine, metadata = models.setup_database(conn_str, echo=False, create_all=False) |
289 engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False) |
291 Session = scoped_session(sessionmaker(bind=engine, autocommit=False)) |
290 Session = scoped_session(Session) |
292 return Session, engine, metadata |
291 return Session, engine, metadata |
293 |
292 |
294 |
293 |
295 def process_leftovers(session, access_token, logger): |
294 def process_leftovers(session, access_token, logger): |
296 |
295 |
422 |
421 |
423 sprocess.start() |
422 sprocess.start() |
424 for cprocess in tweet_processes: |
423 for cprocess in tweet_processes: |
425 cprocess.start() |
424 cprocess.start() |
426 |
425 |
427 add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name,sprocess.pid), 'consumers':dict([(p.name,p.pid) for p in tweet_processes])}, session_maker) |
426 add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name, sprocess.pid), 'consumers':dict([(p.name, p.pid) for p in tweet_processes])}, session_maker) |
428 |
427 |
429 if options.duration >= 0: |
428 if options.duration >= 0: |
430 end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration) |
429 end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration) |
431 |
430 |
432 |
431 |
513 message = "Database %s not empty exiting" % conn_str |
512 message = "Database %s not empty exiting" % conn_str |
514 utils.get_logger().error(message) |
513 utils.get_logger().error(message) |
515 sys.exit(message) |
514 sys.exit(message) |
516 |
515 |
517 metadata.create_all(engine) |
516 metadata.create_all(engine) |
517 session = Session() |
|
518 try: |
|
519 models.add_model_version(session) |
|
520 finally: |
|
521 session.close() |
|
522 |
|
518 stop_args = {} |
523 stop_args = {} |
519 try: |
524 try: |
520 add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session) |
525 add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session) |
521 stop_args = do_run(options, Session) |
526 stop_args = do_run(options, Session) |
522 except Exception as e: |
527 except Exception as e: |