script/stream/recorder_tweetstream.py
author Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
Thu, 25 Aug 2011 02:20:08 +0200
changeset 261 d84c4aa2a9eb
parent 260 b97a72ab59a2
child 263 6671e9a4c9c5
permissions -rw-r--r--
add process event for start and shutdown
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
11
54d7f1486ac4 implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 9
diff changeset
     1
from getpass import getpass
54d7f1486ac4 implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 9
diff changeset
     2
from iri_tweet import models, utils
261
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
     3
from iri_tweet.models import TweetSource, TweetLog, ProcessEvent
255
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
     4
from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event, 
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
     5
    get_logger)
11
54d7f1486ac4 implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 9
diff changeset
     6
from optparse import OptionParser
254
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
     7
from sqlalchemy.exc import OperationalError
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
     8
from sqlalchemy.orm import scoped_session, sessionmaker
255
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
     9
import Queue
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
    10
import StringIO
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
    11
import anyjson
199
514e0ee0c68a add a duration. not quitewhat expected but that will do
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 82
diff changeset
    12
import datetime
261
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
    13
import inspect
254
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
    14
import logging
11
54d7f1486ac4 implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 9
diff changeset
    15
import os
254
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
    16
import re
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
    17
import shutil
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
    18
import signal
11
54d7f1486ac4 implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 9
diff changeset
    19
import socket
254
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
    20
import sqlalchemy.schema
11
54d7f1486ac4 implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 9
diff changeset
    21
import sys
255
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
    22
import threading
206
6d642d650470 Improve tweet recorder log info
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 199
diff changeset
    23
import time
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
    24
import traceback
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
    25
import tweepy.auth
9
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    26
import tweetstream
254
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
    27
import urllib2
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
    28
#from iri_tweet.utils import get_logger
9
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    29
socket._fileobject.default_bufsize = 0
11
54d7f1486ac4 implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 9
diff changeset
    30
9
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    31
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    32
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    33
#columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user']
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    34
columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source']
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    35
#columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following']
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    36
columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count']
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    37
#just put it in a sqlite3 tqble
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    38
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    39
254
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
    40
def set_logging(options):
255
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
    41
    utils.set_logging(options, logging.getLogger('iri.tweet'))
254
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
    42
    utils.set_logging(options, logging.getLogger('multiprocessing'))
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
    43
    if options.debug >= 2:
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
    44
        utils.set_logging(options, logging.getLogger('sqlalchemy.engine'))
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
    45
    #utils.set_logging(options, logging.getLogger('sqlalchemy.dialects'))
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
    46
    #utils.set_logging(options, logging.getLogger('sqlalchemy.pool'))
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
    47
    #utils.set_logging(options, logging.getLogger('sqlalchemy.orm'))
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
    48
255
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
    49
def set_logging_process(options, queue):
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
    50
    qlogger = utils.set_logging(options, logging.getLogger('iri.tweet.p'), queue)
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
    51
    qlogger.propagate = 0
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
    52
    return qlogger
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
    53
254
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
    54
def get_auth(options, access_token):
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
    55
    if options.username and options.password:
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
    56
        auth = tweepy.auth.BasicAuthHandler(options.username, options.password)        
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
    57
    else:
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
    58
        consumer_key = models.CONSUMER_KEY
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
    59
        consumer_secret = models.CONSUMER_SECRET
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
    60
        auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
    61
        auth.set_access_token(*access_token)
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
    62
    return auth
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
    63
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
    64
207
621fa6caec0c Merge with f093196961e770387cc7cd3e11f2b7c0881b003b
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 206
diff changeset
    65
class ReconnectingTweetStream(tweetstream.FilterStream):
9
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    66
    """TweetStream class that automatically tries to reconnect if the
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    67
    connecting goes down. Reconnecting, and waiting for reconnecting, is
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    68
    blocking.
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    69
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    70
    :param username: See :TweetStream:
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    71
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    72
    :param password: See :TweetStream:
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    73
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    74
    :keyword url: See :TweetStream:
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    75
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    76
    :keyword reconnects: Number of reconnects before a ConnectionError is
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    77
        raised. Default is 3
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    78
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    79
    :error_cb: Optional callable that will be called just before trying to
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    80
        reconnect. The callback will be called with a single argument, the
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    81
        exception that caused the reconnect attempt. Default is None
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    82
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    83
    :retry_wait: Time to wait before reconnecting in seconds. Default is 5
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    84
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    85
    """
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    86
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
    87
    def __init__(self, auth, keywords, reconnects=3, error_cb=None, retry_wait=5, as_text=False, **kwargs):
9
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    88
        self.max_reconnects = reconnects
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    89
        self.retry_wait = retry_wait
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    90
        self._reconnects = 0
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    91
        self._error_cb = error_cb
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
    92
        super(ReconnectingTweetStream, self).__init__(auth=auth, track=keywords, as_text=as_text, **kwargs)
9
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    93
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    94
    def next(self):
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    95
        while True:
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    96
            try:
254
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
    97
                utils.get_logger().debug("return super.next")
199
514e0ee0c68a add a duration. not quitewhat expected but that will do
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 82
diff changeset
    98
                return super(ReconnectingTweetStream, self).next()
9
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    99
            except tweetstream.ConnectionError, e:
254
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   100
                utils.get_logger().debug("connection error :" + str(e))
9
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   101
                self._reconnects += 1
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   102
                if self._reconnects > self.max_reconnects:
15
5d552b6a0e55 add oauth authentication to tweetstream
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 11
diff changeset
   103
                    raise tweetstream.ConnectionError("Too many retries")
9
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   104
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   105
                # Note: error_cb is not called on the last error since we
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   106
                # raise a ConnectionError instead
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   107
                if  callable(self._error_cb):
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   108
                    self._error_cb(e)
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   109
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   110
                time.sleep(self.retry_wait)
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   111
        # Don't listen to auth error, since we can't reasonably reconnect
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   112
        # when we get one.
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   113
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   114
261
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   115
class BaseProcess(Process):
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   116
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   117
    def __init__(self, parent_pid):
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   118
        self.parent_pid = parent_pid
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   119
        super(BaseProcess, self).__init__()
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   120
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   121
    #
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   122
    # from http://stackoverflow.com/questions/2542610/python-daemon-doesnt-kill-its-kids
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   123
    #
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   124
    def parent_is_alive(self):
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   125
        try:
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   126
            # try to call Parent
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   127
            os.kill(self.parent_pid, 0)
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   128
        except OSError:
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   129
            # *beeep* oh no! The phone's disconnected!
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   130
            return False
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   131
        else:
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   132
            # *ring* Hi mom!
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   133
            return True
9
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   134
254
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   135
261
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   136
class SourceProcess(BaseProcess):
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   137
    
261
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   138
    def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   139
        self.session_maker = session_maker
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   140
        self.queue = queue
254
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   141
        self.track = options.track
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   142
        self.reconnects = options.reconnects
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   143
        self.token_filename = options.token_filename
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   144
        self.stop_event = stop_event
254
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   145
        self.options = options
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   146
        self.access_token = access_token
255
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   147
        self.logger_queue = logger_queue
261
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   148
        super(SourceProcess, self).__init__(parent_pid)
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   149
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   150
    def run(self):
255
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   151
        
254
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   152
        #import pydevd
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   153
        #pydevd.settrace(suspend=False)
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   154
255
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   155
        self.logger = set_logging_process(self.options, self.logger_queue)
254
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   156
        self.auth = get_auth(self.options, self.access_token) 
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   157
        
255
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   158
        self.logger.debug("SourceProcess : run")
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   159
        track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   160
        track_list = [k for k in track_list.split(',')]
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   161
255
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   162
        self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list))                        
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   163
        stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True)
255
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   164
        self.logger.debug("SourceProcess : after connecting to stream")
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   165
        stream.muststop = lambda: self.stop_event.is_set()
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   166
        
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   167
        session = self.session_maker()
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   168
        
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   169
        try:
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   170
            for tweet in stream:
261
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   171
                if not self.parent_is_alive():
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   172
                    sys.exit()
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   173
                self.logger.debug("SourceProcess : tweet " + repr(tweet))
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   174
                source = TweetSource(original_json=tweet)
261
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   175
                self.logger.debug("SourceProcess : source created")
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   176
                add_retries = 0
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   177
                while add_retries < 10:
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   178
                    try:
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   179
                        add_retries += 1
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   180
                        session.add(source)
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   181
                        session.flush()
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   182
                        break
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   183
                    except OperationalError as e:
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   184
                        session.rollback()
261
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   185
                        self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries))
254
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   186
                        if add_retries == 10:
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   187
                            raise e
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   188
                     
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   189
                source_id = source.id
261
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   190
                self.logger.debug("SourceProcess : before queue + source id " + repr(source_id))
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   191
                self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   192
                session.commit()
254
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   193
                self.queue.put((source_id, tweet), False)
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   194
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   195
        except Exception as e:
261
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   196
            self.logger.error("SourceProcess : Error when processing tweet " + repr(e))
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   197
        finally:
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   198
            session.rollback()
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   199
            stream.close()
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   200
            session.close()
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   201
            self.queue.close()
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   202
            self.stop_event.set()
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   203
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   204
255
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   205
def process_tweet(tweet, source_id, session, access_token, logger):
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   206
    try:
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   207
        tweet_obj = anyjson.deserialize(tweet)
255
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   208
        if 'text' not in tweet_obj:
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   209
            tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   210
            session.add(tweet_log)
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   211
            return
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   212
        screen_name = ""
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   213
        if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   214
            screen_name = tweet_obj['user']['screen_name']
255
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   215
        logger.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   216
        logger.debug(u"Process_tweet :" + repr(tweet))
254
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   217
        processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None)
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   218
        processor.process()
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   219
    except Exception as e:
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   220
        message = u"Error %s processing tweet %s" % (repr(e), tweet)
255
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   221
        logger.error(message)
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   222
        output = StringIO.StringIO()
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   223
        traceback.print_exception(Exception, e, None, None, output)
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   224
        error_stack = output.getvalue()
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   225
        output.close()
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   226
        session.rollback()
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   227
        tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack)
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   228
        session.add(tweet_log)
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   229
        session.commit()
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   230
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   231
    
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   232
        
261
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   233
class TweetProcess(BaseProcess):
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   234
    
261
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   235
    def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   236
        self.session_maker = session_maker
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   237
        self.queue = queue
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   238
        self.stop_event = stop_event
254
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   239
        self.options = options
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   240
        self.access_token = access_token
255
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   241
        self.logger_queue = logger_queue
261
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   242
        super(TweetProcess, self).__init__(parent_pid)
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   243
9
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   244
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   245
    def run(self):
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   246
        
255
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   247
        self.logger = set_logging_process(self.options, self.logger_queue)
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   248
        session = self.session_maker()
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   249
        try:
261
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   250
            while not self.stop_event.is_set() and self.parent_is_alive():
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   251
                try:
254
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   252
                    source_id, tweet_txt = queue.get(True, 3)
255
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   253
                    self.logger.debug("Processing source id " + repr(source_id))
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   254
                except Exception as e:
255
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   255
                    self.logger.debug('Process tweet exception in loop : ' + repr(e))
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   256
                    continue
255
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   257
                process_tweet(tweet_txt, source_id, session, self.access_token, self.logger)
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   258
                session.commit()
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   259
        finally:
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   260
            session.rollback()
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   261
            self.stop_event.set()
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   262
            session.close()
254
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   263
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   264
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   265
def get_sessionmaker(conn_str):
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   266
    engine, metadata = models.setup_database(conn_str, echo=False, create_all=False)
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   267
    Session = scoped_session(sessionmaker(bind=engine, autocommit=False))
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   268
    return Session, engine, metadata
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   269
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   270
            
255
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   271
def process_leftovers(session, access_token, logger):
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   272
    
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   273
    sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   274
    
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   275
    for src in sources:
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   276
        tweet_txt = src.original_json
255
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   277
        process_tweet(tweet_txt, src.id, session, access_token, logger)
254
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   278
        session.commit()
9
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   279
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   280
        
15
5d552b6a0e55 add oauth authentication to tweetstream
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 11
diff changeset
   281
    
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   282
    #get tweet source that do not match any message
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   283
    #select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull;
255
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   284
def process_log(logger_queues, stop_event):
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   285
    while not stop_event.is_set():
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   286
        for lqueue in logger_queues:
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   287
            try:
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   288
                record = lqueue.get_nowait()
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   289
                logging.getLogger(record.name).handle(record)
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   290
            except Queue.Empty:
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   291
                continue
256
2f335337ff64 Do not stop on IOErrors
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 255
diff changeset
   292
            except IOError:
2f335337ff64 Do not stop on IOErrors
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 255
diff changeset
   293
                continue
255
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   294
        time.sleep(0.1)
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   295
11
54d7f1486ac4 implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 9
diff changeset
   296
        
54d7f1486ac4 implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 9
diff changeset
   297
def get_options():
261
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   298
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   299
    usage = "usage: %prog [options]"
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   300
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   301
    parser = OptionParser(usage=usage)
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   302
254
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   303
    parser.add_option("-f", "--file", dest="conn_str",
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   304
                      help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db")
9
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   305
    parser.add_option("-u", "--user", dest="username",
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   306
                      help="Twitter user", metavar="USER", default=None)
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   307
    parser.add_option("-w", "--password", dest="password",
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   308
                      help="Twitter password", metavar="PASSWORD", default=None)
15
5d552b6a0e55 add oauth authentication to tweetstream
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 11
diff changeset
   309
    parser.add_option("-T", "--track", dest="track",
9
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   310
                      help="Twitter track", metavar="TRACK")
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   311
    parser.add_option("-n", "--new", dest="new", action="store_true",
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   312
                      help="new database", default=False)
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   313
    parser.add_option("-r", "--reconnects", dest="reconnects",
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   314
                      help="Reconnects", metavar="RECONNECTS", default=10, type='int')
15
5d552b6a0e55 add oauth authentication to tweetstream
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 11
diff changeset
   315
    parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
5d552b6a0e55 add oauth authentication to tweetstream
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 11
diff changeset
   316
                      help="Token file name")
199
514e0ee0c68a add a duration. not quitewhat expected but that will do
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 82
diff changeset
   317
    parser.add_option("-d", "--duration", dest="duration",
514e0ee0c68a add a duration. not quitewhat expected but that will do
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 82
diff changeset
   318
                      help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
254
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   319
    parser.add_option("-N", "--nb-process", dest="process_nb",
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   320
                      help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int')
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   321
11
54d7f1486ac4 implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 9
diff changeset
   322
    utils.set_logging_options(parser)
9
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   323
11
54d7f1486ac4 implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 9
diff changeset
   324
    return parser.parse_args()
54d7f1486ac4 implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 9
diff changeset
   325
    
261
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   326
def add_process_event(type, args, session_maker):
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   327
    session = session_maker()
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   328
    try:
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   329
        evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type)
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   330
        session.add(evt)
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   331
        session.commit()
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   332
    finally:
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   333
        session.close()
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   334
9
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   335
11
54d7f1486ac4 implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 9
diff changeset
   336
if __name__ == '__main__':
255
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   337
261
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   338
    stop_args = {}
11
54d7f1486ac4 implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 9
diff changeset
   339
    (options, args) = get_options()
54d7f1486ac4 implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 9
diff changeset
   340
    
254
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   341
    set_logging(options)
9
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   342
    
261
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   343
    utils.get_logger().debug("OPTIONS : " + repr(options))    
254
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   344
    
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   345
    conn_str = options.conn_str.strip()
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   346
    if not re.match("^\w+://.+", conn_str):
255
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   347
        conn_str = 'sqlite:///' + options.conn_str
254
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   348
        
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   349
    if conn_str.startswith("sqlite") and options.new:
261
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   350
        filepath = conn_str[conn_str.find(":///") + 4:]
254
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   351
        if os.path.exists(filepath):
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   352
            i = 1
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   353
            basename, extension = os.path.splitext(filepath)
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   354
            new_path = '%s.%d%s' % (basename, i, extension)
254
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   355
            while i < 1000000 and os.path.exists(new_path):
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   356
                i += 1
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   357
                new_path = '%s.%d%s' % (basename, i, extension)
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   358
            if i >= 1000000:
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   359
                raise Exception("Unable to find new filename for " + filepath)
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   360
            else:
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   361
                shutil.move(filepath, new_path)
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   362
254
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   363
    Session, engine, metadata = get_sessionmaker(conn_str)
9
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   364
    
254
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   365
    if options.new:
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   366
        check_metadata = sqlalchemy.schema.MetaData(bind=engine, reflect=True)
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   367
        if len(check_metadata.sorted_tables) > 0:
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   368
            message = "Database %s not empty exiting" % conn_str
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   369
            utils.get_logger().error(message)
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   370
            sys.exit(message)
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   371
    
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   372
    metadata.create_all(engine)
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   373
    
261
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   374
    add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session)
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   375
    
254
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   376
    access_token = None
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   377
    if not options.username or not options.password:
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   378
        access_token = utils.get_oauth_token(options.token_filename)
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   379
    
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   380
    session = Session()
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   381
    try:
255
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   382
        process_leftovers(session, access_token, utils.get_logger())
254
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   383
        session.commit()
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   384
    finally:
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   385
        session.rollback()
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   386
        session.close()
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   387
    
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   388
    if options.process_nb <= 0:
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   389
        utils.get_logger().debug("Leftovers processed. Exiting.")
261
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   390
        add_process_event(type="shutdown", args=None, session_maker=Session)
254
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   391
        sys.exit()
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   392
255
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   393
    queue = mQueue()
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   394
    stop_event = Event()
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   395
    
254
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   396
    #workaround for bug on using urllib2 and multiprocessing
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   397
    req = urllib2.Request('http://localhost')
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   398
    conn = None
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   399
    try:
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   400
        conn = urllib2.urlopen(req)
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   401
    except:
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   402
        pass
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   403
        #donothing
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   404
    finally:
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   405
        if conn is not None:
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   406
            conn.close()
255
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   407
    
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   408
    process_engines = []
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   409
    logger_queues = []
254
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   410
    
255
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   411
    SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   412
    process_engines.append(engine_process)
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   413
    lqueue = mQueue(1)
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   414
    logger_queues.append(lqueue)
261
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   415
    pid = os.getpid()
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   416
    sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)    
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   417
    
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   418
    tweet_processes = []
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   419
    
254
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   420
    for i in range(options.process_nb - 1):
255
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   421
        SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   422
        process_engines.append(engine_process)
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   423
        lqueue = mQueue(1)
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   424
        logger_queues.append(lqueue)
261
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   425
        cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   426
        tweet_processes.append(cprocess)
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   427
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   428
    def interupt_handler(signum, frame):
261
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   429
        global stop_args
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   430
        utils.get_logger().debug("shutdown asked " + repr(signum) + "  " + repr(inspect.getframeinfo(frame, 9)))
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   431
        stop_args = {'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)}
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   432
        stop_event.set()
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   433
        
260
b97a72ab59a2 add more signals to shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 256
diff changeset
   434
    signal.signal(signal.SIGINT , interupt_handler)
b97a72ab59a2 add more signals to shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 256
diff changeset
   435
    signal.signal(signal.SIGHUP , interupt_handler)
b97a72ab59a2 add more signals to shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 256
diff changeset
   436
    signal.signal(signal.SIGALRM, interupt_handler)
b97a72ab59a2 add more signals to shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 256
diff changeset
   437
    signal.signal(signal.SIGTERM, interupt_handler)
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   438
261
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   439
    log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,))
255
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   440
    log_thread.daemon = True
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   441
261
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   442
    log_thread.start()
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   443
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   444
    sprocess.start()
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   445
    for cprocess in tweet_processes:
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   446
        cprocess.start()
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   447
261
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   448
    add_process_event("pid", {"main":os.getpid(), 'source':sprocess.pid, 'consumers':[p.pid for p in tweet_processes]}, Session)
255
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   449
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   450
    if options.duration >= 0:
255
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   451
        end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)    
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   452
    
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   453
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   454
    while not stop_event.is_set():
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   455
        if options.duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   456
            stop_event.set()
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   457
            break
255
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   458
        if sprocess.is_alive():            
254
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   459
            time.sleep(1)
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   460
        else:
254
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   461
            stop_event.set()
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   462
            break
254
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   463
    utils.get_logger().debug("Joining Source Process")
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   464
    try:
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   465
        sprocess.join(10)
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   466
    except:
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   467
        utils.get_logger().debug("Pb joining Source Process - terminating")
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   468
        sprocess.terminate()
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   469
        
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   470
    for i, cprocess in enumerate(tweet_processes):
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   471
        utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   472
        try:
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   473
            cprocess.join(3)
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   474
        except:
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   475
            utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   476
            cprocess.terminate()
255
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   477
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   478
    
255
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   479
    utils.get_logger().debug("Close queues")
254
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   480
    try:
255
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   481
        queue.close()
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   482
        for lqueue in logger_queues:
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   483
            lqueue.close()
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   484
    except exception as e:
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   485
        utils.get_logger().error("error when closing queues %s", repr(e))
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   486
        #do nothing
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   487
        
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   488
    
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   489
    if options.process_nb > 1:
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   490
        utils.get_logger().debug("Processing leftovers")
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   491
        session = Session()
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   492
        try:
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   493
            process_leftovers(session, access_token, utils.get_logger())
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   494
            session.commit()
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   495
        finally:
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   496
            session.rollback()
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   497
            session.close()
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   498
255
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   499
    for pengine in process_engines:
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   500
        pengine.dispose()
500cd0405c7a improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 254
diff changeset
   501
        
261
d84c4aa2a9eb add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 260
diff changeset
   502
    add_process_event(type="shutdown", args=stop_args, session_maker=Session)
254
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   503
    utils.get_logger().debug("Done. Exiting.")
2209e66bb50b multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 243
diff changeset
   504