76 return value |
76 return value |
77 return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()]) |
77 return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()]) |
78 |
78 |
79 |
79 |
80 class ObjectBufferProxy(object): |
80 class ObjectBufferProxy(object): |
81 def __init__(self, klass, args, kwargs, must_flush): |
81 def __init__(self, klass, args, kwargs, must_flush, instance=None): |
82 self.klass= klass |
82 self.klass= klass |
83 self.args = args |
83 self.args = args |
84 self.kwargs = kwargs |
84 self.kwargs = kwargs |
85 self.must_flush = must_flush |
85 self.must_flush = must_flush |
86 self.instance = None |
86 self.instance = instance |
87 |
87 |
88 def persists(self, session): |
88 def persists(self, session): |
89 new_args = [arg() if callable(arg) else arg for arg in self.args] if self.args is not None else [] |
89 new_args = [arg() if callable(arg) else arg for arg in self.args] if self.args is not None else [] |
90 new_kwargs = dict([(k,v()) if callable(v) else (k,v) for k,v in self.kwargs.items()]) if self.kwargs is not None else {} |
90 new_kwargs = dict([(k,v()) if callable(v) else (k,v) for k,v in self.kwargs.items()]) if self.kwargs is not None else {} |
91 |
91 |
112 def add_object(self, klass, args, kwargs, must_flush): |
112 def add_object(self, klass, args, kwargs, must_flush): |
113 new_proxy = ObjectBufferProxy(klass, args, kwargs, must_flush) |
113 new_proxy = ObjectBufferProxy(klass, args, kwargs, must_flush) |
114 self.__bufferlist.append(new_proxy) |
114 self.__bufferlist.append(new_proxy) |
115 return new_proxy |
115 return new_proxy |
116 |
116 |
|
117 def get(self, klass, **kwargs): |
|
118 for proxy in self.__bufferlist: |
|
119 if proxy.kwargs is None or len(proxy.kwargs) == 0 or proxy.klass != klass: |
|
120 continue |
|
121 found = True |
|
122 for k,v in kwargs.items(): |
|
123 if (k not in proxy.kwargs) or v != proxy.kwargs[k]: |
|
124 found = False |
|
125 break |
|
126 if found: |
|
127 return proxy |
|
128 |
|
129 return None |
|
130 |
|
131 |
117 |
132 |
118 |
133 |
119 |
134 |
120 class TwitterProcessorException(Exception): |
135 class TwitterProcessorException(Exception): |
121 pass |
136 pass |
143 self.source_id = source_id |
158 self.source_id = source_id |
144 self.session = session |
159 self.session = session |
145 self.token_filename = token_filename |
160 self.token_filename = token_filename |
146 self.obj_buffer = ObjectsBuffer() |
161 self.obj_buffer = ObjectsBuffer() |
147 |
162 |
|
163 |
148 def __get_user(self, user_dict): |
164 def __get_user(self, user_dict): |
149 logger.debug("Get user : " + repr(user_dict)) #@UndefinedVariable |
165 logger.debug("Get user : " + repr(user_dict)) #@UndefinedVariable |
150 |
166 |
151 user_id = user_dict.get("id",None) |
167 user_id = user_dict.get("id",None) |
152 user_name = user_dict.get("screen_name", user_dict.get("name", None)) |
168 user_name = user_dict.get("screen_name", user_dict.get("name", None)) |
153 |
169 |
154 if user_id is None and user_name is None: |
170 if user_id is None and user_name is None: |
155 return None |
171 return None |
156 |
172 |
|
173 user = None |
157 if user_id: |
174 if user_id: |
158 user = self.session.query(User).filter(User.id == user_id).first() |
175 user = self.obj_buffer.get(User, id=user_id) |
159 else: |
176 else: |
160 user = self.session.query(User).filter(User.screen_name == user_name).first() |
177 user = self.obj_buffer.get(User, screen_name=user_name) |
161 |
178 |
162 if user is not None: |
179 if user is not None: |
|
180 return user |
|
181 |
|
182 #todo : add methpds to objectbuffer to get buffer user |
|
183 user_obj = None |
|
184 if user_id: |
|
185 user_obj = self.session.query(User).filter(User.id == user_id).first() |
|
186 else: |
|
187 user_obj = self.session.query(User).filter(User.screen_name == user_name).first() |
|
188 |
|
189 if user_obj is not None: |
|
190 user = ObjectBufferProxy(User, None, None, False, user_obj) |
163 return user |
191 return user |
164 |
192 |
165 user_created_at = user_dict.get("created_at", None) |
193 user_created_at = user_dict.get("created_at", None) |
166 |
194 |
167 if user_created_at is None: |
195 if user_created_at is None: |
178 |
206 |
179 user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"]) |
207 user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"]) |
180 if "id" not in user_dict: |
208 if "id" not in user_dict: |
181 return None |
209 return None |
182 |
210 |
183 user = User(**user_dict) |
211 user = self.obj_buffer.add_object(User, None, user_dict, True) |
184 |
212 |
185 self.session.add(user) |
213 return user |
186 self.session.flush() |
214 |
187 |
|
188 return user |
|
189 |
215 |
190 def __process_entity(self, ind, ind_type): |
216 def __process_entity(self, ind, ind_type): |
191 logger.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable |
217 logger.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable |
192 |
218 |
193 ind = clean_keys(ind) |
219 ind = clean_keys(ind) |
194 |
220 |
195 entity_dict = { |
221 entity_dict = { |
196 "indice_start": ind["indices"][0], |
222 "indice_start": ind["indices"][0], |
197 "indice_end" : ind["indices"][1], |
223 "indice_end" : ind["indices"][1], |
198 "tweet_id" : self.tweet.id, |
224 "tweet_id" : self.tweet.id, |
199 "tweet" : self.tweet |
|
200 } |
225 } |
201 |
226 |
202 def process_hashtags(): |
227 def process_hashtags(): |
203 text = ind.get("text", ind.get("hashtag", None)) |
228 text = ind.get("text", ind.get("hashtag", None)) |
204 if text is None: |
229 if text is None: |
205 return None |
230 return None |
206 hashtag = self.session.query(Hashtag).filter(Hashtag.text == text).first() |
231 hashtag = self.obj_buffer.get(Hashtag, text=text) |
|
232 if hashtag is None: |
|
233 hashtag_obj = self.session.query(Hashtag).filter(Hashtag.text == text).first() |
|
234 if hashtag_obj is not None: |
|
235 hashtag = ObjectBufferProxy(Hashtag, None, None, False, hashtag_obj) |
|
236 |
207 if not hashtag: |
237 if not hashtag: |
208 ind["text"] = text |
238 ind["text"] = text |
209 hashtag = Hashtag(**ind) |
239 hashtag = self.obj_buffer.add_object(Hashtag, None, ind, True) |
210 self.session.add(hashtag) |
|
211 self.session.flush() |
|
212 entity_dict['hashtag'] = hashtag |
|
213 entity_dict['hashtag_id'] = hashtag.id |
240 entity_dict['hashtag_id'] = hashtag.id |
214 entity = EntityHashtag(**entity_dict) |
241 return EntityHashtag, entity_dict |
215 return entity |
|
216 |
242 |
217 def process_user_mentions(): |
243 def process_user_mentions(): |
218 user_mention = self.__get_user(ind) |
244 user_mention = self.__get_user(ind) |
219 if user_mention is None: |
245 if user_mention is None: |
220 entity_dict['user'] = None |
|
221 entity_dict['user_id'] = None |
246 entity_dict['user_id'] = None |
222 else: |
247 else: |
223 entity_dict['user'] = user_mention |
|
224 entity_dict['user_id'] = user_mention.id |
248 entity_dict['user_id'] = user_mention.id |
225 entity = EntityUser(**entity_dict) |
249 return EntityUser, entity_dict |
226 return entity |
|
227 |
250 |
228 def process_urls(): |
251 def process_urls(): |
229 url = self.session.query(Url).filter(Url.url == ind["url"]).first() |
252 url = self.obj_buffer.get(Url, url=ind["url"]) |
230 if url is None: |
253 if url is None: |
231 url = Url(**ind) |
254 url_obj = self.session.query(Url).filter(Url.url == ind["url"]).first() |
232 self.session.add(url) |
255 if url_obj is not None: |
233 self.session.flush() |
256 url = ObjectBufferProxy(Url, None, None, False, url_obj) |
234 entity_dict['url'] = url |
257 if url is None: |
|
258 url = self.obj_buffer.add_object(Url, None, ind, True) |
235 entity_dict['url_id'] = url.id |
259 entity_dict['url_id'] = url.id |
236 entity = EntityUrl(**entity_dict) |
260 return EntityUrl, entity_dict |
237 return entity |
|
238 |
261 |
239 #{'': lambda } |
262 #{'': lambda } |
240 entity = { |
263 entity_klass, entity_dict = { |
241 'hashtags': process_hashtags, |
264 'hashtags': process_hashtags, |
242 'user_mentions' : process_user_mentions, |
265 'user_mentions' : process_user_mentions, |
243 'urls' : process_urls |
266 'urls' : process_urls |
244 }[ind_type]() |
267 }[ind_type]() |
245 |
268 |
246 logger.debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable |
269 logger.debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable |
247 if entity: |
270 if entity_klass: |
248 self.session.add(entity) |
271 self.obj_buffer.add_object(entity_klass, None, entity_dict, False) |
249 self.session.flush() |
|
250 |
272 |
251 |
273 |
252 def __process_twitter_stream(self): |
274 def __process_twitter_stream(self): |
253 |
275 |
254 tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count() |
276 tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count() |
259 |
281 |
260 # get or create user |
282 # get or create user |
261 user = self.__get_user(self.json_dict["user"]) |
283 user = self.__get_user(self.json_dict["user"]) |
262 if user is None: |
284 if user is None: |
263 logger.warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable |
285 logger.warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable |
264 ts_copy["user"] = None |
|
265 ts_copy["user_id"] = None |
286 ts_copy["user_id"] = None |
266 else: |
287 else: |
267 ts_copy["user"] = user |
288 ts_copy["user_id"] = user.id |
268 ts_copy["user_id"] = ts_copy["user"].id |
289 |
269 |
290 del(ts_copy['user']) |
270 ts_copy["tweet_source_id"] = self.source_id |
291 ts_copy["tweet_source_id"] = self.source_id |
271 |
292 |
272 self.tweet = Tweet(**ts_copy) |
293 self.tweet = self.obj_buffer.add_object(Tweet, None, ts_copy, True) |
273 self.session.add(self.tweet) |
|
274 self.session.flush() |
|
275 |
294 |
276 # get entities |
295 # get entities |
277 for ind_type, entity_list in self.json_dict["entities"].items(): |
296 for ind_type, entity_list in self.json_dict["entities"].items(): |
278 for ind in entity_list: |
297 for ind in entity_list: |
279 self.__process_entity(ind, ind_type) |
298 self.__process_entity(ind, ind_type) |
312 if user is None: |
331 if user is None: |
313 logger.warning("USER not found " + repr(user_fields)) #@UndefinedVariable |
332 logger.warning("USER not found " + repr(user_fields)) #@UndefinedVariable |
314 tweet_fields["user"] = None |
333 tweet_fields["user"] = None |
315 tweet_fields["user_id"] = None |
334 tweet_fields["user_id"] = None |
316 else: |
335 else: |
317 tweet_fields["user"] = user |
|
318 tweet_fields["user_id"] = user.id |
336 tweet_fields["user_id"] = user.id |
319 |
337 |
320 tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"]) |
338 tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"]) |
321 self.tweet = Tweet(**tweet_fields) |
339 self.tweet = self.obj_buffer.add_object(Tweet, None, tweet_fields, True) |
322 self.session.add(self.tweet) |
|
323 |
340 |
324 text = self.tweet.text |
341 text = self.tweet.text |
325 |
342 |
326 extractor = twitter_text.Extractor(text) |
343 extractor = twitter_text.Extractor(text) |
327 |
344 |
337 |
354 |
338 |
355 |
339 def process(self): |
356 def process(self): |
340 |
357 |
341 if self.source_id is None: |
358 if self.source_id is None: |
342 tweet_source = TweetSource(original_json=self.json_txt); |
359 tweet_source = self.obj_buffer.add_object(TweetSource, None, {'original_json':self.json_txt}, True) |
343 self.session.add(tweet_source) |
|
344 self.session.flush() |
|
345 self.source_id = tweet_source.id |
360 self.source_id = tweet_source.id |
346 |
361 |
347 if "metadata" in self.json_dict: |
362 if "metadata" in self.json_dict: |
348 self.__process_twitter_rest() |
363 self.__process_twitter_rest() |
349 else: |
364 else: |
350 self.__process_twitter_stream() |
365 self.__process_twitter_stream() |
351 |
366 |
352 tweet_log = TweetLog(tweet_source_id = self.source_id, status = TweetLog.TWEET_STATUS['OK']) |
367 self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['OK']}, False) |
353 self.session.add(tweet_log) |
368 |
354 |
369 self.obj_buffer.persists(self.session) |
|
370 |
355 |
371 |
356 def set_logging(options, plogger=None): |
372 def set_logging(options, plogger=None): |
357 |
373 |
358 logging_config = { |
374 logging_config = { |
359 "format" : '%(asctime)s %(levelname)s:%(name)s:%(message)s', |
375 "format" : '%(asctime)s %(levelname)s:%(name)s:%(message)s', |