diff -r a56a807f5d8e -r 2cb514f10a72 server/src/app/Console/Commands/ImportCocoonRDF.php --- a/server/src/app/Console/Commands/ImportCocoonRDF.php Mon Feb 13 17:13:11 2017 +0100 +++ b/server/src/app/Console/Commands/ImportCocoonRDF.php Wed Feb 15 10:40:05 2017 +0100 @@ -33,7 +33,15 @@ * * @var string */ - protected $signature = 'corpus-parole:importRDF {--skip=0 : Number of record to skip} {--raw : Register raw}'; + protected $signature = "corpus-parole:importRDF + {--skip=0 : Number of record to skip} + {--no-raw : Do not record raw queries} + {--no-raw-clear : Do not clear raw repository} + {--clear : Clear repository} + {--force-import : Overwrite document from import event if the repo version is more recent} + {--keep-repo-doc : Keep the existing doc in repo (default is replace document)} + "; + //protected $signature = 'corpus-parole:importRDF {--skip=0 : Number of record to skip}'; /** * Create a new command instance. @@ -66,6 +74,125 @@ return $docTypes; } + /** + * Map a documents into graphes. + */ + public function mapDoc($doc, $docUri) { + $inputDocTypes = $this->getDocTypes($doc, $docUri); + + $docType = count($inputDocTypes)>0? $inputDocTypes[0]:null; + + if(is_null($docType) || !array_key_exists($docType,ImportCocoonRDF::MAPPER_CLASS_MAP)) { + $this->error("\nError processing $identifier ($docRdfUrl) : $docType unknown mapper"); + Log::error("Error processing $identifier ($docRdfUrl) : $docType unknown mapper"); + $this->documentCount['unknown'] += 1; + continue; + } + + $mapperClass = ImportCocoonRDF::MAPPER_CLASS_MAP[$docType]; + $mapper = new $mapperClass($doc, $docUri); + + try { + $mapper->mapGraph(); + } catch (\Exception $e) { + Log::error("Error processing $identifier ($docRdfUrl) : error mapping graph : $e"); + $this->documentCount['error'] += 1; + } + $this->documentCount['all'] += 1; + $this->documentCount[$docType] = isset($this->documentCount[$docType])?$this->documentCount[$docType]+1:1; + + return [$docType, $mapper->getOutputGraphes()]; + + } + + public function mergeDocs($docType, $outputGraphes) { + + foreach ($outputGraphes as $mappedGraphKey => $mappedGraph) { + + $mappedGraphUri = $mappedGraph->getUri(); + try { + $resDocs = $this->gs->query("CONSTRUCT { ?s ?p ?o } WHERE { GRAPH <$mappedGraphUri> { ?s ?p ?o }}"); + } catch (\Exception $e) { + $this->error("\nError on graph query $mappedGraphUri : $e \n" . $e->getMessage() . "\n"); + Log::error("\nError on graph query $mappedGraphUri : $e \n" . $e->getMessage()); + exit; + } + + $mergedGraph = null; + $doDelete = true; + + if($resDocs->isEmpty()) { + $mergedGraph = $mappedGraph; + $doDelete = false; + } else { + $doDelete = true; + $mappedTypes = $this->getDocTypes($mappedGraph, $mappedGraphUri); + $presentTypes = $this->getDocTypes($resDocs, $mappedGraphUri); + + if($docType == "http://purl.org/dc/dcmitype/Collection" || in_array("http://purl.org/dc/dcmitype/Collection", $mappedTypes)) { + $merger = new \CorpusParole\Libraries\Mergers\CocoonCollectionRdfMerger(); + $baseGraph = $resDocs; + $sourceGraph = $mappedGraph; + } + elseif ($docType == "http://purl.org/dc/dcmitype/Text") { + $merger = new \CorpusParole\Libraries\Mergers\CocoonTextRdfMerger(); + $baseGraph = $resDocs; + $sourceGraph = $mappedGraph; + } + else { + $merger = new \CorpusParole\Libraries\Mergers\CocoonSoundRdfMerger(); + $baseGraph = $mappedGraph; + $sourceGraph = $resDocs; + } + $mergedGraph = $merger->mergeGraph($baseGraph, $sourceGraph, $mappedGraphUri); + if(\EasyRdf\Isomorphic::isomorphic($resDocs, $mergedGraph)) { + //graph are isomorphic no need to go farther for this graph + Log::info("Graph are isomorphic for $mappedGraphUri, skipping"); + continue; + } + } + + try { + if($doDelete) { + $this->gs->clear($mappedGraphUri); + } + $this->gs->insert($mergedGraph, $mappedGraphUri); + } + catch(\Exception $e) { + // just log not much we can do here... + $this->error("\nError on insert $mappedGraphUri : $e"); + Log::error("Error on insert $mappedGraphUri : $e"); + $code = $e->getCode(); + $message = $e->getMessage(); + if($e instanceof EasyRdf\Exception && stripos($message, 'timed out')>=0 && $insertTimeout<= ImportCocoonRDF::INSERT_TIMEOUT_RETRY) { + $this->info("\nThis is a timeout, we continue."); + Log::info("This is a timeout, we continue."); + $insertTimeouts++; + continue; + } + throw $e; + } + } + } + + function getModified($graph) { + // get first element of array + $providedCHORes = $graph->allOfType('http://www.europeana.eu/schemas/edm/ProvidedCHO'); + $providedCHO = reset($providedCHORes); + if($providedCHO === false) { + $date = new \DateTime(); + $date->setTimestamp(0); + return $date; + } + $modified = $providedCHO->getLiteral(""); + if(is_null($modified)) { + $date = new \DateTime(); + $date->setTimestamp(0); + return $date; + } + return \DateTime::createFromFormat(\DateTime::W3C, $modified->getValue()); + } + /** * Execute the console command. @@ -77,187 +204,275 @@ libxml_use_internal_errors(true); $skip = (int)$this->option('skip'); - $raw = $this->option('raw'); + $raw = !$this->option('no-raw'); + $rawClear = !$this->option('no-raw-clear'); + $clear = $this->option('clear'); + $forceImport = $this->option('force-import'); + $keepRepoDoc = $this->option('keep-repo-doc'); $this->comment("Skipping $skip records"); - $this->comment("Recording raw queries: ".($raw?'TRUE':'FALSE')); - - $gs = new \EasyRdf\Sparql\Client(Config::get('corpusparole.rdf4j_query_url'), Config::get('corpusparole.rdf4j_update_url')); - $gs_raw = new \EasyRdf\Sparql\Client(Config::get('corpusparole.rdf4j_query_url_raw'), Config::get('corpusparole.rdf4j_update_url_raw')); - - - $client = new Client(Config::get('corpusparole.cocoon_oaipmh_url')); - $endpoint = new Endpoint($client); - - $recs = $endpoint->listRecords('olac', null, null, 'LanguesDeFrance'); - - //TODO : treat timeout exceptions - $progressBar = $this->output->createProgressBar($recs->getTotalRecordsInCollection()); - $progressBar->setFormat(' %current%/%max% [%bar%] %percent:3s%% - %message%'); - - $insertTimeouts = 0; + $this->comment("Querying Cocoon: ".($raw?'TRUE':'FALSE')); + $this->comment("Clear raw repository: ".($rawClear?'TRUE':'FALSE')); + $this->comment("Clear repository: ".($clear?'TRUE':'FALSE')); + $this->comment("Keep existing document into repository: ".($keepRepoDoc?'TRUE':'FALSE')); + $this->comment("Overwrite more recent document:".($forceImport?'TRUE':'FALSE')); - $documentCounts = ['all' => 0, 'unknown' => 0, 'error' => 0, 'raw_duplicates' => 0]; - - foreach ($recs as $item) { - $item->registerXPathNamespace('oai', "http://www.openarchives.org/OAI/2.0/"); - $identifier = (string) $item->xpath('oai:header/oai:identifier')[0]; - - $docRdfUrl = Config::get('corpusparole.cocoon_rdf_base_uri').substr($identifier, strlen(Config::get('corpusparole.cocoon_doc_id_base'))); - $message = "$identifier : $docRdfUrl"; - if($recs->getNumRetrieved() <= $skip) { - $progressBar->setMessage("$message - Skipping"); - $progressBar->advance(); - continue; - } - $progressBar->setMessage($message); - $progressBar->advance(); - - $docUri = config('corpusparole.cocoon_doc_id_base_uri').substr($identifier, strlen(Config::get('corpusparole.cocoon_doc_id_base'))); + $this->gs = new \EasyRdf\Sparql\Client(Config::get('corpusparole.rdf4j_query_url'), Config::get('corpusparole.rdf4j_update_url')); + $this->gs_raw = new \EasyRdf\Sparql\Client(Config::get('corpusparole.rdf4j_query_url_raw'), Config::get('corpusparole.rdf4j_update_url_raw')); - $docLoaded = false; - $loadRetry = 0; - $doc = null; - while(!$docLoaded && $loadRetry < config('corpusparole.max_load_retry', 3)) { - $loadRetry++; - try { - $doc = new \EasyRdf\Graph($docRdfUrl); - $doc->load(); - $docLoaded = true; - } - //TODO: catch network exception - add error to database - catch(\Exception $e) { - $code = $e->getCode(); - $message = $e->getMessage(); - $this->info("\nError processing $identifier. code : $code, message: $message"); - Log::debug("Error processing $identifier. code : $code, message: $message"); - if($code == 400 || ($code == 0 && stripos($message, 'timed out')>=0) ) { - $this->info("\nTimeout error processing $identifier ($docRdfUrl) : $e, retrying"); - Log::warning("Timeout error processing $identifier ($docRdfUrl) : $e, retrying"); - continue; - } - else { - $this->error("\nError processing $identifier ($docRdfUrl) : $e"); - Log::error("Error processing $identifier ($docRdfUrl) : $e"); - break; - } - //$this->error(print_r($e->getTraceAsString(),true)); - } - } - if(!$docLoaded) { - $documentCounts['error'] += 1; - continue; - } + $this->documentCount = [ + 'all' => 0, + 'unknown' => 0, + 'error' => 0, + 'raw_duplicates' => 0, + 'modified' => 0, + 'replaced' => 0 + ]; - //insert raw - if($raw) { - $resDocsRaw = $gs_raw->query("ASK WHERE { GRAPH <$docUri> { ?s ?p ?o }}"); - if($resDocsRaw->getBoolean()) { - $gs_raw->clear($docUri); + if($raw) { + $client = new Client(Config::get('corpusparole.cocoon_oaipmh_url')); + $endpoint = new Endpoint($client); + + $recs = $endpoint->listRecords('olac', null, null, 'LanguesDeFrance'); - } - $gs_raw->insert($doc, $docUri); + $progressBar = $this->output->createProgressBar($recs->getTotalRecordsInCollection()); + $progressBar->setFormat(' %current%/%max% [%bar%] %percent:3s%% - %message%'); + + $insertTimeouts = 0; + + //Clear raw repository if asked + if($rawClear) { + $this->gs_raw->clear("all"); } - //map doc - $inputDocTypes = $this->getDocTypes($doc, $docUri); - - $docType = count($inputDocTypes)>0? $inputDocTypes[0]:null; + foreach ($recs as $item) { + $item->registerXPathNamespace('oai', "http://www.openarchives.org/OAI/2.0/"); + $identifier = (string) $item->xpath('oai:header/oai:identifier')[0]; - if(is_null($docType) || !array_key_exists($docType,ImportCocoonRDF::MAPPER_CLASS_MAP)) { - $this->error("\nError processing $identifier ($docRdfUrl) : $docType unknown mapper"); - Log::error("Error processing $identifier ($docRdfUrl) : $docType unknown mapper"); - $documentCounts['unknown'] += 1; - continue; - } + $docRdfUrl = Config::get('corpusparole.cocoon_rdf_base_uri').substr($identifier, strlen(Config::get('corpusparole.cocoon_doc_id_base'))); + $message = "$identifier : $docRdfUrl"; + if($recs->getNumRetrieved() <= $skip) { + $progressBar->setMessage("$message - Skipping"); + $progressBar->advance(); + continue; + } + $progressBar->setMessage($message); + $progressBar->advance(); - $mapperClass = ImportCocoonRDF::MAPPER_CLASS_MAP[$docType]; - $mapper = new $mapperClass($doc, $docUri); + $docUri = config('corpusparole.cocoon_doc_id_base_uri').substr($identifier, strlen(Config::get('corpusparole.cocoon_doc_id_base'))); - try { - $mapper->mapGraph(); - } catch (\Exception $e) { - Log::error("Error processing $identifier ($docRdfUrl) : error mapping graph : $e"); - $documentCounts['error'] += 1; - } - $documentCounts['all'] += 1; - $documentCounts[$docType] = isset($documentCounts[$docType])?$documentCounts[$docType]+1:1; - - $mappedGraphes = $mapper->getOutputGraphes(); - - foreach ($mapper->getOutputGraphes() as $mappedGraphKey => $mappedGraph) { - - $mappedGraphUri = $mappedGraph->getUri(); - try { - $resDocs = $gs->query("CONSTRUCT { ?s ?p ?o } WHERE { GRAPH <$mappedGraphUri> { ?s ?p ?o }}"); - } catch (\Exception $e) { - $this->error("\nError on graph query $identifier ($mappedGraphUri) : $e \n" . $e->getBody() . "\n"); - Log::error("\nError on graph query $identifier ($mappedGraphUri) : $e \n" . $e->getBody()); - exit; + $docLoaded = false; + $loadRetry = 0; + $doc = null; + while(!$docLoaded && $loadRetry < config('corpusparole.max_load_retry', 3)) { + $loadRetry++; + try { + $doc = new \EasyRdf\Graph($docRdfUrl); + $doc->load(); + $docLoaded = true; + } + //TODO: catch network exception - add error to database + catch(\Exception $e) { + $code = $e->getCode(); + $message = $e->getMessage(); + $this->info("\nError processing $identifier. code : $code, message: $message"); + Log::debug("Error processing $identifier. code : $code, message: $message"); + if($code == 400 || ($code == 0 && stripos($message, 'timed out')>=0) ) { + $this->info("\nTimeout error processing $identifier ($docRdfUrl) : $e, retrying"); + Log::warning("Timeout error processing $identifier ($docRdfUrl) : $e, retrying"); + continue; + } + else { + $this->error("\nError processing $identifier ($docRdfUrl) : $e"); + Log::error("Error processing $identifier ($docRdfUrl) : $e"); + break; + } + //$this->error(print_r($e->getTraceAsString(),true)); + } + } + if(!$docLoaded) { + $this->documentCount['error'] += 1; + continue; } - $mergedGraph = null; - $doDelete = true; + $resDocsRaw = $this->gs_raw->query("ASK WHERE { GRAPH <$docUri> { ?s ?p ?o }}"); + if($resDocsRaw->getBoolean()) { + $this->gs_raw->clear($docUri); + $this->documentCount['raw_duplicates'] += 1; + } + $this->gs_raw->insert($doc, $docUri); + } + $progressBar->setMessage("finished raw import"); + $progressBar->finish(); + } + + // $collectionDocsUris = $this->gs_raw->query("SELECT distinct ?uri WHERE { + // GRAPH ?uri { + // ?s ?p ?o. + // ?s . + // FILTER(?o IN (, )) + // } + // }"); + + if($clear) { + $this->gs->clear("all"); + } - if($resDocs->isEmpty()) { - $mergedGraph = $mappedGraph; - $doDelete = false; - } - else { - $doDelete = true; - $mappedTypes = $this->getDocTypes($mappedGraph, $mappedGraphUri); - $presentTypes = $this->getDocTypes($resDocs, $mappedGraphUri); + $collectionDocsUris = $this->gs_raw->query("SELECT distinct ?uri WHERE { + GRAPH ?uri { + ?s . + } + }"); + + $collectionCount = count($collectionDocsUris); + $this->info("\nImporting $collectionCount Collections from raw repository"); + $progressBar = $this->output->createProgressBar($collectionCount); + $progressBar->setFormat(' %current%/%max% [%bar%] %percent:3s%% - %message%'); + + + foreach($collectionDocsUris as $docUriRes) { + $docUri = $docUriRes->uri->getUri(); + + $progressBar->setMessage("Importing collection $docUri."); + $progressBar->advance(); + + $doc = $this->gs_raw->query("CONSTRUCT { ?s ?p ?o } WHERE { GRAPH <$docUri> { ?s ?p ?o. }}"); + + //map the doc + list($docType, $mappedGraphes) = $this->mapDoc($doc, $docUri); + + //merge the result docs + $this->mergeDocs($docType, $mappedGraphes); + + } + + $progressBar->setMessage("finished raw import for collections."); + $progressBar->finish(); - if($docType == "http://purl.org/dc/dcmitype/Collection" || in_array("http://purl.org/dc/dcmitype/Collection", $mappedTypes)) { - $merger = new \CorpusParole\Libraries\Mergers\CocoonCollectionRdfMerger(); - $baseGraph = $resDocs; - $sourceGraph = $mappedGraph; - } - elseif ($docType == "http://purl.org/dc/dcmitype/Text") { - $merger = new \CorpusParole\Libraries\Mergers\CocoonTextRdfMerger(); - $baseGraph = $resDocs; - $sourceGraph = $mappedGraph; - } - else { - $merger = new \CorpusParole\Libraries\Mergers\CocoonSoundRdfMerger(); - $baseGraph = $mappedGraph; - $sourceGraph = $resDocs; - } - $mergedGraph = $merger->mergeGraph($baseGraph, $sourceGraph, $mappedGraphUri); - if(\EasyRdf\Isomorphic::isomorphic($resDocs, $mergedGraph)) { - //graph are isomorphic no need to go farther for this graph - Log::info("Graph are isomorphic for $mappedGraphUri (from $identifier : $docRdfUrl), skipping"); - continue; - } + // list the existing documents + $providedCHODocsUris = []; + $providedCHODocsUrisRes = $this->gs->query("SELECT distinct ?uri WHERE { + GRAPH ?uri { + ?s . + } + }"); + + foreach($providedCHODocsUrisRes as $docUriRes) { + array_push($providedCHODocsUris, $docUriRes->uri->getUri()); + } + + $this->info("\n\nWe have ".count($providedCHODocsUris)." providedCHO in database.\n"); + + $soundDocsUris = $this->gs_raw->query("SELECT distinct ?uri WHERE { + GRAPH ?uri { + ?s ?o. + FILTER(?o IN (, )) + } + }"); + + $soundCount = count($soundDocsUris); + $this->info("\nImporting $soundCount Sound (or Moving Image) from raw repository\n"); + $progressBar = $this->output->createProgressBar($soundCount); + $progressBar->setFormat(' %current%/%max% [%bar%] %percent:3s%% - %message%'); + + + foreach($soundDocsUris as $docUriRes) { + $docUri = $docUriRes->uri->getUri(); + + $progressBar->setMessage("Importing Sound (or Moving Image) $docUri."); + $progressBar->advance(); + + $doc = $this->gs_raw->query("CONSTRUCT { ?s ?p ?o } WHERE { GRAPH <$docUri> { ?s ?p ?o. }}"); + + //map the doc + list($docType, $mappedGraphes) = $this->mapDoc($doc, $docUri); + $firstGraph = reset($mappedGraphes); // first graph is main graph + // remove it from list of existing graphes in repository + $firstGraphUri = $firstGraph->getUri(); + if(($key = array_search($firstGraphUri, $providedCHODocsUris)) !== false) { + unset($providedCHODocsUris[$key]); + } + //if asked, delete it from repository. check modified date + //merge the result docs + try { + $resDocs = $this->gs->query("CONSTRUCT { ?s ?p ?o } WHERE { GRAPH <$firstGraphUri> { ?s ?p ?o }}"); + } catch (\Exception $e) { + $this->error("\nError on graph query $firstGraphUri : $e \n" . $e->getMessage() . "\n"); + Log::error("\nError on graph query $firstGraphUri : $e \n" . $e->getMessage()); + exit; + } + $doDelete = true; + if($resDocs->isEmpty()) { + $doDelete = false; + } else { + // get modified from repo + $dateRepo = $this->getModified($resDocs); + // get modified from import + $dateImport = $this->getModified($firstGraph); + + if($dateRepo > $dateImport) { + $this->documentCount['modified'] += 1; + $doDelete = $forceImport; + } else { + $doDelete = !$keepRepoDoc; } - try { - if($doDelete) { - $gs->clear($mappedGraphUri); - } - $gs->insert($mergedGraph, $mappedGraphUri); - } - catch(\Exception $e) { - // just log not much we can do here... - $this->error("\nError on insert $identifier ($docRdfUrl) : $e"); - Log::error("Error on insert $identifier ($docRdfUrl) : $e"); - $code = $e->getCode(); - $message = $e->getMessage(); - if($e instanceof EasyRdf\Exception && stripos($message, 'timed out')>=0 && $insertTimeout<= ImportCocoonRDF::INSERT_TIMEOUT_RETRY) { - $this->info("\nThis is a timeout, we continue."); - Log::info("This is a timeout, we continue."); - $insertTimeouts++; - continue; - } - throw $e; - } + } + + if($doDelete) { + $this->documentCount['replaced'] += 1; + $this->gs->clear($firstGraphUri); + } + + $this->mergeDocs($docType, $mappedGraphes); + } + + $progressBar->setMessage("finished raw import for sounds."); + $progressBar->finish(); + + + $textDocsUris = $this->gs_raw->query("SELECT distinct ?uri WHERE { + GRAPH ?uri { + ?s . + } + }"); + + $textCount = count($textDocsUris); + $this->info("\n\nImporting $textCount text from raw repository\n"); + $progressBar = $this->output->createProgressBar($textCount); + $progressBar->setFormat(' %current%/%max% [%bar%] %percent:3s%% - %message%'); + + + foreach($textDocsUris as $docUriRes) { + $docUri = $docUriRes->uri->getUri(); + + $progressBar->setMessage("Importing Text $docUri."); + $progressBar->advance(); + + $doc = $this->gs_raw->query("CONSTRUCT { ?s ?p ?o } WHERE { GRAPH <$docUri> { ?s ?p ?o. }}"); + + //map the doc + list($docType, $mappedGraphes) = $this->mapDoc($doc, $docUri); + + //merge the result docs + $this->mergeDocs($docType, $mappedGraphes); + + } + + $progressBar->setMessage("finished raw import for text."); + $progressBar->finish(); + + + // delete left overs from previous repository + $this->info("\n\nThere is ".count($providedCHODocsUris)." documents left-over.\n"); + if(count($providedCHODocsUris) > 0 && $delete_old) { + foreach($providedCHODocsUris as $graphUri) { + $this->gs->clear($graphUri); } } - $progressBar->setMessage("finished"); - $progressBar->finish(); - $this->info("\nDocument count info: "); - foreach ($documentCounts as $docType => $docCount) { + $this->info("\n\nDocument count info: "); + foreach ($this->documentCount as $docType => $docCount) { if($docType == 'error' && $docCount > 0) { $this->error("$docType => $docCount"); } else {