script/utils/export_twitter_alchemy.py
author ymh <ymh.work@gmail.com>
Tue, 12 Jan 2021 13:47:11 +0100
changeset 1538 4e5ee8e79e7f
parent 1523 53f1b28188f0
permissions -rw-r--r--
add command to export zoom chat files as annotation

#!/usr/bin/env python
# coding=utf-8

import argparse
import bisect
import datetime
import json
import os.path
import re
import sys
import uuid  # @UnresolvedImport

import requests
from lxml import etree
from sqlalchemy import BigInteger, Column, Table, bindparam, event
from sqlalchemy.sql import func, select

from dateutil.parser import parse as parse_date_raw
from dateutil.tz import tzutc
from iri_tweet.models import Tweet, User, setup_database
from iri_tweet.utils import (get_filter_query, get_logger, set_logging,
                             set_logging_options)

#class TweetExclude(object):
#    def __init__(self, id):
#        self.id = id
#
#    def __repr__(self):
#        return "<TweetExclude(id=%d)>" % (self.id)

LDT_CONTENT_REST_API_PATH = "api/ldt/1.0/contents/"
LDT_PROJECT_REST_API_PATH = "api/ldt/1.0/projects/"

def parse_date(datestr):
    res = parse_date_raw(datestr)
    if res.tzinfo is None:
        res = res.replace(tzinfo=tzutc())
    return res


def re_fn(expr, item):
    reg = re.compile(expr, re.I)
    res = reg.search(item)
    if res:
        get_logger().debug("re_fn : " + repr(expr) + "~" + repr(item)) #@UndefinedVariable
    return res is not None

def parse_polemics_1(tw, extended_mode):
    """
    parse polemics in text and return a list of polemic code. None if not polemic found
    """
    polemics = {}
    for m in re.finditer(r"(\+\+|\-\-|\?\?|\=\=)",tw.text):
        pol_link = {
            '++' : 'OK',
            '--' : 'KO',
            '??' : 'Q',
            '==' : 'REF'}[m.group(1)]
        polemics[pol_link] = pol_link

    if extended_mode:
        if "?" in tw.text:
            polemics["Q"] = "Q"

        for entity in tw.entity_list:
            if entity.type == "entity_url":
                polemics["REF"] = "REF"

    if len(polemics) > 0:
        return polemics.keys()
    else:
        return None

def parse_polemics_2(tw, extended_mode):
    """
    parse polemics in text and return a list of polemic code. None if not polemic found
    """
    polemics = {}
    for m in re.finditer(r"(\+\+|\!\!|\?\?|\=\=)",tw.text):
        pol_link = {
            '++' : 'OK',
            '!!' : 'KO',
            '??' : 'Q',
            '==' : 'REF'}[m.group(1)]
        polemics[pol_link] = pol_link

    if extended_mode:
        if "?" in tw.text:
            polemics["Q"] = "Q"

        for entity in tw.entity_list:
            if entity.type == "entity_url":
                polemics["REF"] = "REF"

    if len(polemics) > 0:
        return polemics.keys()
    else:
        return None

def parse_polemics_3(tw, extended_mode):
    """
    parse polemics in text and return a list of polemic code. None if not polemic found
    """
    polemics = {}
    for m in re.finditer(r"(\+\+|\?\?|\*\*|\=\=)",tw.text):
        pol_link = {
            '++' : 'OK',
            '??' : 'KO',
            '**' : 'REF',
            '==' : 'Q'}[m.group(1)]
        polemics[pol_link] = pol_link

    if extended_mode:
        for entity in tw.entity_list:
            if entity.type == "entity_url":
                polemics["REF"] = "REF"

    if len(polemics) > 0:
        return polemics.keys()
    else:
        return None


protocol_version_map = {
    "1" : parse_polemics_1,
    "2" : parse_polemics_2,
    "3" : parse_polemics_3
}

def get_options():

    parser = argparse.ArgumentParser(description="All date should be given using iso8601 format. If no timezone is used, the date is considered as UTC")

    parser.add_argument("-f", "--file", dest="filename",
                      help="write export to file", metavar="FILE", default="project.ldt")
    parser.add_argument("-d", "--database", dest="database",
                      help="Input database", metavar="DATABASE")
    parser.add_argument("-a", "--annotation-protocol", dest="protocol_version",
                      help="annotation protocol version", metavar="PROTOCOL_VERSION",
                      default="2")
    parser.add_argument("-s", "--start-date", dest="start_date",
                      help="start date", metavar="START_DATE", default=None)
    parser.add_argument("-e", "--end-date", dest="end_date",
                      help="end date", metavar="END_DATE", default=None)
    parser.add_argument("-I", "--content-file", dest="content_file",
                      help="Content file", metavar="CONTENT_FILE")
    parser.add_argument("-c", "--content", dest="content",
                      help="Content url", metavar="CONTENT")
    parser.add_argument("-V", "--video-url", dest="video",
                      help="video url", metavar="VIDEO")
    parser.add_argument("-i", "--content-id", dest="content_id",
                      help="Content id", metavar="CONTENT_ID")
    parser.add_argument("-x", "--exclude", dest="exclude",
                      help="file containing the id to exclude", metavar="EXCLUDE")
    parser.add_argument("-C", "--color", dest="color",
                      help="Color code", metavar="COLOR", default="16763904")
    parser.add_argument("-H", "--hashtag", dest="hashtag",
                      help="Hashtag", metavar="HASHTAG", default=[], action="append")
    parser.add_argument("-D", "--duration", dest="duration", type=int,
                      help="Duration", metavar="DURATION", default=None)
    parser.add_argument("-n", "--name", dest="name",
                      help="Cutting name", metavar="NAME", default="Tweets")
    parser.add_argument("-R", "--replace", dest="replace", action="store_true",
                      help="Replace tweet ensemble", default=False)
    parser.add_argument("-m", "--merge", dest="merge", action="store_true",
                      help="merge tweet ensemble, choose the first ensemble", default=False)
    parser.add_argument("-L", "--list-conf", dest="listconf",
                      help="list of file to process", metavar="LIST_CONF", default=None)
    parser.add_argument("-E", "--extended", dest="extended_mode", action="store_true",
                      help="Trigger polemic extended mode", default=False)
    parser.add_argument("-b", "--base-url", dest="base_url",
                      help="base URL of the platform", metavar="BASE_URL", default="http://ldt.iri.centrepompidou.fr/ldtplatform/")
    parser.add_argument("-p", "--project", dest="project_id",
                      help="Project id", metavar="PROJECT_ID", default=None)
    parser.add_argument("-P", "--post-param", dest="post_param",
                      help="Post param", metavar="POST_PARAM", default=None)
    parser.add_argument("--user-whitelist", dest="user_whitelist", action="store",
                      help="A list of user screen name", metavar="USER_WHITELIST",default=None)
    parser.add_argument("--cut", dest="cuts", action="append",
                      help="A cut with the forma <ts in ms>::<duration>", metavar="CUT", default=[])

    set_logging_options(parser)

    return (parser.parse_args(), parser)


def find_delta(deltas, ts):
    i = bisect.bisect_right(deltas, (ts+1,0))
    if i:
        return deltas[i-1]
    return (0,0)


def parse_duration(s):
    try:
        return int(s)
    except ValueError:
        parts = s.split(":")
        if len(parts) < 2:
            raise ValueError("Bad duration format")
        time_params = {
            'hours': int(parts[0]),
            'minutes': int(parts[1]),
            'seconds': int(parts[2]) if len(parts)>2 else 0
        }
        return int(round(datetime.timedelta(**time_params).total_seconds()*1000))


if __name__ == "__main__" :

    (options, parser) = get_options()

    set_logging(options)

    get_logger().debug("OPTIONS : " + repr(options)) #@UndefinedVariable


    deltas = [(0,0)]
    total_delta = 0
    if options.cuts:
        cuts_raw = sorted([tuple([parse_duration(s) for s in c.split("::")]) for c in options.cuts])
        for c, d in cuts_raw:
            deltas.append((c+total_delta, -1))
            total_delta += d
            deltas.append((c+total_delta, total_delta))

    if len(sys.argv) == 1 or options.database is None:
        parser.print_help()
        sys.exit(1)

    conn_str = options.database.strip()
    if not re.match(r"^\w+://.+", conn_str):
        conn_str = 'sqlite:///' + conn_str

    engine, metadata, Session = setup_database(conn_str, echo=((options.verbose-options.quiet)>0), create_all = False)


    conn = None
    try :
        conn = engine.connect()
        @event.listens_for(conn, "begin")
        def do_begin(conn):
            conn.connection.create_function('regexp', 2, re_fn)
        session = None
        try :
            session = Session(bind=conn)
            tweet_exclude_table = Table("tweet_exclude", metadata, Column('id', BigInteger, primary_key=True), prefixes=['TEMPORARY'])
            #mapper(TweetExclude, tweet_exclude_table)
            metadata.create_all(bind=conn, tables=[tweet_exclude_table])

            if options.exclude and os.path.exists(options.exclude):
                with open(options.exclude, 'r+') as f:
                    tei = tweet_exclude_table.insert() # pylint: disable=E1120
                    ex_regexp = re.compile(r"(?P<field>\w+)(?P<op>[~=])(?P<value>.+)", re.I)
                    for line in f:
                        res = ex_regexp.match(line.strip())
                        if res:
                            if res.group('field') == "id":
                                conn.execute(tei.values(id=res.group('value')))
                            else:
                                exclude_query = session.query(Tweet)
                                filter_obj = Tweet
                                filter_field = res.group('field')
                                if filter_field.startswith("user__"):
                                    exclude_query = exclude_query.outerjoin(User, Tweet.user_id==User.id)
                                    filter_obj = User
                                    filter_field = filter_field[len("user__"):]

                                if res.group('op') == "=":
                                    exclude_query = exclude_query.filter(getattr(filter_obj, filter_field) == res.group('value'))
                                else:
                                    exclude_query = exclude_query.filter(getattr(filter_obj, filter_field).op('regexp')(res.group('value')))

                                test_query = select([func.count()]).where(tweet_exclude_table.c.id==bindparam('t_id'))
                                for t in exclude_query.all():
                                    get_logger().debug("t : " + repr(t))
                                    if conn.execute(test_query, t_id=t.id).fetchone()[0] == 0:
                                        conn.execute(tei.values(id=t.id))

            user_whitelist_file = options.user_whitelist
            user_whitelist = None

            if options.listconf:

                parameters = []
                confdoc = etree.parse(options.listconf)
                for node in confdoc.xpath("/twitter_export/file"):
                    params = {}
                    for snode in node:
                        if snode.tag == "path":
                            params['content_file'] = snode.text
                            params['content_file_write'] = snode.text
                        elif snode.tag == "project_id":
                            params['content_file'] = options.base_url + LDT_PROJECT_REST_API_PATH + snode.text + "/?format=json"
                            params['content_file_write'] = options.base_url + LDT_PROJECT_REST_API_PATH + snode.text + "/?format=json"
                            params['project_id'] = snode.text
                        elif snode.tag == "start_date":
                            params['start_date'] = snode.text
                        elif snode.tag == "end_date":
                            params['end_date'] = snode.text
                        elif snode.tag == "duration":
                            params['duration'] = int(snode.text)
                        elif snode.tag == "hashtags":
                            params['hashtags'] = [snode.text]
                    if options.hashtag or 'hashtags' not in params :
                        params['hashtags'] = options.hashtag
                    parameters.append(params)
            else:
                if options.project_id:
                    content_file = options.base_url + LDT_PROJECT_REST_API_PATH + options.project_id + "/?format=json"
                else:
                    content_file = options.content_file
                parameters = [{
                    'start_date': options.start_date,
                    'end_date' : options.end_date,
                    'duration' : options.duration,
                    'content_file' : content_file,
                    'content_file_write' : content_file,
                    'hashtags' : options.hashtag,
                    'project_id' : options.project_id
                }]
            post_param = {}
            if options.post_param:
                post_param = json.loads(options.post_param)

            for params in parameters:

                get_logger().debug("PARAMETERS " + repr(params)) #@UndefinedVariable

                start_date_str = params.get("start_date",None)
                end_date_str = params.get("end_date", None)
                duration = params.get("duration", None)
                content_file = params.get("content_file", None)
                content_file_write = params.get("content_file_write", None)
                hashtags = list(set(params.get('hashtags', [])))

                if user_whitelist_file:
                    with open(user_whitelist_file, 'r+') as f:
                        user_whitelist = list(set([s.strip() for s in f]))

                start_date = None
                if start_date_str:
                    start_date = parse_date(start_date_str)

                root = None
                ensemble_parent = None

                #to do : analyse situation ldt or iri ? filename set or not ?

                if content_file and content_file.find("http") == 0:

                    get_logger().debug("url : " + content_file) #@UndefinedVariable

                    r = requests.get(content_file, params=post_param)
                    get_logger().debug("url response " + repr(r) + " content " + repr(r.text)) #@UndefinedVariable
                    project = r.json()
                    text_match = re.match(r"\<\?\s*xml.*?\?\>(.*)", project['ldt'], re.I|re.S)
                    root = etree.fromstring(text_match.group(1) if text_match else project['ldt'])

                elif content_file and os.path.exists(content_file):

                    doc = etree.parse(content_file)
                    root = doc.getroot()

                content_id = None

                if root is None:

                    root = etree.Element("iri")

                    project = etree.SubElement(root, "project", {"abstract":"Polemics Tweets","title":"Polemic Tweets", "user":"IRI Web", "id":str(uuid.uuid4())})

                    medias = etree.SubElement(root, "medias")
                    media = etree.SubElement(medias, "media", {"pict":"", "src":options.content, "video":options.video, "id":options.content_id, "extra":""})

                    annotations = etree.SubElement(root, "annotations")
                    content = etree.SubElement(annotations, "content", {"id":options.content_id})
                    ensemble_parent = content

                    content_id = options.content_id


                if ensemble_parent is None:
                    file_type = None
                    for node in root:
                        if node.tag == "project":
                            file_type = "ldt"
                            break
                        elif node.tag == "head":
                            file_type = "iri"
                            break

                    if file_type == "ldt":
                        media_nodes = root.xpath("//media")
                        if len(media_nodes) > 0:
                            media = media_nodes[0]
                        annotations_node = root.find("annotations")
                        if annotations_node is None:
                            annotations_node = etree.SubElement(root, "annotations")
                        content_node = annotations_node.find("content")
                        if content_node is None:
                            content_node = etree.SubElement(annotations_node,"content", id=media.get("id"))
                        ensemble_parent = content_node
                        content_id = content_node.get("id")
                        display_nodes = root.xpath("//displays/display/content[@id='%s']" % content_id)
                        if len(display_nodes) == 0:
                            get_logger().info("No display node found. Will not update display")
                            display_content_node = None
                        else:
                            display_content_node = display_nodes[0]

                    elif file_type == "iri":
                        body_node = root.find("body")
                        if body_node is None:
                            body_node = etree.SubElement(root, "body")
                        ensembles_node = body_node.find("ensembles")
                        if ensembles_node is None:
                            ensembles_node = etree.SubElement(body_node, "ensembles")
                        ensemble_parent = ensembles_node
                        content_id = root.xpath("head/meta[@name='id']/@content")[0]
                        display_content_node = None


                if ensemble_parent is None:
                    get_logger().error("Can not process file") #@UndefinedVariable
                    sys.exit()

                if options.replace:
                    for ens in ensemble_parent.iterchildren(tag="ensemble"):
                        ens_id = ens.get("id","")
                        if ens_id.startswith("tweet_"):
                            ensemble_parent.remove(ens)
                            # remove in display nodes
                            if display_content_node is not None:
                                for cut_display in display_content_node.iterchildren():
                                    if cut_display.get('idens','') == ens_id:
                                        display_content_node.remove(cut_display)

                ensemble = None
                elements = None

                if options.merge:
                    for ens in ensemble_parent.findall("ensemble"):
                        if ens.get('id',"").startswith("tweet_"):
                            ensemble = ens
                            break
                    if ensemble is not None:
                        elements = ensemble.find(".//elements")
                        decoupage = ensemble.find("decoupage")

                if ensemble is None or elements is None:
                    ensemble = etree.SubElement(ensemble_parent, "ensemble", {"id":"tweet_" + str(uuid.uuid4()), "title":"Ensemble Twitter", "author":"IRI Web", "abstract":"Ensemble Twitter"})
                    decoupage = etree.SubElement(ensemble, "decoupage", {"id": str(uuid.uuid4()), "author": "IRI Web"})

                    etree.SubElement(decoupage, "title").text = options.name
                    etree.SubElement(decoupage, "abstract").text = options.name

                    elements = etree.SubElement(decoupage, "elements")

                ensemble_id = ensemble.get('id', '')
                decoupage_id = decoupage.get('id', '') if decoupage is not None else None

                end_date = None
                if end_date_str:
                    end_date = parse_date(end_date_str)
                elif start_date and duration:
                    end_date = start_date + datetime.timedelta(seconds=duration)
                elif start_date and options.base_url:
                    # get duration from api
                    content_url = options.base_url + LDT_CONTENT_REST_API_PATH + content_id + "/?format=json"
                    r = requests.get(content_url)
                    duration = int(r.json()['duration'])
                    get_logger().debug("get duration " + content_url) #@UndefinedVariable
                    get_logger().debug("get duration " + repr(duration)) #@UndefinedVariable

                    end_date = start_date + datetime.timedelta(seconds=int(duration/1000))

                if end_date and deltas:
                    end_date = end_date + datetime.timedelta(milliseconds=deltas[-1][1])
                query = get_filter_query(session, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist)

                query_res = query.all()


                for tw in query_res:
                    tweet_ts_dt = tw.created_at
                    if tweet_ts_dt.tzinfo is None:
                        tweet_ts_dt = tweet_ts_dt.replace(tzinfo=tzutc())
                    if start_date is None:
                        start_date = tweet_ts_dt
                    tweet_ts_rel = tweet_ts_dt-start_date
                    tweet_ts_rel_milli = int(round(tweet_ts_rel.total_seconds() * 1000))
                    if deltas:
                        d = find_delta(deltas, tweet_ts_rel_milli)
                        if d[1] < 0:
                            continue
                        else :
                            tweet_ts_rel_milli -= d[1]

                    username = None
                    profile_url = ""
                    if tw.user is not None:
                        username = tw.user.screen_name
                        profile_url = tw.user.profile_image_url_https if tw.user.profile_image_url_https is not None else ""
                    if not username:
                        username = "anon."

                    element = etree.SubElement(elements, "element" , {"id": "%s-%s" % (uuid.uuid4(),tw.id), "color":options.color, "author":username, "date":tweet_ts_dt.strftime("%Y/%m/%d"), "begin": str(tweet_ts_rel_milli), "dur":"0", "src":profile_url})
                    etree.SubElement(element, "title").text = username + ": " + tw.text
                    etree.SubElement(element, "abstract").text = tw.text

                    tags_node = etree.SubElement(element, "tags")

                    for entity in tw.entity_list:
                        if entity.type == 'entity_hashtag':
                            etree.SubElement(tags_node,"tag").text = entity.hashtag.text

                    meta_element = etree.SubElement(element, 'meta')

                    etree.SubElement(meta_element, "polemic_version").text = options.protocol_version
                    parse_polemics = protocol_version_map.get(options.protocol_version, parse_polemics_2)
                    polemics_list = parse_polemics(tw, options.extended_mode)
                    if polemics_list:
                        polemics_element = etree.Element('polemics')
                        for pol in polemics_list:
                            etree.SubElement(polemics_element, 'polemic').text = pol
                        meta_element.append(polemics_element)

                    etree.SubElement(meta_element, "source", attrib={"url":"http://dev.twitter.com", "mimetype":"application/json"}).text = etree.CDATA(tw.tweet_source.original_json)

                # sort by tc in
                if options.merge :
                    # remove all elements and put them in a array
                    # sort them with tc
                    #put them back
                    elements[:] = sorted(elements,key=lambda n: int(n.get('begin')))

                #add to display node
                if display_content_node is not None:
                    display_dec = None
                    for dec in display_content_node.iterchildren(tag="decoupage"):
                        if dec.get('idens','') == ensemble_id and dec.get('id', '') == decoupage_id:
                            display_dec = dec
                            break
                    if display_dec is None and ensemble_id and decoupage_id:
                        etree.SubElement(display_content_node, "decoupage", attrib={'idens': ensemble_id, 'id': decoupage_id, 'tagsSelect':''})

                output_data = etree.tostring(root, encoding="utf-8", method="xml", pretty_print=False, xml_declaration=True).decode('utf-8')

                if content_file_write and content_file_write.find("http") == 0:

                    project["ldt"] = output_data
                    project['owner'] = project['owner'].replace('%7E','~')
                    project['contents'] = [c_url.replace('%7E','~') for c_url in project['contents']]

                    post_param = {}
                    if options.post_param:
                        post_param = json.loads(options.post_param)

                    get_logger().debug("write http " + content_file_write) #@UndefinedVariable
                    get_logger().debug("write http " + repr(post_param)) #@UndefinedVariable
                    get_logger().debug("write http " + repr(project)) #@UndefinedVariable
                    r = requests.put(content_file_write, data=json.dumps(project), headers={'content-type':'application/json'}, params=post_param)
                    get_logger().debug("write http " + repr(r) + " content " + r.text) #@UndefinedVariable
                    if r.status_code != requests.codes.ok:  # pylint: disable=E1101
                        r.raise_for_status()
                else:
                    if content_file_write and os.path.exists(content_file_write):
                        dest_file_name = content_file_write
                    else:
                        dest_file_name = options.filename

                    get_logger().debug("WRITE : " + dest_file_name) #@UndefinedVariable
                    output = open(dest_file_name, "w")
                    output.write(output_data)
                    output.flush()
                    output.close()

        finally:
            if session:
                session.close()
    finally:
        if conn:
            conn.close()