# HG changeset patch # User ymh # Date 1458041572 -3600 # Node ID 1de00f96a0478f5802ef037f75f4d59d88009d1c # Parent 211aef9186f5c770d78887587cbe9f0888b59249 Corrections for elasticsearch (server) >= 2.2 and elasticserach (client lib) >= 2.3 (+version) diff -r 211aef9186f5 -r 1de00f96a047 src/ldt/ldt/__init__.py --- a/src/ldt/ldt/__init__.py Thu Dec 31 23:38:47 2015 +0100 +++ b/src/ldt/ldt/__init__.py Tue Mar 15 12:32:52 2016 +0100 @@ -1,6 +1,6 @@ __all__ = ["VERSION", "get_version", "__version__", "default_app_config"] -VERSION = (1, 60, 2, "final", 0) +VERSION = (1, 61, 0, "final", 0) def get_version(): diff -r 211aef9186f5 -r 1de00f96a047 src/ldt/ldt/indexation/backends/elasticsearch_backend.py --- a/src/ldt/ldt/indexation/backends/elasticsearch_backend.py Thu Dec 31 23:38:47 2015 +0100 +++ b/src/ldt/ldt/indexation/backends/elasticsearch_backend.py Tue Mar 15 12:32:52 2016 +0100 @@ -4,6 +4,8 @@ @author: ymh ''' +import logging +import haystack from haystack.backends import BaseEngine, elasticsearch_backend from haystack.exceptions import MissingDependency from haystack.utils import get_identifier @@ -14,13 +16,60 @@ except ImportError: raise MissingDependency("The 'elasticsearch' backend requires the installation of 'requests'.") try: - import pyelasticsearch + import elasticsearch + try: + # let's try this, for elasticsearch > 1.7.0 + from elasticsearch.helpers import bulk + except ImportError: + # let's try this, for elasticsearch <= 1.7.0 + from elasticsearch.helpers import bulk_index as bulk + from elasticsearch.exceptions import NotFoundError except ImportError: - raise MissingDependency("The 'elasticsearch' backend requires the installation of 'pyelasticsearch'. Please refer to the documentation.") + raise MissingDependency("The 'elasticsearch' backend requires the installation of 'elasticsearch'. Please refer to the documentation.") + +logger = logging.getLogger(__name__) + +class ElasticsearchSearchBackend(elasticsearch_backend.ElasticsearchSearchBackend): + #TODO: Setup added to remove "boost" which is no longer supported. This can be removed with haytack >= 2.4.2 + def setup(self): + """ + Defers loading until needed. + """ + # Get the existing mapping & cache it. We'll compare it + # during the ``update`` & if it doesn't match, we'll put the new + # mapping. + try: + self.existing_mapping = self.conn.indices.get_mapping(index=self.index_name) + except NotFoundError: + pass + except Exception: + if not self.silently_fail: + raise -class ElasticsearchSearchBackend(elasticsearch_backend.ElasticsearchSearchBackend): + unified_index = haystack.connections[self.connection_alias].get_unified_index() + self.content_field_name, field_mapping = self.build_schema(unified_index.all_searchfields()) + current_mapping = { + 'modelresult': { + 'date_detection': False, + 'properties': field_mapping, + } + } + logger.debug("Current Mapping %r", current_mapping) + + if current_mapping != self.existing_mapping: + try: + # Make sure the index is there first. + self.conn.indices.create(index=self.index_name, body=self.DEFAULT_SETTINGS, ignore=400) + self.conn.indices.put_mapping(index=self.index_name, doc_type='modelresult', body=current_mapping) + self.existing_mapping = current_mapping + except Exception: + if not self.silently_fail: + raise + + self.setup_complete = True + def build_search_kwargs(self, query_string, sort_by=None, start_offset=0, end_offset=None, fields='', highlight=False, facets=None, @@ -29,7 +78,7 @@ within=None, dwithin=None, distance_point=None, models=None, limit_to_registered_models=None, result_class=None): - + kwargs = super(ElasticsearchSearchBackend, self).build_search_kwargs(query_string, sort_by=sort_by, start_offset=start_offset, end_offset=end_offset, fields=fields, highlight=highlight, facets=facets, date_facets=date_facets, query_facets=query_facets, @@ -37,47 +86,60 @@ within=within, dwithin=dwithin, distance_point=distance_point, models=models, limit_to_registered_models=limit_to_registered_models, result_class=result_class) - + #TODO : try to make list of field dynamic - #TODO : How to handle multiple + #TODO : How to handle multiple if highlight: fields_def = { } - + if models is None or len(models) == 0 :#or Segment in models: fields_def['tags'] = {} fields_def['title'] = {} fields_def['abstract'] = {} - + kwargs['highlight'] = { 'pre_tags' : [""], 'post_tags' : [""], "number_of_fragments" : 0, - 'fields': fields_def + 'fields': fields_def } - + return kwargs - - + + def remove(self, obj_or_string, commit=True): - + if not self.setup_complete: try: self.setup() - except (requests.RequestException, pyelasticsearch.ElasticHttpError), e: + except elasticsearch.TransportError as e: if not self.silently_fail: raise - self.log.error("Failed to remove document '%s' from Elasticsearch: %s", repr(obj_or_string), e) + self.log.error("Failed to remove document '%s' from Elasticsearch: %s", doc_id, e, + exc_info=True) return if isinstance(obj_or_string, collections.Iterable) and not isinstance(obj_or_string, basestring): ids = [get_identifier(elt) for elt in obj_or_string] if not ids: return - q = {"query": {'ids' : {'values' : ids}}} - self.conn.delete_by_query(self.index_name, 'modelresult', q) - else: - return super(ElasticsearchSearchBackend, self).remove(obj_or_string, commit=commit) + actions = [ { + '_op_type': 'delete', + '_index': self.index_name, + '_type': 'modelresult', + '_id': id,} for id in ids ] + + # q = {"query": {'ids' : {'values' : ids}}} + # self.conn.delete_by_query(self.index_name, 'modelresult', q) + del_res = bulk(self.conn, actions, stats_only=False, raise_on_error=False) + if ( del_res and + len(del_res) > 1 and + del_res[1] and + any([ not (r and (u'delete' in r) and ((r[u'delete'].get(u'found', False) and r[u'delete'].get(u'status', 0) == 200) or ((not r[u'delete'].get(u'found', True)) and r['delete'].get('status', 0) == 404))) for r in del_res[1]])): + raise elasticsearch.TransportError("Problems when bulk removing %r", del_res) + else: + return super(ElasticsearchSearchBackend, self).remove(obj_or_string, commit=commit) class ElasticsearchSearchEngine(BaseEngine): diff -r 211aef9186f5 -r 1de00f96a047 src/ldt/ldt/indexation/search_indexes.py --- a/src/ldt/ldt/indexation/search_indexes.py Thu Dec 31 23:38:47 2015 +0100 +++ b/src/ldt/ldt/indexation/search_indexes.py Tue Mar 15 12:32:52 2016 +0100 @@ -25,22 +25,22 @@ author = indexes.CharField(model_attr='author', stored=True, null=True) start_ts = indexes.IntegerField(model_attr='start_ts', indexed=False, stored=True) date = indexes.CharField(model_attr='date', stored=True) - + def get_model(self): return Segment - + def prepare_tags(self, obj): if hasattr(obj, 'tag_list'): if obj.tag_list is not None: obj.tags = None # To avoid a second and useless db request return ",".join(obj.tag_list) return ",".join([tag.name for tag in obj.tags.all()]) - + def index_queryset(self, using=None): "Used when the entire index for model is updated." return self.get_model().objects.prefetch_related("tags") - - + + class AnnotationIndex(indexes.SearchIndex, indexes.Indexable): text = indexes.CharField(document=True, use_template=True) tags = indexes.CharField(model_attr='tags', indexed=True, stored=False) @@ -51,10 +51,10 @@ def get_model(self): return Annotation - + def prepare_tags(self, obj): return ",".join([tag.name for tag in obj.tags.all()]) - + def index_queryset(self, using=None): "Used when the entire index for model is updated." return self.get_model().objects.prefetch_related("tags") @@ -65,13 +65,13 @@ tags = indexes.CharField(model_attr='get_tags', indexed=True, stored=False, null=True) title = indexes.CharField(model_attr='title', indexed=True, stored=True) abstract = indexes.CharField(model_attr='description', indexed=True, stored=False, null=True) - + def get_model(self): return Content - + def prepare_tags(self, obj): return ",".join([tag.name for tag in obj.tags.all()]) - + def index_queryset(self, using=None): "Used when the entire index for model is updated." return self.get_model().objects.prefetch_related("tags") diff -r 211aef9186f5 -r 1de00f96a047 src/ldt/ldt/ldt_utils/contentindexer.py --- a/src/ldt/ldt/ldt_utils/contentindexer.py Thu Dec 31 23:38:47 2015 +0100 +++ b/src/ldt/ldt/ldt_utils/contentindexer.py Tue Mar 15 12:32:52 2016 +0100 @@ -212,7 +212,10 @@ content = obj url = content.iri_url() - _, file_content = request_with_auth(url) + res, file_content = request_with_auth(url) + if res['status'] != "200": + logger.debug("Problem when getting %s, passing: %r", url, res) + return doc = lxml.etree.parse(StringIO(file_content)) #@UndefinedVariable object_delete(Segment, iri_id=content.iri_id, project_id='')