server/src/app/Console/Commands/IndexDocuments.php
author ymh <ymh.work@gmail.com>
Sun, 02 Oct 2016 11:49:00 +0200
changeset 308 e032d686d88e
parent 25 4ce76c9e7729
child 320 0fce13da58af
permissions -rw-r--r--
add hierarchy info in document indexation + geostats api controllers + add some keys to geonames resolver

<?php

namespace CorpusParole\Console\Commands;

use Illuminate\Console\Command;
use GuzzleHttp\Client;
use CorpusParole\Repositories\DocumentRepository;
use CorpusParole\Libraries\CocoonUtils;
use CorpusParole\Models\GeonamesHierarchy;
use Es;

class IndexDocuments extends Command
{

    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'corpus-parole:indexDocuments
                          {--limit=0 : index only the first n documents, 0 (default) means index everything }
                          {--no-bulk : index documents one by one instead of using ElasticSearch bulk indexing }
                          {--step-size=100 : number of documents to retrieve from repository at a time before indexing}';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'Index documents into ElasticSearch.';

    /**
     * Create a new command instance.
     *
     * @return void
     */
    public function __construct(DocumentRepository $documentRepository, Client $httpClient)
    {
        $this->documentRepository = $documentRepository;
        $this->httpClient = $httpClient;
        parent::__construct();
    }



    /**
     * Reset Elasticsearch index
     *
     * @return int (1 if sucess, 0 if error)
     */
    private function resetIndex()
    {
        $indexParams = [
            'index' => env('ELASTICSEARCH_INDEX')
        ];
        if(Es::indices()->exists($indexParams)){
            $response = Es::indices()->delete($indexParams);
            if($response['acknowledged']!=1){
                return 0;
            }
        }
        // Note: removed the "'store' => True" parameters on fields and use _source on record instead

        $indexParams['body'] = [
            'settings' => [
                'number_of_shards' => config('elasticsearch.shards'),
                'number_of_replicas' => config('elasticsearch.replicas'),
                'index.mapping.ignore_malformed' => True
            ],
            'mappings' => [
                'document' => [
                    'properties' => [
                        'title' => [
                            'type' => 'string',
                            'fields' => [
                                'raw' => [
                                    'type' => 'string',
                                    'index' => 'not_analyzed'
                                ]
                            ]
                        ],
                        'date' => [ 'type' => 'date' ],
                        'geonames_hyerarchy' => [ 'type' => 'string' ],
                        'location' => [ 'type' => 'geo_point' ]
                        // TODO: add location information
                    ]
                ]
            ]
        ];
        $response = Es::indices()->create($indexParams);
        if($response['acknowledged']!=1){
            return 0;
        }
        return 1;
    }


    private function getGeonamesHierarchyArray($geonamesid) {
        // TODO: Manage this cache !!!
        $hcache = GeonamesHierarchy::where('geonamesid', $geonamesid)->first();
        if(is_null($hcache)) {

            // TODO: add delay to respect geonames 2k request/hour
            // TODO: manage errors

            $apiBody = $this->httpClient->get(
                config('corpusparole.geonames_hierarchy_webservice_url'),
                [ 'query' =>
                    [ 'geonameId' => $geonamesid,
                      'username' => config('corpusparole.geonames_username') ],
                  'accept' => 'application/json' // TODO: check this
                ]
            )->getBody();
            $hjson = json_decode($apiBody);
            $hcache = new GeonamesHierarchy;
            $hcache->geonamesid = $geonamesid;
            $hcache->hierarchy = $hjson;
            $hcache->save();
        }

        $res = [];
        foreach($hcache->hierarchy['geonames'] as $hierarchyElem) {
            if(in_array($hierarchyElem['fcode'], ['CONT','PCLI', 'PCL','PCLD', 'PCLF', 'PCLH', 'PCLIX', 'PCLIS', 'ADM1'])) {
                array_push($res, $hierarchyElem['geonameId']);
            }
        }

        return $res;

    }

    /**
     * get geonames hierarchy data.
     * @return array list of geonames ids
     */
    private function getGeonamesHierarchy($doc) {
        $geoRes = $doc->getGeoInfo();
        if(is_null($geoRes)) {
            return [];
        }
        // aggregate hierachy list from geonames results
        $res = [];
        foreach($geoRes->getGeonamesLocs() as $gurl) {
            $geonamesId = CocoonUtils::getGeonamesidFromUrl($gurl);
            if(is_null($geonamesId)) {
                continue;
            }
            $hierarchyIds = $this->getGeonamesHierarchyArray($geonamesId);
            $res = array_unique(array_merge($res, $hierarchyIds));
        }
        return $res;

    }

    /**
     * Index one document into Elasticsearch
     *
     * @return int (1 if sucess, 0 if error)
     */
    private function indexOne($resultDoc)
    {
        $doc = $this->documentRepository->get($resultDoc->getId());
        $query_data = [
            'index' => config('elasticsearch.index'),
            'type' => 'document',
            'id' => (string)$doc->getId(),
            'body' => [
                'title' => (string)$doc->getTitle(),
                'date' => (string)$doc->getModified(),
                'geonames_hierarchy' => $this->getGeonamesHierarchy($doc)
            ]
        ];
        Es::index($query_data);
    }

    /**
     * Index multiple document into Elasticsearch
     *
     * @return int (1 if sucess, 0 if error)
     */
     private function indexBulk($docs)
     {
          $query_data = ['body' => []];
          foreach($docs as $doc){
              $query_data['body'][] = [
                  'index' => [
                      '_index' => config('elasticsearch.index'),
                      '_type' => 'document',
                      '_id' => (string)$doc->getId()
                  ]
              ];
              $query_data['body'][] = [
                  'title' => (string)$doc->getTitle(),
                  'date' => (string)$doc->getModified()
              ];
          }
          Es::bulk($query_data);
     }
    /**
     * Execute the console command.
     *
     * @return mixed
     */
    public function handle()
    {
        $this->info('Options:');
        $noBulk = $this->option('no-bulk');
        if ($noBulk)
        {
            $this->comment(' - Indexing without bulk insert');
        }
        else
        {
            $this->comment(' - Indexing using bulk insert');
        }
        $limit = $this->option('limit');
        if ($limit>0) {
            $this->comment(' - Indexing only the first '.$limit.' documents');
        }
        $stepSize = $this->option('step-size');
        $this->comment(' - Indexing with step size of '.$stepSize);

        $this->info('Resetting index...');
        $success = $this->resetIndex();
        if($success==1){
            $this->comment('Index reset!');
        }
        else{
            $this->error('Error resetting index ' . env('ELASTICSEARCH_INDEX'));
        }

        $this->info('Indexing documents...');

        if ($limit<=0) {
            $lastPage = $this->documentRepository->paginateAll($stepSize, 'page')->lastPage();
            $total = $this->documentRepository->getCount();
            $lastPageEntryCount = $stepSize+1;
        }
        else {
            $lastPage = min((int)($limit/$stepSize)+1, $this->documentRepository->paginateAll($stepSize, 'page')->lastPage());
            $total = $limit;
            $lastPageEntryCount = $limit % $stepSize;
        }

        if ($noBulk)
        {
            $progressBar = $this->output->createProgressBar($total);
        }
        else
        {
            $progressBar = $this->output->createProgressBar($lastPage);
        }
        $progressBar->setFormat(' %current%/%max% [%bar%] %percent:3s%% - %message%');

        for ($page=1;$page<=$lastPage;$page++)
        {
            $docs = $this->documentRepository->paginateAll($stepSize, 'page', $page);
            if ($noBulk)
            {
                foreach ($docs as $i=>$doc){
                    if ($page==$lastPage && $i>=$lastPageEntryCount){
                        break;
                    }
                    $this->indexOne($doc);
                    $progressBar->advance();
                    $progressBar->setMessage($doc->getId());
                }
            }
            else
            {
                $this->indexBulk($docs);
                $progressBar->advance();
                $progressBar->setMessage('Page '.$page);
            }
        }
        $progressBar->finish();
        $this->info('Indexing completed');
    }
}