1 /*
   2  * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 /*
  25  * @test
  26  * @build DummyWebSocketServer
  27  * @run testng/othervm
  28  *       WebSocketTest
  29  */
  30 
  31 import org.testng.annotations.AfterTest;
  32 import org.testng.annotations.DataProvider;
  33 import org.testng.annotations.Test;
  34 
  35 import java.io.IOException;
  36 import java.net.http.WebSocket;
  37 import java.nio.ByteBuffer;
  38 import java.nio.charset.StandardCharsets;
  39 import java.util.ArrayList;
  40 import java.util.List;
  41 import java.util.concurrent.CompletableFuture;
  42 import java.util.concurrent.CompletionStage;
  43 import java.util.concurrent.TimeUnit;
  44 import java.util.concurrent.atomic.AtomicBoolean;
  45 import java.util.function.Supplier;
  46 import java.util.stream.Collectors;
  47 
  48 import static java.net.http.HttpClient.Builder.NO_PROXY;
  49 import static java.net.http.HttpClient.newBuilder;
  50 import static java.net.http.WebSocket.NORMAL_CLOSURE;
  51 import static org.testng.Assert.assertEquals;
  52 import static org.testng.Assert.assertThrows;
  53 
  54 public class WebSocketTest {
  55 
  56     private static final Class<IllegalArgumentException> IAE = IllegalArgumentException.class;
  57     private static final Class<IllegalStateException> ISE = IllegalStateException.class;
  58     private static final Class<IOException> IOE = IOException.class;
  59 
  60     /* shortcut */
  61     private static void assertFails(Class<? extends Throwable> clazz,
  62                                     CompletionStage<?> stage) {
  63         Support.assertCompletesExceptionally(clazz, stage);
  64     }
  65 
  66     private DummyWebSocketServer server;
  67     private WebSocket webSocket;
  68 
  69     @AfterTest
  70     public void cleanup() {
  71         server.close();
  72         webSocket.abort();
  73     }
  74 
  75     @Test
  76     public void illegalArgument() throws IOException {
  77         server = new DummyWebSocketServer();
  78         server.open();
  79         webSocket = newBuilder().proxy(NO_PROXY).build()
  80                 .newWebSocketBuilder()
  81                 .buildAsync(server.getURI(), new WebSocket.Listener() { })
  82                 .join();
  83 
  84         assertFails(IAE, webSocket.sendPing(ByteBuffer.allocate(126)));
  85         assertFails(IAE, webSocket.sendPing(ByteBuffer.allocate(127)));
  86         assertFails(IAE, webSocket.sendPing(ByteBuffer.allocate(128)));
  87         assertFails(IAE, webSocket.sendPing(ByteBuffer.allocate(129)));
  88         assertFails(IAE, webSocket.sendPing(ByteBuffer.allocate(256)));
  89 
  90         assertFails(IAE, webSocket.sendPong(ByteBuffer.allocate(126)));
  91         assertFails(IAE, webSocket.sendPong(ByteBuffer.allocate(127)));
  92         assertFails(IAE, webSocket.sendPong(ByteBuffer.allocate(128)));
  93         assertFails(IAE, webSocket.sendPong(ByteBuffer.allocate(129)));
  94         assertFails(IAE, webSocket.sendPong(ByteBuffer.allocate(256)));
  95 
  96         assertFails(IOE, webSocket.sendText(Support.incompleteString(), true));
  97         assertFails(IOE, webSocket.sendText(Support.incompleteString(), false));
  98         assertFails(IOE, webSocket.sendText(Support.malformedString(), true));
  99         assertFails(IOE, webSocket.sendText(Support.malformedString(), false));
 100 
 101         assertFails(IAE, webSocket.sendClose(NORMAL_CLOSURE, Support.stringWithNBytes(124)));
 102         assertFails(IAE, webSocket.sendClose(NORMAL_CLOSURE, Support.stringWithNBytes(125)));
 103         assertFails(IAE, webSocket.sendClose(NORMAL_CLOSURE, Support.stringWithNBytes(128)));
 104         assertFails(IAE, webSocket.sendClose(NORMAL_CLOSURE, Support.stringWithNBytes(256)));
 105         assertFails(IAE, webSocket.sendClose(NORMAL_CLOSURE, Support.stringWithNBytes(257)));
 106         assertFails(IAE, webSocket.sendClose(NORMAL_CLOSURE, Support.stringWith2NBytes((123 / 2) + 1)));
 107         assertFails(IAE, webSocket.sendClose(NORMAL_CLOSURE, Support.malformedString()));
 108         assertFails(IAE, webSocket.sendClose(NORMAL_CLOSURE, Support.incompleteString()));
 109 
 110         assertFails(IAE, webSocket.sendClose(-2, "a reason"));
 111         assertFails(IAE, webSocket.sendClose(-1, "a reason"));
 112         assertFails(IAE, webSocket.sendClose(0, "a reason"));
 113         assertFails(IAE, webSocket.sendClose(1, "a reason"));
 114         assertFails(IAE, webSocket.sendClose(500, "a reason"));
 115         assertFails(IAE, webSocket.sendClose(998, "a reason"));
 116         assertFails(IAE, webSocket.sendClose(999, "a reason"));
 117         assertFails(IAE, webSocket.sendClose(1002, "a reason"));
 118         assertFails(IAE, webSocket.sendClose(1003, "a reason"));
 119         assertFails(IAE, webSocket.sendClose(1006, "a reason"));
 120         assertFails(IAE, webSocket.sendClose(1007, "a reason"));
 121         assertFails(IAE, webSocket.sendClose(1009, "a reason"));
 122         assertFails(IAE, webSocket.sendClose(1010, "a reason"));
 123         assertFails(IAE, webSocket.sendClose(1012, "a reason"));
 124         assertFails(IAE, webSocket.sendClose(1013, "a reason"));
 125         assertFails(IAE, webSocket.sendClose(1015, "a reason"));
 126         assertFails(IAE, webSocket.sendClose(5000, "a reason"));
 127         assertFails(IAE, webSocket.sendClose(32768, "a reason"));
 128         assertFails(IAE, webSocket.sendClose(65535, "a reason"));
 129         assertFails(IAE, webSocket.sendClose(65536, "a reason"));
 130         assertFails(IAE, webSocket.sendClose(Integer.MAX_VALUE, "a reason"));
 131         assertFails(IAE, webSocket.sendClose(Integer.MIN_VALUE, "a reason"));
 132 
 133         assertThrows(IAE, () -> webSocket.request(Integer.MIN_VALUE));
 134         assertThrows(IAE, () -> webSocket.request(Long.MIN_VALUE));
 135         assertThrows(IAE, () -> webSocket.request(-1));
 136         assertThrows(IAE, () -> webSocket.request(0));
 137     }
 138 
 139     @Test
 140     public void partialBinaryThenText() throws IOException {
 141         server = new DummyWebSocketServer();
 142         server.open();
 143         webSocket = newBuilder().proxy(NO_PROXY).build().newWebSocketBuilder()
 144                 .buildAsync(server.getURI(), new WebSocket.Listener() { })
 145                 .join();
 146         webSocket.sendBinary(ByteBuffer.allocate(16), false).join();
 147         assertFails(ISE, webSocket.sendText("text", false));
 148         assertFails(ISE, webSocket.sendText("text", true));
 149         // Pings & Pongs are fine
 150         webSocket.sendPing(ByteBuffer.allocate(125)).join();
 151         webSocket.sendPong(ByteBuffer.allocate(125)).join();
 152     }
 153 
 154     @Test
 155     public void partialTextThenBinary() throws IOException {
 156         server = new DummyWebSocketServer();
 157         server.open();
 158         webSocket = newBuilder().proxy(NO_PROXY).build().newWebSocketBuilder()
 159                 .buildAsync(server.getURI(), new WebSocket.Listener() { })
 160                 .join();
 161 
 162         webSocket.sendText("text", false).join();
 163         assertFails(ISE, webSocket.sendBinary(ByteBuffer.allocate(16), false));
 164         assertFails(ISE, webSocket.sendBinary(ByteBuffer.allocate(16), true));
 165         // Pings & Pongs are fine
 166         webSocket.sendPing(ByteBuffer.allocate(125)).join();
 167         webSocket.sendPong(ByteBuffer.allocate(125)).join();
 168     }
 169 
 170     @Test
 171     public void sendMethodsThrowIOE1() throws IOException {
 172         server = new DummyWebSocketServer();
 173         server.open();
 174         webSocket = newBuilder().proxy(NO_PROXY).build()
 175                 .newWebSocketBuilder()
 176                 .buildAsync(server.getURI(), new WebSocket.Listener() { })
 177                 .join();
 178 
 179         webSocket.sendClose(NORMAL_CLOSURE, "ok").join();
 180 
 181         assertFails(IOE, webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "ok"));
 182 
 183         assertFails(IOE, webSocket.sendText("", true));
 184         assertFails(IOE, webSocket.sendText("", false));
 185         assertFails(IOE, webSocket.sendText("abc", true));
 186         assertFails(IOE, webSocket.sendText("abc", false));
 187         assertFails(IOE, webSocket.sendBinary(ByteBuffer.allocate(0), true));
 188         assertFails(IOE, webSocket.sendBinary(ByteBuffer.allocate(0), false));
 189         assertFails(IOE, webSocket.sendBinary(ByteBuffer.allocate(1), true));
 190         assertFails(IOE, webSocket.sendBinary(ByteBuffer.allocate(1), false));
 191 
 192         assertFails(IOE, webSocket.sendPing(ByteBuffer.allocate(125)));
 193         assertFails(IOE, webSocket.sendPing(ByteBuffer.allocate(124)));
 194         assertFails(IOE, webSocket.sendPing(ByteBuffer.allocate(1)));
 195         assertFails(IOE, webSocket.sendPing(ByteBuffer.allocate(0)));
 196 
 197         assertFails(IOE, webSocket.sendPong(ByteBuffer.allocate(125)));
 198         assertFails(IOE, webSocket.sendPong(ByteBuffer.allocate(124)));
 199         assertFails(IOE, webSocket.sendPong(ByteBuffer.allocate(1)));
 200         assertFails(IOE, webSocket.sendPong(ByteBuffer.allocate(0)));
 201     }
 202 
 203     @DataProvider(name = "sequence")
 204     public Object[][] data1() {
 205         int[] CLOSE = {
 206                 0x81, 0x00, // ""
 207                 0x82, 0x00, // []
 208                 0x89, 0x00, // <PING>
 209                 0x8a, 0x00, // <PONG>
 210                 0x88, 0x00, // <CLOSE>
 211         };
 212         int[] ERROR = {
 213                 0x81, 0x00, // ""
 214                 0x82, 0x00, // []
 215                 0x89, 0x00, // <PING>
 216                 0x8a, 0x00, // <PONG>
 217                 0x8b, 0x00, // 0xB control frame (causes an error)
 218         };
 219         return new Object[][]{
 220                 {CLOSE, 1},
 221                 {CLOSE, 3},
 222                 {CLOSE, 4},
 223                 {CLOSE, Long.MAX_VALUE},
 224                 {ERROR, 1},
 225                 {ERROR, 3},
 226                 {ERROR, 4},
 227                 {ERROR, Long.MAX_VALUE},
 228         };
 229     }
 230 
 231     @Test(dataProvider = "sequence")
 232     public void listenerSequentialOrder(int[] binary, long requestSize)
 233             throws IOException
 234     {
 235 
 236         server = Support.serverWithCannedData(binary);
 237         server.open();
 238 
 239         CompletableFuture<Void> violation = new CompletableFuture<>();
 240 
 241         MockListener listener = new MockListener(requestSize) {
 242 
 243             final AtomicBoolean guard = new AtomicBoolean();
 244 
 245             private <T> T checkRunExclusively(Supplier<T> action) {
 246                 if (guard.getAndSet(true)) {
 247                     violation.completeExceptionally(new RuntimeException());
 248                 }
 249                 try {
 250                     return action.get();
 251                 } finally {
 252                     if (!guard.getAndSet(false)) {
 253                         violation.completeExceptionally(new RuntimeException());
 254                     }
 255                 }
 256             }
 257 
 258             @Override
 259             public void onOpen(WebSocket webSocket) {
 260                 checkRunExclusively(() -> {
 261                     super.onOpen(webSocket);
 262                     return null;
 263                 });
 264             }
 265 
 266             @Override
 267             public CompletionStage<?> onText(WebSocket webSocket,
 268                                              CharSequence data,
 269                                              boolean last) {
 270                 return checkRunExclusively(
 271                         () -> super.onText(webSocket, data, last));
 272             }
 273 
 274             @Override
 275             public CompletionStage<?> onBinary(WebSocket webSocket,
 276                                                ByteBuffer data,
 277                                                boolean last) {
 278                 return checkRunExclusively(
 279                         () -> super.onBinary(webSocket, data, last));
 280             }
 281 
 282             @Override
 283             public CompletionStage<?> onPing(WebSocket webSocket,
 284                                              ByteBuffer message) {
 285                 return checkRunExclusively(
 286                         () -> super.onPing(webSocket, message));
 287             }
 288 
 289             @Override
 290             public CompletionStage<?> onPong(WebSocket webSocket,
 291                                              ByteBuffer message) {
 292                 return checkRunExclusively(
 293                         () -> super.onPong(webSocket, message));
 294             }
 295 
 296             @Override
 297             public CompletionStage<?> onClose(WebSocket webSocket,
 298                                               int statusCode,
 299                                               String reason) {
 300                 return checkRunExclusively(
 301                         () -> super.onClose(webSocket, statusCode, reason));
 302             }
 303 
 304             @Override
 305             public void onError(WebSocket webSocket, Throwable error) {
 306                 checkRunExclusively(() -> {
 307                     super.onError(webSocket, error);
 308                     return null;
 309                 });
 310             }
 311         };
 312 
 313         webSocket = newBuilder().proxy(NO_PROXY).build().newWebSocketBuilder()
 314                 .buildAsync(server.getURI(), listener)
 315                 .join();
 316 
 317 
 318         listener.invocations();
 319         violation.complete(null); // won't affect if completed exceptionally
 320         violation.join();
 321     }
 322 
 323     @Test
 324     public void sendMethodsThrowIOE2() throws Exception {
 325         server = Support.serverWithCannedData(0x88, 0x00);
 326         server.open();
 327         CompletableFuture<Void> onCloseCalled = new CompletableFuture<>();
 328         CompletableFuture<Void> canClose = new CompletableFuture<>();
 329 
 330         WebSocket.Listener listener = new WebSocket.Listener() {
 331             @Override
 332             public CompletionStage<?> onClose(WebSocket webSocket,
 333                                               int statusCode,
 334                                               String reason) {
 335                 System.out.printf("onClose(%s, '%s')%n", statusCode, reason);
 336                 onCloseCalled.complete(null);
 337                 return canClose;
 338             }
 339 
 340             @Override
 341             public void onError(WebSocket webSocket, Throwable error) {
 342                 System.out.println("onError(" + error + ")");
 343                 onCloseCalled.completeExceptionally(error);
 344             }
 345         };
 346 
 347         webSocket = newBuilder().proxy(NO_PROXY).build().newWebSocketBuilder()
 348                 .buildAsync(server.getURI(), listener)
 349                 .join();
 350 
 351         onCloseCalled.join();      // Wait for onClose to be called
 352         canClose.complete(null);   // Signal to the WebSocket it can close the output
 353         TimeUnit.SECONDS.sleep(5); // Give canClose some time to reach the WebSocket
 354 
 355         assertFails(IOE, webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "ok"));
 356 
 357         assertFails(IOE, webSocket.sendText("", true));
 358         assertFails(IOE, webSocket.sendText("", false));
 359         assertFails(IOE, webSocket.sendText("abc", true));
 360         assertFails(IOE, webSocket.sendText("abc", false));
 361         assertFails(IOE, webSocket.sendBinary(ByteBuffer.allocate(0), true));
 362         assertFails(IOE, webSocket.sendBinary(ByteBuffer.allocate(0), false));
 363         assertFails(IOE, webSocket.sendBinary(ByteBuffer.allocate(1), true));
 364         assertFails(IOE, webSocket.sendBinary(ByteBuffer.allocate(1), false));
 365 
 366         assertFails(IOE, webSocket.sendPing(ByteBuffer.allocate(125)));
 367         assertFails(IOE, webSocket.sendPing(ByteBuffer.allocate(124)));
 368         assertFails(IOE, webSocket.sendPing(ByteBuffer.allocate(1)));
 369         assertFails(IOE, webSocket.sendPing(ByteBuffer.allocate(0)));
 370 
 371         assertFails(IOE, webSocket.sendPong(ByteBuffer.allocate(125)));
 372         assertFails(IOE, webSocket.sendPong(ByteBuffer.allocate(124)));
 373         assertFails(IOE, webSocket.sendPong(ByteBuffer.allocate(1)));
 374         assertFails(IOE, webSocket.sendPong(ByteBuffer.allocate(0)));
 375     }
 376 
 377     @Test
 378     public void simpleAggregatingBinaryMessages() throws IOException {
 379         List<byte[]> expected = List.of("alpha", "beta", "gamma", "delta")
 380                 .stream()
 381                 .map(s -> s.getBytes(StandardCharsets.US_ASCII))
 382                 .collect(Collectors.toList());
 383         int[] binary = new int[]{
 384                 0x82, 0x05, 0x61, 0x6c, 0x70, 0x68, 0x61, // [alpha]
 385                 0x02, 0x02, 0x62, 0x65,                   // [be
 386                 0x80, 0x02, 0x74, 0x61,                   // ta]
 387                 0x02, 0x01, 0x67,                         // [g
 388                 0x00, 0x01, 0x61,                         // a
 389                 0x00, 0x00,                               //
 390                 0x00, 0x00,                               //
 391                 0x00, 0x01, 0x6d,                         // m
 392                 0x00, 0x01, 0x6d,                         // m
 393                 0x80, 0x01, 0x61,                         // a]
 394                 0x8a, 0x00,                               // <PONG>
 395                 0x02, 0x04, 0x64, 0x65, 0x6c, 0x74,       // [delt
 396                 0x00, 0x01, 0x61,                         // a
 397                 0x80, 0x00,                               // ]
 398                 0x88, 0x00                                // <CLOSE>
 399         };
 400         CompletableFuture<List<byte[]>> actual = new CompletableFuture<>();
 401 
 402         server = Support.serverWithCannedData(binary);
 403         server.open();
 404 
 405         WebSocket.Listener listener = new WebSocket.Listener() {
 406 
 407             List<byte[]> collectedBytes = new ArrayList<>();
 408             ByteBuffer buffer = ByteBuffer.allocate(1024);
 409 
 410             @Override
 411             public CompletionStage<?> onBinary(WebSocket webSocket,
 412                                                ByteBuffer message,
 413                                                boolean last) {
 414                 System.out.printf("onBinary(%s, %s)%n", message, last);
 415                 webSocket.request(1);
 416 
 417                 append(message);
 418                 if (last) {
 419                     buffer.flip();
 420                     byte[] bytes = new byte[buffer.remaining()];
 421                     buffer.get(bytes);
 422                     buffer.clear();
 423                     processWholeBinary(bytes);
 424                 }
 425                 return null;
 426             }
 427 
 428             private void append(ByteBuffer message) {
 429                 if (buffer.remaining() < message.remaining()) {
 430                     assert message.remaining() > 0;
 431                     int cap = (buffer.capacity() + message.remaining()) * 2;
 432                     ByteBuffer b = ByteBuffer.allocate(cap);
 433                     b.put(buffer.flip());
 434                     buffer = b;
 435                 }
 436                 buffer.put(message);
 437             }
 438 
 439             private void processWholeBinary(byte[] bytes) {
 440                 String stringBytes = new String(bytes, StandardCharsets.UTF_8);
 441                 System.out.println("processWholeBinary: " + stringBytes);
 442                 collectedBytes.add(bytes);
 443             }
 444 
 445             @Override
 446             public CompletionStage<?> onClose(WebSocket webSocket,
 447                                               int statusCode,
 448                                               String reason) {
 449                 actual.complete(collectedBytes);
 450                 return null;
 451             }
 452 
 453             @Override
 454             public void onError(WebSocket webSocket, Throwable error) {
 455                 actual.completeExceptionally(error);
 456             }
 457         };
 458 
 459         webSocket = newBuilder().proxy(NO_PROXY).build().newWebSocketBuilder()
 460                 .buildAsync(server.getURI(), listener)
 461                 .join();
 462 
 463         List<byte[]> a = actual.join();
 464         assertEquals(a, expected);
 465     }
 466 
 467     @Test
 468     public void simpleAggregatingTextMessages() throws IOException {
 469 
 470         List<String> expected = List.of("alpha", "beta", "gamma", "delta");
 471 
 472         int[] binary = new int[]{
 473                 0x81, 0x05, 0x61, 0x6c, 0x70, 0x68, 0x61, // "alpha"
 474                 0x01, 0x02, 0x62, 0x65,                   // "be
 475                 0x80, 0x02, 0x74, 0x61,                   // ta"
 476                 0x01, 0x01, 0x67,                         // "g
 477                 0x00, 0x01, 0x61,                         // a
 478                 0x00, 0x00,                               //
 479                 0x00, 0x00,                               //
 480                 0x00, 0x01, 0x6d,                         // m
 481                 0x00, 0x01, 0x6d,                         // m
 482                 0x80, 0x01, 0x61,                         // a"
 483                 0x8a, 0x00,                               // <PONG>
 484                 0x01, 0x04, 0x64, 0x65, 0x6c, 0x74,       // "delt
 485                 0x00, 0x01, 0x61,                         // a
 486                 0x80, 0x00,                               // "
 487                 0x88, 0x00                                // <CLOSE>
 488         };
 489         CompletableFuture<List<String>> actual = new CompletableFuture<>();
 490 
 491         server = Support.serverWithCannedData(binary);
 492         server.open();
 493 
 494         WebSocket.Listener listener = new WebSocket.Listener() {
 495 
 496             List<String> collectedStrings = new ArrayList<>();
 497             StringBuilder text = new StringBuilder();
 498 
 499             @Override
 500             public CompletionStage<?> onText(WebSocket webSocket,
 501                                              CharSequence message,
 502                                              boolean last) {
 503                 System.out.printf("onText(%s, %s)%n", message, last);
 504                 webSocket.request(1);
 505                 text.append(message);
 506                 if (last) {
 507                     String str = text.toString();
 508                     text.setLength(0);
 509                     processWholeText(str);
 510                 }
 511                 return null;
 512             }
 513 
 514             private void processWholeText(String string) {
 515                 System.out.println(string);
 516                 collectedStrings.add(string);
 517             }
 518 
 519             @Override
 520             public CompletionStage<?> onClose(WebSocket webSocket,
 521                                               int statusCode,
 522                                               String reason) {
 523                 actual.complete(collectedStrings);
 524                 return null;
 525             }
 526 
 527             @Override
 528             public void onError(WebSocket webSocket, Throwable error) {
 529                 actual.completeExceptionally(error);
 530             }
 531         };
 532 
 533         webSocket = newBuilder().proxy(NO_PROXY).build().newWebSocketBuilder()
 534                 .buildAsync(server.getURI(), listener)
 535                 .join();
 536 
 537         List<String> a = actual.join();
 538         assertEquals(a, expected);
 539     }
 540 
 541     /*
 542      * Exercises the scenario where requests for more messages are made prior to
 543      * completing the returned CompletionStage instances.
 544      */
 545     @Test
 546     public void aggregatingTextMessages() throws IOException {
 547 
 548         List<String> expected = List.of("alpha", "beta", "gamma", "delta");
 549 
 550         int[] binary = new int[]{
 551                 0x81, 0x05, 0x61, 0x6c, 0x70, 0x68, 0x61, // "alpha"
 552                 0x01, 0x02, 0x62, 0x65,                   // "be
 553                 0x80, 0x02, 0x74, 0x61,                   // ta"
 554                 0x01, 0x01, 0x67,                         // "g
 555                 0x00, 0x01, 0x61,                         // a
 556                 0x00, 0x00,                               //
 557                 0x00, 0x00,                               //
 558                 0x00, 0x01, 0x6d,                         // m
 559                 0x00, 0x01, 0x6d,                         // m
 560                 0x80, 0x01, 0x61,                         // a"
 561                 0x8a, 0x00,                               // <PONG>
 562                 0x01, 0x04, 0x64, 0x65, 0x6c, 0x74,       // "delt
 563                 0x00, 0x01, 0x61,                         // a
 564                 0x80, 0x00,                               // "
 565                 0x88, 0x00                                // <CLOSE>
 566         };
 567         CompletableFuture<List<String>> actual = new CompletableFuture<>();
 568 
 569 
 570         server = Support.serverWithCannedData(binary);
 571         server.open();
 572 
 573         WebSocket.Listener listener = new WebSocket.Listener() {
 574 
 575             List<CharSequence> parts = new ArrayList<>();
 576             /*
 577              * A CompletableFuture which will complete once the current
 578              * message has been fully assembled. Until then the listener
 579              * returns this instance for every call.
 580              */
 581             CompletableFuture<?> currentCf = new CompletableFuture<>();
 582             List<String> collected = new ArrayList<>();
 583 
 584             @Override
 585             public CompletionStage<?> onText(WebSocket webSocket,
 586                                              CharSequence message,
 587                                              boolean last) {
 588                 parts.add(message);
 589                 if (!last) {
 590                     webSocket.request(1);
 591                 } else {
 592                     this.currentCf.thenRun(() -> webSocket.request(1));
 593                     CompletableFuture<?> refCf = this.currentCf;
 594                     processWholeMessage(new ArrayList<>(parts), refCf);
 595                     currentCf = new CompletableFuture<>();
 596                     parts.clear();
 597                     return refCf;
 598                 }
 599                 return currentCf;
 600             }
 601 
 602             @Override
 603             public CompletionStage<?> onClose(WebSocket webSocket,
 604                                               int statusCode,
 605                                               String reason) {
 606                 actual.complete(collected);
 607                 return null;
 608             }
 609 
 610             @Override
 611             public void onError(WebSocket webSocket, Throwable error) {
 612                 actual.completeExceptionally(error);
 613             }
 614 
 615             public void processWholeMessage(List<CharSequence> data,
 616                                             CompletableFuture<?> cf) {
 617                 StringBuilder b = new StringBuilder();
 618                 data.forEach(b::append);
 619                 String s = b.toString();
 620                 System.out.println(s);
 621                 cf.complete(null);
 622                 collected.add(s);
 623             }
 624         };
 625 
 626         webSocket = newBuilder().proxy(NO_PROXY).build().newWebSocketBuilder()
 627                 .buildAsync(server.getURI(), listener)
 628                 .join();
 629 
 630         List<String> a = actual.join();
 631         assertEquals(a, expected);
 632     }
 633 }