--- a/server/src/app/Console/Commands/IndexDocuments.php Sun Oct 16 22:23:31 2016 +0530
+++ b/server/src/app/Console/Commands/IndexDocuments.php Sun Oct 16 23:19:57 2016 +0530
@@ -2,12 +2,28 @@
namespace CorpusParole\Console\Commands;
+
+
use Illuminate\Console\Command;
+use EasyRdf\Resource;
+use EasyRdf\Literal;
+use EasyRdf\Graph;
+
+use Carbon\Carbon;
+
use GuzzleHttp\Client;
+use GuzzleHttp\Exception\TransferException;
+use GuzzleHttp\Psr7;
+
+use CorpusParole\Libraries\Utils;
use CorpusParole\Repositories\DocumentRepository;
use CorpusParole\Libraries\CocoonUtils;
use CorpusParole\Models\GeonamesHierarchy;
+use CorpusParole\Services\BnfResolverInterface;
+use CorpusParole\Services\LexvoResolverInterface;
use Es;
+use Log;
+use Cache;
class IndexDocuments extends Command
{
@@ -20,7 +36,8 @@
protected $signature = 'corpus-parole:indexDocuments
{--limit=0 : index only the first n documents, 0 (default) means index everything }
{--no-bulk : index documents one by one instead of using ElasticSearch bulk indexing }
- {--step-size=100 : number of documents to retrieve from repository at a time before indexing}';
+ {--step-size=100 : number of documents to retrieve from repository at a time before indexing}
+ {--reset-geo-cache : reset geo cache befr indexing}';
/**
* The console command description.
@@ -34,9 +51,15 @@
*
* @return void
*/
- public function __construct(DocumentRepository $documentRepository, Client $httpClient)
+ public function __construct(
+ DocumentRepository $documentRepository,
+ Client $httpClient,
+ BnfResolverInterface $bnfResolver,
+ LexvoResolverInterface $lexvoResolver)
{
$this->documentRepository = $documentRepository;
+ $this->bnfResolver = $bnfResolver;
+ $this->lexvoResolver = $lexvoResolver;
$this->httpClient = $httpClient;
parent::__construct();
}
@@ -65,7 +88,8 @@
'settings' => [
'number_of_shards' => config('elasticsearch.shards'),
'number_of_replicas' => config('elasticsearch.replicas'),
- 'index.mapping.ignore_malformed' => True
+ 'index.mapping.ignore_malformed' => True,
+ 'index.requests.cache.enable' => True
],
'mappings' => [
'document' => [
@@ -79,10 +103,20 @@
]
]
],
- 'date' => [ 'type' => 'date' ],
- 'geonames_hyerarchy' => [ 'type' => 'string' ],
- 'location' => [ 'type' => 'geo_point' ]
- // TODO: add location information
+ 'date' => [ 'type' => 'date', 'index' => 'not_analyzed'],
+ 'geonames_hyerarchy' => [ 'type' => 'string', 'index' => 'not_analyzed'],
+ 'location' => [ 'type' => 'geo_point'],
+ 'creation_date' => ['type' => 'date', 'index' => 'not_analyzed'],
+ 'language' => ['type' => 'string', 'index' => 'not_analyzed'],
+ 'discourse_types' => ['type' => 'string', 'index' => 'not_analyzed'],
+ 'subject' => [
+ 'type' => 'nested',
+ 'properties' => [
+ 'label' => [ 'type' => 'string', 'index' => 'not_analyzed'],
+ 'code' => [ 'type' => 'string', 'index' => 'not_analyzed'],
+ 'label_code' => [ 'type' => 'string', 'index' => 'not_analyzed']
+ ]
+ ]
]
]
]
@@ -96,7 +130,7 @@
private function getGeonamesHierarchyArray($geonamesid) {
- // TODO: Manage this cache !!!
+
$hcache = GeonamesHierarchy::where('geonamesid', $geonamesid)->first();
if(is_null($hcache)) {
@@ -112,7 +146,7 @@
]
)->getBody();
$hjson = json_decode($apiBody);
- $hcache = new GeonamesHierarchy;
+ $hcache = new GeonamesHierarchy();
$hcache->geonamesid = $geonamesid;
$hcache->hierarchy = $hjson;
$hcache->save();
@@ -124,7 +158,6 @@
array_push($res, $hierarchyElem['geonameId']);
}
}
-
return $res;
}
@@ -153,22 +186,311 @@
}
/**
+ * get subjects as { 'label': label, 'code': code } objects
+ * Takes only into account the bnf subjects
+ */
+ private function getSubjects($doc) {
+
+ $sres = array_reduce($doc->getSubjects(), function($res, $s) {
+ $mBnf = [];
+ $mLexvo = [];
+
+ if($s instanceof Resource && preg_match(config('corpusparole.bnf_ark_url_regexp'), $s->getUri(), $mBnf) === 1) {
+
+ array_push($res, [
+ 'uri' => $mBnf[0],
+ 'code' => $mBnf[1],
+ 'type' => 'bnf'
+ ]);
+ } elseif($s instanceof Resource && preg_match(config('corpusparole.lexvo_url_regexp'), $s->getUri(), $mLexvo) === 1) {
+ array_push($res, [
+ 'uri' => $mLexvo[0],
+ 'code' => $mLexvo[1],
+ 'type' => 'lxv'
+ ]);
+ } elseif($s instanceof Literal && strpos($s->getDatatypeUri(), config('corpusparole.olac_base_url')) === 0 ) {
+ array_push($res, [
+ 'uri' => $s->getValue(),
+ 'code' => $s->getValue(),
+ 'type' => 'olac'
+ ]);
+ } elseif($s instanceof Literal) {
+ array_push($res, [
+ 'uri' => $s->getValue(),
+ 'code' => $s->getValue(),
+ 'type' => 'txt'
+ ]);
+ }
+ return $res;
+ }, []);
+
+ $labelsBnf = $this->bnfResolver->getLabels(
+ array_unique(array_reduce(
+ $sres,
+ function($r, $so) {
+ if($so['type'] === 'bnf') {
+ array_push($r, $so['uri']);
+ }
+ return $r;
+ },[]
+ ))
+ );
+ $labelsLexvo = $this->lexvoResolver->getNames(
+ array_unique(array_reduce(
+ $sres,
+ function($r, $so) {
+ if($so['type'] === 'lxv') {
+ array_push($r, $so['uri']);
+ }
+ return $r;
+ },[]
+ ))
+ );
+
+ return array_map(function($so) use ($labelsBnf, $labelsLexvo) {
+ $label = $so['uri'];
+ if($so['type'] === 'bnf') {
+ $label = $labelsBnf[$label];
+ } elseif ($so['type'] === 'lxv') {
+ $label = $labelsLexvo[$label];
+ }
+ return [ 'label' => $label, 'code' => $so['code'], 'label_code' => $label."|".$so['type']."|".$so['code'] ]; }, $sres
+ );
+ }
+
+ private function graphResolvCoordinate($loc, $graph) {
+ $latLit = $graph->getLiteral($loc, "<http://www.w3.org/2003/01/geo/wgs84_pos#lat>");
+ if(is_null($latLit) || empty($latLit->getValue())) {
+ return null;
+ }
+ $lat = $latLit->getValue();
+
+ $longLit = $graph->getLiteral($loc, "<http://www.w3.org/2003/01/geo/wgs84_pos#long>");
+ if(is_null($longLit) || empty($longLit->getValue())) {
+ return null;
+ }
+ $long = $longLit->getValue();
+
+ return [ $lat, $long ];
+ }
+
+ private function loadGraph($url, $type) {
+ try {
+ $r = $this->httpClient->get($url);
+ } catch (TransferException $e) {
+ $this->error("loadGraph : Error Loading $url");
+ Log::error("loadGraph : Error Loading $url");
+ Log::error("loadGraph : Error request " . Psr7\str($e->getRequest()));
+ if ($e->hasResponse()) {
+ $this->error("loadGraph : Error response " . Psr7\str($e->getResponse()));
+ Log::error("loadGraph : Error response " . Psr7\str($e->getResponse()));
+ }
+ return null;
+ }
+ try {
+ $message = (string)$r->getBody();
+ $graph = new Graph($url, $message, $type);
+ return $graph;
+ } catch (EasyRdf\Exception $e) {
+ $this->error("loadGraph : Error parsing $url");
+ Log::error("loadGraph : Error parsing $url");
+ if($e instanceof EasyRdf\Parser\Exception) {
+ Log::error("loadGraph : Error exception line ".$e->getLine().", column: ".$e->getColumn());
+ }
+ $this->error("loadGraph : Error exception message ".$e->getMessage());
+ Log::error("loadGraph : Error exception message ".$e->getMessage());
+ Log::error("loadGraph : Error content $message");
+ return null;
+ }
+
+ }
+
+ private function geonamesResolveCoordinates($loc) {
+ $coords = cache("corpus.geonames.coord.$loc");
+ if(is_null($coords)) {
+ $graph = $this->loadGraph("{$loc}about.rdf", 'rdfxml');
+ $coords = is_null($graph)?null:$this->graphResolvCoordinate($loc, $graph);
+ cache(["corpus.geonames.coord.$loc" => is_null($coords)?false:$coords], Carbon::now()->addMinutes(20));
+ }
+ return ($coords===false)?null:$coords;
+ }
+
+ private function dbpediaResolveCoordinates($loc) {
+ $coords = cache("corpus.dbpedia.coord.$loc");
+ if(is_null($coords)) {
+ $graph = $this->loadGraph("$loc.rdf", 'rdfxml');
+ $coords = is_null($graph)?null:$this->graphResolvCoordinate($loc, $graph);
+ cache(["corpus.dbpedia.coord.$loc"=> is_null($coords)?false:$coords], Carbon::now()->addMinutes(20));
+ }
+ return ($coords===false)?null:$coords;
+ }
+
+ private function getLocation($doc) {
+
+ $geoRes = $doc->getGeoInfo();
+
+ if(is_null($geoRes)) {
+ return null;
+ }
+
+ $locUrls = [];
+ foreach($geoRes->getRefLocs() as $loc) {
+ if(preg_match(config('corpusparole.geonames_url_regexp'), $loc, $m) === 1) {
+
+ if(!array_key_exists('geonames', $locUrls)) {
+ $locUrls['geonames'] = [];
+ }
+ array_push($locUrls['geonames'], "http://sws.geonames.org/$m[1]/");
+
+ } elseif(preg_match(config('corpusparole.dbpedia_url_regexp'), $loc, $md) === 1) {
+ if(!array_key_exists('dbpedia', $locUrls)) {
+ $locUrls['dbpedia'] = [];
+ }
+ //$this->line("DBPEDIA MATCH $loc ".print_r($md,true));
+ array_push($locUrls['dbpedia'], "http://$md[1]/data/$md[4]");
+ }
+ }
+
+ $coordinates = null;
+ foreach($locUrls as $locType => $locList) {
+ foreach($locList as $locationUrl) {
+ $coordinates = call_user_func([$this, "${locType}ResolveCoordinates"], $locationUrl);
+ if(!is_null($coordinates)) {
+ break;
+ }
+ }
+ }
+
+ if(is_null($coordinates)) {
+ $coordinates = [$geoRes->getLatitudeValue(), $geoRes->getLongitudeValue()];
+ }
+
+ if(empty($coordinates[0]) || empty($coordinates[1])) {
+ return null;
+ } else {
+ return [floatval($coordinates[0]), floatval($coordinates[1])];
+ }
+
+ }
+
+ private function getCreationDate($doc) {
+
+ $created = $doc->getCreated();
+ if(is_null($created)) {
+ return null;
+ }
+ $dateType = $created->getDatatypeUri();
+ $res = null;
+
+ if($dateType === "http://purl.org/dc/terms/Period") {
+ $res = $this->processPeriod($created->getValue());
+ }
+ elseif($dateType === "http://purl.org/dc/terms/W3CDTF") {
+ $res = $this->processDate($created->getValue());
+ }
+
+ return $res;
+
+ }
+
+ private function extractDate($dateStr) {
+ if(preg_match("/^\\d{4}$/", $dateStr) === 1) {
+ $dateStr = "$dateStr-1-1";
+ }
+ $date = date_create($dateStr);
+ if($date === false ) {
+ Log::warning("DateStatsController:extractYear bad format for date $dateStr");
+ return null;
+ }
+ return $date;
+ }
+
+ private function processPeriod($periodStr) {
+ $start = null;
+ $end = null;
+ foreach(explode(";", $periodStr) as $elem) {
+ $elem = trim($elem);
+ if(strpos($elem, 'start=') === 0) {
+ $startDate = $this->extractDate(trim(substr($elem, 6)));
+ if(is_null($startDate)) {
+ return null;
+ }
+ $start = intval($startDate->format("Y"));
+ if($start === false) {
+ return null;
+ }
+ } elseif(strpos($elem, 'end=') === 0) {
+ $endDate = $this->extractDate(trim(substr($elem, 4)));
+ if(is_null($endDate)) {
+ return null;
+ }
+ $end = intval($endDate->format("Y"));
+ if($end === false) {
+ return null;
+ }
+ }
+ }
+
+ if(is_null($start) || is_null($end) || $start>$end ) {
+ Log::warning("Bad format for $periodStr");
+ return null;
+ }
+
+ return array_map(function($y) {
+ return \DateTime::createFromFormat("Y", "$y")->format(\DateTime::W3C);
+ }, range($start, $end));
+ }
+
+ private function processDate($dateStr) {
+ $date = $this->extractDate($dateStr);
+ if(is_null($date)) {
+ return null;
+ } else {
+ return $date->format(\DateTime::W3C);
+ }
+ }
+
+ private function getDiscourseTypes($doc) {
+ return array_reduce($doc->getDiscourseTypes(), function($res, $d) {
+ $val = null;
+ if($d instanceof Resource) {
+ $val = $d->getUri();
+ } elseif($d instanceof Literal) {
+ $datatype = $d->getDatatypeURI();
+ $val = (!empty($datatype)?"$datatype#":"").$d->getValue();
+ }
+ if(!empty($val)) {
+ array_push($res,$val);
+ }
+ return $res;
+ }, []);
+ }
+
+ private function getDocBody($doc) {
+ return [
+ 'title' => (string)$doc->getTitle(),
+ 'date' => (string)$doc->getModified(),
+ 'location' => $this->getLocation($doc),
+ 'creation_date' => $this->getCreationDate($doc),
+ 'language' => $doc->getLanguagesValue(),
+ 'discourse_types' => $this->getDiscourseTypes($doc),
+ 'geonames_hierarchy' => $this->getGeonamesHierarchy($doc),
+ 'subject' => $this->getSubjects($doc),
+ ];
+ }
+
+ /**
* Index one document into Elasticsearch
*
* @return int (1 if sucess, 0 if error)
*/
- private function indexOne($resultDoc)
+ private function indexOne($docId, $docBody)
{
- $doc = $this->documentRepository->get($resultDoc->getId());
$query_data = [
'index' => config('elasticsearch.index'),
'type' => 'document',
- 'id' => (string)$doc->getId(),
- 'body' => [
- 'title' => (string)$doc->getTitle(),
- 'date' => (string)$doc->getModified(),
- 'geonames_hierarchy' => $this->getGeonamesHierarchy($doc)
- ]
+ 'id' => $docId,
+ 'body' => $docBody
];
Es::index($query_data);
}
@@ -178,21 +500,18 @@
*
* @return int (1 if sucess, 0 if error)
*/
- private function indexBulk($docs)
+ private function indexBulk($docBodies)
{
$query_data = ['body' => []];
- foreach($docs as $doc){
+ foreach($docBodies as $docId => $docBody){
$query_data['body'][] = [
'index' => [
'_index' => config('elasticsearch.index'),
'_type' => 'document',
- '_id' => (string)$doc->getId()
+ '_id' => $docId
]
];
- $query_data['body'][] = [
- 'title' => (string)$doc->getTitle(),
- 'date' => (string)$doc->getModified()
- ];
+ $query_data['body'][] = $docBody;
}
Es::bulk($query_data);
}
@@ -220,6 +539,12 @@
$stepSize = $this->option('step-size');
$this->comment(' - Indexing with step size of '.$stepSize);
+ if($this->option('reset-geo-cache', false)) {
+ // delete all rows in GeonamesHierarchy
+ GeonamesHierarchy::getQuery()->delete();
+ $this->comment('Geonames cache reset!');
+ }
+
$this->info('Resetting index...');
$success = $this->resetIndex();
if($success==1){
@@ -231,49 +556,43 @@
$this->info('Indexing documents...');
- if ($limit<=0) {
- $lastPage = $this->documentRepository->paginateAll($stepSize, 'page')->lastPage();
- $total = $this->documentRepository->getCount();
- $lastPageEntryCount = $stepSize+1;
- }
- else {
- $lastPage = min((int)($limit/$stepSize)+1, $this->documentRepository->paginateAll($stepSize, 'page')->lastPage());
- $total = $limit;
- $lastPageEntryCount = $limit % $stepSize;
+ $limit = (int)$limit;
+ $total = $this->documentRepository->getCount();
+
+ if($limit>0) {
+ $total = min($limit, $total);
}
- if ($noBulk)
- {
- $progressBar = $this->output->createProgressBar($total);
- }
- else
- {
- $progressBar = $this->output->createProgressBar($lastPage);
- }
+ $progressBar = $this->output->createProgressBar($total);
$progressBar->setFormat(' %current%/%max% [%bar%] %percent:3s%% - %message%');
- for ($page=1;$page<=$lastPage;$page++)
- {
- $docs = $this->documentRepository->paginateAll($stepSize, 'page', $page);
- if ($noBulk)
- {
- foreach ($docs as $i=>$doc){
- if ($page==$lastPage && $i>=$lastPageEntryCount){
- break;
- }
- $this->indexOne($doc);
- $progressBar->advance();
- $progressBar->setMessage($doc->getId());
+ $page = 0;
+ $lastPage = PHP_INT_MAX;
+ $docIds = [];
+
+ while($page++<$lastPage) {
+ $docsPaginator = $this->documentRepository->paginate(null, $stepSize, config('corpusparole.pagination_page_param'), $page, "_graph");
+ $lastPage = $docsPaginator->lastPage();
+ $docsBodies = [];
+ foreach($docsPaginator as $docResult) {
+ $docId = (string)$docResult->getId();
+ $progressBar->setMessage($docId);
+ $progressBar->advance();
+ $doc = $this->documentRepository->get($docId);
+ $docBody = $this->getDocBody($doc);
+ if($noBulk) {
+ $this->indexOne($docId, $docBody);
+ } else {
+ $docsBodies[$docId] = $docBody;
}
+ $docIds[] = $docId;
}
- else
- {
- $this->indexBulk($docs);
- $progressBar->advance();
- $progressBar->setMessage('Page '.$page);
+ if(!$noBulk) {
+ $this->indexBulk($docsBodies);
}
}
$progressBar->finish();
- $this->info('Indexing completed');
+ $this->info("\nIndexing completed for " . count(array_unique($docIds))." documents (of ".count($docIds).").");
+
}
}