Corrections for elasticsearch (server) >= 2.2 and elasticserach (client lib) >= 2.3 (+version) V01.61
authorymh <ymh.work@gmail.com>
Tue, 15 Mar 2016 12:32:52 +0100
changeset 1477 1de00f96a047
parent 1476 211aef9186f5
child 1478 26db87e1b013
Corrections for elasticsearch (server) >= 2.2 and elasticserach (client lib) >= 2.3 (+version)
src/ldt/ldt/__init__.py
src/ldt/ldt/indexation/backends/elasticsearch_backend.py
src/ldt/ldt/indexation/search_indexes.py
src/ldt/ldt/ldt_utils/contentindexer.py
--- a/src/ldt/ldt/__init__.py	Thu Dec 31 23:38:47 2015 +0100
+++ b/src/ldt/ldt/__init__.py	Tue Mar 15 12:32:52 2016 +0100
@@ -1,6 +1,6 @@
 __all__ = ["VERSION", "get_version", "__version__", "default_app_config"]
 
-VERSION = (1, 60, 2, "final", 0)
+VERSION = (1, 61, 0, "final", 0)
 
 
 def get_version():
--- a/src/ldt/ldt/indexation/backends/elasticsearch_backend.py	Thu Dec 31 23:38:47 2015 +0100
+++ b/src/ldt/ldt/indexation/backends/elasticsearch_backend.py	Tue Mar 15 12:32:52 2016 +0100
@@ -4,6 +4,8 @@
 
 @author: ymh
 '''
+import logging
+import haystack
 from haystack.backends import BaseEngine, elasticsearch_backend
 from haystack.exceptions import MissingDependency
 from haystack.utils import get_identifier
@@ -14,13 +16,60 @@
 except ImportError:
     raise MissingDependency("The 'elasticsearch' backend requires the installation of 'requests'.")
 try:
-    import pyelasticsearch
+    import elasticsearch
+    try:
+        # let's try this, for elasticsearch > 1.7.0
+        from elasticsearch.helpers import bulk
+    except ImportError:
+        # let's try this, for elasticsearch <= 1.7.0
+        from elasticsearch.helpers import bulk_index as bulk
+    from elasticsearch.exceptions import NotFoundError
 except ImportError:
-    raise MissingDependency("The 'elasticsearch' backend requires the installation of 'pyelasticsearch'. Please refer to the documentation.")
+    raise MissingDependency("The 'elasticsearch' backend requires the installation of 'elasticsearch'. Please refer to the documentation.")
+
+logger = logging.getLogger(__name__)
+
+class ElasticsearchSearchBackend(elasticsearch_backend.ElasticsearchSearchBackend):
 
 
+    #TODO: Setup added to remove "boost" which is no longer supported. This can be removed with haytack >= 2.4.2
+    def setup(self):
+        """
+        Defers loading until needed.
+        """
+        # Get the existing mapping & cache it. We'll compare it
+        # during the ``update`` & if it doesn't match, we'll put the new
+        # mapping.
+        try:
+            self.existing_mapping = self.conn.indices.get_mapping(index=self.index_name)
+        except NotFoundError:
+            pass
+        except Exception:
+            if not self.silently_fail:
+                raise
 
-class ElasticsearchSearchBackend(elasticsearch_backend.ElasticsearchSearchBackend):
+        unified_index = haystack.connections[self.connection_alias].get_unified_index()
+        self.content_field_name, field_mapping = self.build_schema(unified_index.all_searchfields())
+        current_mapping = {
+            'modelresult': {
+                'date_detection': False,
+                'properties': field_mapping,
+            }
+        }
+        logger.debug("Current Mapping %r", current_mapping)
+
+        if current_mapping != self.existing_mapping:
+            try:
+                # Make sure the index is there first.
+                self.conn.indices.create(index=self.index_name, body=self.DEFAULT_SETTINGS, ignore=400)
+                self.conn.indices.put_mapping(index=self.index_name, doc_type='modelresult', body=current_mapping)
+                self.existing_mapping = current_mapping
+            except Exception:
+                if not self.silently_fail:
+                    raise
+
+        self.setup_complete = True
+
 
     def build_search_kwargs(self, query_string, sort_by=None, start_offset=0, end_offset=None,
                         fields='', highlight=False, facets=None,
@@ -29,7 +78,7 @@
                         within=None, dwithin=None, distance_point=None,
                         models=None, limit_to_registered_models=None,
                         result_class=None):
-        
+
         kwargs = super(ElasticsearchSearchBackend, self).build_search_kwargs(query_string, sort_by=sort_by, start_offset=start_offset, end_offset=end_offset,
                         fields=fields, highlight=highlight, facets=facets,
                         date_facets=date_facets, query_facets=query_facets,
@@ -37,47 +86,60 @@
                         within=within, dwithin=dwithin, distance_point=distance_point,
                         models=models, limit_to_registered_models=limit_to_registered_models,
                         result_class=result_class)
-                
+
         #TODO : try to make list of field dynamic
-        #TODO : How to handle multiple 
+        #TODO : How to handle multiple
         if highlight:
             fields_def = { }
- 
+
             if models is None or len(models) == 0 :#or Segment in models:
                 fields_def['tags'] = {}
                 fields_def['title'] = {}
                 fields_def['abstract'] = {}
-            
+
             kwargs['highlight'] = {
                 'pre_tags' : ["<span class='highlight'>"],
                 'post_tags' : ["</span>"],
                 "number_of_fragments" : 0,
-                'fields': fields_def                
+                'fields': fields_def
             }
-        
+
         return kwargs
-    
-                
+
+
     def remove(self, obj_or_string, commit=True):
-        
+
         if not self.setup_complete:
             try:
                 self.setup()
-            except (requests.RequestException, pyelasticsearch.ElasticHttpError), e:
+            except elasticsearch.TransportError as e:
                 if not self.silently_fail:
                     raise
 
-                self.log.error("Failed to remove document '%s' from Elasticsearch: %s", repr(obj_or_string), e)
+                self.log.error("Failed to remove document '%s' from Elasticsearch: %s", doc_id, e,
+                               exc_info=True)
                 return
 
         if isinstance(obj_or_string, collections.Iterable) and not isinstance(obj_or_string, basestring):
             ids = [get_identifier(elt) for elt in obj_or_string]
             if not ids:
                 return
-            q = {"query": {'ids' : {'values' : ids}}}
-            self.conn.delete_by_query(self.index_name, 'modelresult', q)
-        else: 
-            return super(ElasticsearchSearchBackend, self).remove(obj_or_string, commit=commit) 
+            actions = [ {
+                '_op_type': 'delete',
+                '_index': self.index_name,
+                '_type': 'modelresult',
+                '_id': id,} for id in ids ]
+
+            # q = {"query": {'ids' : {'values' : ids}}}
+            # self.conn.delete_by_query(self.index_name, 'modelresult', q)
+            del_res = bulk(self.conn, actions, stats_only=False, raise_on_error=False)
+            if ( del_res and
+                    len(del_res) > 1 and
+                    del_res[1] and
+                    any([ not (r and (u'delete' in r) and ((r[u'delete'].get(u'found', False) and r[u'delete'].get(u'status', 0) == 200) or ((not r[u'delete'].get(u'found', True)) and r['delete'].get('status', 0) == 404))) for r in del_res[1]])):
+                raise elasticsearch.TransportError("Problems when bulk removing %r", del_res)
+        else:
+            return super(ElasticsearchSearchBackend, self).remove(obj_or_string, commit=commit)
 
 
 class ElasticsearchSearchEngine(BaseEngine):
--- a/src/ldt/ldt/indexation/search_indexes.py	Thu Dec 31 23:38:47 2015 +0100
+++ b/src/ldt/ldt/indexation/search_indexes.py	Tue Mar 15 12:32:52 2016 +0100
@@ -25,22 +25,22 @@
     author = indexes.CharField(model_attr='author', stored=True, null=True)
     start_ts = indexes.IntegerField(model_attr='start_ts', indexed=False, stored=True)
     date = indexes.CharField(model_attr='date', stored=True)
-    
+
     def get_model(self):
         return Segment
-    
+
     def prepare_tags(self, obj):
         if hasattr(obj, 'tag_list'):
             if obj.tag_list is not None:
                 obj.tags = None # To avoid a second and useless db request
                 return ",".join(obj.tag_list)
         return ",".join([tag.name for tag in obj.tags.all()])
-    
+
     def index_queryset(self, using=None):
         "Used when the entire index for model is updated."
         return self.get_model().objects.prefetch_related("tags")
-    
-    
+
+
 class AnnotationIndex(indexes.SearchIndex, indexes.Indexable):
     text = indexes.CharField(document=True, use_template=True)
     tags = indexes.CharField(model_attr='tags', indexed=True, stored=False)
@@ -51,10 +51,10 @@
 
     def get_model(self):
         return Annotation
-    
+
     def prepare_tags(self, obj):
         return ",".join([tag.name for tag in obj.tags.all()])
-    
+
     def index_queryset(self, using=None):
         "Used when the entire index for model is updated."
         return self.get_model().objects.prefetch_related("tags")
@@ -65,13 +65,13 @@
     tags = indexes.CharField(model_attr='get_tags', indexed=True, stored=False, null=True)
     title = indexes.CharField(model_attr='title', indexed=True, stored=True)
     abstract = indexes.CharField(model_attr='description', indexed=True, stored=False, null=True)
-    
+
     def get_model(self):
         return Content
-    
+
     def prepare_tags(self, obj):
         return ",".join([tag.name for tag in obj.tags.all()])
-    
+
     def index_queryset(self, using=None):
         "Used when the entire index for model is updated."
         return self.get_model().objects.prefetch_related("tags")
--- a/src/ldt/ldt/ldt_utils/contentindexer.py	Thu Dec 31 23:38:47 2015 +0100
+++ b/src/ldt/ldt/ldt_utils/contentindexer.py	Tue Mar 15 12:32:52 2016 +0100
@@ -212,7 +212,10 @@
 
         content = obj
         url = content.iri_url()
-        _, file_content = request_with_auth(url)
+        res, file_content = request_with_auth(url)
+        if res['status'] != "200":
+            logger.debug("Problem when getting %s, passing: %r", url, res)
+            return
         doc = lxml.etree.parse(StringIO(file_content)) #@UndefinedVariable
 
         object_delete(Segment, iri_id=content.iri_id, project_id='')