annot-server/websockets.py
changeset 0 e1d4d7a8255a
child 23 16a1925df2df
equal deleted inserted replaced
-1:000000000000 0:e1d4d7a8255a
       
     1 
       
     2 #
       
     3 # See LICENCE for detail
       
     4 # Copyright (c) 2014 IRI
       
     5 #
       
     6 
       
     7 import json
       
     8 
       
     9 from autobahn.twisted.websocket import WebSocketServerFactory, \
       
    10                                        WebSocketServerProtocol
       
    11 from autobahn.websocket import http
       
    12 
       
    13 from baseserver import BaseProtocol
       
    14 import utils
       
    15 
       
    16 class BroadcastServerProtocol(WebSocketServerProtocol):
       
    17 
       
    18     def onConnect(self, request):
       
    19         if request.params:
       
    20             self.factory.registerFilter(self, request.params)
       
    21 
       
    22     def onOpen(self):
       
    23         self.factory.register(self)
       
    24 
       
    25     def onMessage(self, payload, isBinary):
       
    26         if not isBinary:
       
    27             msg = "{} from {}".format(payload.decode('utf8'), self.peer)
       
    28             self.factory.broadcast(msg)
       
    29 
       
    30     def connectionLost(self, reason):
       
    31         WebSocketServerProtocol.connectionLost(self, reason)
       
    32         self.factory.unregister(self)
       
    33 
       
    34 
       
    35 class BroadcastServerFactory(WebSocketServerFactory):
       
    36     """
       
    37     Simple broadcast server broadcasting any message it receives to all
       
    38     currently connected clients.
       
    39     """
       
    40 
       
    41     def __init__(self, url, debug = False, debugCodePaths = False):
       
    42         WebSocketServerFactory.__init__(self, url, debug = debug, debugCodePaths = debugCodePaths)
       
    43         self.clients = []
       
    44         self.filters = {}
       
    45         self.protocol = BroadcastServerProtocol
       
    46 
       
    47     def registerFilter(self, client, filter):
       
    48         print("registered filter {} for client {}".format(repr(filter),client.peer))
       
    49         self.filters[client] = filter
       
    50 
       
    51     def register(self, client):
       
    52         if not client in self.clients:
       
    53              print("registered client {}".format(client.peer))
       
    54              self.clients.append(client)
       
    55 
       
    56     def unregister(self, client):
       
    57         if client in self.clients:
       
    58             print("unregistered client {}".format(client.peer))
       
    59             self.clients.remove(client)
       
    60         if client in self.filters:
       
    61             self.filters.pop(client, None)
       
    62 
       
    63     def broadcast(self, msg, filter):
       
    64         print("broadcasting prepared message '{}' ..".format(msg))
       
    65         #preparedMsg = self.prepareMessage(msg)
       
    66         for c in self.clients:
       
    67             preparedMsg = self.prepareMessage(msg)
       
    68             if all([ (k in filter and filter[k] in v) for k,v in self.filters.get(c, {}).items()]):
       
    69                 c.sendPreparedMessage(preparedMsg)
       
    70                 print("prepared message sent to {}".format(c.peer))
       
    71 
       
    72 
       
    73 class AnnotationServerProtocol(WebSocketServerProtocol, BaseProtocol):
       
    74 
       
    75     def onConnect(self, request):
       
    76         event_ids = request.params.get('event', [])
       
    77         if not event_ids or not event_ids[0]:
       
    78             raise http.HttpException(400, "The event parameter is missing")
       
    79         self.event = event_ids[0]
       
    80         self._init_props(self.factory.ws_factory, self.factory.conn)
       
    81 
       
    82     #TODO: add error handling
       
    83     def onMessage(self, payload, isBinary):
       
    84         if isBinary:
       
    85             return
       
    86         msg = payload.decode('utf8')
       
    87         params_annot = json.loads(msg)
       
    88 
       
    89         params = {
       
    90             'event': self.event,
       
    91             'channel' : utils.ANNOTATION_CHANNEL,
       
    92             'content' : params_annot
       
    93         }
       
    94 
       
    95         def error_callback(failure):
       
    96             raise http.HttpException(500,"Error when processing message : %r" % failure)
       
    97 
       
    98         def annot_callback(res):
       
    99             if 'ts' in res:
       
   100                 res['ts'] = res['ts'].isoformat()+'Z'
       
   101             if 'uuid' in res:
       
   102                 res['uuid'] = str(res['uuid'])
       
   103             self.sendMessage(json.dumps(res))
       
   104 
       
   105 
       
   106         defer = self.process_annotation(params)
       
   107         defer.addErrback(error_callback)
       
   108         defer.addCallback(annot_callback)
       
   109 
       
   110 
       
   111 class AnotationServerFactory(WebSocketServerFactory):
       
   112 
       
   113     def __init__(self, url, ws_factory, conn, debug = False, debugCodePaths = False):
       
   114         WebSocketServerFactory.__init__(self, url, debug = debug, debugCodePaths = debugCodePaths)
       
   115         self.ws_factory = ws_factory
       
   116         self.conn = conn
       
   117         self.clients = {}
       
   118         self.protocol = AnnotationServerProtocol