--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/annot-server/websockets.py Wed Oct 08 15:14:58 2014 +0200
@@ -0,0 +1,118 @@
+
+#
+# See LICENCE for detail
+# Copyright (c) 2014 IRI
+#
+
+import json
+
+from autobahn.twisted.websocket import WebSocketServerFactory, \
+ WebSocketServerProtocol
+from autobahn.websocket import http
+
+from baseserver import BaseProtocol
+import utils
+
+class BroadcastServerProtocol(WebSocketServerProtocol):
+
+ def onConnect(self, request):
+ if request.params:
+ self.factory.registerFilter(self, request.params)
+
+ def onOpen(self):
+ self.factory.register(self)
+
+ def onMessage(self, payload, isBinary):
+ if not isBinary:
+ msg = "{} from {}".format(payload.decode('utf8'), self.peer)
+ self.factory.broadcast(msg)
+
+ def connectionLost(self, reason):
+ WebSocketServerProtocol.connectionLost(self, reason)
+ self.factory.unregister(self)
+
+
+class BroadcastServerFactory(WebSocketServerFactory):
+ """
+ Simple broadcast server broadcasting any message it receives to all
+ currently connected clients.
+ """
+
+ def __init__(self, url, debug = False, debugCodePaths = False):
+ WebSocketServerFactory.__init__(self, url, debug = debug, debugCodePaths = debugCodePaths)
+ self.clients = []
+ self.filters = {}
+ self.protocol = BroadcastServerProtocol
+
+ def registerFilter(self, client, filter):
+ print("registered filter {} for client {}".format(repr(filter),client.peer))
+ self.filters[client] = filter
+
+ def register(self, client):
+ if not client in self.clients:
+ print("registered client {}".format(client.peer))
+ self.clients.append(client)
+
+ def unregister(self, client):
+ if client in self.clients:
+ print("unregistered client {}".format(client.peer))
+ self.clients.remove(client)
+ if client in self.filters:
+ self.filters.pop(client, None)
+
+ def broadcast(self, msg, filter):
+ print("broadcasting prepared message '{}' ..".format(msg))
+ #preparedMsg = self.prepareMessage(msg)
+ for c in self.clients:
+ preparedMsg = self.prepareMessage(msg)
+ if all([ (k in filter and filter[k] in v) for k,v in self.filters.get(c, {}).items()]):
+ c.sendPreparedMessage(preparedMsg)
+ print("prepared message sent to {}".format(c.peer))
+
+
+class AnnotationServerProtocol(WebSocketServerProtocol, BaseProtocol):
+
+ def onConnect(self, request):
+ event_ids = request.params.get('event', [])
+ if not event_ids or not event_ids[0]:
+ raise http.HttpException(400, "The event parameter is missing")
+ self.event = event_ids[0]
+ self._init_props(self.factory.ws_factory, self.factory.conn)
+
+ #TODO: add error handling
+ def onMessage(self, payload, isBinary):
+ if isBinary:
+ return
+ msg = payload.decode('utf8')
+ params_annot = json.loads(msg)
+
+ params = {
+ 'event': self.event,
+ 'channel' : utils.ANNOTATION_CHANNEL,
+ 'content' : params_annot
+ }
+
+ def error_callback(failure):
+ raise http.HttpException(500,"Error when processing message : %r" % failure)
+
+ def annot_callback(res):
+ if 'ts' in res:
+ res['ts'] = res['ts'].isoformat()+'Z'
+ if 'uuid' in res:
+ res['uuid'] = str(res['uuid'])
+ self.sendMessage(json.dumps(res))
+
+
+ defer = self.process_annotation(params)
+ defer.addErrback(error_callback)
+ defer.addCallback(annot_callback)
+
+
+class AnotationServerFactory(WebSocketServerFactory):
+
+ def __init__(self, url, ws_factory, conn, debug = False, debugCodePaths = False):
+ WebSocketServerFactory.__init__(self, url, debug = debug, debugCodePaths = debugCodePaths)
+ self.ws_factory = ws_factory
+ self.conn = conn
+ self.clients = {}
+ self.protocol = AnnotationServerProtocol