Start on routing and queues, now need 1 worker per queue
authordurandn
Fri, 10 Apr 2015 13:07:18 +0200
changeset 86 1c84b37adaf4
parent 85 bbe4e172afb3
child 87 e7afb5bd5a85
Start on routing and queues, now need 1 worker per queue
src/catedit/config.py.tmpl
src/catedit/resources.py
src/catedit/utils.py
--- a/src/catedit/config.py.tmpl	Fri Apr 10 13:00:48 2015 +0200
+++ b/src/catedit/config.py.tmpl	Fri Apr 10 13:07:18 2015 +0200
@@ -5,6 +5,7 @@
 
 from rdflib import RDF, RDFS, Literal
 from rdflib.namespace import SKOS
+from kombu import Queue
 
 class AppConfig(object):
 
@@ -13,10 +14,6 @@
     HOST = "0.0.0.0"
     DEBUG = True
 	
-    # Celery settings
-    
-    CELERY_BROKER_URL = 'sqla+sqlite:////Users/durandn/IRIProjects/catedit/src/catedit/celerydb.sqlite'
-	
     # WTForms settings
 
     SECRET_KEY = 'totally-secret-key'
@@ -100,3 +97,11 @@
 			"rdflib_class": "http://www.w3.org/2004/02/skos/core#related"
 		}
     }
+    
+    # Celery settings
+    
+    CELERY_BROKER_URL = 'sqla+sqlite:////Users/durandn/IRIProjects/catedit/src/catedit/celerydb.sqlite'
+    if PERSISTENCE_CONFIG["METHOD"] == "PersistenceToGithub":
+        CELERY_QUEUES = tuple(
+            Queue("repo_"+repository, routing_key="task_for_"+repository) for repository in PERSISTENCE_CONFIG["REPOSITORY_LIST"]
+        )
--- a/src/catedit/resources.py	Fri Apr 10 13:00:48 2015 +0200
+++ b/src/catedit/resources.py	Fri Apr 10 13:07:18 2015 +0200
@@ -98,17 +98,22 @@
         args = cat_parser.parse_args()
         if cat_id is None:
             if cat_manager_instance.persistence.session_compliant is True:
-                task=submit_changes.delay(
-                        deleted_categories=session.get(
+                task=submit_changes.apply_async(
+                    kwargs={
+                        "deleted_categories" : session.get(
                             "deleted_categories", {}
                         ).get(repository, {}),
-                        modified_categories=session.get(
+                        "modified_categories" : session.get(
                             "modified_categories", {}
                         ).get(repository, {}),
-                        message=args["commit_message"],
-                        repository=repository,
-                        token=session["user_code"]
-                    )
+                        "message" : args["commit_message"],
+                        "repository" : repository,
+                        "token" : session["user_code"]
+                    },
+                    queue="repo_"+repository,
+                    routing_key="task_for_"+repository
+                )
+                            
                 session["tasks"][repository].append(task.id)
                 session["deleted_categories"][repository] = {}
                 session["modified_categories"][repository] = {}
--- a/src/catedit/utils.py	Fri Apr 10 13:00:48 2015 +0200
+++ b/src/catedit/utils.py	Fri Apr 10 13:07:18 2015 +0200
@@ -100,7 +100,6 @@
     return compare_result
 
 
-
 def make_differences_list(first_category_list, second_category_list, repository):
     """
         Compares 2 category lists and generates a dict that lists addition,