|
1 import email.utils |
|
2 import logging |
|
3 from models import * |
|
4 import datetime |
|
5 import twitter |
|
6 import twitter_text |
|
7 |
|
8 |
|
9 def parse_date(date_str): |
|
10 ts = email.utils.parsedate_tz(date_str) |
|
11 return datetime.datetime(*ts[0:7]) |
|
12 |
|
13 |
|
14 fields_adapter = { |
|
15 'stream': { |
|
16 "tweet": { |
|
17 "created_at" : adapt_date, |
|
18 "coordinates" : adapt_json, |
|
19 "place" : adapt_json, |
|
20 "geo" : adapt_json, |
|
21 # "original_json" : adapt_json, |
|
22 }, |
|
23 "user": { |
|
24 "created_at" : adapt_date, |
|
25 }, |
|
26 }, |
|
27 'rest': { |
|
28 "tweet" : { |
|
29 "place" : adapt_json, |
|
30 "geo" : adapt_json, |
|
31 "created_at" : adapt_date, |
|
32 # "original_json" : adapt_json, |
|
33 }, |
|
34 }, |
|
35 } |
|
36 |
|
37 # |
|
38 # adapt fields, return a copy of the field_dict with adapted fields |
|
39 # |
|
40 def adapt_fields(fields_dict, adapter_mapping): |
|
41 def adapt_one_field(field, value): |
|
42 if field in adapter_mapping and adapter_mapping[field] is not None: |
|
43 return adapter_mapping[field](value) |
|
44 else: |
|
45 return value |
|
46 return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()]) |
|
47 |
|
48 def get_user(user_dict, session): |
|
49 |
|
50 logging.debug("Get user : " + repr(user_dict)) |
|
51 |
|
52 user_id = user_dict.get("id",None) |
|
53 user_name = user_dict.get("screen_name", user_dict.get("name", None)) |
|
54 |
|
55 if user_id is None and user_name is None: |
|
56 return None |
|
57 |
|
58 if user_id: |
|
59 user = session.query(User).filter(User.id == user_id).first() |
|
60 else: |
|
61 user = session.query(User).filter(User.screen_name == user_name).first() |
|
62 |
|
63 if user is not None: |
|
64 return user |
|
65 |
|
66 user_created_at = user_dict.get("created_at", None) |
|
67 |
|
68 if user_created_at is None: |
|
69 t = twitter.Twitter(auth=twitter.OAuth(ACCESS_TOKEN_KEY, ACCESS_TOKEN_SECRET, CONSUMER_KEY, CONSUMER_SECRET)) |
|
70 try: |
|
71 if user_id: |
|
72 user_dict = t.users.show(user_id=user_id) |
|
73 else: |
|
74 user_dict = t.users.show(screen_name=user_name) |
|
75 except Exception as e: |
|
76 logging.info("get_user : TWITTER ERROR : " + repr(e)) |
|
77 logging.info("get_user : TWITTER ERROR : " + str(e)) |
|
78 |
|
79 user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"]) |
|
80 if "id" not in user_dict: |
|
81 return None |
|
82 |
|
83 user = User(**user_dict) |
|
84 |
|
85 session.add(user) |
|
86 session.flush() |
|
87 |
|
88 return user |
|
89 # if not, if needed get info from twitter |
|
90 # create user |
|
91 # return it |
|
92 |
|
93 def process_entity(ind, ind_type, tweet, session): |
|
94 |
|
95 logging.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) |
|
96 |
|
97 entity_dict = { |
|
98 "indice_start": ind["indices"][0], |
|
99 "indice_end" : ind["indices"][1], |
|
100 "tweet_id" : tweet.id, |
|
101 "tweet" : tweet |
|
102 } |
|
103 |
|
104 def process_hashtags(): |
|
105 text = ind.get("text", ind.get("hashtag", None)) |
|
106 if text is None: |
|
107 return None |
|
108 hashtag = session.query(Hashtag).filter(Hashtag.text == text).first() |
|
109 if not hashtag: |
|
110 ind["text"] = text |
|
111 hashtag = Hashtag(**ind) |
|
112 session.add(hashtag) |
|
113 session.flush() |
|
114 entity_dict['hashtag'] = hashtag |
|
115 entity_dict['hashtag_id'] = hashtag.id |
|
116 entity = EntityHashtag(**entity_dict) |
|
117 return entity |
|
118 |
|
119 def process_user_mentions(): |
|
120 user_mention = get_user(ind, session) |
|
121 if user_mention is None: |
|
122 entity_dict['user'] = None |
|
123 entity_dict['user_id'] = None |
|
124 else: |
|
125 entity_dict['user'] = user_mention |
|
126 entity_dict['user_id'] = user_mention.id |
|
127 entity = EntityUser(**entity_dict) |
|
128 return entity |
|
129 |
|
130 def process_urls(): |
|
131 url = session.query(Url).filter(Url.url == ind["url"]).first() |
|
132 if url is None: |
|
133 url = Url(**ind) |
|
134 session.add(url) |
|
135 session.flush() |
|
136 entity_dict['url'] = url |
|
137 entity_dict['url_id'] = url.id |
|
138 entity = EntityUrl(**entity_dict) |
|
139 return entity |
|
140 |
|
141 #{'': lambda } |
|
142 entity = { |
|
143 'hashtags': process_hashtags, |
|
144 'user_mentions' : process_user_mentions, |
|
145 'urls' : process_urls |
|
146 }[ind_type]() |
|
147 |
|
148 logging.debug("Process_entity entity_dict: " + repr(entity_dict)) |
|
149 if entity: |
|
150 session.add(entity) |
|
151 |
|
152 |
|
153 |
|
154 def from_twitter_rest(ts, jsontxt, session): |
|
155 |
|
156 tweet_nb = session.query(Tweet).filter(Tweet.id == ts["id"]).count() |
|
157 if tweet_nb > 0: |
|
158 return |
|
159 |
|
160 tweet_fields = { |
|
161 'created_at': ts["created_at"], |
|
162 'favorited': False, |
|
163 'id': ts["id"], |
|
164 'id_str': ts["id_str"], |
|
165 #'in_reply_to_screen_name': ts["to_user"], |
|
166 'in_reply_to_user_id': ts["to_user_id"], |
|
167 'in_reply_to_user_id_str': ts["to_user_id_str"], |
|
168 #'place': ts["place"], |
|
169 'source': ts["source"], |
|
170 'text': ts["text"], |
|
171 'truncated': False, |
|
172 'original_json' : jsontxt, |
|
173 } |
|
174 |
|
175 #user |
|
176 |
|
177 user_fields = { |
|
178 'id' : ts['from_user_id'], |
|
179 'id_str' : ts['from_user_id_str'], |
|
180 'lang' : ts['iso_language_code'], |
|
181 'profile_image_url' : ts["profile_image_url"], |
|
182 'screen_name' : ts["from_user"], |
|
183 } |
|
184 |
|
185 user = get_user(user_fields, session) |
|
186 if user is None: |
|
187 log.warning("USER not found " + repr(user_fields)) |
|
188 tweet_fields["user"] = None |
|
189 tweet_fields["user_id"] = None |
|
190 else: |
|
191 tweet_fields["user"] = user |
|
192 tweet_fields["user_id"] = user.id |
|
193 |
|
194 tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"]) |
|
195 tweet = Tweet(**tweet_fields) |
|
196 session.add(tweet) |
|
197 |
|
198 text = tweet.text |
|
199 |
|
200 extractor = twitter_text.Extractor(text) |
|
201 |
|
202 for ind in extractor.extract_hashtags_with_indices(): |
|
203 process_entity(ind, "hashtags", tweet, session) |
|
204 |
|
205 for ind in extractor.extract_mentioned_screen_names_with_indices(): |
|
206 process_entity(ind, "user_mentions", tweet, session) |
|
207 |
|
208 for ind in extractor.extract_urls_with_indices(): |
|
209 process_entity(ind, "urls", tweet, session) |
|
210 |
|
211 |
|
212 |
|
213 |
|
214 def from_twitter_stream(ts, jsontxt, session): |
|
215 |
|
216 tweet_nb = session.query(Tweet).filter(Tweet.id == ts["id"]).count() |
|
217 if tweet_nb > 0: |
|
218 return |
|
219 |
|
220 ts_copy = adapt_fields(ts, fields_adapter["stream"]["tweet"]) |
|
221 |
|
222 # get or create user |
|
223 user = get_user(ts["user"], session) |
|
224 if user is None: |
|
225 log.warning("USER not found " + repr(ts["user"])) |
|
226 ts_copy["user"] = None |
|
227 ts_copy["user_id"] = None |
|
228 else: |
|
229 ts_copy["user"] = user |
|
230 ts_copy["user_id"] = ts_copy["user"].id |
|
231 ts_copy["original_json"] = jsontxt |
|
232 |
|
233 tweet = Tweet(**ts_copy) |
|
234 session.add(tweet) |
|
235 session.flush() |
|
236 |
|
237 # get entities |
|
238 for ind_type, entity_list in ts["entities"].items(): |
|
239 for ind in entity_list: |
|
240 process_entity(ind, ind_type, tweet, session) |