server/src/app/Repositories/RdfDocumentRepository.php
author ymh <ymh.work@gmail.com>
Sat, 10 Jun 2017 08:33:03 +0200
changeset 532 1190ea937f2d
parent 506 8a5bb4b48b85
permissions -rw-r--r--
make things work after node 8, npm 5 migration. Migrate to lodash 4

<?php

namespace CorpusParole\Repositories;

use Config;
use Log;
use CorpusParole\Models\DocumentResult;
use CorpusParole\Models\Document;
use CorpusParole\Libraries\CorpusParoleException;
use CorpusParole\Libraries\Utils;
use CorpusParole\Libraries\Sparql\SparqlClient;
use CorpusParole\Libraries\Filters\CorpusFilterManager;


use CorpusParole\Services\LexvoResolverInterface;

use EasyRdf\Graph;

use Illuminate\Pagination\LengthAwarePaginator;
use Illuminate\Pagination\Paginator;

use Es;

/**
 * Implement the DocumentRepository using EasyRdf
 * TODO: certainly split the transaction management (+add, +delete +transaction ) to an external class -> for this extend the sparql client.
 */
class RdfDocumentRepository implements DocumentRepository {

    const BASE_DOC_QUERY
        = "SELECT".
        "    ?uri".
        "    ?doc".
        "    ?title".
        "    ?issued".
        "    ?created".
        "    ?modified".
        "    (group_concat(distinct ?language;separator=\", \") as ?lang) ".
        "    (group_concat(distinct ?publisher;separator=\", \") as ?publishers) ".
        "  WHERE {".
        "    GRAPH ?uri { ?doc a <http://www.europeana.eu/schemas/edm/ProvidedCHO>.".
        "      ?doc <http://purl.org/dc/elements/1.1/title> ?title.".
        "      OPTIONAL {?doc <http://purl.org/dc/elements/1.1/language> ?language.} ".
        "      OPTIONAL {?doc <http://purl.org/dc/terms/issued> ?issued.} ".
        "      OPTIONAL {?doc <http://purl.org/dc/terms/created> ?created.} ".
        "      OPTIONAL {?doc <http://purl.org/dc/terms/modified> ?modified.} ".
        "      OPTIONAL {?doc <http://purl.org/dc/elements/1.1/publisher> ?publisher.}".
        "    }. ".
        "    %s".
        "  } ".
        "  GROUP BY ?uri ?doc ?title ?issued ?created ?modified ";

    const ADDITIONAL_DOC_QUERIES = [
        "SELECT".
        "    ?uri".
        "    ?doc".
        "    (sample(distinct ?ext) as ?extent) ".
        "WHERE {".
        "    GRAPH ?uri {".
        "        ?s a <http://www.europeana.eu/schemas/edm/WebResource>. ".
        "        ?uri <http://www.europeana.eu/schemas/edm/isShownBy> ?s. ".
        "        ?uri <http://www.europeana.eu/schemas/edm/aggregatedCHO> ?doc. ".
        "        OPTIONAL {?s <http://purl.org/dc/terms/extent> ?ext.}".
        "    }. ".
        "    %s".
        "} ".
        "GROUP BY ?uri ?doc",

        "SELECT".
        "    ?uri".
        "    ?doc".
        "    (sample(distinct str(?s)) as ?transcript_url) ".
        "WHERE {".
        "    GRAPH ?uri {".
        "        ?s a <http://www.europeana.eu/schemas/edm/WebResource>. ".
        "        ?uri <http://www.europeana.eu/schemas/edm/aggregatedCHO> ?doc. ".
        "        OPTIONAL {?s <http://purl.org/dc/elements/1.1/format> ?f.} ".
        "    }. ".
        "    FILTER(str(?f) IN ( \"application/xml\", \"application/pdf\" )). ".
        "    %s".
        "} ".
        "GROUP BY ?uri ?doc"
    ];

    private $sparqlClient;
    private $lexvoResolver;

    public function __construct(SparqlClient $sparqlClient, LexvoResolverInterface $lexvoResolver) {
        $this->sparqlClient = $sparqlClient;
        $this->lexvoResolver = $lexvoResolver;
    }

    public function getSparqlClient() {
        return $this->sparqlClient;
    }

    private function getResGraph($doc) {

        if(empty((array)$doc) || !array_key_exists('uri', (array)$doc)) {
            return null;
        }

        $newGraph = new Graph($doc->uri->getUri());
        $newGraph->add($doc->uri, "rdf:type", $newGraph->resource("http://www.openarchives.org/ore/terms/Aggregation"));
        $newGraph->add($doc->uri, "http://www.europeana.eu/schemas/edm/aggregatedCHO", $doc->doc);
        $newGraph->add($doc->doc, "rdf:type", $newGraph->resource("http://www.europeana.eu/schemas/edm/ProvidedCHO"));
        if(isset($doc->title)) {
            $newGraph->add($doc->doc, "http://purl.org/dc/elements/1.1/title", $doc->title);
        }
        if(isset($doc->lang)) {
            foreach(explode(", ", $doc->lang) as $langStr) {
                $langStr = trim($langStr);
                if(filter_var($langStr, FILTER_VALIDATE_URL)) {
                    $newGraph->addResource($doc->doc, "http://purl.org/dc/elements/1.1/language", $langStr);
                } else {
                    $newGraph->addLiteral($doc->doc, "http://purl.org/dc/elements/1.1/language", $langStr);
                }
            }
        }
        if(isset($doc->issued)) {
            $newGraph->add($doc->doc, "http://purl.org/dc/terms/issued", $doc->issued);
        }
        if(isset($doc->modified)) {
            $newGraph->add($doc->doc, "http://purl.org/dc/terms/modified", $doc->modified);
        }
        if(isset($doc->created)) {
            $newGraph->add($doc->doc, "http://purl.org/dc/terms/created", $doc->created);
        }
        if(isset($doc->publishers)) {
            $newGraph->add($doc->doc, "http://purl.org/dc/elements/1.1/publisher", $doc->publishers);
        }
        if(isset($doc->extent)) {
            $newGraph->add($doc->doc, "http://purl.org/dc/terms/extent", $doc->extent);
        }
        if(isset($doc->transcript_url)) {
            $newGraph->add($doc->doc, config('corpusparole.corpus_ontology_url').'transcript', $doc->transcript_url);
        }
        return $newGraph;
    }

    private function queryES($filters=null, $offset=null, $limit=null, $sort=null) {

        if(empty($sort)) {
            $sort = ["_doc"];
        } elseif (is_string($sort)) {
            $sort = [$sort];
        }
        if(is_null($filters)) {
            $filters = [];
        }

        $filterManager = new CorpusFilterManager();
        $qFilterParts = $filterManager->buildESFilters($filters);

        $query = [
            'index' => config('corpusparole.elasticsearch_index'),
            'body' => [
                "size" => empty($limit)?0:$limit,
                "from" => $offset,
                "sort" => $sort
            ]
        ];

        if(count($qFilterParts)>0) {
            $query['body']['query'] = ['constant_score' => [
                'filter' => [
                    'bool' => [
                        'must' => $qFilterParts
                    ]
                ]
            ] ];
        }

        Log::debug("QUERY :");
        Log::debug(json_encode($query, JSON_PRETTY_PRINT));
        Log::debug("FILTERS :");
        Log::debug(json_encode($filters, JSON_PRETTY_PRINT));

        $esRes = Es::search($query);

        return ['total' => $esRes['hits']['total'], 'documents' => array_map(function($r) {
            return $r['_id'];
        }, $esRes['hits']['hits'])];

    }

    /**
     * Query docs.
     * if $filter is empty or null and $sort is '_graph', the documents list is fetched from the triple store, otherwise, They are fetched from ElasticSearch
     */
    private function queryDocs($filters=null, $offset=null, $limit=null, $sort=null) {

        $resDocs = [];

        $limitsClauses = [];
        $sortClauseStr = "";
        $limitsClausesStr = "";
        $filterUris = "";

        if(empty($filters) && $sort === "_graph") {
            if(!is_null($offset)) {
                array_push($limitsClauses, "OFFSET $offset");
            }
            if(!is_null($limit)) {
                array_push($limitsClauses, "LIMIT $limit");
            }
            if(!empty($limitsClauses)) {
                $limitsClausesStr = "\n" . join(" ", $limitsClauses);
            }
            $sortClauseStr = "\n ORDER BY ASC(?title)";
            $total = $this->getCount();
        } else {
            $esRes = $this->queryES($filters, $offset, $limit, ["title.raw" => 'asc']);
            // WARNING: we count on the fact that php keep keys order
            $total = intval($esRes['total']);
            foreach($esRes['documents'] as $esDocId) {
                $uri = config('corpusparole.corpus_doc_id_base_uri_prefix').$esDocId;
                $resDocs[$uri] = null;
            }
            if(count($resDocs) > 0) {
                $filterUris = "FILTER(?uri in (<".join(">, <" , array_keys($resDocs)).">)) ";
            } else {
                return ['meta' => [ 'total'=> $total ], 'documents' => []];
            }

        }


        $sparqlQuery = sprintf(self::BASE_DOC_QUERY.$sortClauseStr.$limitsClausesStr, $filterUris);

        Log::debug("SPARQL QUERY :");
        Log::debug($sparqlQuery);

        $docs = $this->sparqlClient->query($sparqlQuery);
        foreach($docs as $doc) {
            $graph = $this->getResGraph($doc);
            if(is_null($graph)) {
                Log::debug("NULL GRAPH - odd");
                continue;
            }
            $uri = $doc->uri->getUri();
            $resDocs[$uri] = $graph;
        }

        if(count($resDocs) == 0) {
            return ['meta' => [ 'total'=> $total ], 'documents' => []];
        }

        if(empty($filterUris)) {
            $filterUris = "FILTER(?uri in (<".join(">, <" , array_keys($resDocs)).">)) ";
        }

        foreach(self::ADDITIONAL_DOC_QUERIES as $query) {
            $docs = $this->sparqlClient->query(sprintf($query, $filterUris));
            foreach($docs as $doc) {
                $graph = $this->getResGraph($doc);
                if(is_null($graph)) {
                    continue;
                }

                $uri = $doc->uri->getUri();
                if(array_key_exists($uri, $resDocs)) {
                    $resDocs[$uri] = Utils::mergeGraphs($resDocs[$uri], $graph);
                } else {
                    $resDocs[$uri] = $graph;
                }
            }
        }

        $documentsResults = array_map(function($g) { return new DocumentResult($g->getUri(), $g); }, array_values($resDocs));

        return ['meta' => [ 'total'=> $total ], 'documents' => $documentsResults];
    }

    public function all() {
        return $this->queryDocs(null, null, null, "_graph")['documents'];
    }

    public function get($id, $short=false) {

        if(strpos($id, config('corpusparole.corpus_id_scheme')) === 0) {
            $id = substr($id, strlen(config('corpusparole.corpus_id_scheme')));
        }
        $docUri = Config::get('corpusparole.corpus_doc_id_base_uri').$id;

        // We want the CBD (Concise Bounded Description, cf. http://www.w3.org/Submission/CBD/)
        // WARNING: This seems to work in rdf4j for our dataset.
        $doc = $this->sparqlClient->query(
            "CONSTRUCT { ?s ?p ?o } WHERE { GRAPH <$docUri> { ?s ?p ?o } }"
        );
        //TODO: return null if not found
        if($doc->isEmpty()) {
            return null;
        }
        //print($doc->dump());

        // clone the graph to force the URI
        $DocumentKlass = $short?DocumentResult::class:Document::class;
        return new $DocumentKlass($docUri, new Graph($docUri, $doc->toRdfPhp()));

    }

    /**
     * save document.
     * @return boolean true if a transaction was started, false otherwise
     * @throws CorpusParoleException if one of the operation could not be performed
     */
    public function save(Document $doc) {

        $transactionStarted = $this->sparqlClient->startTransaction();

        try {
            foreach($doc->getDeltaList() as $delta) {
                $this->sparqlClient->deleteWhere($delta->getDeleteWhere(), $delta->getUri());
                $this->sparqlClient->delete($delta->getDeletedGraph());
                $this->sparqlClient->add($delta->getAddedGraph());
            }
            if($transactionStarted) {
                $transactionStarted = false;
                return $this->sparqlClient->commit();
            }
            else  {
                return false;
            }
        }
        catch(CorpusParoleException $e) {
            if($transactionStarted) {
                $this->sparqlClient->rollback();
            }
            throw $e;
        }
    }

    public function getCount($filters=null) {
        $count = 0;
        if(empty($filters)) {
            $res = $this->sparqlClient->query("SELECT (COUNT (DISTINCT ?g) as ?count) WHERE { GRAPH ?g { ?s a <http://www.europeana.eu/schemas/edm/ProvidedCHO> } }");
            assert(!is_null($res) && count($res)==1);
            $count = intval($res[0]->count->getValue());
        } else {
            $esRes = $this->queryES($filters, 0, 0);
            $count = intval($esRes['hits']['total']);
        }

        return $count;

    }

    //SELECT ?g WHERE { GRAPH ?g { ?s ?p ?o } }

    /**
     * Paginate all document as a paginator.
     *
     * @param  int  $perPage
     * @param  string  $pageName
     * @return \Illuminate\Contracts\Pagination\LengthAwarePaginator
     */
    public function paginateAll($perPage = 15, $pageName = 'page', $page = null)
    {
        return $this->paginate(null, $perPage, $pageName, null);
    }

    /**
     * Paginate filtered document as a paginator.
     *
     * @param  array $filters
     * @param  int  $perPage
     * @param  string  $pageName
     * @return \Illuminate\Contracts\Pagination\LengthAwarePaginator
     */
    public function paginate($filters = null, $perPage = 15, $pageName = 'page', $page = null, $sort=null) {

        assert(is_numeric($perPage));

        if(is_null($page)) {
            $page = Paginator::resolveCurrentPage($pageName);
        }

        assert(is_null($page) || is_numeric($page));

        $offset = max(0,($page - 1) * $perPage);

        $results = $this->queryDocs($filters, $offset, $perPage, $sort);

        return new LengthAwarePaginator($results['documents'], $results['meta']['total'], $perPage, $page, [
            'path' => Paginator::resolveCurrentPath(),
            'pageName' => $pageName,
        ]);

    }


    /**
     * Resolve lexvo id for all documents in the list
     * this allow to optimise the call of lexvo repository
     * @param $docList Array: a list (Array) of document to resolve
     */
    public function resolveLexvo(Array $docList) {

        $languageIds = [];
        #get the list pf language needing resolving
        foreach ($docList as $doc) {
            if(!empty($doc->getLanguagesValue()) && is_null($doc->getLanguagesResolved())) {
                foreach($doc->getLanguagesValue() as $lang) {
                    $languageIds[$lang]=true;
                }
            }
        }

        # call LexvoResolver
        $langNames = $this->lexvoResolver->getNames(array_keys($languageIds));

        foreach ($docList as $doc) {
            if(!empty($doc->getLanguagesValue()) && is_null($doc->getLanguagesResolved())) {
                $langResolved = [];
                foreach($doc->getLanguagesValue() as $lang) {
                    $langResolved[] = $langNames[$lang];
                }
                $doc->setLanguageResolved($langResolved);
            }
        }

        return $docList;
    }

    /**
     * Add country info for document.
     * This modify the document
     *
     * @param $doc the document instance
     * @return nothing
     */
    public function addCountryInfo($doc) {
        $params = [
            'index' => config('corpusparole.elasticsearch_index'),
            'type' => 'document',
            'id' => $doc->getId()
        ];
        $esRes = Es::get($params);

        if(!empty($esRes) && $esRes['found']) {
            $source = $esRes["_source"];
            if(array_key_exists('geonames_country', $source)) {
                $doc->setCountryCode($source['geonames_country']);
            }
        } else {
            Log::error("RdfDocumentRepository::addCountryInfo : Document ".$doc->getId()." not found in index.");
        }
    }

}