3 import codecs |
3 import codecs |
4 import json |
4 import json |
5 import logging |
5 import logging |
6 import re |
6 import re |
7 import sys |
7 import sys |
|
8 import twitter |
8 |
9 |
9 from iri_tweet.models import Tweet, TweetLog, TweetSource, setup_database |
10 from iri_tweet.models import Tweet, TweetLog, TweetSource, setup_database |
10 from iri_tweet.processor import TwitterProcessorStatus |
11 from iri_tweet.processor import TwitterProcessorStatus |
11 from iri_tweet.utils import get_oauth_token, show_progress |
12 from iri_tweet.utils import get_oauth_token, show_progress |
12 |
13 |
13 logger = logging.getLogger(__name__) |
14 logger = logging.getLogger(__name__) |
14 |
15 |
15 def get_option(): |
16 def get_option(): |
16 |
17 |
17 parser = argparse.ArgumentParser(description='Merge tweets databases') |
18 parser = argparse.ArgumentParser(description='Merge tweets databases') |
18 |
19 |
19 parser.add_argument("-l", "--log", dest="logfile", |
20 parser.add_argument("-l", "--log", dest="logfile", |
20 help="log to file", metavar="LOG", default="stderr") |
21 help="log to file", metavar="LOG", default="stderr") |
21 parser.add_argument("-v", dest="verbose", action="count", |
22 parser.add_argument("-v", dest="verbose", action="count", |
29 parser.add_argument("--query-user", dest="query_user", action="store_true", |
30 parser.add_argument("--query-user", dest="query_user", action="store_true", |
30 help="Query twitter for user information", default=False) |
31 help="Query twitter for user information", default=False) |
31 parser.add_argument("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token", |
32 parser.add_argument("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token", |
32 help="Token file name") |
33 help="Token file name") |
33 |
34 |
34 |
35 |
35 parser.add_argument("source", action="store", nargs=1, type=str, metavar="SOURCE") |
36 parser.add_argument("source", action="store", nargs=1, type=str, metavar="SOURCE") |
36 parser.add_argument("target", action="store", nargs=1, type=str, metavar="TARGET") |
37 parser.add_argument("target", action="store", nargs=1, type=str, metavar="TARGET") |
37 |
38 |
38 |
39 |
39 return parser.parse_args() |
40 return parser.parse_args() |
40 |
41 |
41 if __name__ == "__main__": |
42 if __name__ == "__main__": |
42 |
43 |
43 #sys.stdout = codecs.getwriter(sys.stdout.encoding)(sys.stdout) |
44 #sys.stdout = codecs.getwriter(sys.stdout.encoding)(sys.stdout) |
44 writer = None |
45 writer = None |
45 options = get_option() |
46 options = get_option() |
46 |
47 |
47 access_token = None |
48 twitter_auth = None |
48 if options.query_user: |
49 if options.query_user: |
49 access_token = get_oauth_token(options.consumer_key, options.consumer_secret, options.token_filename) |
50 acess_token_key, access_token_secret = get_oauth_token(consumer_key=options.consumer_key, consumer_secret=options.consumer_secret, token_file_path=options.token_filename) |
50 |
51 twitter_auth = twitter.OAuth(acess_token_key, access_token_secret, options.consumer_key, options.consumer_secret) |
|
52 |
51 #open source |
53 #open source |
52 src_conn_str = options.source[0].strip() |
54 src_conn_str = options.source[0].strip() |
53 if not re.match(r"^\w+://.+", src_conn_str): |
55 if not re.match(r"^\w+://.+", src_conn_str): |
54 src_conn_str = 'sqlite:///' + src_conn_str |
56 src_conn_str = 'sqlite:///' + src_conn_str |
55 tgt_conn_str = options.target[0].strip() |
57 tgt_conn_str = options.target[0].strip() |
56 if not re.match(r"^\w+://.+", tgt_conn_str): |
58 if not re.match(r"^\w+://.+", tgt_conn_str): |
57 tgt_conn_str = 'sqlite:///' + tgt_conn_str |
59 tgt_conn_str = 'sqlite:///' + tgt_conn_str |
58 |
60 |
59 |
61 |
60 engine_src, metadata_src, Session_src = setup_database(src_conn_str, echo=((options.verbose-options.quiet)>0), create_all = False) |
62 engine_src, metadata_src, Session_src = setup_database(src_conn_str, echo=((options.verbose-options.quiet)>0), create_all = False) |
61 engine_tgt, metadata_tgt, Session_tgt = setup_database(tgt_conn_str, echo=((options.verbose-options.quiet)>0), create_all = False) |
63 engine_tgt, metadata_tgt, Session_tgt = setup_database(tgt_conn_str, echo=((options.verbose-options.quiet)>0), create_all = False) |
62 |
64 |
63 conn_src = conn_tgt = session_src = session_tgt = None |
65 conn_src = conn_tgt = session_src = session_tgt = None |
64 |
66 |
65 try: |
67 try: |
66 #conn_src = engine_src.connect() |
68 #conn_src = engine_src.connect() |
67 #conn_tgt = engine_tgt.connect() |
69 #conn_tgt = engine_tgt.connect() |
68 session_src = Session_src() |
70 session_src = Session_src() |
69 session_tgt = Session_tgt() |
71 session_tgt = Session_tgt() |
70 |
72 |
71 count_tw = session_src.query(Tweet).count() |
73 count_tw = session_src.query(Tweet).count() |
72 |
74 |
73 if count_tw == 0: |
75 if count_tw == 0: |
74 print("No tweet to process : exit") |
76 print("No tweet to process : exit") |
75 sys.exit() |
77 sys.exit() |
76 |
78 |
77 query_src = session_src.query(Tweet).join(TweetSource).yield_per(100) |
79 query_src = session_src.query(Tweet).join(TweetSource).yield_per(100) |
78 added = 0 |
80 added = 0 |
79 |
81 |
80 for i,tweet in enumerate(query_src): |
82 for i,tweet in enumerate(query_src): |
81 |
83 |
82 tweet_count = session_tgt.query(Tweet).filter(Tweet.id == tweet.id).count() |
84 tweet_count = session_tgt.query(Tweet).filter(Tweet.id == tweet.id).count() |
83 |
85 |
84 progress_text = u"Process: " |
86 progress_text = u"Process: " |
85 if tweet_count == 0: |
87 if tweet_count == 0: |
86 added += 1 |
88 added += 1 |
87 progress_text = u"Adding : " |
89 progress_text = u"Adding : " |
88 tweet_source = tweet.tweet_source.original_json |
90 tweet_source = tweet.tweet_source.original_json |
89 |
91 |
90 tweet_obj = json.loads(tweet_source) |
92 tweet_obj = json.loads(tweet_source) |
91 if 'text' not in tweet_obj: |
93 if 'text' not in tweet_obj: |
92 tweet_log = TweetLog(tweet_source_id=tweet.tweet_source.id, status=TweetLog.TWEET_STATUS['NOT_TWEET']) |
94 tweet_log = TweetLog(tweet_source_id=tweet.tweet_source.id, status=TweetLog.TWEET_STATUS['NOT_TWEET']) |
93 session_tgt.add(tweet_log) |
95 session_tgt.add(tweet_log) |
94 else: |
96 else: |
95 tp = TwitterProcessorStatus(None, tweet_source, None, session_tgt, access_token, options.token_filename, user_query_twitter=options.query_user, logger=logger) |
97 tp = TwitterProcessorStatus(None, tweet_source, None, session_tgt, twitter_auth=twitter_auth, user_query_twitter=options.query_user, logger=logger) |
96 tp.process() |
98 tp.process() |
97 |
99 |
98 session_tgt.flush() |
100 session_tgt.flush() |
99 |
101 |
100 ptext = progress_text + tweet.text |
102 ptext = progress_text + tweet.text |
101 writer = show_progress(i+1, count_tw, ptext.replace("\n",""), 70, writer) |
103 writer = show_progress(i+1, count_tw, ptext.replace("\n",""), 70, writer) |
102 |
104 |
103 session_tgt.commit() |
105 session_tgt.commit() |
104 print(u"%d new tweet added" % (added,)) |
106 print(u"%d new tweet added" % (added,)) |
105 |
107 |
106 finally: |
108 finally: |
107 if session_tgt is not None: |
109 if session_tgt is not None: |
108 session_tgt.close() |
110 session_tgt.close() |
109 if session_src is not None: |
111 if session_src is not None: |
110 session_src.close() |
112 session_src.close() |