server/src/app/Console/Commands/ImportCocoonRDF.php
author ymh <ymh.work@gmail.com>
Wed, 13 Sep 2017 22:19:48 +0200
changeset 544 ad58d7627f70
parent 531 48f5380c26d0
permissions -rw-r--r--
use same http client in ImportCocoonRDF and define version 0.22

<?php

namespace CorpusParole\Console\Commands;

use Config;
use Log;
use Illuminate\Console\Command;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Input\InputArgument;
use Phpoaipmh\Client;
use Phpoaipmh\Endpoint;
use Phpoaipmh\HttpAdapter\GuzzleAdapter;
use CorpusParole\Libraries\Sparql\GuzzleSparqlClient;

class ImportCocoonRDF extends Command {

    const INSERT_TIMEOUT_RETRY = 5;

    const MAPPER_CLASS_MAP = [
        "http://purl.org/dc/dcmitype/Sound" => '\CorpusParole\Libraries\Mappers\CocoonSoundRdfMapper',
        "http://purl.org/dc/dcmitype/MovingImage" => '\CorpusParole\Libraries\Mappers\CocoonSoundRdfMapper',
        "http://purl.org/dc/dcmitype/Text" => '\CorpusParole\Libraries\Mappers\CocoonTextRdfMapper',
        "http://purl.org/dc/dcmitype/Collection" => '\CorpusParole\Libraries\Mappers\CocoonCollectionRdfMapper'
    ];

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'Import Rdf from Cocoon.';

    /**
    * The name and signature of the console command.
    *
    * @var string
    */
    protected $signature = "corpus-parole:importRDF
        {--skip=0 : Number of record to skip}
        {--no-raw : Do not record raw queries}
        {--no-raw-clear : Do not clear raw repository}
        {--clear : Clear repository}
        {--force-import : Overwrite document from import event if the repo version is more recent}
        {--keep-repo-doc : Keep the existing doc in repo (default is replace document)}
    ";
    //protected $signature = 'corpus-parole:importRDF {--skip=0 : Number of record to skip}';

    /**
     * Create a new command instance.
     */
    public function __construct() {
        parent::__construct();
    }

    /**
     * Get the list of dcmi types for the graph
     */
    private function getDocTypes($doc, $docUri) {

        $res = $doc->resource($docUri);
        $docTypes = [];
        //foreach ($res->all("http://purl.org/dc/elements/1.1/type") as $resType) {
        foreach ($res->all("dc11:type","resource") as $resType) {
            $type = $resType->getUri();
            if(0 === strpos($type, 'http://purl.org/dc/dcmitype/')) {
                $docTypes[] = $type;
            }
        }

        // if the doc type list is empty, check that we have a collection
        if(empty($docTypes)) {
            if(!empty($doc->allOfType('edm:Collection'))) {
                $docTypes[] = "http://purl.org/dc/dcmitype/Collection";
            }
        }
        return $docTypes;
    }

    /**
     * Map a documents into graphes.
     */
    public function mapDoc($doc, $docUri) {
        $inputDocTypes = $this->getDocTypes($doc, $docUri);

        $docType = count($inputDocTypes)>0? $inputDocTypes[0]:null;

        if(is_null($docType) || !array_key_exists($docType,ImportCocoonRDF::MAPPER_CLASS_MAP)) {
            $this->error("\nError processing $identifier ($docRdfUrl) : $docType unknown mapper");
            Log::error("Error processing $identifier ($docRdfUrl) : $docType unknown mapper");
            $this->documentCount['unknown'] += 1;
            return ['unknown', null];
        }

        $mapperClass = ImportCocoonRDF::MAPPER_CLASS_MAP[$docType];
        $mapper = new $mapperClass($doc, $docUri);

        try {
            $mapper->mapGraph();
        } catch (\Exception $e) {
            Log::error("Error processing $identifier ($docRdfUrl) : error mapping graph : $e");
            $this->documentCount['error'] += 1;
            return ['error', null];
        }
        $this->documentCount['all'] += 1;
        $this->documentCount[$docType] = isset($this->documentCount[$docType])?$this->documentCount[$docType]+1:1;

        return [$docType, $mapper->getOutputGraphes()];

    }

    public function mergeDocs($docType, $outputGraphes) {

        foreach ($outputGraphes as $mappedGraphKey => $mappedGraph) {

            $mappedGraphUri = $mappedGraph->getUri();
            try {
                $resDocs = $this->gs->query("CONSTRUCT { ?s ?p ?o } WHERE { GRAPH <$mappedGraphUri> { ?s ?p ?o }}");
            } catch (\Exception $e) {
                $this->error("\nError on graph query $mappedGraphUri : $e \n" . $e->getMessage() . "\n");
                Log::error("\nError on graph query $mappedGraphUri : $e \n" . $e->getMessage());
                exit;
            }

            $mergedGraph = null;
            $doDelete = true;

            if($resDocs->isEmpty()) {
                $mergedGraph = $mappedGraph;
                $doDelete = false;
            } else {
                $doDelete = true;
                $mappedTypes = $this->getDocTypes($mappedGraph, $mappedGraphUri);
                $presentTypes = $this->getDocTypes($resDocs, $mappedGraphUri);

                if($docType == "http://purl.org/dc/dcmitype/Collection" || in_array("http://purl.org/dc/dcmitype/Collection", $mappedTypes)) {
                    $merger = new \CorpusParole\Libraries\Mergers\CocoonCollectionRdfMerger();
                    $baseGraph = $resDocs;
                    $sourceGraph = $mappedGraph;
                }
                elseif ($docType == "http://purl.org/dc/dcmitype/Text") {
                    $merger = new \CorpusParole\Libraries\Mergers\CocoonTextRdfMerger();
                    $baseGraph = $resDocs;
                    $sourceGraph = $mappedGraph;
                }
                else {
                    $merger = new \CorpusParole\Libraries\Mergers\CocoonSoundRdfMerger();
                    $baseGraph = $mappedGraph;
                    $sourceGraph = $resDocs;
                }
                $mergedGraph = $merger->mergeGraph($baseGraph, $sourceGraph, $mappedGraphUri);
                if(\EasyRdf\Isomorphic::isomorphic($resDocs, $mergedGraph)) {
                    //graph are isomorphic no need to go farther for this graph
                    Log::info("Graph are isomorphic for $mappedGraphUri, skipping");
                    continue;
                }
            }

            try {
                if($doDelete) {
                    $this->gs->clear($mappedGraphUri);
                }
                $this->gs->insert($mergedGraph, $mappedGraphUri);
            }
            catch(\Exception $e) {
                // just log not much we can do here...
                $this->error("\nError on insert $mappedGraphUri : $e");
                Log::error("Error on insert $mappedGraphUri : $e");
                $code = $e->getCode();
                $message = $e->getMessage();
                if($e instanceof EasyRdf\Exception && stripos($message, 'timed out')>=0 && $insertTimeout<= ImportCocoonRDF::INSERT_TIMEOUT_RETRY) {
                    $this->info("\nThis is a timeout, we continue.");
                    Log::info("This is a timeout, we continue.");
                    $insertTimeouts++;
                    continue;
                }
                throw $e;
            }
        }
    }

    function getModified($graph) {
        // get first element of array
        $providedCHORes = $graph->allOfType('http://www.europeana.eu/schemas/edm/ProvidedCHO');
        $providedCHO = reset($providedCHORes);
        if($providedCHO === false) {
            $date = new \DateTime();
            $date->setTimestamp(0);
            return $date;
        }
        $modified = $providedCHO->getLiteral("<http://purl.org/dc/terms/modified>");
        if(is_null($modified)) {
            $date = new \DateTime();
            $date->setTimestamp(0);
            return $date;
        }
        return \DateTime::createFromFormat(\DateTime::W3C, $modified->getValue());
    }


    /**
     * Execute the console command.
     *
     * @return mixed
     */
    public function handle() {

        libxml_use_internal_errors(true);

        $skip = (int)$this->option('skip');
        $raw = !$this->option('no-raw');
        $rawClear = !$this->option('no-raw-clear');
        $clear = $this->option('clear');
        $forceImport = $this->option('force-import');
        $keepRepoDoc = $this->option('keep-repo-doc');

        $this->comment("Skipping $skip records");
        $this->comment("Querying Cocoon: ".($raw?'TRUE':'FALSE'));
        $this->comment("Clear raw repository: ".($rawClear?'TRUE':'FALSE'));
        $this->comment("Clear repository: ".($clear?'TRUE':'FALSE'));
        $this->comment("Keep existing document into repository: ".($keepRepoDoc?'TRUE':'FALSE'));
        $this->comment("Overwrite more recent document:".($forceImport?'TRUE':'FALSE'));

        $this->httpClient = app()->make('Guzzle');
        $this->gs = new GuzzleSparqlClient($this->httpClient, Config::get('corpusparole.rdf4j_query_url'), Config::get('corpusparole.rdf4j_update_url'));
        $this->gs_raw = new GuzzleSparqlClient($this->httpClient, Config::get('corpusparole.rdf4j_query_url_raw'), Config::get('corpusparole.rdf4j_update_url_raw'));

        $this->documentCount = [
            'all' => 0,
            'unknown' => 0,
            'error' => 0,
            'raw_duplicates' => 0,
            'modified' => 0,
            'replaced' => 0
        ];

        if($raw) {
            $client = new Client(Config::get('corpusparole.cocoon_oaipmh_url'), new GuzzleAdapter($this->httpClient));
            $endpoint = new Endpoint($client);

            $recs = $endpoint->listRecords('olac', null, null, 'LanguesDeFrance');

            $progressBar = $this->output->createProgressBar($recs->getTotalRecordsInCollection());
            $progressBar->setFormat(' %current%/%max% [%bar%] %percent:3s%% - %message%');

            $insertTimeouts = 0;

            //Clear raw repository if asked
            if($rawClear) {
                $this->gs_raw->clear("all");
            }

            foreach ($recs as $item) {
                $item->registerXPathNamespace('oai', "http://www.openarchives.org/OAI/2.0/");
                $identifier = (string) $item->xpath('oai:header/oai:identifier')[0];

                $docRdfUrl = Config::get('corpusparole.cocoon_rdf_base_uri').substr($identifier, strlen(Config::get('corpusparole.cocoon_doc_id_base')));
                $message = "$identifier : $docRdfUrl";
                if($recs->getNumRetrieved() <= $skip) {
                    $progressBar->setMessage("$message - Skipping");
                    $progressBar->advance();
                    continue;
                }
                $progressBar->setMessage($message);
                $progressBar->advance();

                $docUri = config('corpusparole.cocoon_doc_id_base_uri').substr($identifier, strlen(Config::get('corpusparole.cocoon_doc_id_base')));

                $docLoaded = false;
                $loadRetry = 0;
                $doc = null;
                while(!$docLoaded && $loadRetry < config('corpusparole.max_load_retry', 3)) {
                    $loadRetry++;
                    try {
                        $resp = $this->httpClient->get($docRdfUrl);
                        $content_type = $resp->getHeader('Content-Type');
                        $format = null;
                        if(is_array($content_type) && count($content_type)>0) {
                            list($format, ) = \EasyRdf\Utils::parseMimeType($content_type[0]);
                        }
                        $doc = new \EasyRdf\Graph($docRdfUrl, $resp->getBody(), $format);
                        $docLoaded = true;
                    }
                    //TODO: catch network exception - add error to database
                    catch(\GuzzleHttp\Exception\ConnectException $e) {
                        $this->info("\nTimeout error processing $identifier ($docRdfUrl) : $e, retrying");
                        Log::warning("Timeout error processing $identifier ($docRdfUrl) : $e, retrying");
                        continue;
                    }
                    catch(\GuzzleHttp\Exception\ClientException $e) {
                        if($e->getResponse()->getStatusCode() == 400) {
                            $this->info("\nTimeout error processing $identifier ($docRdfUrl) : $e, retrying");
                            Log::warning("Timeout error processing $identifier ($docRdfUrl) : $e, retrying");
                            continue;
                        }
                        else {
                            $this->error("\nError processing $identifier ($docRdfUrl) : $e");
                            Log::error("Error processing $identifier ($docRdfUrl) : $e");
                            break;
                        }
                    }
                    catch(\Exception $e) {
                        $this->error("\nError processing $identifier ($docRdfUrl) : $e");
                        Log::error("Error processing $identifier ($docRdfUrl) : $e");
                        break;
                    }
                }
                if(!$docLoaded) {
                    $this->documentCount['error'] += 1;
                    continue;
                }

                $resDocsRaw = $this->gs_raw->query("ASK WHERE { GRAPH <$docUri> { ?s ?p ?o }}");
                if($resDocsRaw->getBoolean()) {
                    $this->gs_raw->clear($docUri);
                    $this->documentCount['raw_duplicates'] += 1;
                }
                $this->gs_raw->insert($doc, $docUri);
            }
            $progressBar->setMessage("finished raw import");
            $progressBar->finish();
        }

        if($clear) {
            $this->gs->clear("all");
        }

        $collectionDocsUris = $this->gs_raw->query("SELECT distinct ?uri WHERE {
            GRAPH ?uri {
                ?s <http://purl.org/dc/elements/1.1/type> <http://purl.org/dc/dcmitype/Collection>.
            }
        }");

        $collectionCount = count($collectionDocsUris);
        $this->info("\nImporting $collectionCount Collections from raw repository");
        $progressBar = $this->output->createProgressBar($collectionCount);
        $progressBar->setFormat(' %current%/%max% [%bar%] %percent:3s%% - %message%');


        foreach($collectionDocsUris as $docUriRes) {
            $docUri = $docUriRes->uri->getUri();

            $progressBar->setMessage("Importing collection $docUri.");
            $progressBar->advance();

            $doc = $this->gs_raw->query("CONSTRUCT { ?s ?p ?o } WHERE { GRAPH <$docUri> { ?s ?p ?o. }}");

            //map the doc
            list($docType, $mappedGraphes) = $this->mapDoc($doc, $docUri);

            if($docType === 'unknown' || $docType === 'error') {
                // The error has been traced in mapDoc
                continue;
            }

            //merge the result docs
            $this->mergeDocs($docType, $mappedGraphes);

        }

        $progressBar->setMessage("finished raw import for collections.");
        $progressBar->finish();

        // list the existing documents
        $providedCHODocsUris = [];
        $providedCHODocsUrisRes = $this->gs->query("SELECT distinct ?uri WHERE {
            GRAPH ?uri {
                ?s <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://www.europeana.eu/schemas/edm/ProvidedCHO>.
            }
        }");

        foreach($providedCHODocsUrisRes as $docUriRes) {
            array_push($providedCHODocsUris, $docUriRes->uri->getUri());
        }

        $this->info("\n\nWe have ".count($providedCHODocsUris)." providedCHO in database.\n");

        $soundDocsUris = $this->gs_raw->query("SELECT distinct ?uri WHERE {
            GRAPH ?uri {
                ?s <http://purl.org/dc/elements/1.1/type> ?o.
                FILTER(?o IN (<http://purl.org/dc/dcmitype/Sound>, <http://purl.org/dc/dcmitype/MovingImage>))
            }
        }");

        $soundCount = count($soundDocsUris);
        $this->info("\nImporting $soundCount Sound (or Moving Image) from raw repository\n");
        $progressBar = $this->output->createProgressBar($soundCount);
        $progressBar->setFormat(' %current%/%max% [%bar%] %percent:3s%% - %message%');


        foreach($soundDocsUris as $docUriRes) {
            $docUri = $docUriRes->uri->getUri();

            $progressBar->setMessage("Importing Sound (or Moving Image) $docUri.");
            $progressBar->advance();

            $doc = $this->gs_raw->query("CONSTRUCT { ?s ?p ?o } WHERE { GRAPH <$docUri> { ?s ?p ?o. }}");

            //map the doc
            list($docType, $mappedGraphes) = $this->mapDoc($doc, $docUri);
            $firstGraph = reset($mappedGraphes); // first graph is main graph
            // remove it from list of existing graphes in repository
            $firstGraphUri = $firstGraph->getUri();
            if(($key = array_search($firstGraphUri, $providedCHODocsUris)) !== false) {
               unset($providedCHODocsUris[$key]);
            }
            //if asked, delete it from repository. check modified date
            //merge the result docs
            try {
                $resDocs = $this->gs->query("CONSTRUCT { ?s ?p ?o } WHERE { GRAPH <$firstGraphUri> { ?s ?p ?o }}");
            } catch (\Exception $e) {
                $this->error("\nError on graph query $firstGraphUri : $e \n" . $e->getMessage() . "\n");
                Log::error("\nError on graph query $firstGraphUri : $e \n" . $e->getMessage());
                exit;
            }
            $doDelete = true;
            if($resDocs->isEmpty()) {
                $doDelete = false;
            } else {
                // get modified from repo
                $dateRepo = $this->getModified($resDocs);
                // get modified from import
                $dateImport = $this->getModified($firstGraph);

                if($dateRepo > $dateImport) {
                    $this->documentCount['modified'] += 1;
                    $doDelete = $forceImport;
                } else {
                    $doDelete = !$keepRepoDoc;
                }

            }

            if($doDelete) {
                $this->documentCount['replaced'] += 1;
                $this->gs->clear($firstGraphUri);
            }

            $this->mergeDocs($docType, $mappedGraphes);
        }

        $progressBar->setMessage("finished raw import for sounds.");
        $progressBar->finish();


        $textDocsUris = $this->gs_raw->query("SELECT distinct ?uri WHERE {
            GRAPH ?uri {
                ?s <http://purl.org/dc/elements/1.1/type> <http://purl.org/dc/dcmitype/Text>.
            }
        }");

        $textCount = count($textDocsUris);
        $this->info("\n\nImporting $textCount text from raw repository\n");
        $progressBar = $this->output->createProgressBar($textCount);
        $progressBar->setFormat(' %current%/%max% [%bar%] %percent:3s%% - %message%');


        foreach($textDocsUris as $docUriRes) {
            $docUri = $docUriRes->uri->getUri();

            $progressBar->setMessage("Importing Text $docUri.");
            $progressBar->advance();

            $doc = $this->gs_raw->query("CONSTRUCT { ?s ?p ?o } WHERE { GRAPH <$docUri> { ?s ?p ?o. }}");

            //map the doc
            list($docType, $mappedGraphes) = $this->mapDoc($doc, $docUri);

            //merge the result docs
            $this->mergeDocs($docType, $mappedGraphes);

        }

        $progressBar->setMessage("finished raw import for text.");
        $progressBar->finish();


        // delete left overs from previous repository
        $this->info("\n\nThere is ".count($providedCHODocsUris)." documents left-over.\n");
        if(count($providedCHODocsUris) > 0 && $delete_old) {
            foreach($providedCHODocsUris as $graphUri) {
                $this->gs->clear($graphUri);
            }
        }

        $this->info("\n\nDocument count info: ");
        foreach ($this->documentCount as $docType => $docCount) {
            if($docType == 'error' && $docCount > 0) {
                $this->error("$docType => $docCount");
            } else {
                $this->info("$docType => $docCount");
            }
        }
    }
}