< prev index next >
1 /*
2 * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package java.net.http;
26
27 import java.net.http.WebSocket.MessagePart;
28 import java.net.http.WebSocketFrame.Opcode;
29 import java.nio.ByteBuffer;
30 import java.nio.CharBuffer;
31 import java.nio.charset.CharacterCodingException;
32 import java.util.concurrent.atomic.AtomicInteger;
33
34 import static java.lang.String.format;
35 import static java.lang.System.Logger.Level.TRACE;
36 import static java.net.http.Utils.dump;
37 import static java.net.http.Utils.logger;
38 import static java.net.http.WebSocket.CloseCode.NOT_CONSISTENT;
39 import static java.net.http.WebSocket.CloseCode.of;
40 import static java.util.Objects.requireNonNull;
41
42 //
43 // Translates a sequence of WebSocket frame structure events (invocations of
44 // WebSocketFrame.Listener) into a sequence of incoming messages (invocations of
45 // WSReceivedMessages).
46 //
47 // A stateful object. The data that has been consumed, but not yet translated is
48 // accumulated until it's sufficient to form a message.
49 //
50 final class WSTranslator implements WebSocketFrame.Listener {
51
52 private final AtomicInteger invocationOrder = new AtomicInteger();
53
54 private final WSReceivedMessages output;
55 private final CharsetToolkit.Decoder decoder = new CharsetToolkit.Decoder();
56 private boolean fullyRead;
57 private boolean fin;
58 private Opcode opcode, originatingOpcode;
59 private MessagePart part = MessagePart.WHOLE;
60 private long payloadLen;
61 private Shared<ByteBuffer> binaryData;
62
63 WSTranslator(WSReceivedMessages output) {
64 this.output = requireNonNull(output, "listener");
65 }
66
67 @Override
68 public void fin(boolean value) {
69 assert invocationOrder.compareAndSet(0, 1) : dump(invocationOrder, value);
70 if (logger.isLoggable(TRACE)) {
71 // Checked for being loggable because of autoboxing of 'value'
72 logger.log(TRACE, "Reading fin: {0}", value);
73 }
74 fullyRead = false;
75 fin = value;
76 }
77
78 @Override
79 public void rsv1(boolean value) {
80 assert invocationOrder.compareAndSet(1, 2) : dump(invocationOrder, value);
81 if (logger.isLoggable(TRACE)) {
82 logger.log(TRACE, "Reading rsv1: {0}", value);
83 }
84 if (value) {
85 throw new WebSocketProtocolException("5.2.", "rsv1 bit is set unexpectedly");
86 }
87 }
88
89 @Override
90 public void rsv2(boolean value) {
91 assert invocationOrder.compareAndSet(2, 3) : dump(invocationOrder, value);
92 if (logger.isLoggable(TRACE)) {
93 logger.log(TRACE, "Reading rsv2: {0}", value);
94 }
95 if (value) {
96 throw new WebSocketProtocolException("5.2.", "rsv2 bit is set unexpectedly");
97 }
98 }
99
100 @Override
101 public void rsv3(boolean value) {
102 assert invocationOrder.compareAndSet(3, 4) : dump(invocationOrder, value);
103 if (logger.isLoggable(TRACE)) {
104 logger.log(TRACE, "Reading rsv3: {0}", value);
105 }
106 if (value) {
107 throw new WebSocketProtocolException("5.2.", "rsv3 bit is set unexpectedly");
108 }
109 }
110
111 @Override
112 public void opcode(Opcode v) {
113 assert invocationOrder.compareAndSet(4, 5) : dump(invocationOrder, v);
114 logger.log(TRACE, "Reading opcode: {0}", v);
115 if (v == Opcode.PING || v == Opcode.PONG || v == Opcode.CLOSE) {
116 if (!fin) {
117 throw new WebSocketProtocolException("5.5.", "A fragmented control frame " + v);
118 }
119 opcode = v;
120 } else if (v == Opcode.TEXT || v == Opcode.BINARY) {
121 if (originatingOpcode != null) {
122 throw new WebSocketProtocolException
123 ("5.4.", format("An unexpected frame %s (fin=%s)", v, fin));
124 }
125 opcode = v;
126 if (!fin) {
127 originatingOpcode = v;
128 }
129 } else if (v == Opcode.CONTINUATION) {
130 if (originatingOpcode == null) {
131 throw new WebSocketProtocolException
132 ("5.4.", format("An unexpected frame %s (fin=%s)", v, fin));
133 }
134 opcode = v;
135 } else {
136 throw new WebSocketProtocolException("5.2.", "An unknown opcode " + v);
137 }
138 }
139
140 @Override
141 public void mask(boolean value) {
142 assert invocationOrder.compareAndSet(5, 6) : dump(invocationOrder, value);
143 if (logger.isLoggable(TRACE)) {
144 logger.log(TRACE, "Reading mask: {0}", value);
145 }
146 if (value) {
147 throw new WebSocketProtocolException
148 ("5.1.", "Received a masked frame from the server");
149 }
150 }
151
152 @Override
153 public void payloadLen(long value) {
154 assert invocationOrder.compareAndSet(6, 7) : dump(invocationOrder, value);
155 if (logger.isLoggable(TRACE)) {
156 logger.log(TRACE, "Reading payloadLen: {0}", value);
157 }
158 if (opcode.isControl()) {
159 if (value > 125) {
160 throw new WebSocketProtocolException
161 ("5.5.", format("A control frame %s has a payload length of %s",
162 opcode, value));
163 }
164 assert Opcode.CLOSE.isControl();
165 if (opcode == Opcode.CLOSE && value == 1) {
166 throw new WebSocketProtocolException
167 ("5.5.1.", "A Close frame's status code is only 1 byte long");
168 }
169 }
170 payloadLen = value;
171 }
172
173 @Override
174 public void maskingKey(int value) {
175 assert false : dump(invocationOrder, value);
176 }
177
178 @Override
179 public void payloadData(Shared<ByteBuffer> data, boolean isLast) {
180 assert invocationOrder.compareAndSet(7, isLast ? 8 : 7)
181 : dump(invocationOrder, data, isLast);
182 if (logger.isLoggable(TRACE)) {
183 logger.log(TRACE, "Reading payloadData: data={0}, isLast={1}", data, isLast);
184 }
185 if (opcode.isControl()) {
186 if (binaryData != null) {
187 binaryData.put(data);
188 data.dispose();
189 } else if (!isLast) {
190 // The first chunk of the message
191 int remaining = data.remaining();
192 // It shouldn't be 125, otherwise the next chunk will be of size
193 // 0, which is not what Reader promises to deliver (eager
194 // reading)
195 assert remaining < 125 : dump(remaining);
196 Shared<ByteBuffer> b = Shared.wrap(ByteBuffer.allocate(125)).put(data);
197 data.dispose();
198 binaryData = b; // Will be disposed by the user
199 } else {
200 // The only chunk; will be disposed by the user
201 binaryData = data.position(data.limit()); // FIXME: remove this hack
202 }
203 } else {
204 part = determinePart(isLast);
205 boolean text = opcode == Opcode.TEXT || originatingOpcode == Opcode.TEXT;
206 if (!text) {
207 output.onBinary(part, data);
208 } else {
209 // TODO: use pool for CharBuffers WITH composite CharSequence,
210 // since there's no guarantee of 1:1 mapping in UTF-8, and over
211 // time sizes of current buffers in use might diverge
212 CharBuffer throwAway = CharBuffer.allocate(data.remaining());
213 try {
214 decoder.decode(data.buffer(), throwAway, part.isLast());
215 } catch (CharacterCodingException e) {
216 throw new WebSocketProtocolException
217 ("5.6.", "Invalid UTF-8 sequence in frame " + opcode, NOT_CONSISTENT, e);
218 }
219 // FIXME: cases 6.2.4, 6.2.3 (Autobahn Testsuite)
220 // accumulate bytes and characters until there's at least one
221 // more char to deliver
222 data.rewind();
223 throwAway.flip();
224 if (!throwAway.hasRemaining()) {
225 // TODO: corner case
226 }
227 Shared<CharBuffer> textData = Shared.wrap(throwAway);
228 output.onText(part, new DisposableText(textData, data));
229 }
230 }
231 }
232
233 @Override
234 public void endFrame() { // TODO: remove
235 assert invocationOrder.compareAndSet(8, 0) : dump(invocationOrder);
236 if (opcode.isControl()) {
237 binaryData.flip();
238 }
239 switch (opcode) {
240 case CLOSE:
241 WebSocket.CloseCode cc;
242 String reason;
243 if (payloadLen == 0) {
244 cc = null;
245 reason = "";
246 } else {
247 ByteBuffer b = binaryData.buffer();
248 int len = b.remaining();
249 assert 2 <= len && len <= 125 : dump(len, payloadLen);
250 try {
251 cc = of(b.getChar());
252 reason = CharsetToolkit.decode(b).toString();
253 } catch (IllegalArgumentException e) {
254 throw new WebSocketProtocolException
255 ("5.5.1", "Incorrect status code", e);
256 } catch (CharacterCodingException e) {
257 throw new WebSocketProtocolException
258 ("5.5.1", "Close reason is a malformed UTF-8 sequence", e);
259 }
260 }
261 binaryData.dispose(); // Manual dispose
262 output.onClose(cc, reason);
263 break;
264 case PING:
265 output.onPing(binaryData);
266 binaryData = null;
267 break;
268 case PONG:
269 output.onPong(binaryData);
270 binaryData = null;
271 break;
272 default:
273 assert opcode == Opcode.TEXT || opcode == Opcode.BINARY
274 || opcode == Opcode.CONTINUATION : dump(opcode);
275 if (fin) {
276 // It is always the last chunk:
277 // either TEXT(FIN=TRUE)/BINARY(FIN=TRUE) or CONT(FIN=TRUE)
278 originatingOpcode = null;
279 decoder.reset();
280 }
281 break;
282 }
283 payloadLen = 0;
284 opcode = null;
285 fullyRead = true;
286 }
287
288 private MessagePart determinePart(boolean isLast) {
289 boolean lastChunk = fin && isLast;
290 switch (part) {
291 case LAST:
292 case WHOLE:
293 return lastChunk ? MessagePart.WHOLE : MessagePart.FIRST;
294 case FIRST:
295 case PART:
296 return lastChunk ? MessagePart.LAST : MessagePart.PART;
297 default:
298 throw new InternalError(String.valueOf(part));
299 }
300 }
301 }
< prev index next >