<?php
namespace CorpusParole\Console\Commands;
use Illuminate\Console\Command;
use EasyRdf\Resource;
use EasyRdf\Literal;
use EasyRdf\Graph;
use Carbon\Carbon;
use GuzzleHttp\Client;
use GuzzleHttp\Exception\TransferException;
use GuzzleHttp\Psr7;
use CorpusParole\Libraries\Utils;
use CorpusParole\Repositories\DocumentRepository;
use CorpusParole\Libraries\CocoonUtils;
use CorpusParole\Models\GeonamesHierarchy;
use CorpusParole\Services\BnfResolverInterface;
use CorpusParole\Services\LexvoResolverInterface;
use Es;
use Log;
use Cache;
class IndexDocuments extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'corpus-parole:indexDocuments
{--limit=0 : index only the first n documents, 0 (default) means index everything }
{--no-bulk : index documents one by one instead of using ElasticSearch bulk indexing }
{--step-size=100 : number of documents to retrieve from repository at a time before indexing}
{--reset-geo-cache : reset geo cache befr indexing}';
/**
* The console command description.
*
* @var string
*/
protected $description = 'Index documents into ElasticSearch.';
/**
* Create a new command instance.
*
* @return void
*/
public function __construct(
DocumentRepository $documentRepository,
Client $httpClient,
BnfResolverInterface $bnfResolver,
LexvoResolverInterface $lexvoResolver)
{
$this->documentRepository = $documentRepository;
$this->bnfResolver = $bnfResolver;
$this->lexvoResolver = $lexvoResolver;
$this->httpClient = $httpClient;
parent::__construct();
}
/**
* Reset Elasticsearch index
*
* @return int (1 if sucess, 0 if error)
*/
private function resetIndex()
{
$indexParams = [
'index' => config('elasticsearch.index')
];
if(Es::indices()->exists($indexParams)){
$response = Es::indices()->delete($indexParams);
if($response['acknowledged']!=1){
return 0;
}
}
// Note: removed the "'store' => True" parameters on fields and use _source on record instead
$indexParams['body'] = [
'settings' => [
'number_of_shards' => config('elasticsearch.shards'),
'number_of_replicas' => config('elasticsearch.replicas'),
'index.mapping.ignore_malformed' => True,
'index.requests.cache.enable' => True
],
'mappings' => [
'document' => [
'properties' => [
'title' => [
'type' => 'string',
'fields' => [
'raw' => [
'type' => 'string',
'index' => 'not_analyzed'
]
]
],
'date' => [ 'type' => 'date', 'index' => 'not_analyzed'],
'geonames_hierarchy' => [ 'type' => 'string', 'index' => 'not_analyzed'],
'location' => [ 'type' => 'geo_point'],
'creation_date' => ['type' => 'date', 'index' => 'not_analyzed'],
'language' => ['type' => 'string', 'index' => 'not_analyzed'],
'discourse_types' => ['type' => 'string', 'index' => 'not_analyzed'],
'creation_years' => [
'type' => 'nested',
'properties' => [
'year' => [ 'type' => 'short', 'index' => 'not_analyzed'],
'weight' => [ 'type' => 'float', 'index' => 'not_analyzed'],
]
] ,
'subject' => [
'type' => 'nested',
'properties' => [
'label' => [ 'type' => 'string', 'index' => 'not_analyzed'],
'code' => [ 'type' => 'string', 'index' => 'not_analyzed'],
'label_code' => [ 'type' => 'string', 'index' => 'not_analyzed']
]
]
]
]
]
];
$response = Es::indices()->create($indexParams);
if($response['acknowledged']!=1){
return 0;
}
return 1;
}
private function getGeonamesHierarchyArray($geonamesid) {
$hcache = GeonamesHierarchy::where('geonamesid', $geonamesid)->first();
if(is_null($hcache)) {
// TODO: add delay to respect geonames 2k request/hour
// TODO: manage errors
$apiBody = $this->httpClient->get(
config('corpusparole.geonames_hierarchy_webservice_url'),
[ 'query' =>
[ 'geonameId' => $geonamesid,
'username' => config('corpusparole.geonames_username') ],
'accept' => 'application/json' // TODO: check this
]
)->getBody();
$hjson = json_decode($apiBody);
$hcache = new GeonamesHierarchy();
$hcache->geonamesid = $geonamesid;
$hcache->hierarchy = $hjson;
$hcache->save();
}
$res = [];
foreach($hcache->hierarchy['geonames'] as $hierarchyElem) {
if(in_array($hierarchyElem['fcode'], ['CONT','PCLI', 'PCL','PCLD', 'PCLF', 'PCLH', 'PCLIX', 'PCLIS', 'ADM1'])) {
array_push($res, $hierarchyElem['geonameId']);
}
}
return $res;
}
/**
* get geonames hierarchy data.
* @return array list of geonames ids
*/
private function getGeonamesHierarchy($doc) {
$geoRes = $doc->getGeoInfo();
if(is_null($geoRes)) {
return [];
}
// aggregate hierachy list from geonames results
$res = [];
foreach($geoRes->getGeonamesLocs() as $gurl) {
$geonamesId = CocoonUtils::getGeonamesidFromUrl($gurl);
if(is_null($geonamesId)) {
continue;
}
$hierarchyIds = $this->getGeonamesHierarchyArray($geonamesId);
$res = array_unique(array_merge($res, $hierarchyIds));
}
return $res;
}
/**
* get subjects as { 'label': label, 'code': code } objects
* Takes only into account the bnf subjects
*/
private function getSubjects($doc) {
$sres = array_reduce($doc->getSubjects(), function($res, $s) {
$mBnf = [];
$mLexvo = [];
if($s instanceof Resource && preg_match(config('corpusparole.bnf_ark_url_regexp'), $s->getUri(), $mBnf) === 1) {
array_push($res, [
'uri' => $mBnf[0],
'code' => $mBnf[1],
'type' => 'bnf'
]);
} elseif($s instanceof Resource && preg_match(config('corpusparole.lexvo_url_regexp'), $s->getUri(), $mLexvo) === 1) {
array_push($res, [
'uri' => $mLexvo[0],
'code' => $mLexvo[1],
'type' => 'lxv'
]);
} elseif($s instanceof Literal && strpos($s->getDatatypeUri(), config('corpusparole.olac_base_url')) === 0 ) {
array_push($res, [
'uri' => $s->getValue(),
'code' => $s->getValue(),
'type' => 'olac'
]);
} elseif($s instanceof Literal) {
array_push($res, [
'uri' => $s->getValue(),
'code' => $s->getValue(),
'type' => 'txt'
]);
}
return $res;
}, []);
$labelsBnf = $this->bnfResolver->getLabels(
array_unique(array_reduce(
$sres,
function($r, $so) {
if($so['type'] === 'bnf') {
array_push($r, $so['uri']);
}
return $r;
},[]
))
);
$labelsLexvo = $this->lexvoResolver->getNames(
array_unique(array_reduce(
$sres,
function($r, $so) {
if($so['type'] === 'lxv') {
array_push($r, $so['uri']);
}
return $r;
},[]
))
);
return array_map(function($so) use ($labelsBnf, $labelsLexvo) {
$label = $so['uri'];
if($so['type'] === 'bnf') {
$label = $labelsBnf[$label];
} elseif ($so['type'] === 'lxv') {
$label = $labelsLexvo[$label];
}
return [ 'label' => $label, 'code' => $so['code'], 'label_code' => $label."|".$so['type']."|".$so['code'] ]; }, $sres
);
}
private function graphResolvCoordinate($loc, $graph) {
$latLit = $graph->getLiteral($loc, "<http://www.w3.org/2003/01/geo/wgs84_pos#lat>");
if(is_null($latLit) || empty($latLit->getValue())) {
return null;
}
$lat = $latLit->getValue();
$longLit = $graph->getLiteral($loc, "<http://www.w3.org/2003/01/geo/wgs84_pos#long>");
if(is_null($longLit) || empty($longLit->getValue())) {
return null;
}
$long = $longLit->getValue();
return [ $lat, $long ];
}
private function loadGraph($url, $type) {
try {
$r = $this->httpClient->get($url);
} catch (TransferException $e) {
$this->error("loadGraph : Error Loading $url");
Log::error("loadGraph : Error Loading $url");
Log::error("loadGraph : Error request " . Psr7\str($e->getRequest()));
if ($e->hasResponse()) {
$this->error("loadGraph : Error response " . Psr7\str($e->getResponse()));
Log::error("loadGraph : Error response " . Psr7\str($e->getResponse()));
}
return null;
}
try {
$message = (string)$r->getBody();
$graph = new Graph($url, $message, $type);
return $graph;
} catch (EasyRdf\Exception $e) {
$this->error("loadGraph : Error parsing $url");
Log::error("loadGraph : Error parsing $url");
if($e instanceof EasyRdf\Parser\Exception) {
Log::error("loadGraph : Error exception line ".$e->getLine().", column: ".$e->getColumn());
}
$this->error("loadGraph : Error exception message ".$e->getMessage());
Log::error("loadGraph : Error exception message ".$e->getMessage());
Log::error("loadGraph : Error content $message");
return null;
}
}
private function geonamesResolveCoordinates($loc) {
$coords = cache("corpus.geonames.coord.$loc");
if(is_null($coords)) {
$graph = $this->loadGraph("{$loc}about.rdf", 'rdfxml');
$coords = is_null($graph)?null:$this->graphResolvCoordinate($loc, $graph);
cache(["corpus.geonames.coord.$loc" => is_null($coords)?false:$coords], Carbon::now()->addMinutes(20));
}
return ($coords===false)?null:$coords;
}
private function dbpediaResolveCoordinates($loc) {
$coords = cache("corpus.dbpedia.coord.$loc");
if(is_null($coords)) {
$graph = $this->loadGraph("$loc.rdf", 'rdfxml');
$coords = is_null($graph)?null:$this->graphResolvCoordinate($loc, $graph);
cache(["corpus.dbpedia.coord.$loc"=> is_null($coords)?false:$coords], Carbon::now()->addMinutes(20));
}
return ($coords===false)?null:$coords;
}
private function getLocation($doc) {
$geoRes = $doc->getGeoInfo();
if(is_null($geoRes)) {
return null;
}
$locUrls = [];
foreach($geoRes->getRefLocs() as $loc) {
if(preg_match(config('corpusparole.geonames_url_regexp'), $loc, $m) === 1) {
if(!array_key_exists('geonames', $locUrls)) {
$locUrls['geonames'] = [];
}
array_push($locUrls['geonames'], "http://sws.geonames.org/$m[1]/");
} elseif(preg_match(config('corpusparole.dbpedia_url_regexp'), $loc, $md) === 1) {
if(!array_key_exists('dbpedia', $locUrls)) {
$locUrls['dbpedia'] = [];
}
//$this->line("DBPEDIA MATCH $loc ".print_r($md,true));
array_push($locUrls['dbpedia'], "http://$md[1]/data/$md[4]");
}
}
$coordinates = null;
foreach($locUrls as $locType => $locList) {
foreach($locList as $locationUrl) {
$coordinates = call_user_func([$this, "${locType}ResolveCoordinates"], $locationUrl);
if(!is_null($coordinates)) {
break;
}
}
}
if(is_null($coordinates)) {
$coordinates = [$geoRes->getLatitudeValue(), $geoRes->getLongitudeValue()];
}
if(empty($coordinates[0]) || empty($coordinates[1])) {
return null;
} else {
return [floatval($coordinates[0]), floatval($coordinates[1])];
}
}
private function getCreationDate($doc) {
$created = $doc->getCreated();
if(is_null($created)) {
return null;
}
$dateType = $created->getDatatypeUri();
$res = null;
if($dateType === "http://purl.org/dc/terms/Period") {
$res = $this->processPeriod($created->getValue());
}
elseif($dateType === "http://purl.org/dc/terms/W3CDTF") {
$res = $this->processDate($created->getValue());
}
return $res;
}
private function extractDate($dateStr) {
if(preg_match("/^\\d{4}$/", $dateStr) === 1) {
$dateStr = "$dateStr-1-1";
}
$date = date_create($dateStr);
if($date === false ) {
Log::warning("DateStatsController:extractYear bad format for date $dateStr");
return null;
}
return $date;
}
private function processPeriod($periodStr, $asDate=false) {
$start = null;
$end = null;
foreach(explode(";", $periodStr) as $elem) {
$elem = trim($elem);
if(strpos($elem, 'start=') === 0) {
$startDate = $this->extractDate(trim(substr($elem, 6)));
if(is_null($startDate)) {
return null;
}
$start = intval($startDate->format("Y"));
if($start === false) {
return null;
}
} elseif(strpos($elem, 'end=') === 0) {
$endDate = $this->extractDate(trim(substr($elem, 4)));
if(is_null($endDate)) {
return null;
}
$end = intval($endDate->format("Y"));
if($end === false) {
return null;
}
}
}
if(is_null($start) || is_null($end) || $start>$end ) {
Log::warning("Bad format for $periodStr");
return null;
}
return array_map(function($y) use ($asDate){
$date = \DateTime::createFromFormat("Y", "$y");
if($asDate) {
return $date;
} else {
return $date->format(\DateTime::W3C);
}
}, range($start, $end));
}
private function processDate($dateStr, $asDate=false) {
$date = $this->extractDate($dateStr);
if(is_null($date)) {
return null;
} else {
if($asDate) {
return $date;
} else {
return $date->format(\DateTime::W3C);
}
}
}
private function getCreationYears($doc) {
$created = $doc->getCreated();
if(is_null($created)) {
return [];
}
$dateType = $created->getDatatypeUri();
$dates = null;
if($dateType === "http://purl.org/dc/terms/Period") {
$dates = $this->processPeriod($created->getValue(), true);
}
elseif($dateType === "http://purl.org/dc/terms/W3CDTF") {
$dates = $this->processDate($created->getValue(), true);
if(!is_null($dates)) {
$dates = [ $dates, ];
}
}
if(is_null($dates)) {
return [];
}
$count = count($dates);
return array_map(function($d) use ($count) {
return [
'year' => intval($d->format("Y")),
'weight' => 1/$count
];
}, $dates);
}
private function getDiscourseTypes($doc) {
return array_reduce($doc->getDiscourseTypes(), function($res, $d) {
$val = null;
if($d instanceof Resource) {
$val = $d->getUri();
} elseif($d instanceof Literal) {
$datatype = $d->getDatatypeURI();
$val = (!empty($datatype)?"$datatype#":"").$d->getValue();
}
if(!empty($val)) {
array_push($res,$val);
}
return $res;
}, []);
}
private function getDocBody($doc) {
return [
'title' => (string)$doc->getTitle(),
'date' => (string)$doc->getModified(),
'location' => $this->getLocation($doc),
'creation_date' => $this->getCreationDate($doc),
'creation_years' => $this->getCreationYears($doc),
'language' => $doc->getLanguagesValue(),
'discourse_types' => $this->getDiscourseTypes($doc),
'geonames_hierarchy' => $this->getGeonamesHierarchy($doc),
'subject' => $this->getSubjects($doc),
];
}
/**
* Index one document into Elasticsearch
*
* @return int (1 if sucess, 0 if error)
*/
private function indexOne($docId, $docBody)
{
$query_data = [
'index' => config('elasticsearch.index'),
'type' => 'document',
'id' => $docId,
'body' => $docBody
];
Es::index($query_data);
}
/**
* Index multiple document into Elasticsearch
*
* @return int (1 if sucess, 0 if error)
*/
private function indexBulk($docBodies)
{
$query_data = ['body' => []];
foreach($docBodies as $docId => $docBody){
$query_data['body'][] = [
'index' => [
'_index' => config('elasticsearch.index'),
'_type' => 'document',
'_id' => $docId
]
];
$query_data['body'][] = $docBody;
}
Es::bulk($query_data);
}
/**
* Execute the console command.
*
* @return mixed
*/
public function handle()
{
$this->info('Options:');
$noBulk = $this->option('no-bulk');
if ($noBulk)
{
$this->comment(' - Indexing without bulk insert');
}
else
{
$this->comment(' - Indexing using bulk insert');
}
$limit = $this->option('limit');
if ($limit>0) {
$this->comment(' - Indexing only the first '.$limit.' documents');
}
$stepSize = $this->option('step-size');
$this->comment(' - Indexing with step size of '.$stepSize);
if($this->option('reset-geo-cache', false)) {
// delete all rows in GeonamesHierarchy
GeonamesHierarchy::getQuery()->delete();
$this->comment('Geonames cache reset!');
}
$this->info('Resetting index...');
$success = $this->resetIndex();
if($success==1){
$this->comment('Index reset!');
}
else{
$this->error('Error resetting index ' . config('elasticsearch.index'));
}
$this->info('Indexing documents...');
$limit = (int)$limit;
$total = $this->documentRepository->getCount();
if($limit>0) {
$total = min($limit, $total);
}
$progressBar = $this->output->createProgressBar($total);
$progressBar->setFormat(' %current%/%max% [%bar%] %percent:3s%% - %message%');
$page = 0;
$lastPage = PHP_INT_MAX;
$docIds = [];
while($page++<$lastPage) {
$docsPaginator = $this->documentRepository->paginate(null, $stepSize, config('corpusparole.pagination_page_param'), $page, "_graph");
$lastPage = $docsPaginator->lastPage();
$docsBodies = [];
foreach($docsPaginator as $docResult) {
$docId = (string)$docResult->getId();
$progressBar->setMessage($docId);
$progressBar->advance();
$doc = $this->documentRepository->get($docId);
$docBody = $this->getDocBody($doc);
if($noBulk) {
$this->indexOne($docId, $docBody);
} else {
$docsBodies[$docId] = $docBody;
}
$docIds[] = $docId;
}
if(!$noBulk) {
$this->indexBulk($docsBodies);
}
}
$progressBar->finish();
$this->info("\nIndexing completed for " . count(array_unique($docIds))." documents (of ".count($docIds).").");
}
}