80 |
81 |
81 |
82 |
82 class SourceProcess(Process): |
83 class SourceProcess(Process): |
83 |
84 |
84 def __init__(self, session_maker, queue, auth, track, debug, reconnects, token_filename, stop_event): |
85 def __init__(self, session_maker, queue, auth, track, debug, reconnects, token_filename, stop_event): |
85 super(SourceProcess, self).__init__() |
|
86 self.session_maker = session_maker |
86 self.session_maker = session_maker |
87 self.queue = queue |
87 self.queue = queue |
88 self.auth = auth |
88 self.auth = auth |
89 self.track = track |
89 self.track = track |
90 self.debug = debug |
90 self.debug = debug |
91 self.reconnects = reconnects |
91 self.reconnects = reconnects |
92 self.token_filename = token_filename |
92 self.token_filename = token_filename |
93 self.stop_event = stop_event |
93 self.stop_event = stop_event |
|
94 super(SourceProcess, self).__init__() |
94 # self.stop_event = |
95 # self.stop_event = |
95 |
96 |
96 def run(self): |
97 def run(self): |
97 |
98 |
|
99 get_logger().debug("SourceProcess : run") |
98 track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip() |
100 track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip() |
99 track_list = [k for k in track_list.split(',')] |
101 track_list = [k for k in track_list.split(',')] |
100 |
102 |
|
103 get_logger().debug("SourceProcess : before connecting to stream " + repr(track_list)) |
101 stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True) |
104 stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True) |
|
105 get_logger().debug("SourceProcess : after connecting to stream") |
102 stream.muststop = lambda: self.stop_event.is_set() |
106 stream.muststop = lambda: self.stop_event.is_set() |
103 |
107 |
104 session = self.session_maker() |
108 session = self.session_maker() |
105 |
109 |
106 try: |
110 try: |
107 for tweet in stream: |
111 for tweet in stream: |
|
112 get_logger().debug("tweet " + repr(tweet)) |
108 source = TweetSource(original_json=tweet) |
113 source = TweetSource(original_json=tweet) |
109 session.add(source) |
114 get_logger().debug("source created") |
110 session.flush() |
115 add_retries = 0 |
|
116 while add_retries < 10: |
|
117 try: |
|
118 add_retries += 1 |
|
119 session.add(source) |
|
120 session.flush() |
|
121 break |
|
122 except OperationalError as e: |
|
123 session.rollback() |
|
124 get_logger().debug("Operational Error %s nb %d" % (repr(e), add_retries)) |
|
125 if add_retries==10: |
|
126 raise e |
|
127 |
111 source_id = source.id |
128 source_id = source.id |
112 queue.put((source_id, tweet), False) |
129 get_logger().debug("before queue + source id " + repr(source_id)) |
|
130 self.queue.put((source_id, tweet), False) |
113 #process_tweet(tweet, source_id, self.session, self.debug, self.token_filename) |
131 #process_tweet(tweet, source_id, self.session, self.debug, self.token_filename) |
114 logging.info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime))) |
132 get_logger().info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime))) |
115 session.commit() |
133 session.commit() |
116 # if duration >= 0 and datetime.datetime.utcnow() >= end_ts: |
134 # if duration >= 0 and datetime.datetime.utcnow() >= end_ts: |
117 # print "Stop recording after %d seconds." % (duration) |
135 # print "Stop recording after %d seconds." % (duration) |
118 # break |
136 # break |
119 except: |
137 except Exception as e: |
|
138 get_logger().error("Error when processing tweet " + repr(e)) |
|
139 finally: |
120 session.rollback() |
140 session.rollback() |
121 finally: |
|
122 stream.close() |
141 stream.close() |
123 session.close() |
142 session.close() |
124 |
143 self.queue.close() |
|
144 self.stop_event.set() |
|
145 |
|
146 |
|
147 def process_tweet(tweet, source_id, session, token_filename): |
|
148 try: |
|
149 tweet_obj = anyjson.deserialize(tweet) |
|
150 screen_name = "" |
|
151 if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']: |
|
152 screen_name = tweet_obj['user']['screen_name'] |
|
153 get_logger().info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text'])) |
|
154 get_logger().debug(u"Process_tweet :" + repr(tweet)) |
|
155 processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, token_filename) |
|
156 processor.process() |
|
157 except Exception as e: |
|
158 message = u"Error %s processing tweet %s" % (repr(e), tweet) |
|
159 get_logger().error(message) |
|
160 output = StringIO.StringIO() |
|
161 traceback.print_exception(Exception, e, None, None, output) |
|
162 error_stack = output.getvalue() |
|
163 output.close() |
|
164 session.rollback() |
|
165 tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack) |
|
166 session.add(tweet_log) |
|
167 session.commit() |
|
168 |
|
169 |
125 |
170 |
126 class TweetProcess(Process): |
171 class TweetProcess(Process): |
127 |
172 |
128 def __init__(self, session_maker, queue, debug, token_filename, stop_event): |
173 def __init__(self, session_maker, queue, debug, token_filename, stop_event): |
129 super(TweetProcess, self).__init__() |
|
130 self.session_maker = session_maker |
174 self.session_maker = session_maker |
131 self.queue = queue |
175 self.queue = queue |
132 self.debug = debug |
176 self.debug = debug |
133 self.stop_event = stop_event |
177 self.stop_event = stop_event |
134 self.token_filename = token_filename |
178 self.token_filename = token_filename |
|
179 super(TweetProcess, self).__init__() |
|
180 |
135 |
181 |
136 def run(self): |
182 def run(self): |
137 |
183 |
138 session = self.session_maker() |
184 session = self.session_maker() |
139 try: |
185 try: |
140 while not self.stop_event.is_set(): |
186 while not self.stop_event.is_set(): |
141 try: |
187 try: |
142 source_id, tweet_txt = queue.get(True, 30) |
188 source_id, tweet_txt = queue.get(True, 10) |
143 except: |
189 get_logger().debug("Processing source id " + repr(source_id)) |
|
190 except Exception as e: |
|
191 get_logger().debug('Process tweet exception in loop : ' + repr(e)) |
144 continue |
192 continue |
145 process_tweet(tweet_txt, source_id, session) |
193 process_tweet(tweet_txt, source_id, session, self.token_filename) |
146 session.commit() |
194 session.commit() |
147 self.queue.task_done() |
|
148 except: |
195 except: |
149 session.rollback() |
|
150 raise |
196 raise |
151 finally: |
197 finally: |
|
198 session.rollback() |
|
199 self.stop_event.set() |
152 session.close() |
200 session.close() |
153 |
201 |
154 |
202 def process_leftovers(session, token_filename): |
155 def process_tweet(tweet, source_id, session): |
203 |
156 |
204 sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None) |
157 try: |
205 |
158 tweet_obj = anyjson.deserialize(tweet) |
206 for src in sources: |
159 screen_name = "" |
207 tweet_txt = src.original_json |
160 if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']: |
208 process_tweet(tweet_txt, src.id, session, token_filename) |
161 screen_name = tweet_obj['user']['screen_name'] |
209 |
162 logging.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text'])) |
210 |
163 logging.debug(u"Process_tweet :" + repr(tweet)) |
211 |
164 processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, self.token_filename) |
212 #get tweet source that do not match any message |
165 processor.process() |
213 #select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull; |
166 except Exception, e: |
214 |
167 message = u"Error %e processing tweet %s" % (unicode(e), tweet) |
|
168 logging.error(message) |
|
169 output = StringIO.StringIO() |
|
170 traceback.print_exception(Exception, e, None, None, output) |
|
171 tweet_log = TweetLog(status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=output.getvalue()) |
|
172 output.close() |
|
173 |
|
174 |
|
175 |
|
176 #def main_source(username, password, track, session, debug, reconnects, token_filename, duration): |
|
177 |
|
178 #username = username or raw_input('Twitter username: ') |
|
179 #password = password or getpass('Twitter password: ') |
|
180 |
|
181 # track_list = track or raw_input('Keywords to track (comma seperated): ').strip() |
|
182 # track_list = [k for k in track_list.split(',')] |
|
183 |
|
184 # if username and password: |
|
185 # auth = tweepy.auth.BasicAuthHandler(username, password) |
|
186 # else: |
|
187 # consumer_key = models.CONSUMER_KEY |
|
188 # consumer_secret = models.CONSUMER_SECRET |
|
189 # auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False) |
|
190 # auth.set_access_token(*(utils.get_oauth_token(token_filename))) |
|
191 |
|
192 # if duration >= 0: |
|
193 # end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=duration) |
|
194 |
|
195 # stream = ReconnectingTweetStream(auth, track_list, reconnects=reconnects, as_text=True) |
|
196 # try: |
|
197 # for tweet in stream: |
|
198 # source = TweetSource(original_json=tweet) |
|
199 # session.add(source) |
|
200 # session.flush() |
|
201 # source_id = source.id |
|
202 # process_tweet(tweet, source_id, session, debug, token_filename) |
|
203 # logging.info("Tweet count: %d - current rate : %.2f - running : %s"%(stream.count, stream.rate, int(time.time()-stream.starttime))) |
|
204 # session.commit() |
|
205 # if duration >= 0 and datetime.datetime.utcnow() >= end_ts: |
|
206 # print "Stop recording after %d seconds." % (duration) |
|
207 # break |
|
208 # finally: |
|
209 # stream.close() |
|
210 |
215 |
211 def get_options(): |
216 def get_options(): |
212 parser = OptionParser() |
217 parser = OptionParser() |
213 parser.add_option("-f", "--file", dest="filename", |
218 parser.add_option("-f", "--file", dest="filename", |
214 help="write tweet to FILE", metavar="FILE", default="enmi2010_twitter.db") |
219 help="write tweet to FILE", metavar="FILE", default="enmi2010_twitter.db") |
270 consumer_key = models.CONSUMER_KEY |
273 consumer_key = models.CONSUMER_KEY |
271 consumer_secret = models.CONSUMER_SECRET |
274 consumer_secret = models.CONSUMER_SECRET |
272 auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False) |
275 auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False) |
273 auth.set_access_token(*(utils.get_oauth_token(options.token_filename))) |
276 auth.set_access_token(*(utils.get_oauth_token(options.token_filename))) |
274 |
277 |
275 |
278 |
|
279 engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2)) |
|
280 Session = sessionmaker(bind=engine) |
|
281 |
|
282 session = Session() |
|
283 process_leftovers(session, options.token_filename) |
|
284 session.commit() |
|
285 session.close() |
|
286 |
276 sprocess = SourceProcess(Session, queue, auth, options.track, options.debug, options.reconnects, options.token_filename, stop_event) |
287 sprocess = SourceProcess(Session, queue, auth, options.track, options.debug, options.reconnects, options.token_filename, stop_event) |
277 |
288 |
278 tweet_processes = [] |
289 tweet_processes = [] |
279 |
290 |
280 for i in range(options.consumer_nb): |
291 for i in range(options.consumer_nb): |
|
292 engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2)) |
|
293 Session = sessionmaker(bind=engine) |
281 cprocess = TweetProcess(Session, queue, options.debug, options.token_filename, stop_event) |
294 cprocess = TweetProcess(Session, queue, options.debug, options.token_filename, stop_event) |
282 tweet_processes.append(cprocess) |
295 tweet_processes.append(cprocess) |
283 |
296 |
284 def interupt_handler(signum, frame): |
297 def interupt_handler(signum, frame): |
285 stop_event.set() |
298 stop_event.set() |