Corrections for elasticsearch (server) >= 2.2 and elasticserach (client lib) >= 2.3 (+version)
--- a/src/ldt/ldt/__init__.py Thu Dec 31 23:38:47 2015 +0100
+++ b/src/ldt/ldt/__init__.py Tue Mar 15 12:32:52 2016 +0100
@@ -1,6 +1,6 @@
__all__ = ["VERSION", "get_version", "__version__", "default_app_config"]
-VERSION = (1, 60, 2, "final", 0)
+VERSION = (1, 61, 0, "final", 0)
def get_version():
--- a/src/ldt/ldt/indexation/backends/elasticsearch_backend.py Thu Dec 31 23:38:47 2015 +0100
+++ b/src/ldt/ldt/indexation/backends/elasticsearch_backend.py Tue Mar 15 12:32:52 2016 +0100
@@ -4,6 +4,8 @@
@author: ymh
'''
+import logging
+import haystack
from haystack.backends import BaseEngine, elasticsearch_backend
from haystack.exceptions import MissingDependency
from haystack.utils import get_identifier
@@ -14,13 +16,60 @@
except ImportError:
raise MissingDependency("The 'elasticsearch' backend requires the installation of 'requests'.")
try:
- import pyelasticsearch
+ import elasticsearch
+ try:
+ # let's try this, for elasticsearch > 1.7.0
+ from elasticsearch.helpers import bulk
+ except ImportError:
+ # let's try this, for elasticsearch <= 1.7.0
+ from elasticsearch.helpers import bulk_index as bulk
+ from elasticsearch.exceptions import NotFoundError
except ImportError:
- raise MissingDependency("The 'elasticsearch' backend requires the installation of 'pyelasticsearch'. Please refer to the documentation.")
+ raise MissingDependency("The 'elasticsearch' backend requires the installation of 'elasticsearch'. Please refer to the documentation.")
+
+logger = logging.getLogger(__name__)
+
+class ElasticsearchSearchBackend(elasticsearch_backend.ElasticsearchSearchBackend):
+ #TODO: Setup added to remove "boost" which is no longer supported. This can be removed with haytack >= 2.4.2
+ def setup(self):
+ """
+ Defers loading until needed.
+ """
+ # Get the existing mapping & cache it. We'll compare it
+ # during the ``update`` & if it doesn't match, we'll put the new
+ # mapping.
+ try:
+ self.existing_mapping = self.conn.indices.get_mapping(index=self.index_name)
+ except NotFoundError:
+ pass
+ except Exception:
+ if not self.silently_fail:
+ raise
-class ElasticsearchSearchBackend(elasticsearch_backend.ElasticsearchSearchBackend):
+ unified_index = haystack.connections[self.connection_alias].get_unified_index()
+ self.content_field_name, field_mapping = self.build_schema(unified_index.all_searchfields())
+ current_mapping = {
+ 'modelresult': {
+ 'date_detection': False,
+ 'properties': field_mapping,
+ }
+ }
+ logger.debug("Current Mapping %r", current_mapping)
+
+ if current_mapping != self.existing_mapping:
+ try:
+ # Make sure the index is there first.
+ self.conn.indices.create(index=self.index_name, body=self.DEFAULT_SETTINGS, ignore=400)
+ self.conn.indices.put_mapping(index=self.index_name, doc_type='modelresult', body=current_mapping)
+ self.existing_mapping = current_mapping
+ except Exception:
+ if not self.silently_fail:
+ raise
+
+ self.setup_complete = True
+
def build_search_kwargs(self, query_string, sort_by=None, start_offset=0, end_offset=None,
fields='', highlight=False, facets=None,
@@ -29,7 +78,7 @@
within=None, dwithin=None, distance_point=None,
models=None, limit_to_registered_models=None,
result_class=None):
-
+
kwargs = super(ElasticsearchSearchBackend, self).build_search_kwargs(query_string, sort_by=sort_by, start_offset=start_offset, end_offset=end_offset,
fields=fields, highlight=highlight, facets=facets,
date_facets=date_facets, query_facets=query_facets,
@@ -37,47 +86,60 @@
within=within, dwithin=dwithin, distance_point=distance_point,
models=models, limit_to_registered_models=limit_to_registered_models,
result_class=result_class)
-
+
#TODO : try to make list of field dynamic
- #TODO : How to handle multiple
+ #TODO : How to handle multiple
if highlight:
fields_def = { }
-
+
if models is None or len(models) == 0 :#or Segment in models:
fields_def['tags'] = {}
fields_def['title'] = {}
fields_def['abstract'] = {}
-
+
kwargs['highlight'] = {
'pre_tags' : ["<span class='highlight'>"],
'post_tags' : ["</span>"],
"number_of_fragments" : 0,
- 'fields': fields_def
+ 'fields': fields_def
}
-
+
return kwargs
-
-
+
+
def remove(self, obj_or_string, commit=True):
-
+
if not self.setup_complete:
try:
self.setup()
- except (requests.RequestException, pyelasticsearch.ElasticHttpError), e:
+ except elasticsearch.TransportError as e:
if not self.silently_fail:
raise
- self.log.error("Failed to remove document '%s' from Elasticsearch: %s", repr(obj_or_string), e)
+ self.log.error("Failed to remove document '%s' from Elasticsearch: %s", doc_id, e,
+ exc_info=True)
return
if isinstance(obj_or_string, collections.Iterable) and not isinstance(obj_or_string, basestring):
ids = [get_identifier(elt) for elt in obj_or_string]
if not ids:
return
- q = {"query": {'ids' : {'values' : ids}}}
- self.conn.delete_by_query(self.index_name, 'modelresult', q)
- else:
- return super(ElasticsearchSearchBackend, self).remove(obj_or_string, commit=commit)
+ actions = [ {
+ '_op_type': 'delete',
+ '_index': self.index_name,
+ '_type': 'modelresult',
+ '_id': id,} for id in ids ]
+
+ # q = {"query": {'ids' : {'values' : ids}}}
+ # self.conn.delete_by_query(self.index_name, 'modelresult', q)
+ del_res = bulk(self.conn, actions, stats_only=False, raise_on_error=False)
+ if ( del_res and
+ len(del_res) > 1 and
+ del_res[1] and
+ any([ not (r and (u'delete' in r) and ((r[u'delete'].get(u'found', False) and r[u'delete'].get(u'status', 0) == 200) or ((not r[u'delete'].get(u'found', True)) and r['delete'].get('status', 0) == 404))) for r in del_res[1]])):
+ raise elasticsearch.TransportError("Problems when bulk removing %r", del_res)
+ else:
+ return super(ElasticsearchSearchBackend, self).remove(obj_or_string, commit=commit)
class ElasticsearchSearchEngine(BaseEngine):
--- a/src/ldt/ldt/indexation/search_indexes.py Thu Dec 31 23:38:47 2015 +0100
+++ b/src/ldt/ldt/indexation/search_indexes.py Tue Mar 15 12:32:52 2016 +0100
@@ -25,22 +25,22 @@
author = indexes.CharField(model_attr='author', stored=True, null=True)
start_ts = indexes.IntegerField(model_attr='start_ts', indexed=False, stored=True)
date = indexes.CharField(model_attr='date', stored=True)
-
+
def get_model(self):
return Segment
-
+
def prepare_tags(self, obj):
if hasattr(obj, 'tag_list'):
if obj.tag_list is not None:
obj.tags = None # To avoid a second and useless db request
return ",".join(obj.tag_list)
return ",".join([tag.name for tag in obj.tags.all()])
-
+
def index_queryset(self, using=None):
"Used when the entire index for model is updated."
return self.get_model().objects.prefetch_related("tags")
-
-
+
+
class AnnotationIndex(indexes.SearchIndex, indexes.Indexable):
text = indexes.CharField(document=True, use_template=True)
tags = indexes.CharField(model_attr='tags', indexed=True, stored=False)
@@ -51,10 +51,10 @@
def get_model(self):
return Annotation
-
+
def prepare_tags(self, obj):
return ",".join([tag.name for tag in obj.tags.all()])
-
+
def index_queryset(self, using=None):
"Used when the entire index for model is updated."
return self.get_model().objects.prefetch_related("tags")
@@ -65,13 +65,13 @@
tags = indexes.CharField(model_attr='get_tags', indexed=True, stored=False, null=True)
title = indexes.CharField(model_attr='title', indexed=True, stored=True)
abstract = indexes.CharField(model_attr='description', indexed=True, stored=False, null=True)
-
+
def get_model(self):
return Content
-
+
def prepare_tags(self, obj):
return ",".join([tag.name for tag in obj.tags.all()])
-
+
def index_queryset(self, using=None):
"Used when the entire index for model is updated."
return self.get_model().objects.prefetch_related("tags")
--- a/src/ldt/ldt/ldt_utils/contentindexer.py Thu Dec 31 23:38:47 2015 +0100
+++ b/src/ldt/ldt/ldt_utils/contentindexer.py Tue Mar 15 12:32:52 2016 +0100
@@ -212,7 +212,10 @@
content = obj
url = content.iri_url()
- _, file_content = request_with_auth(url)
+ res, file_content = request_with_auth(url)
+ if res['status'] != "200":
+ logger.debug("Problem when getting %s, passing: %r", url, res)
+ return
doc = lxml.etree.parse(StringIO(file_content)) #@UndefinedVariable
object_delete(Segment, iri_id=content.iri_id, project_id='')