43 return adapter_mapping[field](value) |
59 return adapter_mapping[field](value) |
44 else: |
60 else: |
45 return value |
61 return value |
46 return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()]) |
62 return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()]) |
47 |
63 |
48 def get_user(user_dict, session): |
64 |
49 |
65 |
50 logging.debug("Get user : " + repr(user_dict)) |
66 class TwitterProcessorException(Exception): |
51 |
67 pass |
52 user_id = user_dict.get("id",None) |
68 |
53 user_name = user_dict.get("screen_name", user_dict.get("name", None)) |
69 class TwitterProcessor(object): |
54 |
70 |
55 if user_id is None and user_name is None: |
71 def __init__(self, json_dict, json_txt, session): |
56 return None |
72 |
57 |
73 if json_dict is None and json_txt is None: |
58 if user_id: |
74 raise TwitterProcessorException("No json") |
59 user = session.query(User).filter(User.id == user_id).first() |
75 |
|
76 if json_dict is None: |
|
77 self.json_dict = json.loads(json_txt) |
|
78 else: |
|
79 self.json_dict = json_dict |
|
80 |
|
81 if not json_txt: |
|
82 self.json_txt = json.dumps(json_dict) |
|
83 else: |
|
84 self.json_txt = json_txt |
|
85 |
|
86 if "id" not in self.json_dict: |
|
87 raise TwitterProcessorException("No id in json") |
|
88 |
|
89 self.session = session |
|
90 |
|
91 def __get_user(self, user_dict): |
|
92 logging.debug("Get user : " + repr(user_dict)) |
|
93 |
|
94 user_id = user_dict.get("id",None) |
|
95 user_name = user_dict.get("screen_name", user_dict.get("name", None)) |
|
96 |
|
97 if user_id is None and user_name is None: |
|
98 return None |
|
99 |
|
100 if user_id: |
|
101 user = self.session.query(User).filter(User.id == user_id).first() |
|
102 else: |
|
103 user = self.session.query(User).filter(User.screen_name == user_name).first() |
|
104 |
|
105 if user is not None: |
|
106 return user |
|
107 |
|
108 user_created_at = user_dict.get("created_at", None) |
|
109 |
|
110 if user_created_at is None: |
|
111 acess_token_key, access_token_secret = get_oauth_token() |
|
112 t = twitter.Twitter(auth=twitter.OAuth(token_key, token_secret, CONSUMER_KEY, CONSUMER_SECRET)) |
|
113 try: |
|
114 if user_id: |
|
115 user_dict = t.users.show(user_id=user_id) |
|
116 else: |
|
117 user_dict = t.users.show(screen_name=user_name) |
|
118 except Exception as e: |
|
119 logging.info("get_user : TWITTER ERROR : " + repr(e)) |
|
120 logging.info("get_user : TWITTER ERROR : " + str(e)) |
|
121 |
|
122 user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"]) |
|
123 if "id" not in user_dict: |
|
124 return None |
|
125 |
|
126 user = User(**user_dict) |
|
127 |
|
128 self.session.add(user) |
|
129 self.session.flush() |
|
130 |
|
131 return user |
|
132 |
|
133 def __process_entity(self, ind, ind_type): |
|
134 logging.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) |
|
135 |
|
136 entity_dict = { |
|
137 "indice_start": ind["indices"][0], |
|
138 "indice_end" : ind["indices"][1], |
|
139 "tweet_id" : self.tweet.id, |
|
140 "tweet" : self.tweet |
|
141 } |
|
142 |
|
143 def process_hashtags(): |
|
144 text = ind.get("text", ind.get("hashtag", None)) |
|
145 if text is None: |
|
146 return None |
|
147 hashtag = self.session.query(Hashtag).filter(Hashtag.text == text).first() |
|
148 if not hashtag: |
|
149 ind["text"] = text |
|
150 hashtag = Hashtag(**ind) |
|
151 self.session.add(hashtag) |
|
152 self.session.flush() |
|
153 entity_dict['hashtag'] = hashtag |
|
154 entity_dict['hashtag_id'] = hashtag.id |
|
155 entity = EntityHashtag(**entity_dict) |
|
156 return entity |
|
157 |
|
158 def process_user_mentions(): |
|
159 user_mention = self.__get_user(ind) |
|
160 if user_mention is None: |
|
161 entity_dict['user'] = None |
|
162 entity_dict['user_id'] = None |
|
163 else: |
|
164 entity_dict['user'] = user_mention |
|
165 entity_dict['user_id'] = user_mention.id |
|
166 entity = EntityUser(**entity_dict) |
|
167 return entity |
|
168 |
|
169 def process_urls(): |
|
170 url = self.session.query(Url).filter(Url.url == ind["url"]).first() |
|
171 if url is None: |
|
172 url = Url(**ind) |
|
173 self.session.add(url) |
|
174 self.session.flush() |
|
175 entity_dict['url'] = url |
|
176 entity_dict['url_id'] = url.id |
|
177 entity = EntityUrl(**entity_dict) |
|
178 return entity |
|
179 |
|
180 #{'': lambda } |
|
181 entity = { |
|
182 'hashtags': process_hashtags, |
|
183 'user_mentions' : process_user_mentions, |
|
184 'urls' : process_urls |
|
185 }[ind_type]() |
|
186 |
|
187 logging.debug("Process_entity entity_dict: " + repr(entity_dict)) |
|
188 if entity: |
|
189 self.session.add(entity) |
|
190 self.session.flush() |
|
191 |
|
192 |
|
193 def __process_twitter_stream(self): |
|
194 |
|
195 tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count() |
|
196 if tweet_nb > 0: |
|
197 return |
|
198 |
|
199 ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"]) |
|
200 |
|
201 # get or create user |
|
202 user = self.__get_user(self.json_dict["user"]) |
|
203 if user is None: |
|
204 log.warning("USER not found " + repr(ts["user"])) |
|
205 ts_copy["user"] = None |
|
206 ts_copy["user_id"] = None |
|
207 else: |
|
208 ts_copy["user"] = user |
|
209 ts_copy["user_id"] = ts_copy["user"].id |
|
210 ts_copy["original_json"] = self.json_txt |
|
211 |
|
212 self.tweet = Tweet(**ts_copy) |
|
213 self.session.add(self.tweet) |
|
214 self.session.flush() |
|
215 |
|
216 # get entities |
|
217 for ind_type, entity_list in self.json_dict["entities"].items(): |
|
218 for ind in entity_list: |
|
219 self.__process_entity(ind, ind_type) |
|
220 |
|
221 |
|
222 def __process_twitter_rest(self): |
|
223 tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count() |
|
224 if tweet_nb > 0: |
|
225 return |
|
226 |
|
227 tweet_fields = { |
|
228 'created_at': self.json_dict["created_at"], |
|
229 'favorited': False, |
|
230 'id': self.json_dict["id"], |
|
231 'id_str': self.json_dict["id_str"], |
|
232 #'in_reply_to_screen_name': ts["to_user"], |
|
233 'in_reply_to_user_id': self.json_dict["to_user_id"], |
|
234 'in_reply_to_user_id_str': self.json_dict["to_user_id_str"], |
|
235 #'place': ts["place"], |
|
236 'source': self.json_dict["source"], |
|
237 'text': self.json_dict["text"], |
|
238 'truncated': False, |
|
239 'original_json' : self.json_txt, |
|
240 } |
|
241 |
|
242 #user |
|
243 |
|
244 user_fields = { |
|
245 'id' : self.json_dict['from_user_id'], |
|
246 'id_str' : self.json_dict['from_user_id_str'], |
|
247 'lang' : self.json_dict['iso_language_code'], |
|
248 'profile_image_url' : self.json_dict["profile_image_url"], |
|
249 'screen_name' : self.json_dict["from_user"], |
|
250 } |
|
251 |
|
252 user = self.__get_user(user_fields) |
|
253 if user is None: |
|
254 log.warning("USER not found " + repr(user_fields)) |
|
255 tweet_fields["user"] = None |
|
256 tweet_fields["user_id"] = None |
|
257 else: |
|
258 tweet_fields["user"] = user |
|
259 tweet_fields["user_id"] = user.id |
|
260 |
|
261 tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"]) |
|
262 self.tweet = Tweet(**tweet_fields) |
|
263 session.add(self.tweet) |
|
264 |
|
265 text = self.tweet.text |
|
266 |
|
267 extractor = twitter_text.Extractor(text) |
|
268 |
|
269 for ind in extractor.extract_hashtags_with_indices(): |
|
270 self.__process_entity(ind, "hashtags") |
|
271 |
|
272 for ind in extractor.extract_mentioned_screen_names_with_indices(): |
|
273 self.__process_entity(ind, "user_mentions") |
|
274 |
|
275 for ind in extractor.extract_urls_with_indices(): |
|
276 self.__process_entity(ind, "urls") |
|
277 |
|
278 self.session.flush() |
|
279 |
|
280 |
|
281 def process(self): |
|
282 if "metadata" in self.json_dict: |
|
283 self.__process_twitter_rest() |
|
284 else: |
|
285 self.__process_twitter_stream() |
|
286 |
|
287 |
|
288 def set_logging(options): |
|
289 |
|
290 logging_config = {} |
|
291 |
|
292 if options.logfile == "stdout": |
|
293 logging_config["stream"] = sys.stdout |
|
294 elif options.logfile == "stderr": |
|
295 logging_config["stream"] = sys.stderr |
60 else: |
296 else: |
61 user = session.query(User).filter(User.screen_name == user_name).first() |
297 logging_config["filename"] = options.logfile |
62 |
298 |
63 if user is not None: |
299 logging_config["level"] = max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)) |
64 return user |
300 logging.basicConfig(**logging_config) |
65 |
301 |
66 user_created_at = user_dict.get("created_at", None) |
302 options.debug = (options.verbose-options.quiet > 0) |
67 |
303 |
68 if user_created_at is None: |
304 def set_logging_options(parser): |
69 t = twitter.Twitter(auth=twitter.OAuth(ACCESS_TOKEN_KEY, ACCESS_TOKEN_SECRET, CONSUMER_KEY, CONSUMER_SECRET)) |
305 parser.add_option("-l", "--log", dest="logfile", |
70 try: |
306 help="log to file", metavar="LOG", default="stderr") |
71 if user_id: |
307 parser.add_option("-v", dest="verbose", action="count", |
72 user_dict = t.users.show(user_id=user_id) |
308 help="verbose", metavar="VERBOSE", default=0) |
73 else: |
309 parser.add_option("-q", dest="quiet", action="count", |
74 user_dict = t.users.show(screen_name=user_name) |
310 help="quiet", metavar="QUIET", default=0) |
75 except Exception as e: |
|
76 logging.info("get_user : TWITTER ERROR : " + repr(e)) |
|
77 logging.info("get_user : TWITTER ERROR : " + str(e)) |
|
78 |
|
79 user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"]) |
|
80 if "id" not in user_dict: |
|
81 return None |
|
82 |
|
83 user = User(**user_dict) |
|
84 |
|
85 session.add(user) |
|
86 session.flush() |
|
87 |
|
88 return user |
|
89 # if not, if needed get info from twitter |
|
90 # create user |
|
91 # return it |
|
92 |
|
93 def process_entity(ind, ind_type, tweet, session): |
|
94 |
|
95 logging.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) |
|
96 |
|
97 entity_dict = { |
|
98 "indice_start": ind["indices"][0], |
|
99 "indice_end" : ind["indices"][1], |
|
100 "tweet_id" : tweet.id, |
|
101 "tweet" : tweet |
|
102 } |
|
103 |
|
104 def process_hashtags(): |
|
105 text = ind.get("text", ind.get("hashtag", None)) |
|
106 if text is None: |
|
107 return None |
|
108 hashtag = session.query(Hashtag).filter(Hashtag.text == text).first() |
|
109 if not hashtag: |
|
110 ind["text"] = text |
|
111 hashtag = Hashtag(**ind) |
|
112 session.add(hashtag) |
|
113 session.flush() |
|
114 entity_dict['hashtag'] = hashtag |
|
115 entity_dict['hashtag_id'] = hashtag.id |
|
116 entity = EntityHashtag(**entity_dict) |
|
117 return entity |
|
118 |
|
119 def process_user_mentions(): |
|
120 user_mention = get_user(ind, session) |
|
121 if user_mention is None: |
|
122 entity_dict['user'] = None |
|
123 entity_dict['user_id'] = None |
|
124 else: |
|
125 entity_dict['user'] = user_mention |
|
126 entity_dict['user_id'] = user_mention.id |
|
127 entity = EntityUser(**entity_dict) |
|
128 return entity |
|
129 |
|
130 def process_urls(): |
|
131 url = session.query(Url).filter(Url.url == ind["url"]).first() |
|
132 if url is None: |
|
133 url = Url(**ind) |
|
134 session.add(url) |
|
135 session.flush() |
|
136 entity_dict['url'] = url |
|
137 entity_dict['url_id'] = url.id |
|
138 entity = EntityUrl(**entity_dict) |
|
139 return entity |
|
140 |
|
141 #{'': lambda } |
|
142 entity = { |
|
143 'hashtags': process_hashtags, |
|
144 'user_mentions' : process_user_mentions, |
|
145 'urls' : process_urls |
|
146 }[ind_type]() |
|
147 |
|
148 logging.debug("Process_entity entity_dict: " + repr(entity_dict)) |
|
149 if entity: |
|
150 session.add(entity) |
|
151 |
|
152 |
|
153 |
|
154 def from_twitter_rest(ts, jsontxt, session): |
|
155 |
|
156 tweet_nb = session.query(Tweet).filter(Tweet.id == ts["id"]).count() |
|
157 if tweet_nb > 0: |
|
158 return |
|
159 |
|
160 tweet_fields = { |
|
161 'created_at': ts["created_at"], |
|
162 'favorited': False, |
|
163 'id': ts["id"], |
|
164 'id_str': ts["id_str"], |
|
165 #'in_reply_to_screen_name': ts["to_user"], |
|
166 'in_reply_to_user_id': ts["to_user_id"], |
|
167 'in_reply_to_user_id_str': ts["to_user_id_str"], |
|
168 #'place': ts["place"], |
|
169 'source': ts["source"], |
|
170 'text': ts["text"], |
|
171 'truncated': False, |
|
172 'original_json' : jsontxt, |
|
173 } |
|
174 |
|
175 #user |
|
176 |
|
177 user_fields = { |
|
178 'id' : ts['from_user_id'], |
|
179 'id_str' : ts['from_user_id_str'], |
|
180 'lang' : ts['iso_language_code'], |
|
181 'profile_image_url' : ts["profile_image_url"], |
|
182 'screen_name' : ts["from_user"], |
|
183 } |
|
184 |
|
185 user = get_user(user_fields, session) |
|
186 if user is None: |
|
187 log.warning("USER not found " + repr(user_fields)) |
|
188 tweet_fields["user"] = None |
|
189 tweet_fields["user_id"] = None |
|
190 else: |
|
191 tweet_fields["user"] = user |
|
192 tweet_fields["user_id"] = user.id |
|
193 |
|
194 tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"]) |
|
195 tweet = Tweet(**tweet_fields) |
|
196 session.add(tweet) |
|
197 |
|
198 text = tweet.text |
|
199 |
|
200 extractor = twitter_text.Extractor(text) |
|
201 |
|
202 for ind in extractor.extract_hashtags_with_indices(): |
|
203 process_entity(ind, "hashtags", tweet, session) |
|
204 |
|
205 for ind in extractor.extract_mentioned_screen_names_with_indices(): |
|
206 process_entity(ind, "user_mentions", tweet, session) |
|
207 |
|
208 for ind in extractor.extract_urls_with_indices(): |
|
209 process_entity(ind, "urls", tweet, session) |
|
210 |
|
211 |
|
212 |
|
213 |
|
214 def from_twitter_stream(ts, jsontxt, session): |
|
215 |
|
216 tweet_nb = session.query(Tweet).filter(Tweet.id == ts["id"]).count() |
|
217 if tweet_nb > 0: |
|
218 return |
|
219 |
|
220 ts_copy = adapt_fields(ts, fields_adapter["stream"]["tweet"]) |
|
221 |
|
222 # get or create user |
|
223 user = get_user(ts["user"], session) |
|
224 if user is None: |
|
225 log.warning("USER not found " + repr(ts["user"])) |
|
226 ts_copy["user"] = None |
|
227 ts_copy["user_id"] = None |
|
228 else: |
|
229 ts_copy["user"] = user |
|
230 ts_copy["user_id"] = ts_copy["user"].id |
|
231 ts_copy["original_json"] = jsontxt |
|
232 |
|
233 tweet = Tweet(**ts_copy) |
|
234 session.add(tweet) |
|
235 session.flush() |
|
236 |
|
237 # get entities |
|
238 for ind_type, entity_list in ts["entities"].items(): |
|
239 for ind in entity_list: |
|
240 process_entity(ind, ind_type, tweet, session) |
|