diff -r 4a3899b6a7ed -r 766af1228b05 server/src/app/Console/Commands/IndexDocuments.php --- a/server/src/app/Console/Commands/IndexDocuments.php Sun Oct 16 22:23:31 2016 +0530 +++ b/server/src/app/Console/Commands/IndexDocuments.php Sun Oct 16 23:19:57 2016 +0530 @@ -2,12 +2,28 @@ namespace CorpusParole\Console\Commands; + + use Illuminate\Console\Command; +use EasyRdf\Resource; +use EasyRdf\Literal; +use EasyRdf\Graph; + +use Carbon\Carbon; + use GuzzleHttp\Client; +use GuzzleHttp\Exception\TransferException; +use GuzzleHttp\Psr7; + +use CorpusParole\Libraries\Utils; use CorpusParole\Repositories\DocumentRepository; use CorpusParole\Libraries\CocoonUtils; use CorpusParole\Models\GeonamesHierarchy; +use CorpusParole\Services\BnfResolverInterface; +use CorpusParole\Services\LexvoResolverInterface; use Es; +use Log; +use Cache; class IndexDocuments extends Command { @@ -20,7 +36,8 @@ protected $signature = 'corpus-parole:indexDocuments {--limit=0 : index only the first n documents, 0 (default) means index everything } {--no-bulk : index documents one by one instead of using ElasticSearch bulk indexing } - {--step-size=100 : number of documents to retrieve from repository at a time before indexing}'; + {--step-size=100 : number of documents to retrieve from repository at a time before indexing} + {--reset-geo-cache : reset geo cache befr indexing}'; /** * The console command description. @@ -34,9 +51,15 @@ * * @return void */ - public function __construct(DocumentRepository $documentRepository, Client $httpClient) + public function __construct( + DocumentRepository $documentRepository, + Client $httpClient, + BnfResolverInterface $bnfResolver, + LexvoResolverInterface $lexvoResolver) { $this->documentRepository = $documentRepository; + $this->bnfResolver = $bnfResolver; + $this->lexvoResolver = $lexvoResolver; $this->httpClient = $httpClient; parent::__construct(); } @@ -65,7 +88,8 @@ 'settings' => [ 'number_of_shards' => config('elasticsearch.shards'), 'number_of_replicas' => config('elasticsearch.replicas'), - 'index.mapping.ignore_malformed' => True + 'index.mapping.ignore_malformed' => True, + 'index.requests.cache.enable' => True ], 'mappings' => [ 'document' => [ @@ -79,10 +103,20 @@ ] ] ], - 'date' => [ 'type' => 'date' ], - 'geonames_hyerarchy' => [ 'type' => 'string' ], - 'location' => [ 'type' => 'geo_point' ] - // TODO: add location information + 'date' => [ 'type' => 'date', 'index' => 'not_analyzed'], + 'geonames_hyerarchy' => [ 'type' => 'string', 'index' => 'not_analyzed'], + 'location' => [ 'type' => 'geo_point'], + 'creation_date' => ['type' => 'date', 'index' => 'not_analyzed'], + 'language' => ['type' => 'string', 'index' => 'not_analyzed'], + 'discourse_types' => ['type' => 'string', 'index' => 'not_analyzed'], + 'subject' => [ + 'type' => 'nested', + 'properties' => [ + 'label' => [ 'type' => 'string', 'index' => 'not_analyzed'], + 'code' => [ 'type' => 'string', 'index' => 'not_analyzed'], + 'label_code' => [ 'type' => 'string', 'index' => 'not_analyzed'] + ] + ] ] ] ] @@ -96,7 +130,7 @@ private function getGeonamesHierarchyArray($geonamesid) { - // TODO: Manage this cache !!! + $hcache = GeonamesHierarchy::where('geonamesid', $geonamesid)->first(); if(is_null($hcache)) { @@ -112,7 +146,7 @@ ] )->getBody(); $hjson = json_decode($apiBody); - $hcache = new GeonamesHierarchy; + $hcache = new GeonamesHierarchy(); $hcache->geonamesid = $geonamesid; $hcache->hierarchy = $hjson; $hcache->save(); @@ -124,7 +158,6 @@ array_push($res, $hierarchyElem['geonameId']); } } - return $res; } @@ -153,22 +186,311 @@ } /** + * get subjects as { 'label': label, 'code': code } objects + * Takes only into account the bnf subjects + */ + private function getSubjects($doc) { + + $sres = array_reduce($doc->getSubjects(), function($res, $s) { + $mBnf = []; + $mLexvo = []; + + if($s instanceof Resource && preg_match(config('corpusparole.bnf_ark_url_regexp'), $s->getUri(), $mBnf) === 1) { + + array_push($res, [ + 'uri' => $mBnf[0], + 'code' => $mBnf[1], + 'type' => 'bnf' + ]); + } elseif($s instanceof Resource && preg_match(config('corpusparole.lexvo_url_regexp'), $s->getUri(), $mLexvo) === 1) { + array_push($res, [ + 'uri' => $mLexvo[0], + 'code' => $mLexvo[1], + 'type' => 'lxv' + ]); + } elseif($s instanceof Literal && strpos($s->getDatatypeUri(), config('corpusparole.olac_base_url')) === 0 ) { + array_push($res, [ + 'uri' => $s->getValue(), + 'code' => $s->getValue(), + 'type' => 'olac' + ]); + } elseif($s instanceof Literal) { + array_push($res, [ + 'uri' => $s->getValue(), + 'code' => $s->getValue(), + 'type' => 'txt' + ]); + } + return $res; + }, []); + + $labelsBnf = $this->bnfResolver->getLabels( + array_unique(array_reduce( + $sres, + function($r, $so) { + if($so['type'] === 'bnf') { + array_push($r, $so['uri']); + } + return $r; + },[] + )) + ); + $labelsLexvo = $this->lexvoResolver->getNames( + array_unique(array_reduce( + $sres, + function($r, $so) { + if($so['type'] === 'lxv') { + array_push($r, $so['uri']); + } + return $r; + },[] + )) + ); + + return array_map(function($so) use ($labelsBnf, $labelsLexvo) { + $label = $so['uri']; + if($so['type'] === 'bnf') { + $label = $labelsBnf[$label]; + } elseif ($so['type'] === 'lxv') { + $label = $labelsLexvo[$label]; + } + return [ 'label' => $label, 'code' => $so['code'], 'label_code' => $label."|".$so['type']."|".$so['code'] ]; }, $sres + ); + } + + private function graphResolvCoordinate($loc, $graph) { + $latLit = $graph->getLiteral($loc, ""); + if(is_null($latLit) || empty($latLit->getValue())) { + return null; + } + $lat = $latLit->getValue(); + + $longLit = $graph->getLiteral($loc, ""); + if(is_null($longLit) || empty($longLit->getValue())) { + return null; + } + $long = $longLit->getValue(); + + return [ $lat, $long ]; + } + + private function loadGraph($url, $type) { + try { + $r = $this->httpClient->get($url); + } catch (TransferException $e) { + $this->error("loadGraph : Error Loading $url"); + Log::error("loadGraph : Error Loading $url"); + Log::error("loadGraph : Error request " . Psr7\str($e->getRequest())); + if ($e->hasResponse()) { + $this->error("loadGraph : Error response " . Psr7\str($e->getResponse())); + Log::error("loadGraph : Error response " . Psr7\str($e->getResponse())); + } + return null; + } + try { + $message = (string)$r->getBody(); + $graph = new Graph($url, $message, $type); + return $graph; + } catch (EasyRdf\Exception $e) { + $this->error("loadGraph : Error parsing $url"); + Log::error("loadGraph : Error parsing $url"); + if($e instanceof EasyRdf\Parser\Exception) { + Log::error("loadGraph : Error exception line ".$e->getLine().", column: ".$e->getColumn()); + } + $this->error("loadGraph : Error exception message ".$e->getMessage()); + Log::error("loadGraph : Error exception message ".$e->getMessage()); + Log::error("loadGraph : Error content $message"); + return null; + } + + } + + private function geonamesResolveCoordinates($loc) { + $coords = cache("corpus.geonames.coord.$loc"); + if(is_null($coords)) { + $graph = $this->loadGraph("{$loc}about.rdf", 'rdfxml'); + $coords = is_null($graph)?null:$this->graphResolvCoordinate($loc, $graph); + cache(["corpus.geonames.coord.$loc" => is_null($coords)?false:$coords], Carbon::now()->addMinutes(20)); + } + return ($coords===false)?null:$coords; + } + + private function dbpediaResolveCoordinates($loc) { + $coords = cache("corpus.dbpedia.coord.$loc"); + if(is_null($coords)) { + $graph = $this->loadGraph("$loc.rdf", 'rdfxml'); + $coords = is_null($graph)?null:$this->graphResolvCoordinate($loc, $graph); + cache(["corpus.dbpedia.coord.$loc"=> is_null($coords)?false:$coords], Carbon::now()->addMinutes(20)); + } + return ($coords===false)?null:$coords; + } + + private function getLocation($doc) { + + $geoRes = $doc->getGeoInfo(); + + if(is_null($geoRes)) { + return null; + } + + $locUrls = []; + foreach($geoRes->getRefLocs() as $loc) { + if(preg_match(config('corpusparole.geonames_url_regexp'), $loc, $m) === 1) { + + if(!array_key_exists('geonames', $locUrls)) { + $locUrls['geonames'] = []; + } + array_push($locUrls['geonames'], "http://sws.geonames.org/$m[1]/"); + + } elseif(preg_match(config('corpusparole.dbpedia_url_regexp'), $loc, $md) === 1) { + if(!array_key_exists('dbpedia', $locUrls)) { + $locUrls['dbpedia'] = []; + } + //$this->line("DBPEDIA MATCH $loc ".print_r($md,true)); + array_push($locUrls['dbpedia'], "http://$md[1]/data/$md[4]"); + } + } + + $coordinates = null; + foreach($locUrls as $locType => $locList) { + foreach($locList as $locationUrl) { + $coordinates = call_user_func([$this, "${locType}ResolveCoordinates"], $locationUrl); + if(!is_null($coordinates)) { + break; + } + } + } + + if(is_null($coordinates)) { + $coordinates = [$geoRes->getLatitudeValue(), $geoRes->getLongitudeValue()]; + } + + if(empty($coordinates[0]) || empty($coordinates[1])) { + return null; + } else { + return [floatval($coordinates[0]), floatval($coordinates[1])]; + } + + } + + private function getCreationDate($doc) { + + $created = $doc->getCreated(); + if(is_null($created)) { + return null; + } + $dateType = $created->getDatatypeUri(); + $res = null; + + if($dateType === "http://purl.org/dc/terms/Period") { + $res = $this->processPeriod($created->getValue()); + } + elseif($dateType === "http://purl.org/dc/terms/W3CDTF") { + $res = $this->processDate($created->getValue()); + } + + return $res; + + } + + private function extractDate($dateStr) { + if(preg_match("/^\\d{4}$/", $dateStr) === 1) { + $dateStr = "$dateStr-1-1"; + } + $date = date_create($dateStr); + if($date === false ) { + Log::warning("DateStatsController:extractYear bad format for date $dateStr"); + return null; + } + return $date; + } + + private function processPeriod($periodStr) { + $start = null; + $end = null; + foreach(explode(";", $periodStr) as $elem) { + $elem = trim($elem); + if(strpos($elem, 'start=') === 0) { + $startDate = $this->extractDate(trim(substr($elem, 6))); + if(is_null($startDate)) { + return null; + } + $start = intval($startDate->format("Y")); + if($start === false) { + return null; + } + } elseif(strpos($elem, 'end=') === 0) { + $endDate = $this->extractDate(trim(substr($elem, 4))); + if(is_null($endDate)) { + return null; + } + $end = intval($endDate->format("Y")); + if($end === false) { + return null; + } + } + } + + if(is_null($start) || is_null($end) || $start>$end ) { + Log::warning("Bad format for $periodStr"); + return null; + } + + return array_map(function($y) { + return \DateTime::createFromFormat("Y", "$y")->format(\DateTime::W3C); + }, range($start, $end)); + } + + private function processDate($dateStr) { + $date = $this->extractDate($dateStr); + if(is_null($date)) { + return null; + } else { + return $date->format(\DateTime::W3C); + } + } + + private function getDiscourseTypes($doc) { + return array_reduce($doc->getDiscourseTypes(), function($res, $d) { + $val = null; + if($d instanceof Resource) { + $val = $d->getUri(); + } elseif($d instanceof Literal) { + $datatype = $d->getDatatypeURI(); + $val = (!empty($datatype)?"$datatype#":"").$d->getValue(); + } + if(!empty($val)) { + array_push($res,$val); + } + return $res; + }, []); + } + + private function getDocBody($doc) { + return [ + 'title' => (string)$doc->getTitle(), + 'date' => (string)$doc->getModified(), + 'location' => $this->getLocation($doc), + 'creation_date' => $this->getCreationDate($doc), + 'language' => $doc->getLanguagesValue(), + 'discourse_types' => $this->getDiscourseTypes($doc), + 'geonames_hierarchy' => $this->getGeonamesHierarchy($doc), + 'subject' => $this->getSubjects($doc), + ]; + } + + /** * Index one document into Elasticsearch * * @return int (1 if sucess, 0 if error) */ - private function indexOne($resultDoc) + private function indexOne($docId, $docBody) { - $doc = $this->documentRepository->get($resultDoc->getId()); $query_data = [ 'index' => config('elasticsearch.index'), 'type' => 'document', - 'id' => (string)$doc->getId(), - 'body' => [ - 'title' => (string)$doc->getTitle(), - 'date' => (string)$doc->getModified(), - 'geonames_hierarchy' => $this->getGeonamesHierarchy($doc) - ] + 'id' => $docId, + 'body' => $docBody ]; Es::index($query_data); } @@ -178,21 +500,18 @@ * * @return int (1 if sucess, 0 if error) */ - private function indexBulk($docs) + private function indexBulk($docBodies) { $query_data = ['body' => []]; - foreach($docs as $doc){ + foreach($docBodies as $docId => $docBody){ $query_data['body'][] = [ 'index' => [ '_index' => config('elasticsearch.index'), '_type' => 'document', - '_id' => (string)$doc->getId() + '_id' => $docId ] ]; - $query_data['body'][] = [ - 'title' => (string)$doc->getTitle(), - 'date' => (string)$doc->getModified() - ]; + $query_data['body'][] = $docBody; } Es::bulk($query_data); } @@ -220,6 +539,12 @@ $stepSize = $this->option('step-size'); $this->comment(' - Indexing with step size of '.$stepSize); + if($this->option('reset-geo-cache', false)) { + // delete all rows in GeonamesHierarchy + GeonamesHierarchy::getQuery()->delete(); + $this->comment('Geonames cache reset!'); + } + $this->info('Resetting index...'); $success = $this->resetIndex(); if($success==1){ @@ -231,49 +556,43 @@ $this->info('Indexing documents...'); - if ($limit<=0) { - $lastPage = $this->documentRepository->paginateAll($stepSize, 'page')->lastPage(); - $total = $this->documentRepository->getCount(); - $lastPageEntryCount = $stepSize+1; - } - else { - $lastPage = min((int)($limit/$stepSize)+1, $this->documentRepository->paginateAll($stepSize, 'page')->lastPage()); - $total = $limit; - $lastPageEntryCount = $limit % $stepSize; + $limit = (int)$limit; + $total = $this->documentRepository->getCount(); + + if($limit>0) { + $total = min($limit, $total); } - if ($noBulk) - { - $progressBar = $this->output->createProgressBar($total); - } - else - { - $progressBar = $this->output->createProgressBar($lastPage); - } + $progressBar = $this->output->createProgressBar($total); $progressBar->setFormat(' %current%/%max% [%bar%] %percent:3s%% - %message%'); - for ($page=1;$page<=$lastPage;$page++) - { - $docs = $this->documentRepository->paginateAll($stepSize, 'page', $page); - if ($noBulk) - { - foreach ($docs as $i=>$doc){ - if ($page==$lastPage && $i>=$lastPageEntryCount){ - break; - } - $this->indexOne($doc); - $progressBar->advance(); - $progressBar->setMessage($doc->getId()); + $page = 0; + $lastPage = PHP_INT_MAX; + $docIds = []; + + while($page++<$lastPage) { + $docsPaginator = $this->documentRepository->paginate(null, $stepSize, config('corpusparole.pagination_page_param'), $page, "_graph"); + $lastPage = $docsPaginator->lastPage(); + $docsBodies = []; + foreach($docsPaginator as $docResult) { + $docId = (string)$docResult->getId(); + $progressBar->setMessage($docId); + $progressBar->advance(); + $doc = $this->documentRepository->get($docId); + $docBody = $this->getDocBody($doc); + if($noBulk) { + $this->indexOne($docId, $docBody); + } else { + $docsBodies[$docId] = $docBody; } + $docIds[] = $docId; } - else - { - $this->indexBulk($docs); - $progressBar->advance(); - $progressBar->setMessage('Page '.$page); + if(!$noBulk) { + $this->indexBulk($docsBodies); } } $progressBar->finish(); - $this->info('Indexing completed'); + $this->info("\nIndexing completed for " . count(array_unique($docIds))." documents (of ".count($docIds).")."); + } }