<?php
namespace CorpusParole\Console\Commands;
use Config;
use Log;
use Illuminate\Console\Command;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Input\InputArgument;
use Phpoaipmh\Client;
use Phpoaipmh\Endpoint;
use Phpoaipmh\HttpAdapter\GuzzleAdapter;
use CorpusParole\Libraries\Sparql\GuzzleSparqlClient;
class ImportCocoonRDF extends Command {
const INSERT_TIMEOUT_RETRY = 5;
const MAPPER_CLASS_MAP = [
"http://purl.org/dc/dcmitype/Sound" => '\CorpusParole\Libraries\Mappers\CocoonSoundRdfMapper',
"http://purl.org/dc/dcmitype/MovingImage" => '\CorpusParole\Libraries\Mappers\CocoonSoundRdfMapper',
"http://purl.org/dc/dcmitype/Text" => '\CorpusParole\Libraries\Mappers\CocoonTextRdfMapper',
"http://purl.org/dc/dcmitype/Collection" => '\CorpusParole\Libraries\Mappers\CocoonCollectionRdfMapper'
];
/**
* The console command description.
*
* @var string
*/
protected $description = 'Import Rdf from Cocoon.';
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = "corpus-parole:importRDF
{--skip=0 : Number of record to skip}
{--no-raw : Do not record raw queries}
{--no-raw-clear : Do not clear raw repository}
{--clear : Clear repository}
{--force-import : Overwrite document from import event if the repo version is more recent}
{--keep-repo-doc : Keep the existing doc in repo (default is replace document)}
";
//protected $signature = 'corpus-parole:importRDF {--skip=0 : Number of record to skip}';
/**
* Create a new command instance.
*/
public function __construct() {
parent::__construct();
}
/**
* Get the list of dcmi types for the graph
*/
private function getDocTypes($doc, $docUri) {
$res = $doc->resource($docUri);
$docTypes = [];
//foreach ($res->all("http://purl.org/dc/elements/1.1/type") as $resType) {
foreach ($res->all("dc11:type","resource") as $resType) {
$type = $resType->getUri();
if(0 === strpos($type, 'http://purl.org/dc/dcmitype/')) {
$docTypes[] = $type;
}
}
// if the doc type list is empty, check that we have a collection
if(empty($docTypes)) {
if(!empty($doc->allOfType('edm:Collection'))) {
$docTypes[] = "http://purl.org/dc/dcmitype/Collection";
}
}
return $docTypes;
}
/**
* Map a documents into graphes.
*/
public function mapDoc($doc, $docUri) {
$inputDocTypes = $this->getDocTypes($doc, $docUri);
$docType = count($inputDocTypes)>0? $inputDocTypes[0]:null;
if(is_null($docType) || !array_key_exists($docType,ImportCocoonRDF::MAPPER_CLASS_MAP)) {
$this->error("\nError processing $identifier ($docRdfUrl) : $docType unknown mapper");
Log::error("Error processing $identifier ($docRdfUrl) : $docType unknown mapper");
$this->documentCount['unknown'] += 1;
return ['unknown', null];
}
$mapperClass = ImportCocoonRDF::MAPPER_CLASS_MAP[$docType];
$mapper = new $mapperClass($doc, $docUri);
try {
$mapper->mapGraph();
} catch (\Exception $e) {
Log::error("Error processing $identifier ($docRdfUrl) : error mapping graph : $e");
$this->documentCount['error'] += 1;
return ['error', null];
}
$this->documentCount['all'] += 1;
$this->documentCount[$docType] = isset($this->documentCount[$docType])?$this->documentCount[$docType]+1:1;
return [$docType, $mapper->getOutputGraphes()];
}
public function mergeDocs($docType, $outputGraphes) {
foreach ($outputGraphes as $mappedGraphKey => $mappedGraph) {
$mappedGraphUri = $mappedGraph->getUri();
try {
$resDocs = $this->gs->query("CONSTRUCT { ?s ?p ?o } WHERE { GRAPH <$mappedGraphUri> { ?s ?p ?o }}");
} catch (\Exception $e) {
$this->error("\nError on graph query $mappedGraphUri : $e \n" . $e->getMessage() . "\n");
Log::error("\nError on graph query $mappedGraphUri : $e \n" . $e->getMessage());
exit;
}
$mergedGraph = null;
$doDelete = true;
if($resDocs->isEmpty()) {
$mergedGraph = $mappedGraph;
$doDelete = false;
} else {
$doDelete = true;
$mappedTypes = $this->getDocTypes($mappedGraph, $mappedGraphUri);
$presentTypes = $this->getDocTypes($resDocs, $mappedGraphUri);
if($docType == "http://purl.org/dc/dcmitype/Collection" || in_array("http://purl.org/dc/dcmitype/Collection", $mappedTypes)) {
$merger = new \CorpusParole\Libraries\Mergers\CocoonCollectionRdfMerger();
$baseGraph = $resDocs;
$sourceGraph = $mappedGraph;
}
elseif ($docType == "http://purl.org/dc/dcmitype/Text") {
$merger = new \CorpusParole\Libraries\Mergers\CocoonTextRdfMerger();
$baseGraph = $resDocs;
$sourceGraph = $mappedGraph;
}
else {
$merger = new \CorpusParole\Libraries\Mergers\CocoonSoundRdfMerger();
$baseGraph = $mappedGraph;
$sourceGraph = $resDocs;
}
$mergedGraph = $merger->mergeGraph($baseGraph, $sourceGraph, $mappedGraphUri);
if(\EasyRdf\Isomorphic::isomorphic($resDocs, $mergedGraph)) {
//graph are isomorphic no need to go farther for this graph
Log::info("Graph are isomorphic for $mappedGraphUri, skipping");
continue;
}
}
try {
if($doDelete) {
$this->gs->clear($mappedGraphUri);
}
$this->gs->insert($mergedGraph, $mappedGraphUri);
}
catch(\Exception $e) {
// just log not much we can do here...
$this->error("\nError on insert $mappedGraphUri : $e");
Log::error("Error on insert $mappedGraphUri : $e");
$code = $e->getCode();
$message = $e->getMessage();
if($e instanceof EasyRdf\Exception && stripos($message, 'timed out')>=0 && $insertTimeout<= ImportCocoonRDF::INSERT_TIMEOUT_RETRY) {
$this->info("\nThis is a timeout, we continue.");
Log::info("This is a timeout, we continue.");
$insertTimeouts++;
continue;
}
throw $e;
}
}
}
function getModified($graph) {
// get first element of array
$providedCHORes = $graph->allOfType('http://www.europeana.eu/schemas/edm/ProvidedCHO');
$providedCHO = reset($providedCHORes);
if($providedCHO === false) {
$date = new \DateTime();
$date->setTimestamp(0);
return $date;
}
$modified = $providedCHO->getLiteral("<http://purl.org/dc/terms/modified>");
if(is_null($modified)) {
$date = new \DateTime();
$date->setTimestamp(0);
return $date;
}
return \DateTime::createFromFormat(\DateTime::W3C, $modified->getValue());
}
/**
* Execute the console command.
*
* @return mixed
*/
public function handle() {
libxml_use_internal_errors(true);
$skip = (int)$this->option('skip');
$raw = !$this->option('no-raw');
$rawClear = !$this->option('no-raw-clear');
$clear = $this->option('clear');
$forceImport = $this->option('force-import');
$keepRepoDoc = $this->option('keep-repo-doc');
$this->comment("Skipping $skip records");
$this->comment("Querying Cocoon: ".($raw?'TRUE':'FALSE'));
$this->comment("Clear raw repository: ".($rawClear?'TRUE':'FALSE'));
$this->comment("Clear repository: ".($clear?'TRUE':'FALSE'));
$this->comment("Keep existing document into repository: ".($keepRepoDoc?'TRUE':'FALSE'));
$this->comment("Overwrite more recent document:".($forceImport?'TRUE':'FALSE'));
$this->httpClient = app()->make('Guzzle');
$this->gs = new GuzzleSparqlClient($this->httpClient, Config::get('corpusparole.rdf4j_query_url'), Config::get('corpusparole.rdf4j_update_url'));
$this->gs_raw = new GuzzleSparqlClient($this->httpClient, Config::get('corpusparole.rdf4j_query_url_raw'), Config::get('corpusparole.rdf4j_update_url_raw'));
$this->documentCount = [
'all' => 0,
'unknown' => 0,
'error' => 0,
'raw_duplicates' => 0,
'modified' => 0,
'replaced' => 0
];
if($raw) {
$client = new Client(Config::get('corpusparole.cocoon_oaipmh_url'), new GuzzleAdapter($this->httpClient));
$endpoint = new Endpoint($client);
$recs = $endpoint->listRecords('olac', null, null, 'LanguesDeFrance');
$progressBar = $this->output->createProgressBar($recs->getTotalRecordsInCollection());
$progressBar->setFormat(' %current%/%max% [%bar%] %percent:3s%% - %message%');
$insertTimeouts = 0;
//Clear raw repository if asked
if($rawClear) {
$this->gs_raw->clear("all");
}
foreach ($recs as $item) {
$item->registerXPathNamespace('oai', "http://www.openarchives.org/OAI/2.0/");
$identifier = (string) $item->xpath('oai:header/oai:identifier')[0];
$docRdfUrl = Config::get('corpusparole.cocoon_rdf_base_uri').substr($identifier, strlen(Config::get('corpusparole.cocoon_doc_id_base')));
$message = "$identifier : $docRdfUrl";
if($recs->getNumRetrieved() <= $skip) {
$progressBar->setMessage("$message - Skipping");
$progressBar->advance();
continue;
}
$progressBar->setMessage($message);
$progressBar->advance();
$docUri = config('corpusparole.cocoon_doc_id_base_uri').substr($identifier, strlen(Config::get('corpusparole.cocoon_doc_id_base')));
$docLoaded = false;
$loadRetry = 0;
$doc = null;
while(!$docLoaded && $loadRetry < config('corpusparole.max_load_retry', 3)) {
$loadRetry++;
try {
$resp = $this->httpClient->get($docRdfUrl);
$content_type = $resp->getHeader('Content-Type');
$format = null;
if(is_array($content_type) && count($content_type)>0) {
list($format, ) = \EasyRdf\Utils::parseMimeType($content_type[0]);
}
$doc = new \EasyRdf\Graph($docRdfUrl, $resp->getBody(), $format);
$docLoaded = true;
}
//TODO: catch network exception - add error to database
catch(\GuzzleHttp\Exception\ConnectException $e) {
$this->info("\nTimeout error processing $identifier ($docRdfUrl) : $e, retrying");
Log::warning("Timeout error processing $identifier ($docRdfUrl) : $e, retrying");
continue;
}
catch(\GuzzleHttp\Exception\ClientException $e) {
if($e->getResponse()->getStatusCode() == 400) {
$this->info("\nTimeout error processing $identifier ($docRdfUrl) : $e, retrying");
Log::warning("Timeout error processing $identifier ($docRdfUrl) : $e, retrying");
continue;
}
else {
$this->error("\nError processing $identifier ($docRdfUrl) : $e");
Log::error("Error processing $identifier ($docRdfUrl) : $e");
break;
}
}
catch(\Exception $e) {
$this->error("\nError processing $identifier ($docRdfUrl) : $e");
Log::error("Error processing $identifier ($docRdfUrl) : $e");
break;
}
}
if(!$docLoaded) {
$this->documentCount['error'] += 1;
continue;
}
$resDocsRaw = $this->gs_raw->query("ASK WHERE { GRAPH <$docUri> { ?s ?p ?o }}");
if($resDocsRaw->getBoolean()) {
$this->gs_raw->clear($docUri);
$this->documentCount['raw_duplicates'] += 1;
}
$this->gs_raw->insert($doc, $docUri);
}
$progressBar->setMessage("finished raw import");
$progressBar->finish();
}
if($clear) {
$this->gs->clear("all");
}
$collectionDocsUris = $this->gs_raw->query("SELECT distinct ?uri WHERE {
GRAPH ?uri {
?s <http://purl.org/dc/elements/1.1/type> <http://purl.org/dc/dcmitype/Collection>.
}
}");
$collectionCount = count($collectionDocsUris);
$this->info("\nImporting $collectionCount Collections from raw repository");
$progressBar = $this->output->createProgressBar($collectionCount);
$progressBar->setFormat(' %current%/%max% [%bar%] %percent:3s%% - %message%');
foreach($collectionDocsUris as $docUriRes) {
$docUri = $docUriRes->uri->getUri();
$progressBar->setMessage("Importing collection $docUri.");
$progressBar->advance();
$doc = $this->gs_raw->query("CONSTRUCT { ?s ?p ?o } WHERE { GRAPH <$docUri> { ?s ?p ?o. }}");
//map the doc
list($docType, $mappedGraphes) = $this->mapDoc($doc, $docUri);
if($docType === 'unknown' || $docType === 'error') {
// The error has been traced in mapDoc
continue;
}
//merge the result docs
$this->mergeDocs($docType, $mappedGraphes);
}
$progressBar->setMessage("finished raw import for collections.");
$progressBar->finish();
// list the existing documents
$providedCHODocsUris = [];
$providedCHODocsUrisRes = $this->gs->query("SELECT distinct ?uri WHERE {
GRAPH ?uri {
?s <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://www.europeana.eu/schemas/edm/ProvidedCHO>.
}
}");
foreach($providedCHODocsUrisRes as $docUriRes) {
array_push($providedCHODocsUris, $docUriRes->uri->getUri());
}
$this->info("\n\nWe have ".count($providedCHODocsUris)." providedCHO in database.\n");
$soundDocsUris = $this->gs_raw->query("SELECT distinct ?uri WHERE {
GRAPH ?uri {
?s <http://purl.org/dc/elements/1.1/type> ?o.
FILTER(?o IN (<http://purl.org/dc/dcmitype/Sound>, <http://purl.org/dc/dcmitype/MovingImage>))
}
}");
$soundCount = count($soundDocsUris);
$this->info("\nImporting $soundCount Sound (or Moving Image) from raw repository\n");
$progressBar = $this->output->createProgressBar($soundCount);
$progressBar->setFormat(' %current%/%max% [%bar%] %percent:3s%% - %message%');
foreach($soundDocsUris as $docUriRes) {
$docUri = $docUriRes->uri->getUri();
$progressBar->setMessage("Importing Sound (or Moving Image) $docUri.");
$progressBar->advance();
$doc = $this->gs_raw->query("CONSTRUCT { ?s ?p ?o } WHERE { GRAPH <$docUri> { ?s ?p ?o. }}");
//map the doc
list($docType, $mappedGraphes) = $this->mapDoc($doc, $docUri);
$firstGraph = reset($mappedGraphes); // first graph is main graph
// remove it from list of existing graphes in repository
$firstGraphUri = $firstGraph->getUri();
if(($key = array_search($firstGraphUri, $providedCHODocsUris)) !== false) {
unset($providedCHODocsUris[$key]);
}
//if asked, delete it from repository. check modified date
//merge the result docs
try {
$resDocs = $this->gs->query("CONSTRUCT { ?s ?p ?o } WHERE { GRAPH <$firstGraphUri> { ?s ?p ?o }}");
} catch (\Exception $e) {
$this->error("\nError on graph query $firstGraphUri : $e \n" . $e->getMessage() . "\n");
Log::error("\nError on graph query $firstGraphUri : $e \n" . $e->getMessage());
exit;
}
$doDelete = true;
if($resDocs->isEmpty()) {
$doDelete = false;
} else {
// get modified from repo
$dateRepo = $this->getModified($resDocs);
// get modified from import
$dateImport = $this->getModified($firstGraph);
if($dateRepo > $dateImport) {
$this->documentCount['modified'] += 1;
$doDelete = $forceImport;
} else {
$doDelete = !$keepRepoDoc;
}
}
if($doDelete) {
$this->documentCount['replaced'] += 1;
$this->gs->clear($firstGraphUri);
}
$this->mergeDocs($docType, $mappedGraphes);
}
$progressBar->setMessage("finished raw import for sounds.");
$progressBar->finish();
$textDocsUris = $this->gs_raw->query("SELECT distinct ?uri WHERE {
GRAPH ?uri {
?s <http://purl.org/dc/elements/1.1/type> <http://purl.org/dc/dcmitype/Text>.
}
}");
$textCount = count($textDocsUris);
$this->info("\n\nImporting $textCount text from raw repository\n");
$progressBar = $this->output->createProgressBar($textCount);
$progressBar->setFormat(' %current%/%max% [%bar%] %percent:3s%% - %message%');
foreach($textDocsUris as $docUriRes) {
$docUri = $docUriRes->uri->getUri();
$progressBar->setMessage("Importing Text $docUri.");
$progressBar->advance();
$doc = $this->gs_raw->query("CONSTRUCT { ?s ?p ?o } WHERE { GRAPH <$docUri> { ?s ?p ?o. }}");
//map the doc
list($docType, $mappedGraphes) = $this->mapDoc($doc, $docUri);
//merge the result docs
$this->mergeDocs($docType, $mappedGraphes);
}
$progressBar->setMessage("finished raw import for text.");
$progressBar->finish();
// delete left overs from previous repository
$this->info("\n\nThere is ".count($providedCHODocsUris)." documents left-over.\n");
if(count($providedCHODocsUris) > 0 && $delete_old) {
foreach($providedCHODocsUris as $graphUri) {
$this->gs->clear($graphUri);
}
}
$this->info("\n\nDocument count info: ");
foreach ($this->documentCount as $docType => $docCount) {
if($docType == 'error' && $docCount > 0) {
$this->error("$docType => $docCount");
} else {
$this->info("$docType => $docCount");
}
}
}
}