src/ldt/ldt/indexation/backends/elasticsearch_backend.py
author ymh <ymh.work@gmail.com>
Tue, 22 Oct 2024 09:57:18 +0200
changeset 1516 9cfcfbac1a43
parent 1477 1de00f96a047
permissions -rw-r--r--
Added tag V01.65.08 for changeset c08d6aa5a51d

# -*- coding: utf-8 -*-
'''
Created on Jul 30, 2012

@author: ymh
'''
import logging
import haystack
from haystack.backends import BaseEngine, elasticsearch_backend
from haystack.exceptions import MissingDependency
from haystack.utils import get_identifier
#from ldt.ldt_utils.models import Segment
import collections
try:
    import requests
except ImportError:
    raise MissingDependency("The 'elasticsearch' backend requires the installation of 'requests'.")
try:
    import elasticsearch
    try:
        # let's try this, for elasticsearch > 1.7.0
        from elasticsearch.helpers import bulk
    except ImportError:
        # let's try this, for elasticsearch <= 1.7.0
        from elasticsearch.helpers import bulk_index as bulk
    from elasticsearch.exceptions import NotFoundError
except ImportError:
    raise MissingDependency("The 'elasticsearch' backend requires the installation of 'elasticsearch'. Please refer to the documentation.")

logger = logging.getLogger(__name__)

class ElasticsearchSearchBackend(elasticsearch_backend.ElasticsearchSearchBackend):


    #TODO: Setup added to remove "boost" which is no longer supported. This can be removed with haytack >= 2.4.2
    def setup(self):
        """
        Defers loading until needed.
        """
        # Get the existing mapping & cache it. We'll compare it
        # during the ``update`` & if it doesn't match, we'll put the new
        # mapping.
        try:
            self.existing_mapping = self.conn.indices.get_mapping(index=self.index_name)
        except NotFoundError:
            pass
        except Exception:
            if not self.silently_fail:
                raise

        unified_index = haystack.connections[self.connection_alias].get_unified_index()
        self.content_field_name, field_mapping = self.build_schema(unified_index.all_searchfields())
        current_mapping = {
            'modelresult': {
                'date_detection': False,
                'properties': field_mapping,
            }
        }
        logger.debug("Current Mapping %r", current_mapping)

        if current_mapping != self.existing_mapping:
            try:
                # Make sure the index is there first.
                self.conn.indices.create(index=self.index_name, body=self.DEFAULT_SETTINGS, ignore=400)
                self.conn.indices.put_mapping(index=self.index_name, doc_type='modelresult', body=current_mapping)
                self.existing_mapping = current_mapping
            except Exception:
                if not self.silently_fail:
                    raise

        self.setup_complete = True


    def build_search_kwargs(self, query_string, sort_by=None, start_offset=0, end_offset=None,
                        fields='', highlight=False, facets=None,
                        date_facets=None, query_facets=None,
                        narrow_queries=None, spelling_query=None,
                        within=None, dwithin=None, distance_point=None,
                        models=None, limit_to_registered_models=None,
                        result_class=None):

        kwargs = super(ElasticsearchSearchBackend, self).build_search_kwargs(query_string, sort_by=sort_by, start_offset=start_offset, end_offset=end_offset,
                        fields=fields, highlight=highlight, facets=facets,
                        date_facets=date_facets, query_facets=query_facets,
                        narrow_queries=narrow_queries, spelling_query=spelling_query,
                        within=within, dwithin=dwithin, distance_point=distance_point,
                        models=models, limit_to_registered_models=limit_to_registered_models,
                        result_class=result_class)

        #TODO : try to make list of field dynamic
        #TODO : How to handle multiple
        if highlight:
            fields_def = { }

            if models is None or len(models) == 0 :#or Segment in models:
                fields_def['tags'] = {}
                fields_def['title'] = {}
                fields_def['abstract'] = {}

            kwargs['highlight'] = {
                'pre_tags' : ["<span class='highlight'>"],
                'post_tags' : ["</span>"],
                "number_of_fragments" : 0,
                'fields': fields_def
            }

        return kwargs


    def remove(self, obj_or_string, commit=True):

        if not self.setup_complete:
            try:
                self.setup()
            except elasticsearch.TransportError as e:
                if not self.silently_fail:
                    raise

                self.log.error("Failed to remove document '%s' from Elasticsearch: %s", doc_id, e,
                               exc_info=True)
                return

        if isinstance(obj_or_string, collections.Iterable) and not isinstance(obj_or_string, basestring):
            ids = [get_identifier(elt) for elt in obj_or_string]
            if not ids:
                return
            actions = [ {
                '_op_type': 'delete',
                '_index': self.index_name,
                '_type': 'modelresult',
                '_id': id,} for id in ids ]

            # q = {"query": {'ids' : {'values' : ids}}}
            # self.conn.delete_by_query(self.index_name, 'modelresult', q)
            del_res = bulk(self.conn, actions, stats_only=False, raise_on_error=False)
            if ( del_res and
                    len(del_res) > 1 and
                    del_res[1] and
                    any([ not (r and (u'delete' in r) and ((r[u'delete'].get(u'found', False) and r[u'delete'].get(u'status', 0) == 200) or ((not r[u'delete'].get(u'found', True)) and r['delete'].get('status', 0) == 404))) for r in del_res[1]])):
                raise elasticsearch.TransportError("Problems when bulk removing %r", del_res)
        else:
            return super(ElasticsearchSearchBackend, self).remove(obj_or_string, commit=commit)


class ElasticsearchSearchEngine(BaseEngine):
    backend = ElasticsearchSearchBackend
    query = elasticsearch_backend.ElasticsearchSearchQuery