|
1 |
|
2 # |
|
3 # See LICENCE for detail |
|
4 # Copyright (c) 2014 IRI |
|
5 # |
|
6 |
|
7 import json |
|
8 |
|
9 from autobahn.twisted.websocket import WebSocketServerFactory, \ |
|
10 WebSocketServerProtocol |
|
11 from autobahn.websocket import http |
|
12 |
|
13 from baseserver import BaseProtocol |
|
14 import utils |
|
15 |
|
16 class BroadcastServerProtocol(WebSocketServerProtocol): |
|
17 |
|
18 def onConnect(self, request): |
|
19 if request.params: |
|
20 self.factory.registerFilter(self, request.params) |
|
21 |
|
22 def onOpen(self): |
|
23 self.factory.register(self) |
|
24 |
|
25 def onMessage(self, payload, isBinary): |
|
26 if not isBinary: |
|
27 msg = "{} from {}".format(payload.decode('utf8'), self.peer) |
|
28 self.factory.broadcast(msg) |
|
29 |
|
30 def connectionLost(self, reason): |
|
31 WebSocketServerProtocol.connectionLost(self, reason) |
|
32 self.factory.unregister(self) |
|
33 |
|
34 |
|
35 class BroadcastServerFactory(WebSocketServerFactory): |
|
36 """ |
|
37 Simple broadcast server broadcasting any message it receives to all |
|
38 currently connected clients. |
|
39 """ |
|
40 |
|
41 def __init__(self, url, debug = False, debugCodePaths = False): |
|
42 WebSocketServerFactory.__init__(self, url, debug = debug, debugCodePaths = debugCodePaths) |
|
43 self.clients = [] |
|
44 self.filters = {} |
|
45 self.protocol = BroadcastServerProtocol |
|
46 |
|
47 def registerFilter(self, client, filter): |
|
48 print("registered filter {} for client {}".format(repr(filter),client.peer)) |
|
49 self.filters[client] = filter |
|
50 |
|
51 def register(self, client): |
|
52 if not client in self.clients: |
|
53 print("registered client {}".format(client.peer)) |
|
54 self.clients.append(client) |
|
55 |
|
56 def unregister(self, client): |
|
57 if client in self.clients: |
|
58 print("unregistered client {}".format(client.peer)) |
|
59 self.clients.remove(client) |
|
60 if client in self.filters: |
|
61 self.filters.pop(client, None) |
|
62 |
|
63 def broadcast(self, msg, filter): |
|
64 print("broadcasting prepared message '{}' ..".format(msg)) |
|
65 #preparedMsg = self.prepareMessage(msg) |
|
66 for c in self.clients: |
|
67 preparedMsg = self.prepareMessage(msg) |
|
68 if all([ (k in filter and filter[k] in v) for k,v in self.filters.get(c, {}).items()]): |
|
69 c.sendPreparedMessage(preparedMsg) |
|
70 print("prepared message sent to {}".format(c.peer)) |
|
71 |
|
72 |
|
73 class AnnotationServerProtocol(WebSocketServerProtocol, BaseProtocol): |
|
74 |
|
75 def onConnect(self, request): |
|
76 event_ids = request.params.get('event', []) |
|
77 if not event_ids or not event_ids[0]: |
|
78 raise http.HttpException(400, "The event parameter is missing") |
|
79 self.event = event_ids[0] |
|
80 self._init_props(self.factory.ws_factory, self.factory.conn) |
|
81 |
|
82 #TODO: add error handling |
|
83 def onMessage(self, payload, isBinary): |
|
84 if isBinary: |
|
85 return |
|
86 msg = payload.decode('utf8') |
|
87 params_annot = json.loads(msg) |
|
88 |
|
89 params = { |
|
90 'event': self.event, |
|
91 'channel' : utils.ANNOTATION_CHANNEL, |
|
92 'content' : params_annot |
|
93 } |
|
94 |
|
95 def error_callback(failure): |
|
96 raise http.HttpException(500,"Error when processing message : %r" % failure) |
|
97 |
|
98 def annot_callback(res): |
|
99 if 'ts' in res: |
|
100 res['ts'] = res['ts'].isoformat()+'Z' |
|
101 if 'uuid' in res: |
|
102 res['uuid'] = str(res['uuid']) |
|
103 self.sendMessage(json.dumps(res)) |
|
104 |
|
105 |
|
106 defer = self.process_annotation(params) |
|
107 defer.addErrback(error_callback) |
|
108 defer.addCallback(annot_callback) |
|
109 |
|
110 |
|
111 class AnotationServerFactory(WebSocketServerFactory): |
|
112 |
|
113 def __init__(self, url, ws_factory, conn, debug = False, debugCodePaths = False): |
|
114 WebSocketServerFactory.__init__(self, url, debug = debug, debugCodePaths = debugCodePaths) |
|
115 self.ws_factory = ws_factory |
|
116 self.conn = conn |
|
117 self.clients = {} |
|
118 self.protocol = AnnotationServerProtocol |