| author | Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com> |
| Fri, 09 Sep 2011 19:58:39 +0200 | |
| changeset 272 | fe2efe3600ea |
| parent 263 | 6671e9a4c9c5 |
| child 289 | a5eff8f2b81d |
| permissions | -rw-r--r-- |
|
11
54d7f1486ac4
implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
9
diff
changeset
|
1 |
from getpass import getpass |
|
54d7f1486ac4
implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
9
diff
changeset
|
2 |
from iri_tweet import models, utils |
|
261
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
3 |
from iri_tweet.models import TweetSource, TweetLog, ProcessEvent |
|
255
500cd0405c7a
improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
254
diff
changeset
|
4 |
from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event, |
|
500cd0405c7a
improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
254
diff
changeset
|
5 |
get_logger) |
|
11
54d7f1486ac4
implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
9
diff
changeset
|
6 |
from optparse import OptionParser |
|
254
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
7 |
from sqlalchemy.exc import OperationalError |
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
8 |
from sqlalchemy.orm import scoped_session, sessionmaker |
|
255
500cd0405c7a
improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
254
diff
changeset
|
9 |
import Queue |
|
242
cdd7d3c0549c
Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
207
diff
changeset
|
10 |
import StringIO |
|
cdd7d3c0549c
Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
207
diff
changeset
|
11 |
import anyjson |
|
199
514e0ee0c68a
add a duration. not quitewhat expected but that will do
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
82
diff
changeset
|
12 |
import datetime |
|
261
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
13 |
import inspect |
|
254
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
14 |
import logging |
|
11
54d7f1486ac4
implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
9
diff
changeset
|
15 |
import os |
|
254
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
16 |
import re |
|
242
cdd7d3c0549c
Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
207
diff
changeset
|
17 |
import shutil |
|
cdd7d3c0549c
Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
207
diff
changeset
|
18 |
import signal |
|
11
54d7f1486ac4
implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
9
diff
changeset
|
19 |
import socket |
|
254
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
20 |
import sqlalchemy.schema |
|
11
54d7f1486ac4
implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
9
diff
changeset
|
21 |
import sys |
|
255
500cd0405c7a
improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
254
diff
changeset
|
22 |
import threading |
|
206
6d642d650470
Improve tweet recorder log info
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
199
diff
changeset
|
23 |
import time |
|
242
cdd7d3c0549c
Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
207
diff
changeset
|
24 |
import traceback |
|
cdd7d3c0549c
Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
207
diff
changeset
|
25 |
import tweepy.auth |
|
9
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
26 |
import tweetstream |
|
254
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
27 |
import urllib2 |
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
28 |
#from iri_tweet.utils import get_logger |
|
9
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
29 |
socket._fileobject.default_bufsize = 0 |
|
11
54d7f1486ac4
implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
9
diff
changeset
|
30 |
|
|
9
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
31 |
|
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
32 |
|
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
33 |
#columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user'] |
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
34 |
columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source'] |
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
35 |
#columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following'] |
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
36 |
columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count'] |
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
37 |
#just put it in a sqlite3 tqble |
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
38 |
|
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
39 |
|
|
254
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
40 |
def set_logging(options): |
|
272
fe2efe3600ea
add daemon option
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
263
diff
changeset
|
41 |
loggers = [] |
|
fe2efe3600ea
add daemon option
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
263
diff
changeset
|
42 |
|
|
fe2efe3600ea
add daemon option
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
263
diff
changeset
|
43 |
loggers.append(utils.set_logging(options, logging.getLogger('iri.tweet'))) |
|
fe2efe3600ea
add daemon option
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
263
diff
changeset
|
44 |
loggers.append(utils.set_logging(options, logging.getLogger('multiprocessing'))) |
|
254
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
45 |
if options.debug >= 2: |
|
272
fe2efe3600ea
add daemon option
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
263
diff
changeset
|
46 |
loggers.append(utils.set_logging(options, logging.getLogger('sqlalchemy.engine'))) |
|
254
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
47 |
#utils.set_logging(options, logging.getLogger('sqlalchemy.dialects')) |
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
48 |
#utils.set_logging(options, logging.getLogger('sqlalchemy.pool')) |
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
49 |
#utils.set_logging(options, logging.getLogger('sqlalchemy.orm')) |
|
272
fe2efe3600ea
add daemon option
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
263
diff
changeset
|
50 |
return loggers |
|
254
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
51 |
|
|
255
500cd0405c7a
improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
254
diff
changeset
|
52 |
def set_logging_process(options, queue): |
|
500cd0405c7a
improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
254
diff
changeset
|
53 |
qlogger = utils.set_logging(options, logging.getLogger('iri.tweet.p'), queue) |
|
500cd0405c7a
improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
254
diff
changeset
|
54 |
qlogger.propagate = 0 |
|
500cd0405c7a
improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
254
diff
changeset
|
55 |
return qlogger |
|
500cd0405c7a
improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
254
diff
changeset
|
56 |
|
|
254
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
57 |
def get_auth(options, access_token): |
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
58 |
if options.username and options.password: |
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
59 |
auth = tweepy.auth.BasicAuthHandler(options.username, options.password) |
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
60 |
else: |
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
61 |
consumer_key = models.CONSUMER_KEY |
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
62 |
consumer_secret = models.CONSUMER_SECRET |
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
63 |
auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False) |
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
64 |
auth.set_access_token(*access_token) |
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
65 |
return auth |
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
66 |
|
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
67 |
|
|
207
621fa6caec0c
Merge with f093196961e770387cc7cd3e11f2b7c0881b003b
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
206
diff
changeset
|
68 |
class ReconnectingTweetStream(tweetstream.FilterStream): |
|
9
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
69 |
"""TweetStream class that automatically tries to reconnect if the |
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
70 |
connecting goes down. Reconnecting, and waiting for reconnecting, is |
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
71 |
blocking. |
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
72 |
|
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
73 |
:param username: See :TweetStream: |
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
74 |
|
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
75 |
:param password: See :TweetStream: |
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
76 |
|
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
77 |
:keyword url: See :TweetStream: |
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
78 |
|
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
79 |
:keyword reconnects: Number of reconnects before a ConnectionError is |
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
80 |
raised. Default is 3 |
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
81 |
|
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
82 |
:error_cb: Optional callable that will be called just before trying to |
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
83 |
reconnect. The callback will be called with a single argument, the |
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
84 |
exception that caused the reconnect attempt. Default is None |
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
85 |
|
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
86 |
:retry_wait: Time to wait before reconnecting in seconds. Default is 5 |
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
87 |
|
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
88 |
""" |
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
89 |
|
|
242
cdd7d3c0549c
Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
207
diff
changeset
|
90 |
def __init__(self, auth, keywords, reconnects=3, error_cb=None, retry_wait=5, as_text=False, **kwargs): |
|
9
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
91 |
self.max_reconnects = reconnects |
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
92 |
self.retry_wait = retry_wait |
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
93 |
self._reconnects = 0 |
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
94 |
self._error_cb = error_cb |
|
242
cdd7d3c0549c
Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
207
diff
changeset
|
95 |
super(ReconnectingTweetStream, self).__init__(auth=auth, track=keywords, as_text=as_text, **kwargs) |
|
9
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
96 |
|
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
97 |
def next(self): |
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
98 |
while True: |
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
99 |
try: |
|
254
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
100 |
utils.get_logger().debug("return super.next") |
|
199
514e0ee0c68a
add a duration. not quitewhat expected but that will do
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
82
diff
changeset
|
101 |
return super(ReconnectingTweetStream, self).next() |
|
9
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
102 |
except tweetstream.ConnectionError, e: |
|
254
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
103 |
utils.get_logger().debug("connection error :" + str(e)) |
|
9
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
104 |
self._reconnects += 1 |
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
105 |
if self._reconnects > self.max_reconnects: |
|
15
5d552b6a0e55
add oauth authentication to tweetstream
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
11
diff
changeset
|
106 |
raise tweetstream.ConnectionError("Too many retries") |
|
9
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
107 |
|
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
108 |
# Note: error_cb is not called on the last error since we |
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
109 |
# raise a ConnectionError instead |
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
110 |
if callable(self._error_cb): |
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
111 |
self._error_cb(e) |
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
112 |
|
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
113 |
time.sleep(self.retry_wait) |
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
114 |
# Don't listen to auth error, since we can't reasonably reconnect |
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
115 |
# when we get one. |
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
116 |
|
|
263
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
117 |
def add_process_event(type, args, session_maker): |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
118 |
session = session_maker() |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
119 |
try: |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
120 |
evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type) |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
121 |
session.add(evt) |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
122 |
session.commit() |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
123 |
finally: |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
124 |
session.close() |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
125 |
|
|
9
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
126 |
|
|
261
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
127 |
class BaseProcess(Process): |
|
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
128 |
|
|
263
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
129 |
def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): |
|
261
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
130 |
self.parent_pid = parent_pid |
|
263
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
131 |
self.session_maker = session_maker |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
132 |
self.queue = queue |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
133 |
self.options = options |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
134 |
self.logger_queue = logger_queue |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
135 |
self.stop_event = stop_event |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
136 |
self.access_token = access_token |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
137 |
|
|
261
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
138 |
super(BaseProcess, self).__init__() |
|
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
139 |
|
|
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
140 |
# |
|
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
141 |
# from http://stackoverflow.com/questions/2542610/python-daemon-doesnt-kill-its-kids |
|
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
142 |
# |
|
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
143 |
def parent_is_alive(self): |
|
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
144 |
try: |
|
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
145 |
# try to call Parent |
|
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
146 |
os.kill(self.parent_pid, 0) |
|
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
147 |
except OSError: |
|
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
148 |
# *beeep* oh no! The phone's disconnected! |
|
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
149 |
return False |
|
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
150 |
else: |
|
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
151 |
# *ring* Hi mom! |
|
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
152 |
return True |
|
263
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
153 |
|
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
154 |
|
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
155 |
def __get_process_event_args(self): |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
156 |
return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__, 'access_token':self.access_token} |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
157 |
|
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
158 |
def run(self): |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
159 |
try: |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
160 |
add_process_event("start_worker", self.__get_process_event_args(), self.session_maker) |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
161 |
self.do_run() |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
162 |
finally: |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
163 |
add_process_event("stop_worker",self.__get_process_event_args(), self.session_maker) |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
164 |
|
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
165 |
def do_run(self): |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
166 |
raise NotImplementedError() |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
167 |
|
|
9
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
168 |
|
|
254
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
169 |
|
|
261
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
170 |
class SourceProcess(BaseProcess): |
|
242
cdd7d3c0549c
Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
207
diff
changeset
|
171 |
|
|
261
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
172 |
def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): |
|
254
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
173 |
self.track = options.track |
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
174 |
self.reconnects = options.reconnects |
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
175 |
self.token_filename = options.token_filename |
|
263
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
176 |
super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid) |
|
261
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
177 |
|
|
263
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
178 |
def do_run(self): |
|
255
500cd0405c7a
improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
254
diff
changeset
|
179 |
|
|
254
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
180 |
#import pydevd |
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
181 |
#pydevd.settrace(suspend=False) |
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
182 |
|
|
255
500cd0405c7a
improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
254
diff
changeset
|
183 |
self.logger = set_logging_process(self.options, self.logger_queue) |
|
254
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
184 |
self.auth = get_auth(self.options, self.access_token) |
|
242
cdd7d3c0549c
Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
207
diff
changeset
|
185 |
|
|
255
500cd0405c7a
improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
254
diff
changeset
|
186 |
self.logger.debug("SourceProcess : run") |
|
242
cdd7d3c0549c
Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
207
diff
changeset
|
187 |
track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip() |
|
cdd7d3c0549c
Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
207
diff
changeset
|
188 |
track_list = [k for k in track_list.split(',')] |
|
243
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
189 |
|
|
255
500cd0405c7a
improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
254
diff
changeset
|
190 |
self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list)) |
|
242
cdd7d3c0549c
Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
207
diff
changeset
|
191 |
stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True) |
|
255
500cd0405c7a
improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
254
diff
changeset
|
192 |
self.logger.debug("SourceProcess : after connecting to stream") |
|
242
cdd7d3c0549c
Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
207
diff
changeset
|
193 |
stream.muststop = lambda: self.stop_event.is_set() |
|
cdd7d3c0549c
Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
207
diff
changeset
|
194 |
|
|
cdd7d3c0549c
Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
207
diff
changeset
|
195 |
session = self.session_maker() |
|
cdd7d3c0549c
Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
207
diff
changeset
|
196 |
|
|
cdd7d3c0549c
Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
207
diff
changeset
|
197 |
try: |
|
cdd7d3c0549c
Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
207
diff
changeset
|
198 |
for tweet in stream: |
|
261
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
199 |
if not self.parent_is_alive(): |
|
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
200 |
sys.exit() |
|
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
201 |
self.logger.debug("SourceProcess : tweet " + repr(tweet)) |
|
242
cdd7d3c0549c
Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
207
diff
changeset
|
202 |
source = TweetSource(original_json=tweet) |
|
261
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
203 |
self.logger.debug("SourceProcess : source created") |
|
243
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
204 |
add_retries = 0 |
|
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
205 |
while add_retries < 10: |
|
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
206 |
try: |
|
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
207 |
add_retries += 1 |
|
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
208 |
session.add(source) |
|
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
209 |
session.flush() |
|
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
210 |
break |
|
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
211 |
except OperationalError as e: |
|
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
212 |
session.rollback() |
|
261
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
213 |
self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries)) |
|
254
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
214 |
if add_retries == 10: |
|
243
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
215 |
raise e |
|
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
216 |
|
|
242
cdd7d3c0549c
Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
207
diff
changeset
|
217 |
source_id = source.id |
|
261
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
218 |
self.logger.debug("SourceProcess : before queue + source id " + repr(source_id)) |
|
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
219 |
self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime))) |
|
242
cdd7d3c0549c
Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
207
diff
changeset
|
220 |
session.commit() |
|
254
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
221 |
self.queue.put((source_id, tweet), False) |
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
222 |
|
|
243
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
223 |
except Exception as e: |
|
261
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
224 |
self.logger.error("SourceProcess : Error when processing tweet " + repr(e)) |
|
243
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
225 |
finally: |
|
242
cdd7d3c0549c
Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
207
diff
changeset
|
226 |
session.rollback() |
|
cdd7d3c0549c
Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
207
diff
changeset
|
227 |
stream.close() |
|
cdd7d3c0549c
Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
207
diff
changeset
|
228 |
session.close() |
|
243
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
229 |
self.queue.close() |
|
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
230 |
self.stop_event.set() |
|
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
231 |
|
|
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
232 |
|
|
255
500cd0405c7a
improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
254
diff
changeset
|
233 |
def process_tweet(tweet, source_id, session, access_token, logger): |
|
243
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
234 |
try: |
|
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
235 |
tweet_obj = anyjson.deserialize(tweet) |
|
255
500cd0405c7a
improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
254
diff
changeset
|
236 |
if 'text' not in tweet_obj: |
|
500cd0405c7a
improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
254
diff
changeset
|
237 |
tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET']) |
|
500cd0405c7a
improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
254
diff
changeset
|
238 |
session.add(tweet_log) |
|
500cd0405c7a
improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
254
diff
changeset
|
239 |
return |
|
243
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
240 |
screen_name = "" |
|
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
241 |
if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']: |
|
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
242 |
screen_name = tweet_obj['user']['screen_name'] |
|
255
500cd0405c7a
improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
254
diff
changeset
|
243 |
logger.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text'])) |
|
500cd0405c7a
improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
254
diff
changeset
|
244 |
logger.debug(u"Process_tweet :" + repr(tweet)) |
|
254
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
245 |
processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None) |
|
243
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
246 |
processor.process() |
|
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
247 |
except Exception as e: |
|
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
248 |
message = u"Error %s processing tweet %s" % (repr(e), tweet) |
|
263
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
249 |
logger.exception(message) |
|
243
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
250 |
output = StringIO.StringIO() |
|
263
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
251 |
try: |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
252 |
traceback.print_exc(file=output) |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
253 |
error_stack = output.getvalue() |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
254 |
finally: |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
255 |
output.close() |
|
243
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
256 |
session.rollback() |
|
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
257 |
tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack) |
|
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
258 |
session.add(tweet_log) |
|
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
259 |
session.commit() |
|
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
260 |
|
|
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
261 |
|
|
242
cdd7d3c0549c
Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
207
diff
changeset
|
262 |
|
|
261
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
263 |
class TweetProcess(BaseProcess): |
|
242
cdd7d3c0549c
Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
207
diff
changeset
|
264 |
|
|
261
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
265 |
def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): |
|
263
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
266 |
super(TweetProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid) |
|
243
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
267 |
|
|
9
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
268 |
|
|
263
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
269 |
def do_run(self): |
|
242
cdd7d3c0549c
Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
207
diff
changeset
|
270 |
|
|
255
500cd0405c7a
improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
254
diff
changeset
|
271 |
self.logger = set_logging_process(self.options, self.logger_queue) |
|
242
cdd7d3c0549c
Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
207
diff
changeset
|
272 |
session = self.session_maker() |
|
cdd7d3c0549c
Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
207
diff
changeset
|
273 |
try: |
|
261
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
274 |
while not self.stop_event.is_set() and self.parent_is_alive(): |
|
242
cdd7d3c0549c
Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
207
diff
changeset
|
275 |
try: |
|
263
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
276 |
source_id, tweet_txt = self.queue.get(True, 3) |
|
255
500cd0405c7a
improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
254
diff
changeset
|
277 |
self.logger.debug("Processing source id " + repr(source_id)) |
|
243
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
278 |
except Exception as e: |
|
255
500cd0405c7a
improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
254
diff
changeset
|
279 |
self.logger.debug('Process tweet exception in loop : ' + repr(e)) |
|
242
cdd7d3c0549c
Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
207
diff
changeset
|
280 |
continue |
|
255
500cd0405c7a
improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
254
diff
changeset
|
281 |
process_tweet(tweet_txt, source_id, session, self.access_token, self.logger) |
|
242
cdd7d3c0549c
Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
207
diff
changeset
|
282 |
session.commit() |
|
cdd7d3c0549c
Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
207
diff
changeset
|
283 |
finally: |
|
243
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
284 |
session.rollback() |
|
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
285 |
self.stop_event.set() |
|
242
cdd7d3c0549c
Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
207
diff
changeset
|
286 |
session.close() |
|
254
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
287 |
|
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
288 |
|
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
289 |
def get_sessionmaker(conn_str): |
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
290 |
engine, metadata = models.setup_database(conn_str, echo=False, create_all=False) |
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
291 |
Session = scoped_session(sessionmaker(bind=engine, autocommit=False)) |
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
292 |
return Session, engine, metadata |
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
293 |
|
|
242
cdd7d3c0549c
Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
207
diff
changeset
|
294 |
|
|
255
500cd0405c7a
improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
254
diff
changeset
|
295 |
def process_leftovers(session, access_token, logger): |
|
243
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
296 |
|
|
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
297 |
sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None) |
|
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
298 |
|
|
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
299 |
for src in sources: |
|
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
300 |
tweet_txt = src.original_json |
|
255
500cd0405c7a
improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
254
diff
changeset
|
301 |
process_tweet(tweet_txt, src.id, session, access_token, logger) |
|
254
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
302 |
session.commit() |
|
9
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
303 |
|
|
243
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
304 |
|
|
15
5d552b6a0e55
add oauth authentication to tweetstream
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
11
diff
changeset
|
305 |
|
|
243
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
306 |
#get tweet source that do not match any message |
|
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
307 |
#select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull; |
|
255
500cd0405c7a
improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
254
diff
changeset
|
308 |
def process_log(logger_queues, stop_event): |
|
500cd0405c7a
improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
254
diff
changeset
|
309 |
while not stop_event.is_set(): |
|
500cd0405c7a
improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
254
diff
changeset
|
310 |
for lqueue in logger_queues: |
|
500cd0405c7a
improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
254
diff
changeset
|
311 |
try: |
|
500cd0405c7a
improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
254
diff
changeset
|
312 |
record = lqueue.get_nowait() |
|
500cd0405c7a
improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
254
diff
changeset
|
313 |
logging.getLogger(record.name).handle(record) |
|
500cd0405c7a
improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
254
diff
changeset
|
314 |
except Queue.Empty: |
|
500cd0405c7a
improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
254
diff
changeset
|
315 |
continue |
|
256
2f335337ff64
Do not stop on IOErrors
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
255
diff
changeset
|
316 |
except IOError: |
|
2f335337ff64
Do not stop on IOErrors
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
255
diff
changeset
|
317 |
continue |
|
255
500cd0405c7a
improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
254
diff
changeset
|
318 |
time.sleep(0.1) |
|
243
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
319 |
|
|
11
54d7f1486ac4
implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
9
diff
changeset
|
320 |
|
|
54d7f1486ac4
implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
9
diff
changeset
|
321 |
def get_options(): |
|
261
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
322 |
|
|
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
323 |
usage = "usage: %prog [options]" |
|
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
324 |
|
|
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
325 |
parser = OptionParser(usage=usage) |
|
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
326 |
|
|
254
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
327 |
parser.add_option("-f", "--file", dest="conn_str", |
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
328 |
help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db") |
|
9
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
329 |
parser.add_option("-u", "--user", dest="username", |
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
330 |
help="Twitter user", metavar="USER", default=None) |
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
331 |
parser.add_option("-w", "--password", dest="password", |
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
332 |
help="Twitter password", metavar="PASSWORD", default=None) |
|
15
5d552b6a0e55
add oauth authentication to tweetstream
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
11
diff
changeset
|
333 |
parser.add_option("-T", "--track", dest="track", |
|
9
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
334 |
help="Twitter track", metavar="TRACK") |
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
335 |
parser.add_option("-n", "--new", dest="new", action="store_true", |
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
336 |
help="new database", default=False) |
|
272
fe2efe3600ea
add daemon option
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
263
diff
changeset
|
337 |
parser.add_option("-D", "--daemon", dest="daemon", action="store_true", |
|
fe2efe3600ea
add daemon option
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
263
diff
changeset
|
338 |
help="launch daemon", default=False) |
|
9
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
339 |
parser.add_option("-r", "--reconnects", dest="reconnects", |
|
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
340 |
help="Reconnects", metavar="RECONNECTS", default=10, type='int') |
|
15
5d552b6a0e55
add oauth authentication to tweetstream
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
11
diff
changeset
|
341 |
parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token", |
|
5d552b6a0e55
add oauth authentication to tweetstream
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
11
diff
changeset
|
342 |
help="Token file name") |
|
199
514e0ee0c68a
add a duration. not quitewhat expected but that will do
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
82
diff
changeset
|
343 |
parser.add_option("-d", "--duration", dest="duration", |
|
514e0ee0c68a
add a duration. not quitewhat expected but that will do
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
82
diff
changeset
|
344 |
help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int') |
|
254
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
345 |
parser.add_option("-N", "--nb-process", dest="process_nb", |
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
346 |
help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int') |
|
242
cdd7d3c0549c
Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
207
diff
changeset
|
347 |
|
|
11
54d7f1486ac4
implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
9
diff
changeset
|
348 |
utils.set_logging_options(parser) |
|
9
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
349 |
|
|
11
54d7f1486ac4
implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
9
diff
changeset
|
350 |
return parser.parse_args() |
|
263
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
351 |
|
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
352 |
|
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
353 |
def do_run(options, session_maker): |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
354 |
|
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
355 |
stop_args = {} |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
356 |
|
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
357 |
access_token = None |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
358 |
if not options.username or not options.password: |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
359 |
access_token = utils.get_oauth_token(options.token_filename) |
|
11
54d7f1486ac4
implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
9
diff
changeset
|
360 |
|
|
261
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
361 |
session = session_maker() |
|
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
362 |
try: |
|
263
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
363 |
process_leftovers(session, access_token, utils.get_logger()) |
|
261
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
364 |
session.commit() |
|
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
365 |
finally: |
|
263
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
366 |
session.rollback() |
|
261
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
367 |
session.close() |
|
263
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
368 |
|
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
369 |
if options.process_nb <= 0: |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
370 |
utils.get_logger().debug("Leftovers processed. Exiting.") |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
371 |
return None |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
372 |
|
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
373 |
queue = mQueue() |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
374 |
stop_event = Event() |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
375 |
|
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
376 |
#workaround for bug on using urllib2 and multiprocessing |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
377 |
req = urllib2.Request('http://localhost') |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
378 |
conn = None |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
379 |
try: |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
380 |
conn = urllib2.urlopen(req) |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
381 |
except: |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
382 |
utils.get_logger().debug("could not open localhost") |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
383 |
#donothing |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
384 |
finally: |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
385 |
if conn is not None: |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
386 |
conn.close() |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
387 |
|
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
388 |
process_engines = [] |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
389 |
logger_queues = [] |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
390 |
|
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
391 |
SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
392 |
process_engines.append(engine_process) |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
393 |
lqueue = mQueue(1) |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
394 |
logger_queues.append(lqueue) |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
395 |
pid = os.getpid() |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
396 |
sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
397 |
|
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
398 |
tweet_processes = [] |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
399 |
|
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
400 |
for i in range(options.process_nb - 1): |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
401 |
SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
402 |
process_engines.append(engine_process) |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
403 |
lqueue = mQueue(1) |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
404 |
logger_queues.append(lqueue) |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
405 |
cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
406 |
tweet_processes.append(cprocess) |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
407 |
|
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
408 |
def interupt_handler(signum, frame): |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
409 |
utils.get_logger().debug("shutdown asked " + repr(signum) + " " + repr(inspect.getframeinfo(frame, 9))) |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
410 |
stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)}) |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
411 |
stop_event.set() |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
412 |
|
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
413 |
signal.signal(signal.SIGINT , interupt_handler) |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
414 |
signal.signal(signal.SIGHUP , interupt_handler) |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
415 |
signal.signal(signal.SIGALRM, interupt_handler) |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
416 |
signal.signal(signal.SIGTERM, interupt_handler) |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
417 |
|
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
418 |
log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,)) |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
419 |
log_thread.daemon = True |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
420 |
|
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
421 |
log_thread.start() |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
422 |
|
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
423 |
sprocess.start() |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
424 |
for cprocess in tweet_processes: |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
425 |
cprocess.start() |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
426 |
|
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
427 |
add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name,sprocess.pid), 'consumers':dict([(p.name,p.pid) for p in tweet_processes])}, session_maker) |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
428 |
|
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
429 |
if options.duration >= 0: |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
430 |
end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration) |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
431 |
|
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
432 |
|
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
433 |
while not stop_event.is_set(): |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
434 |
if options.duration >= 0 and datetime.datetime.utcnow() >= end_ts: |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
435 |
stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts}) |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
436 |
stop_event.set() |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
437 |
break |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
438 |
if sprocess.is_alive(): |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
439 |
time.sleep(1) |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
440 |
else: |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
441 |
stop_args.update({'message': 'Source process killed'}) |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
442 |
stop_event.set() |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
443 |
break |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
444 |
utils.get_logger().debug("Joining Source Process") |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
445 |
try: |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
446 |
sprocess.join(10) |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
447 |
except: |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
448 |
utils.get_logger().debug("Pb joining Source Process - terminating") |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
449 |
sprocess.terminate() |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
450 |
|
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
451 |
for i, cprocess in enumerate(tweet_processes): |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
452 |
utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1)) |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
453 |
try: |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
454 |
cprocess.join(3) |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
455 |
except: |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
456 |
utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1)) |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
457 |
cprocess.terminate() |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
458 |
|
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
459 |
|
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
460 |
utils.get_logger().debug("Close queues") |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
461 |
try: |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
462 |
queue.close() |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
463 |
for lqueue in logger_queues: |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
464 |
lqueue.close() |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
465 |
except exception as e: |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
466 |
utils.get_logger().error("error when closing queues %s", repr(e)) |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
467 |
#do nothing |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
468 |
|
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
469 |
|
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
470 |
if options.process_nb > 1: |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
471 |
utils.get_logger().debug("Processing leftovers") |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
472 |
session = session_maker() |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
473 |
try: |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
474 |
process_leftovers(session, access_token, utils.get_logger()) |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
475 |
session.commit() |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
476 |
finally: |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
477 |
session.rollback() |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
478 |
session.close() |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
479 |
|
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
480 |
for pengine in process_engines: |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
481 |
pengine.dispose() |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
482 |
|
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
483 |
return stop_args |
|
261
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
484 |
|
|
9
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
485 |
|
|
272
fe2efe3600ea
add daemon option
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
263
diff
changeset
|
486 |
def main(options, args): |
|
11
54d7f1486ac4
implement get_oauth_token
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
9
diff
changeset
|
487 |
|
|
272
fe2efe3600ea
add daemon option
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
263
diff
changeset
|
488 |
global conn_str |
|
254
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
489 |
|
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
490 |
conn_str = options.conn_str.strip() |
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
491 |
if not re.match("^\w+://.+", conn_str): |
|
255
500cd0405c7a
improve multi processing architecture
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
254
diff
changeset
|
492 |
conn_str = 'sqlite:///' + options.conn_str |
|
254
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
493 |
|
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
494 |
if conn_str.startswith("sqlite") and options.new: |
|
261
d84c4aa2a9eb
add process event for start and shutdown
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
260
diff
changeset
|
495 |
filepath = conn_str[conn_str.find(":///") + 4:] |
|
254
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
496 |
if os.path.exists(filepath): |
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
497 |
i = 1 |
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
498 |
basename, extension = os.path.splitext(filepath) |
|
243
9213a63fa34a
- debug multithread (still database lock problem)
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
242
diff
changeset
|
499 |
new_path = '%s.%d%s' % (basename, i, extension) |
|
254
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
500 |
while i < 1000000 and os.path.exists(new_path): |
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
501 |
i += 1 |
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
502 |
new_path = '%s.%d%s' % (basename, i, extension) |
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
503 |
if i >= 1000000: |
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
504 |
raise Exception("Unable to find new filename for " + filepath) |
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
505 |
else: |
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
506 |
shutil.move(filepath, new_path) |
|
242
cdd7d3c0549c
Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
207
diff
changeset
|
507 |
|
|
254
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
508 |
Session, engine, metadata = get_sessionmaker(conn_str) |
|
9
bb44692e09ee
script apres traitement enmi
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
diff
changeset
|
509 |
|
|
254
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
510 |
if options.new: |
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
511 |
check_metadata = sqlalchemy.schema.MetaData(bind=engine, reflect=True) |
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
512 |
if len(check_metadata.sorted_tables) > 0: |
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
513 |
message = "Database %s not empty exiting" % conn_str |
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
514 |
utils.get_logger().error(message) |
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
515 |
sys.exit(message) |
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
516 |
|
|
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
517 |
metadata.create_all(engine) |
|
263
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
518 |
stop_args = {} |
|
254
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
519 |
try: |
|
263
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
520 |
add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session) |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
521 |
stop_args = do_run(options, Session) |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
522 |
except Exception as e: |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
523 |
utils.get_logger().exception("Error in main thread") |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
524 |
outfile = StringIO.StringIO() |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
525 |
try: |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
526 |
traceback.print_exc(file=outfile) |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
527 |
stop_args = {'error': repr(e), 'message': getattr(e, 'message', ''), 'stacktrace':outfile.getvalue()} |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
528 |
finally: |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
529 |
outfile.close() |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
530 |
raise |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
531 |
finally: |
|
6671e9a4c9c5
correct model ans improve event tracking
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
261
diff
changeset
|
532 |
add_process_event(type="shutdown", args=stop_args, session_maker=Session) |
|
242
cdd7d3c0549c
Starting 'parallel_twitter' branch
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
207
diff
changeset
|
533 |
|
|
254
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
534 |
utils.get_logger().debug("Done. Exiting.") |
|
272
fe2efe3600ea
add daemon option
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
263
diff
changeset
|
535 |
|
|
fe2efe3600ea
add daemon option
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
263
diff
changeset
|
536 |
|
|
fe2efe3600ea
add daemon option
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
263
diff
changeset
|
537 |
|
|
fe2efe3600ea
add daemon option
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
263
diff
changeset
|
538 |
if __name__ == '__main__': |
|
fe2efe3600ea
add daemon option
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
263
diff
changeset
|
539 |
|
|
fe2efe3600ea
add daemon option
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
263
diff
changeset
|
540 |
(options, args) = get_options() |
|
fe2efe3600ea
add daemon option
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
263
diff
changeset
|
541 |
|
|
fe2efe3600ea
add daemon option
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
263
diff
changeset
|
542 |
loggers = set_logging(options) |
|
fe2efe3600ea
add daemon option
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
263
diff
changeset
|
543 |
|
|
fe2efe3600ea
add daemon option
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
263
diff
changeset
|
544 |
utils.get_logger().debug("OPTIONS : " + repr(options)) |
|
fe2efe3600ea
add daemon option
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
263
diff
changeset
|
545 |
|
|
fe2efe3600ea
add daemon option
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
263
diff
changeset
|
546 |
if options.daemon: |
|
fe2efe3600ea
add daemon option
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
263
diff
changeset
|
547 |
import daemon |
|
fe2efe3600ea
add daemon option
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
263
diff
changeset
|
548 |
import lockfile |
|
254
2209e66bb50b
multiple debugging and corrections
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
243
diff
changeset
|
549 |
|
|
272
fe2efe3600ea
add daemon option
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
263
diff
changeset
|
550 |
hdlr_preserve = [] |
|
fe2efe3600ea
add daemon option
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
263
diff
changeset
|
551 |
for logger in loggers: |
|
fe2efe3600ea
add daemon option
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
263
diff
changeset
|
552 |
hdlr_preserve.extend([h.stream for h in logger.handlers]) |
|
fe2efe3600ea
add daemon option
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
263
diff
changeset
|
553 |
|
|
fe2efe3600ea
add daemon option
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
263
diff
changeset
|
554 |
context = daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=hdlr_preserve) |
|
fe2efe3600ea
add daemon option
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
263
diff
changeset
|
555 |
with context: |
|
fe2efe3600ea
add daemon option
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
263
diff
changeset
|
556 |
main(options, args) |
|
fe2efe3600ea
add daemon option
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
263
diff
changeset
|
557 |
else: |
|
fe2efe3600ea
add daemon option
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
263
diff
changeset
|
558 |
main(options, args) |
|
fe2efe3600ea
add daemon option
Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
parents:
263
diff
changeset
|
559 |