server/src/app/Console/Commands/IndexDocuments.php
author ymh <ymh.work@gmail.com>
Thu, 14 Jan 2016 18:35:23 +0100
changeset 25 4ce76c9e7729
parent 24 de47e8f66e8b
child 308 e032d686d88e
permissions -rw-r--r--
small corrections

<?php

namespace CorpusParole\Console\Commands;

use Illuminate\Console\Command;
use CorpusParole\Repositories\DocumentRepository;
use Es;

class IndexDocuments extends Command
{

    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'corpus-parole:indexDocuments
                          {--limit=0 : index only the first n documents, 0 (default) means index everything }
                          {--no-bulk : index documents one by one instead of using ElasticSearch bulk indexing }
                          {--step-size=100 : number of documents to retrieve from repository at a time before indexing}';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'Index documents into ElasticSearch.';

    /**
     * Create a new command instance.
     *
     * @return void
     */
    public function __construct(DocumentRepository $documentRepository)
    {
        $this->documentRepository = $documentRepository;
        parent::__construct();
    }



    /**
     * Reset Elasticsearch index
     *
     * @return int (1 if sucess, 0 if error)
     */
    private function resetIndex()
    {
        $indexParams = [
            'index' => env('ELASTICSEARCH_INDEX')
        ];
        if(Es::indices()->exists($indexParams)){
            $response = Es::indices()->delete($indexParams);
            if($response['acknowledged']!=1){
                return 0;
            }
        }
        $indexParams['body'] = [
            'settings' => [
                'number_of_shards' => conf('elasticsearch.shards'),
                'number_of_replicas' => conf('elasticsearch.replicas'),
                'index.mapping.ignore_malformed' => True
            ],
            'mappings' => [
                'document' => [
                    'properties' => [
                        'title' => [
                            'type' => 'string',
                            'store' => True,
                            'fields' => [
                                'raw' => [
                                    'type' => 'string',
                                    'index' => 'not_analyzed'
                                ]
                            ]
                        ],
                        'date' => [
                            'type' => 'date',
                            'store' => True
                        ]
                    ]
                ]
            ]
        ];
        $response = Es::indices()->create($indexParams);
        if($response['acknowledged']!=1){
            return 0;
        }
        return 1;
    }

    /**
     * Index one document into Elasticsearch
     *
     * @return int (1 if sucess, 0 if error)
     */
    private function indexOne($doc)
    {
        $query_data = [
            'index' => conf('elasticsearch.index'),
            'type' => 'document',
            'id' => (string)$doc->getId(),
            'body' => [
                'title' => (string)$doc->getTitle(),
                'date' => (string)$doc->getModified()
            ]
        ];
        Es::index($query_data);
    }

    /**
     * Index multiple document into Elasticsearch
     *
     * @return int (1 if sucess, 0 if error)
     */
     private function indexBulk($docs)
     {
          $query_data = ['body' => []];
          foreach($docs as $doc){
              $query_data['body'][] = [
                  'index' => [
                      '_index' => conf('elasticsearch.index'),
                      '_type' => 'document',
                      '_id' => (string)$doc->getId()
                  ]
              ];
              $query_data['body'][] = [
                  'title' => (string)$doc->getTitle(),
                  'date' => (string)$doc->getModified()
              ];
          }
          Es::bulk($query_data);
     }
    /**
     * Execute the console command.
     *
     * @return mixed
     */
    public function handle()
    {
        $this->info('Options:');
        $noBulk = $this->option('no-bulk');
        if ($noBulk)
        {
            $this->comment(' - Indexing without bulk insert');
        }
        else
        {
            $this->comment(' - Indexing using bulk insert');
        }
        $limit = $this->option('limit');
        if ($limit>0) {
            $this->comment(' - Indexing only the first '.$limit.' documents');
        }
        $stepSize = $this->option('step-size');
        $this->comment(' - Indexing with step size of '.$stepSize);

        $this->info('Resetting index...');
        $success = $this->resetIndex();
        if($success==1){
            $this->comment('Index reset!');
        }
        else{
            $this->error('Error resetting index ' . env('ELASTICSEARCH_INDEX'));
        }

        $this->info('Indexing documents...');

        if ($limit<=0) {
            $lastPage = $this->documentRepository->paginateAll($stepSize, 'page')->lastPage();
            $total = $this->documentRepository->getCount();
            $lastPageEntryCount = $stepSize+1;
        }
        else {
            $lastPage = min((int)($limit/$stepSize)+1, $this->documentRepository->paginateAll($stepSize, 'page')->lastPage());
            $total = $limit;
            $lastPageEntryCount = $limit % $stepSize;
        }

        if ($noBulk)
        {
            $progressBar = $this->output->createProgressBar($total);
        }
        else
        {
            $progressBar = $this->output->createProgressBar($lastPage);
        }
        $progressBar->setFormat(' %current%/%max% [%bar%] %percent:3s%% - %message%');

        for ($page=1;$page<=$lastPage;$page++)
        {
            $docs = $this->documentRepository->paginateAll($stepSize, 'page', $page);
            if ($noBulk)
            {
                foreach ($docs as $i=>$doc){
                    if ($page==$lastPage && $i>=$lastPageEntryCount){
                        break;
                    }
                    $this->indexOne($doc);
                    $progressBar->advance();
                    $progressBar->setMessage($doc->getId());
                }
            }
            else
            {
                $this->indexBulk($docs);
                $progressBar->advance();
                $progressBar->setMessage('Page '.$page);
            }
        }
        $progressBar->finish();
        $this->info('Indexing completed');
    }
}