62 auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False) |
62 auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False) |
63 auth.set_access_token(*access_token) |
63 auth.set_access_token(*access_token) |
64 return auth |
64 return auth |
65 |
65 |
66 |
66 |
67 class ReconnectingTweetStream(tweetstream.FilterStream): |
|
68 """TweetStream class that automatically tries to reconnect if the |
|
69 connecting goes down. Reconnecting, and waiting for reconnecting, is |
|
70 blocking. |
|
71 |
|
72 :param username: See :TweetStream: |
|
73 |
|
74 :param password: See :TweetStream: |
|
75 |
|
76 :keyword url: See :TweetStream: |
|
77 |
|
78 :keyword reconnects: Number of reconnects before a ConnectionError is |
|
79 raised. Default is 3 |
|
80 |
|
81 :error_cb: Optional callable that will be called just before trying to |
|
82 reconnect. The callback will be called with a single argument, the |
|
83 exception that caused the reconnect attempt. Default is None |
|
84 |
|
85 :retry_wait: Time to wait before reconnecting in seconds. Default is 5 |
|
86 |
|
87 """ |
|
88 |
|
89 def __init__(self, auth, keywords, reconnects=3, error_cb=None, retry_wait=5, as_text=False, **kwargs): |
|
90 self.max_reconnects = reconnects |
|
91 self.retry_wait = retry_wait |
|
92 self._reconnects = 0 |
|
93 self._error_cb = error_cb |
|
94 super(ReconnectingTweetStream, self).__init__(auth=auth, track=keywords, as_text=as_text, **kwargs) |
|
95 |
|
96 def next(self): |
|
97 while True: |
|
98 try: |
|
99 utils.get_logger().debug("return super.next") |
|
100 return super(ReconnectingTweetStream, self).next() |
|
101 except tweetstream.ConnectionError, e: |
|
102 utils.get_logger().debug("connection error :" + str(e)) |
|
103 self._reconnects += 1 |
|
104 if self._reconnects > self.max_reconnects: |
|
105 raise tweetstream.ConnectionError("Too many retries") |
|
106 |
|
107 # Note: error_cb is not called on the last error since we |
|
108 # raise a ConnectionError instead |
|
109 if callable(self._error_cb): |
|
110 self._error_cb(e) |
|
111 |
|
112 time.sleep(self.retry_wait) |
|
113 # Don't listen to auth error, since we can't reasonably reconnect |
|
114 # when we get one. |
|
115 |
|
116 def add_process_event(type, args, session_maker): |
67 def add_process_event(type, args, session_maker): |
117 session = session_maker() |
68 session = session_maker() |
118 try: |
69 try: |
119 evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type) |
70 evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type) |
120 session.add(evt) |
71 session.add(evt) |
168 |
119 |
169 class SourceProcess(BaseProcess): |
120 class SourceProcess(BaseProcess): |
170 |
121 |
171 def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): |
122 def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): |
172 self.track = options.track |
123 self.track = options.track |
173 self.reconnects = options.reconnects |
|
174 self.token_filename = options.token_filename |
124 self.token_filename = options.token_filename |
|
125 self.catchup = options.catchup |
|
126 self.timeout = options.timeout |
175 super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid) |
127 super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid) |
176 |
128 |
177 def do_run(self): |
129 def do_run(self): |
178 |
130 |
179 #import pydevd |
131 #import pydevd |
180 #pydevd.settrace(suspend=False) |
132 #pydevd.settrace(suspend=True) |
181 |
133 |
182 self.logger = set_logging_process(self.options, self.logger_queue) |
134 self.logger = set_logging_process(self.options, self.logger_queue) |
183 self.auth = get_auth(self.options, self.access_token) |
135 self.auth = get_auth(self.options, self.access_token) |
184 |
136 |
185 self.logger.debug("SourceProcess : run") |
137 self.logger.debug("SourceProcess : run ") |
186 track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip() |
138 track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip() |
187 track_list = [k for k in track_list.split(',')] |
139 self.logger.debug("SourceProcess : track list " + track_list) |
|
140 |
|
141 track_list = [k.strip() for k in track_list.split(',')] |
188 |
142 |
189 self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list)) |
143 self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list)) |
190 stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True, url=self.options.url) |
144 stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout) |
191 self.logger.debug("SourceProcess : after connecting to stream") |
145 self.logger.debug("SourceProcess : after connecting to stream") |
192 stream.muststop = lambda: self.stop_event.is_set() |
146 stream.muststop = lambda: self.stop_event.is_set() |
193 |
147 |
194 session = self.session_maker() |
148 session = self.session_maker() |
195 |
149 |
290 engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False) |
244 engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False) |
291 Session = scoped_session(Session) |
245 Session = scoped_session(Session) |
292 return Session, engine, metadata |
246 return Session, engine, metadata |
293 |
247 |
294 |
248 |
295 def process_leftovers(session, access_token, logger): |
249 def process_leftovers(session, access_token, twitter_query_user, logger): |
296 |
250 |
297 sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None) |
251 sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None) |
298 |
252 |
299 for src in sources: |
253 for src in sources: |
300 tweet_txt = src.original_json |
254 tweet_txt = src.original_json |
301 process_tweet(tweet_txt, src.id, session, access_token, logger) |
255 process_tweet(tweet_txt, src.id, session, access_token, twitter_query_user, logger) |
302 session.commit() |
256 session.commit() |
303 |
257 |
304 |
258 |
305 |
259 |
306 #get tweet source that do not match any message |
260 #get tweet source that do not match any message |
334 help="Twitter track", metavar="TRACK") |
288 help="Twitter track", metavar="TRACK") |
335 parser.add_option("-n", "--new", dest="new", action="store_true", |
289 parser.add_option("-n", "--new", dest="new", action="store_true", |
336 help="new database", default=False) |
290 help="new database", default=False) |
337 parser.add_option("-D", "--daemon", dest="daemon", action="store_true", |
291 parser.add_option("-D", "--daemon", dest="daemon", action="store_true", |
338 help="launch daemon", default=False) |
292 help="launch daemon", default=False) |
339 parser.add_option("-r", "--reconnects", dest="reconnects", |
|
340 help="Reconnects", metavar="RECONNECTS", default=10, type='int') |
|
341 parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token", |
293 parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token", |
342 help="Token file name") |
294 help="Token file name") |
343 parser.add_option("-d", "--duration", dest="duration", |
295 parser.add_option("-d", "--duration", dest="duration", |
344 help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int') |
296 help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int') |
345 parser.add_option("-N", "--nb-process", dest="process_nb", |
297 parser.add_option("-N", "--nb-process", dest="process_nb", |
346 help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int') |
298 help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int') |
347 parser.add_option("--url", dest="url", |
299 parser.add_option("--url", dest="url", |
348 help="The twitter url to connect to.", metavar="URL", default=tweetstream.FilterStream.url) |
300 help="The twitter url to connect to.", metavar="URL", default=tweetstream.FilterStream.url) |
349 parser.add_option("--query-user", dest="twitter_query_user", action="store_true", |
301 parser.add_option("--query-user", dest="twitter_query_user", action="store_true", |
350 help="Query twitter for users", default=False, metavar="QUERY_USER") |
302 help="Query twitter for users", default=False, metavar="QUERY_USER") |
|
303 parser.add_option("--catchup", dest="catchup", |
|
304 help="catchup count for tweets", default=None, metavar="CATCHUP", type='int') |
|
305 parser.add_option("--timeout", dest="timeout", |
|
306 help="timeout for connecting in seconds", default=60, metavar="TIMEOUT", type='int') |
|
307 |
351 |
308 |
352 |
309 |
353 |
310 |
354 utils.set_logging_options(parser) |
311 utils.set_logging_options(parser) |
355 |
312 |