script/stream/recorder_tweetstream.py
author Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
Tue, 09 Aug 2011 12:41:37 +0200
changeset 245 4c953ca2aa1d
parent 243 9213a63fa34a
child 254 2209e66bb50b
permissions -rw-r--r--
commit .project
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
11
54d7f1486ac4 implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 9
diff changeset
     1
from getpass import getpass
54d7f1486ac4 implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 9
diff changeset
     2
from iri_tweet import models, utils
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
     3
from iri_tweet.models import TweetSource, TweetLog
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
     4
from multiprocessing import Queue, JoinableQueue, Process, Event, get_logger
11
54d7f1486ac4 implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 9
diff changeset
     5
from optparse import OptionParser
54d7f1486ac4 implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 9
diff changeset
     6
from sqlalchemy.orm import sessionmaker
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
     7
import StringIO
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
     8
import logging
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
     9
import anyjson
199
514e0ee0c68a add a duration. not quitewhat expected but that will do
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 82
diff changeset
    10
import datetime
11
54d7f1486ac4 implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 9
diff changeset
    11
import os
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
    12
import shutil
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
    13
import signal
11
54d7f1486ac4 implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 9
diff changeset
    14
import socket
54d7f1486ac4 implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 9
diff changeset
    15
import sys
206
6d642d650470 Improve tweet recorder log info
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 199
diff changeset
    16
import time
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
    17
import traceback
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
    18
import tweepy.auth
9
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    19
import tweetstream
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
    20
from iri_tweet.utils import logger
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
    21
from sqlalchemy.exc import OperationalError
9
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    22
socket._fileobject.default_bufsize = 0
11
54d7f1486ac4 implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 9
diff changeset
    23
9
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    24
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    25
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    26
#columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user']
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    27
columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source']
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    28
#columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following']
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    29
columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count']
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    30
#just put it in a sqlite3 tqble
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    31
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    32
207
621fa6caec0c Merge with f093196961e770387cc7cd3e11f2b7c0881b003b
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 206
diff changeset
    33
class ReconnectingTweetStream(tweetstream.FilterStream):
9
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    34
    """TweetStream class that automatically tries to reconnect if the
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    35
    connecting goes down. Reconnecting, and waiting for reconnecting, is
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    36
    blocking.
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    37
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    38
    :param username: See :TweetStream:
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    39
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    40
    :param password: See :TweetStream:
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    41
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    42
    :keyword url: See :TweetStream:
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    43
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    44
    :keyword reconnects: Number of reconnects before a ConnectionError is
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    45
        raised. Default is 3
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    46
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    47
    :error_cb: Optional callable that will be called just before trying to
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    48
        reconnect. The callback will be called with a single argument, the
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    49
        exception that caused the reconnect attempt. Default is None
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    50
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    51
    :retry_wait: Time to wait before reconnecting in seconds. Default is 5
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    52
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    53
    """
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    54
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
    55
    def __init__(self, auth, keywords, reconnects=3, error_cb=None, retry_wait=5, as_text=False, **kwargs):
9
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    56
        self.max_reconnects = reconnects
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    57
        self.retry_wait = retry_wait
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    58
        self._reconnects = 0
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    59
        self._error_cb = error_cb
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
    60
        super(ReconnectingTweetStream, self).__init__(auth=auth, track=keywords, as_text=as_text, **kwargs)
9
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    61
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    62
    def next(self):
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    63
        while True:
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    64
            try:
199
514e0ee0c68a add a duration. not quitewhat expected but that will do
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 82
diff changeset
    65
                return super(ReconnectingTweetStream, self).next()
9
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    66
            except tweetstream.ConnectionError, e:
15
5d552b6a0e55 add oauth authentication to tweetstream
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 11
diff changeset
    67
                logging.debug("connection error :" + str(e))
9
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    68
                self._reconnects += 1
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    69
                if self._reconnects > self.max_reconnects:
15
5d552b6a0e55 add oauth authentication to tweetstream
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 11
diff changeset
    70
                    raise tweetstream.ConnectionError("Too many retries")
9
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    71
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    72
                # Note: error_cb is not called on the last error since we
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    73
                # raise a ConnectionError instead
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    74
                if  callable(self._error_cb):
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    75
                    self._error_cb(e)
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    76
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    77
                time.sleep(self.retry_wait)
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    78
        # Don't listen to auth error, since we can't reasonably reconnect
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    79
        # when we get one.
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    80
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    81
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
    82
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
    83
class SourceProcess(Process):
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
    84
    
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
    85
    def __init__(self, session_maker, queue, auth, track, debug, reconnects, token_filename, stop_event):
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
    86
        self.session_maker = session_maker
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
    87
        self.queue = queue
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
    88
        self.auth = auth
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
    89
        self.track = track
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
    90
        self.debug = debug
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
    91
        self.reconnects = reconnects
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
    92
        self.token_filename = token_filename
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
    93
        self.stop_event = stop_event
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
    94
        super(SourceProcess, self).__init__()
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
    95
#        self.stop_event = 
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
    96
    
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
    97
    def run(self):
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
    98
        
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
    99
        get_logger().debug("SourceProcess : run")
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   100
        track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   101
        track_list = [k for k in track_list.split(',')]
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   102
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   103
        get_logger().debug("SourceProcess : before connecting to stream " + repr(track_list))                        
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   104
        stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True)
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   105
        get_logger().debug("SourceProcess : after connecting to stream")
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   106
        stream.muststop = lambda: self.stop_event.is_set()
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   107
        
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   108
        session = self.session_maker()
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   109
        
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   110
        try:
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   111
            for tweet in stream:
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   112
                get_logger().debug("tweet " + repr(tweet))
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   113
                source = TweetSource(original_json=tweet)
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   114
                get_logger().debug("source created")
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   115
                add_retries = 0
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   116
                while add_retries < 10:
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   117
                    try:
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   118
                        add_retries += 1
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   119
                        session.add(source)
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   120
                        session.flush()
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   121
                        break
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   122
                    except OperationalError as e:
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   123
                        session.rollback()
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   124
                        get_logger().debug("Operational Error %s nb %d" % (repr(e), add_retries))
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   125
                        if add_retries==10:
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   126
                            raise e
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   127
                     
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   128
                source_id = source.id
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   129
                get_logger().debug("before queue + source id " + repr(source_id))
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   130
                self.queue.put((source_id, tweet), False)
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   131
                #process_tweet(tweet, source_id, self.session, self.debug, self.token_filename)
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   132
                get_logger().info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   133
                session.commit()
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   134
#                if duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   135
#                    print "Stop recording after %d seconds." % (duration)
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   136
#                    break
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   137
        except Exception as e:
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   138
            get_logger().error("Error when processing tweet " + repr(e))
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   139
        finally:
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   140
            session.rollback()
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   141
            stream.close()
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   142
            session.close()
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   143
            self.queue.close()
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   144
            self.stop_event.set()
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   145
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   146
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   147
def process_tweet(tweet, source_id, session, token_filename):
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   148
    try:
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   149
        tweet_obj = anyjson.deserialize(tweet)
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   150
        screen_name = ""
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   151
        if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   152
            screen_name = tweet_obj['user']['screen_name']
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   153
        get_logger().info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   154
        get_logger().debug(u"Process_tweet :" + repr(tweet))
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   155
        processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, token_filename)
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   156
        processor.process()
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   157
    except Exception as e:
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   158
        message = u"Error %s processing tweet %s" % (repr(e), tweet)
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   159
        get_logger().error(message)
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   160
        output = StringIO.StringIO()
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   161
        traceback.print_exception(Exception, e, None, None, output)
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   162
        error_stack = output.getvalue()
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   163
        output.close()
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   164
        session.rollback()
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   165
        tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack)
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   166
        session.add(tweet_log)
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   167
        session.commit()
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   168
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   169
    
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   170
        
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   171
class TweetProcess(Process):
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   172
    
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   173
    def __init__(self, session_maker, queue, debug, token_filename, stop_event):
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   174
        self.session_maker = session_maker
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   175
        self.queue = queue
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   176
        self.debug = debug
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   177
        self.stop_event = stop_event
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   178
        self.token_filename = token_filename
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   179
        super(TweetProcess, self).__init__()
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   180
9
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   181
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   182
    def run(self):
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   183
        
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   184
        session = self.session_maker()
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   185
        try:
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   186
            while not self.stop_event.is_set():
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   187
                try:
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   188
                    source_id, tweet_txt = queue.get(True, 10)
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   189
                    get_logger().debug("Processing source id " + repr(source_id))
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   190
                except Exception as e:
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   191
                    get_logger().debug('Process tweet exception in loop : ' + repr(e))
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   192
                    continue
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   193
                process_tweet(tweet_txt, source_id, session, self.token_filename)
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   194
                session.commit()
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   195
        except:
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   196
            raise
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   197
        finally:
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   198
            session.rollback()
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   199
            self.stop_event.set()
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   200
            session.close()
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   201
            
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   202
def process_leftovers(session, token_filename):
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   203
    
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   204
    sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   205
    
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   206
    for src in sources:
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   207
        tweet_txt = src.original_json
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   208
        process_tweet(tweet_txt, src.id, session, token_filename)
9
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   209
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   210
        
15
5d552b6a0e55 add oauth authentication to tweetstream
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 11
diff changeset
   211
    
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   212
    #get tweet source that do not match any message
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   213
    #select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull;
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   214
11
54d7f1486ac4 implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 9
diff changeset
   215
        
54d7f1486ac4 implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 9
diff changeset
   216
def get_options():
9
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   217
    parser = OptionParser()
199
514e0ee0c68a add a duration. not quitewhat expected but that will do
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 82
diff changeset
   218
    parser.add_option("-f", "--file", dest="filename",
9
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   219
                      help="write tweet to FILE", metavar="FILE", default="enmi2010_twitter.db")
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   220
    parser.add_option("-u", "--user", dest="username",
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   221
                      help="Twitter user", metavar="USER", default=None)
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   222
    parser.add_option("-w", "--password", dest="password",
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   223
                      help="Twitter password", metavar="PASSWORD", default=None)
15
5d552b6a0e55 add oauth authentication to tweetstream
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 11
diff changeset
   224
    parser.add_option("-T", "--track", dest="track",
9
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   225
                      help="Twitter track", metavar="TRACK")
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   226
    parser.add_option("-n", "--new", dest="new", action="store_true",
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   227
                      help="new database", default=False)
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   228
    parser.add_option("-r", "--reconnects", dest="reconnects",
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   229
                      help="Reconnects", metavar="RECONNECTS", default=10, type='int')
15
5d552b6a0e55 add oauth authentication to tweetstream
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 11
diff changeset
   230
    parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
5d552b6a0e55 add oauth authentication to tweetstream
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 11
diff changeset
   231
                      help="Token file name")
199
514e0ee0c68a add a duration. not quitewhat expected but that will do
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 82
diff changeset
   232
    parser.add_option("-d", "--duration", dest="duration",
514e0ee0c68a add a duration. not quitewhat expected but that will do
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 82
diff changeset
   233
                      help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   234
    parser.add_option("-N", "--consumer", dest="consumer_nb",
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   235
                      help="number of consumer", metavar="CONSUMER", default=1, type='int')
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   236
199
514e0ee0c68a add a duration. not quitewhat expected but that will do
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 82
diff changeset
   237
15
5d552b6a0e55 add oauth authentication to tweetstream
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 11
diff changeset
   238
11
54d7f1486ac4 implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 9
diff changeset
   239
    utils.set_logging_options(parser)
9
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   240
11
54d7f1486ac4 implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 9
diff changeset
   241
    return parser.parse_args()
54d7f1486ac4 implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 9
diff changeset
   242
    
9
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   243
11
54d7f1486ac4 implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 9
diff changeset
   244
if __name__ == '__main__':
9
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   245
    
11
54d7f1486ac4 implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 9
diff changeset
   246
    (options, args) = get_options()
54d7f1486ac4 implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 9
diff changeset
   247
    
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   248
    utils.set_logging(options, get_logger())
11
54d7f1486ac4 implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 9
diff changeset
   249
        
9
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   250
    if options.debug:
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   251
        print "OPTIONS : "
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   252
        print repr(options)
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   253
    
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   254
    if options.new and os.path.exists(options.filename):
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   255
        i = 1
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   256
        basename, extension = os.path.splitext(options.filename)
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   257
        new_path = '%s.%d%s' % (basename, i, extension)
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   258
        while i < 1000000 and os.path.exists(new_path):
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   259
            i += 1
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   260
            new_path = '%s.%d%s' % (basename, i, extension)
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   261
        if i >= 1000000:
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   262
            raise Exception("Unable to find new filename for " + options.filename)
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   263
        else:
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   264
            shutil.move(options.filename, new_path)
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   265
9
bb44692e09ee script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff changeset
   266
    
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   267
    queue = JoinableQueue()
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   268
    stop_event = Event()
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   269
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   270
    if options.username and options.password:
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   271
        auth = tweepy.auth.BasicAuthHandler(options.username, options.password)        
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   272
    else:
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   273
        consumer_key = models.CONSUMER_KEY
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   274
        consumer_secret = models.CONSUMER_SECRET
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   275
        auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   276
        auth.set_access_token(*(utils.get_oauth_token(options.token_filename)))
11
54d7f1486ac4 implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 9
diff changeset
   277
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   278
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   279
    engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2))
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   280
    Session = sessionmaker(bind=engine)
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   281
    
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   282
    session = Session()
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   283
    process_leftovers(session, options.token_filename)
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   284
    session.commit()
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   285
    session.close()
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   286
         
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   287
    sprocess = SourceProcess(Session, queue, auth, options.track, options.debug, options.reconnects, options.token_filename, stop_event)    
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   288
    
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   289
    tweet_processes = []
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   290
    
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   291
    for i in range(options.consumer_nb):
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   292
        engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2))
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   293
        Session = sessionmaker(bind=engine)
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   294
        cprocess = TweetProcess(Session, queue, options.debug, options.token_filename, stop_event)
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   295
        tweet_processes.append(cprocess)
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   296
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   297
    def interupt_handler(signum, frame):
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   298
        stop_event.set()
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   299
        
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   300
    signal.signal(signal.SIGINT, interupt_handler)
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   301
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   302
    sprocess.start()
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   303
    for cprocess in tweet_processes:
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   304
        cprocess.start()
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   305
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   306
    if options.duration >= 0:
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   307
        end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   308
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   309
    while not stop_event.is_set():
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   310
        if options.duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   311
            stop_event.set()
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   312
            break
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   313
        if sprocess.is_alive():
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   314
            time.sleep(0.1)
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   315
        else:
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   316
            break
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   317
    
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   318
    get_logger().debug("Joining Source Process")
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   319
    sprocess.join()
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   320
    get_logger().debug("Joining Queue")
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   321
    #queue.join()
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   322
    for i,cprocess in enumerate(tweet_processes):
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   323
        get_logger().debug("Joining consumer process Nb %d" % (i+1))
242
cdd7d3c0549c Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 207
diff changeset
   324
        cprocess.join()
243
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   325
    
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   326
    get_logger().debug("Processing leftovers")
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   327
    session = Session()
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   328
    process_leftovers(session, options.token_filename)
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   329
    session.commit()
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   330
    session.close()
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   331
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   332
    get_logger().debug("Done. Exiting.")
9213a63fa34a - debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents: 242
diff changeset
   333