server/src/app/Console/Commands/IndexDocuments.php
author ymh <ymh.work@gmail.com>
Wed, 09 Nov 2016 15:05:41 +0100
changeset 406 cf0f23803a53
parent 375 145561ff51ff
child 407 2dba812c7ef2
permissions -rw-r--r--
upgrade elasticsearch to 5.0, upgrade ember

<?php

namespace CorpusParole\Console\Commands;



use Illuminate\Console\Command;
use EasyRdf\Resource;
use EasyRdf\Literal;
use EasyRdf\Graph;

use Carbon\Carbon;

use GuzzleHttp\Client;
use GuzzleHttp\Exception\TransferException;
use GuzzleHttp\Psr7;

use CorpusParole\Libraries\Utils;
use CorpusParole\Repositories\DocumentRepository;
use CorpusParole\Libraries\CocoonUtils;
use CorpusParole\Models\GeonamesHierarchy;
use CorpusParole\Services\BnfResolverInterface;
use CorpusParole\Services\LexvoResolverInterface;
use Es;
use Log;
use Cache;

class IndexDocuments extends Command
{

    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'corpus-parole:indexDocuments
                          {--limit=0 : index only the first n documents, 0 (default) means index everything }
                          {--no-bulk : index documents one by one instead of using ElasticSearch bulk indexing }
                          {--step-size=100 : number of documents to retrieve from repository at a time before indexing}
                          {--reset-geo-cache : reset geo cache befr indexing}';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'Index documents into ElasticSearch.';

    /**
     * Create a new command instance.
     *
     * @return void
     */
    public function __construct(
        DocumentRepository $documentRepository,
        Client $httpClient,
        BnfResolverInterface $bnfResolver,
        LexvoResolverInterface $lexvoResolver)
    {
        $this->documentRepository = $documentRepository;
        $this->bnfResolver = $bnfResolver;
        $this->lexvoResolver = $lexvoResolver;
        $this->httpClient = $httpClient;
        parent::__construct();
    }



    /**
     * Reset Elasticsearch index
     *
     * @return int (1 if sucess, 0 if error)
     */
    private function resetIndex()
    {
        $indexParams = [
            'index' => config('elasticsearch.index')
        ];
        if(Es::indices()->exists($indexParams)){
            $response = Es::indices()->delete($indexParams);
            if($response['acknowledged']!=1){
                return 0;
            }
        }
        // Note: removed the "'store' => True" parameters on fields and use _source on record instead

        $indexParams['body'] = [
            'settings' => [
                'number_of_shards' => config('elasticsearch.shards'),
                'number_of_replicas' => config('elasticsearch.replicas'),
                'index.mapping.ignore_malformed' => True,
                'index.requests.cache.enable' => True
            ],
            'mappings' => [
                'document' => [
                    'properties' => [
                        'title' => [
                            'type' => 'string',
                            'fields' => [
                                'raw' => [
                                    'type' => 'string',
                                    'index' => 'not_analyzed'
                                ]
                            ]
                        ],
                        'date' => [ 'type' => 'date', 'index' => 'not_analyzed'],
                        'geonames_hierarchy' => [ 'type' => 'string', 'index' => 'not_analyzed'],
                        'location' => [ 'type' => 'geo_point'],
                        'creation_date' => ['type' => 'date', 'index' => 'not_analyzed'],
                        'language' => ['type' => 'string', 'index' => 'not_analyzed'],
                        'discourse_types' => ['type' => 'string', 'index' => 'not_analyzed'],
                        'creation_years' => [
                            'type' => 'nested',
                            'properties' => [
                                'year' => [ 'type' => 'short', 'index' => 'not_analyzed'],
                                'weight' => [ 'type' => 'float', 'index' => 'not_analyzed'],
                            ]
                        ] ,
                        'subject' => [
                            'type' => 'nested',
                            'properties' => [
                                'label' => [ 'type' => 'string', 'index' => 'not_analyzed'],
                                'code' => [ 'type' => 'string', 'index' => 'not_analyzed'],
                                'label_code' => [ 'type' => 'string', 'index' => 'not_analyzed']
                            ]
                        ]
                    ]
                ]
            ]
        ];
        $response = Es::indices()->create($indexParams);
        if($response['acknowledged']!=1){
            return 0;
        }
        return 1;
    }


    private function getGeonamesHierarchyArray($geonamesid) {

        $hcache = GeonamesHierarchy::where('geonamesid', $geonamesid)->first();
        if(is_null($hcache)) {

            // TODO: add delay to respect geonames 2k request/hour
            // TODO: manage errors

            $apiBody = $this->httpClient->get(
                config('corpusparole.geonames_hierarchy_webservice_url'),
                [ 'query' =>
                    [ 'geonameId' => $geonamesid,
                      'username' => config('corpusparole.geonames_username') ],
                  'accept' => 'application/json' // TODO: check this
                ]
            )->getBody();
            $hjson = json_decode($apiBody);
            $hcache = new GeonamesHierarchy();
            $hcache->geonamesid = $geonamesid;
            $hcache->hierarchy = $hjson;
            $hcache->save();
        }

        $res = [];
        foreach($hcache->hierarchy['geonames'] as $hierarchyElem) {
            if(in_array($hierarchyElem['fcode'], ['CONT','PCLI', 'PCL','PCLD', 'PCLF', 'PCLH', 'PCLIX', 'PCLIS', 'ADM1'])) {
                array_push($res, $hierarchyElem['geonameId']);
            }
        }
        return $res;

    }

    /**
     * get geonames hierarchy data.
     * @return array list of geonames ids
     */
    private function getGeonamesHierarchy($doc) {
        $geoRes = $doc->getGeoInfo();
        if(is_null($geoRes)) {
            return [];
        }
        // aggregate hierachy list from geonames results
        $res = [];
        foreach($geoRes->getGeonamesLocs() as $gurl) {
            $geonamesId = CocoonUtils::getGeonamesidFromUrl($gurl);
            if(is_null($geonamesId)) {
                continue;
            }
            $hierarchyIds = $this->getGeonamesHierarchyArray($geonamesId);
            $res = array_unique(array_merge($res, $hierarchyIds));
        }
        return $res;

    }

    /**
     * get subjects as { 'label': label, 'code': code } objects
     * Takes only into account the bnf subjects
     */
    private function getSubjects($doc) {

        $sres = array_reduce($doc->getSubjects(), function($res, $s) {
            $mBnf = [];
            $mLexvo = [];

            if($s instanceof Resource && preg_match(config('corpusparole.bnf_ark_url_regexp'), $s->getUri(), $mBnf) === 1) {

                array_push($res, [
                    'uri' => $mBnf[0],
                    'code' => $mBnf[1],
                    'type' => 'bnf'
                ]);
            } elseif($s instanceof Resource && preg_match(config('corpusparole.lexvo_url_regexp'), $s->getUri(), $mLexvo) === 1) {
                array_push($res, [
                    'uri' => $mLexvo[0],
                    'code' => $mLexvo[1],
                    'type' => 'lxv'
                ]);
            } elseif($s instanceof Literal && strpos($s->getDatatypeUri(), config('corpusparole.olac_base_url')) === 0 ) {
                array_push($res, [
                    'uri' => $s->getValue(),
                    'code' => $s->getValue(),
                    'type' => 'olac'
                ]);
            } elseif($s instanceof Literal) {
                array_push($res, [
                    'uri' => $s->getValue(),
                    'code' => $s->getValue(),
                    'type' => 'txt'
                ]);
            }
            return $res;
        }, []);

        $labelsBnf = $this->bnfResolver->getLabels(
            array_unique(array_reduce(
                $sres,
                function($r, $so) {
                    if($so['type'] === 'bnf') {
                        array_push($r, $so['uri']);
                    }
                    return $r;
                },[]
            ))
        );
        $labelsLexvo = $this->lexvoResolver->getNames(
            array_unique(array_reduce(
                $sres,
                function($r, $so) {
                    if($so['type'] === 'lxv') {
                        array_push($r, $so['uri']);
                    }
                    return $r;
                },[]
            ))
        );

        return array_map(function($so) use ($labelsBnf, $labelsLexvo) {
            $label = $so['uri'];
            if($so['type'] === 'bnf') {
                $label = $labelsBnf[$label];
            } elseif ($so['type'] === 'lxv') {
                $label = $labelsLexvo[$label];
            }
            return [ 'label' => $label, 'code' => $so['code'], 'label_code' =>  $label."|".$so['type']."|".$so['code'] ]; }, $sres
        );
    }

    private function graphResolvCoordinate($loc, $graph) {
        $latLit = $graph->getLiteral($loc, "<http://www.w3.org/2003/01/geo/wgs84_pos#lat>");
        if(is_null($latLit) || empty($latLit->getValue())) {
            return null;
        }
        $lat = $latLit->getValue();

        $longLit = $graph->getLiteral($loc, "<http://www.w3.org/2003/01/geo/wgs84_pos#long>");
        if(is_null($longLit) || empty($longLit->getValue())) {
            return null;
        }
        $long = $longLit->getValue();

        return [ $lat, $long ];
    }

    private function loadGraph($url, $type) {
        try {
            $r = $this->httpClient->get($url);
        } catch (TransferException $e) {
            $this->error("loadGraph : Error Loading $url");
            Log::error("loadGraph : Error Loading $url");
            Log::error("loadGraph : Error request " . Psr7\str($e->getRequest()));
            if ($e->hasResponse()) {
                $this->error("loadGraph : Error response " . Psr7\str($e->getResponse()));
                Log::error("loadGraph : Error response " . Psr7\str($e->getResponse()));
            }
            return null;
        }
        try {
            $message = (string)$r->getBody();
            $graph = new Graph($url, $message, $type);
            return $graph;
        } catch (EasyRdf\Exception $e) {
            $this->error("loadGraph : Error parsing $url");
            Log::error("loadGraph : Error parsing $url");
            if($e instanceof EasyRdf\Parser\Exception) {
                Log::error("loadGraph : Error exception line ".$e->getLine().", column: ".$e->getColumn());
            }
            $this->error("loadGraph : Error exception message ".$e->getMessage());
            Log::error("loadGraph : Error exception message ".$e->getMessage());
            Log::error("loadGraph : Error content $message");
            return null;
        }

    }

    private function geonamesResolveCoordinates($loc) {
        $coords = cache("corpus.geonames.coord.$loc");
        if(is_null($coords)) {
            $graph = $this->loadGraph("{$loc}about.rdf", 'rdfxml');
            $coords = is_null($graph)?null:$this->graphResolvCoordinate($loc, $graph);
            cache(["corpus.geonames.coord.$loc" => is_null($coords)?false:$coords], Carbon::now()->addMinutes(20));
        }
        return ($coords===false)?null:$coords;
    }

    private function dbpediaResolveCoordinates($loc) {
        $coords = cache("corpus.dbpedia.coord.$loc");
        if(is_null($coords)) {
            $graph = $this->loadGraph("$loc.rdf", 'rdfxml');
            $coords = is_null($graph)?null:$this->graphResolvCoordinate($loc, $graph);
            cache(["corpus.dbpedia.coord.$loc"=> is_null($coords)?false:$coords], Carbon::now()->addMinutes(20));
        }
        return ($coords===false)?null:$coords;
    }

    private function getLocation($doc) {

        $geoRes = $doc->getGeoInfo();

        if(is_null($geoRes)) {
            return null;
        }

        $locUrls = [];
        foreach($geoRes->getRefLocs() as $loc) {
            if(preg_match(config('corpusparole.geonames_url_regexp'), $loc, $m) === 1) {

                if(!array_key_exists('geonames', $locUrls)) {
                    $locUrls['geonames'] = [];
                }
                array_push($locUrls['geonames'], "http://sws.geonames.org/$m[1]/");

            } elseif(preg_match(config('corpusparole.dbpedia_url_regexp'), $loc, $md) === 1) {
                if(!array_key_exists('dbpedia', $locUrls)) {
                    $locUrls['dbpedia'] = [];
                }
                //$this->line("DBPEDIA MATCH $loc ".print_r($md,true));
                array_push($locUrls['dbpedia'], "http://$md[1]/data/$md[4]");
            }
        }

        $coordinates = null;
        foreach($locUrls as $locType => $locList) {
            foreach($locList as $locationUrl) {
                $coordinates = call_user_func([$this, "${locType}ResolveCoordinates"], $locationUrl);
                if(!is_null($coordinates)) {
                    break;
                }
            }
        }

        if(is_null($coordinates)) {
            $coordinates = [$geoRes->getLatitudeValue(), $geoRes->getLongitudeValue()];
        }

        if(empty($coordinates[0]) || empty($coordinates[1])) {
            return null;
        } else {
            return [floatval($coordinates[0]), floatval($coordinates[1])];
        }

    }

    private function getCreationDate($doc) {

        $created = $doc->getCreated();
        if(is_null($created)) {
            return null;
        }
        $dateType = $created->getDatatypeUri();
        $res = null;

        if($dateType === "http://purl.org/dc/terms/Period") {
            $res = $this->processPeriod($created->getValue());
        }
        elseif($dateType === "http://purl.org/dc/terms/W3CDTF") {
            $res = $this->processDate($created->getValue());
        }

        return $res;

    }

    private function extractDate($dateStr) {
        if(preg_match("/^\\d{4}$/", $dateStr) === 1) {
            $dateStr = "$dateStr-1-1";
        }
        $date = date_create($dateStr);
        if($date === false ) {
            Log::warning("DateStatsController:extractYear bad format for date $dateStr");
            return null;
        }
        return $date;
    }

    private function processPeriod($periodStr, $asDate=false) {
        $start = null;
        $end = null;
        foreach(explode(";", $periodStr) as $elem) {
            $elem = trim($elem);
            if(strpos($elem, 'start=') === 0) {
                $startDate = $this->extractDate(trim(substr($elem, 6)));
                if(is_null($startDate)) {
                    return null;
                }
                $start = intval($startDate->format("Y"));
                if($start === false) {
                    return null;
                }
            } elseif(strpos($elem, 'end=') === 0) {
                $endDate = $this->extractDate(trim(substr($elem, 4)));
                if(is_null($endDate)) {
                    return null;
                }
                $end = intval($endDate->format("Y"));
                if($end === false) {
                    return null;
                }
            }
        }

        if(is_null($start) || is_null($end) || $start>$end ) {
            Log::warning("Bad format for $periodStr");
            return null;
        }

        return array_map(function($y) use ($asDate){
            $date = \DateTime::createFromFormat("Y", "$y");
            if($asDate) {
                return $date;
            } else {
                return $date->format(\DateTime::W3C);
            }

        }, range($start, $end));
    }

    private function processDate($dateStr, $asDate=false) {
        $date = $this->extractDate($dateStr);
        if(is_null($date))  {
            return null;
        } else {
            if($asDate) {
                return $date;
            } else {
                return $date->format(\DateTime::W3C);
            }

        }
    }

    private function getCreationYears($doc) {
        $created = $doc->getCreated();
        if(is_null($created)) {
            return [];
        }
        $dateType = $created->getDatatypeUri();
        $dates = null;

        if($dateType === "http://purl.org/dc/terms/Period") {
            $dates = $this->processPeriod($created->getValue(), true);
        }
        elseif($dateType === "http://purl.org/dc/terms/W3CDTF") {
            $dates = $this->processDate($created->getValue(), true);
            if(!is_null($dates)) {
                $dates = [ $dates, ];
            }
        }
        if(is_null($dates)) {
            return [];
        }
        $count = count($dates);
        return array_map(function($d) use ($count) {
            return [
                'year' => intval($d->format("Y")),
                'weight' => 1/$count
            ];

        }, $dates);
    }

    private function getDiscourseTypes($doc) {
        return array_reduce($doc->getDiscourseTypes(), function($res, $d) {
            $val = null;
            if($d instanceof Resource) {
                $val = $d->getUri();
            } elseif($d instanceof Literal) {
                $datatype = $d->getDatatypeURI();
                $val = (!empty($datatype)?"$datatype#":"").$d->getValue();
            }
            if(!empty($val)) {
                array_push($res,$val);
            }
            return $res;
        }, []);
    }

    private function getDocBody($doc) {
        return [
            'title' => (string)$doc->getTitle(),
            'date' => (string)$doc->getModified(),
            'location' => $this->getLocation($doc),
            'creation_date' => $this->getCreationDate($doc),
            'creation_years' => $this->getCreationYears($doc),
            'language' => $doc->getLanguagesValue(),
            'discourse_types' => $this->getDiscourseTypes($doc),
            'geonames_hierarchy' => $this->getGeonamesHierarchy($doc),
            'subject' => $this->getSubjects($doc),
        ];
    }

    /**
     * Index one document into Elasticsearch
     *
     * @return int (1 if sucess, 0 if error)
     */
    private function indexOne($docId, $docBody)
    {
        $query_data = [
            'index' => config('elasticsearch.index'),
            'type' => 'document',
            'id' => $docId,
            'body' => $docBody
        ];
        Es::index($query_data);
    }

    /**
     * Index multiple document into Elasticsearch
     *
     * @return int (1 if sucess, 0 if error)
     */
     private function indexBulk($docBodies)
     {
          $query_data = ['body' => []];
          foreach($docBodies as $docId => $docBody){
              $query_data['body'][] = [
                  'index' => [
                      '_index' => config('elasticsearch.index'),
                      '_type' => 'document',
                      '_id' => $docId
                  ]
              ];
              $query_data['body'][] = $docBody;
          }
          Es::bulk($query_data);
     }
    /**
     * Execute the console command.
     *
     * @return mixed
     */
    public function handle()
    {
        $this->info('Options:');
        $noBulk = $this->option('no-bulk');
        if ($noBulk)
        {
            $this->comment(' - Indexing without bulk insert');
        }
        else
        {
            $this->comment(' - Indexing using bulk insert');
        }
        $limit = $this->option('limit');
        if ($limit>0) {
            $this->comment(' - Indexing only the first '.$limit.' documents');
        }
        $stepSize = $this->option('step-size');
        $this->comment(' - Indexing with step size of '.$stepSize);

        if($this->option('reset-geo-cache', false)) {
            // delete all rows in GeonamesHierarchy
            GeonamesHierarchy::getQuery()->delete();
            $this->comment('Geonames cache reset!');
        }

        $this->info('Resetting index...');
        $success = $this->resetIndex();
        if($success==1){
            $this->comment('Index reset!');
        }
        else{
            $this->error('Error resetting index ' . config('elasticsearch.index'));
        }

        $this->info('Indexing documents...');

        $limit = (int)$limit;
        $total = $this->documentRepository->getCount();

        if($limit>0) {
            $total = min($limit, $total);
        }

        $progressBar = $this->output->createProgressBar($total);
        $progressBar->setFormat(' %current%/%max% [%bar%] %percent:3s%% - %message%');

        $page = 0;
        $lastPage = PHP_INT_MAX;
        $docIds = [];

        while($page++<$lastPage) {
            $docsPaginator = $this->documentRepository->paginate(null, $stepSize, config('corpusparole.pagination_page_param'), $page, "_graph");
            $lastPage = $docsPaginator->lastPage();
            $docsBodies = [];
            foreach($docsPaginator as $docResult) {
                $docId = (string)$docResult->getId();
                $progressBar->setMessage($docId);
                $progressBar->advance();
                $doc = $this->documentRepository->get($docId);
                $docBody = $this->getDocBody($doc);
                if($noBulk) {
                    $this->indexOne($docId, $docBody);
                } else {
                    $docsBodies[$docId] = $docBody;
                }
                $docIds[] = $docId;
            }
            if(!$noBulk) {
                $this->indexBulk($docsBodies);
            }
        }
        $progressBar->finish();
        $this->info("\nIndexing completed for " . count(array_unique($docIds))." documents (of ".count($docIds).").");

    }
}