1 /* 2 * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* 25 * @test 26 * @build DummyWebSocketServer 27 * @run testng/othervm 28 * WebSocketTest 29 */ 30 31 import org.testng.annotations.AfterTest; 32 import org.testng.annotations.DataProvider; 33 import org.testng.annotations.Test; 34 35 import java.io.IOException; 36 import java.net.http.WebSocket; 37 import java.nio.ByteBuffer; 38 import java.nio.charset.StandardCharsets; 39 import java.util.ArrayList; 40 import java.util.List; 41 import java.util.concurrent.CompletableFuture; 42 import java.util.concurrent.CompletionStage; 43 import java.util.concurrent.TimeUnit; 44 import java.util.concurrent.atomic.AtomicBoolean; 45 import java.util.function.Supplier; 46 import java.util.stream.Collectors; 47 48 import static java.net.http.HttpClient.Builder.NO_PROXY; 49 import static java.net.http.HttpClient.newBuilder; 50 import static java.net.http.WebSocket.NORMAL_CLOSURE; 51 import static org.testng.Assert.assertEquals; 52 import static org.testng.Assert.assertThrows; 53 54 public class WebSocketTest { 55 56 private static final Class<IllegalArgumentException> IAE = IllegalArgumentException.class; 57 private static final Class<IllegalStateException> ISE = IllegalStateException.class; 58 private static final Class<IOException> IOE = IOException.class; 59 60 /* shortcut */ 61 private static void assertFails(Class<? extends Throwable> clazz, 62 CompletionStage<?> stage) { 63 Support.assertCompletesExceptionally(clazz, stage); 64 } 65 66 private DummyWebSocketServer server; 67 private WebSocket webSocket; 68 69 @AfterTest 70 public void cleanup() { 71 server.close(); 72 webSocket.abort(); 73 } 74 75 @Test 76 public void illegalArgument() throws IOException { 77 server = new DummyWebSocketServer(); 78 server.open(); 79 webSocket = newBuilder().proxy(NO_PROXY).build() 80 .newWebSocketBuilder() 81 .buildAsync(server.getURI(), new WebSocket.Listener() { }) 82 .join(); 83 84 assertFails(IAE, webSocket.sendPing(ByteBuffer.allocate(126))); 85 assertFails(IAE, webSocket.sendPing(ByteBuffer.allocate(127))); 86 assertFails(IAE, webSocket.sendPing(ByteBuffer.allocate(128))); 87 assertFails(IAE, webSocket.sendPing(ByteBuffer.allocate(129))); 88 assertFails(IAE, webSocket.sendPing(ByteBuffer.allocate(256))); 89 90 assertFails(IAE, webSocket.sendPong(ByteBuffer.allocate(126))); 91 assertFails(IAE, webSocket.sendPong(ByteBuffer.allocate(127))); 92 assertFails(IAE, webSocket.sendPong(ByteBuffer.allocate(128))); 93 assertFails(IAE, webSocket.sendPong(ByteBuffer.allocate(129))); 94 assertFails(IAE, webSocket.sendPong(ByteBuffer.allocate(256))); 95 96 assertFails(IOE, webSocket.sendText(Support.incompleteString(), true)); 97 assertFails(IOE, webSocket.sendText(Support.incompleteString(), false)); 98 assertFails(IOE, webSocket.sendText(Support.malformedString(), true)); 99 assertFails(IOE, webSocket.sendText(Support.malformedString(), false)); 100 101 assertFails(IAE, webSocket.sendClose(NORMAL_CLOSURE, Support.stringWithNBytes(124))); 102 assertFails(IAE, webSocket.sendClose(NORMAL_CLOSURE, Support.stringWithNBytes(125))); 103 assertFails(IAE, webSocket.sendClose(NORMAL_CLOSURE, Support.stringWithNBytes(128))); 104 assertFails(IAE, webSocket.sendClose(NORMAL_CLOSURE, Support.stringWithNBytes(256))); 105 assertFails(IAE, webSocket.sendClose(NORMAL_CLOSURE, Support.stringWithNBytes(257))); 106 assertFails(IAE, webSocket.sendClose(NORMAL_CLOSURE, Support.stringWith2NBytes((123 / 2) + 1))); 107 assertFails(IAE, webSocket.sendClose(NORMAL_CLOSURE, Support.malformedString())); 108 assertFails(IAE, webSocket.sendClose(NORMAL_CLOSURE, Support.incompleteString())); 109 110 assertFails(IAE, webSocket.sendClose(-2, "a reason")); 111 assertFails(IAE, webSocket.sendClose(-1, "a reason")); 112 assertFails(IAE, webSocket.sendClose(0, "a reason")); 113 assertFails(IAE, webSocket.sendClose(1, "a reason")); 114 assertFails(IAE, webSocket.sendClose(500, "a reason")); 115 assertFails(IAE, webSocket.sendClose(998, "a reason")); 116 assertFails(IAE, webSocket.sendClose(999, "a reason")); 117 assertFails(IAE, webSocket.sendClose(1002, "a reason")); 118 assertFails(IAE, webSocket.sendClose(1003, "a reason")); 119 assertFails(IAE, webSocket.sendClose(1006, "a reason")); 120 assertFails(IAE, webSocket.sendClose(1007, "a reason")); 121 assertFails(IAE, webSocket.sendClose(1009, "a reason")); 122 assertFails(IAE, webSocket.sendClose(1010, "a reason")); 123 assertFails(IAE, webSocket.sendClose(1012, "a reason")); 124 assertFails(IAE, webSocket.sendClose(1013, "a reason")); 125 assertFails(IAE, webSocket.sendClose(1015, "a reason")); 126 assertFails(IAE, webSocket.sendClose(5000, "a reason")); 127 assertFails(IAE, webSocket.sendClose(32768, "a reason")); 128 assertFails(IAE, webSocket.sendClose(65535, "a reason")); 129 assertFails(IAE, webSocket.sendClose(65536, "a reason")); 130 assertFails(IAE, webSocket.sendClose(Integer.MAX_VALUE, "a reason")); 131 assertFails(IAE, webSocket.sendClose(Integer.MIN_VALUE, "a reason")); 132 133 assertThrows(IAE, () -> webSocket.request(Integer.MIN_VALUE)); 134 assertThrows(IAE, () -> webSocket.request(Long.MIN_VALUE)); 135 assertThrows(IAE, () -> webSocket.request(-1)); 136 assertThrows(IAE, () -> webSocket.request(0)); 137 } 138 139 @Test 140 public void partialBinaryThenText() throws IOException { 141 server = new DummyWebSocketServer(); 142 server.open(); 143 webSocket = newBuilder().proxy(NO_PROXY).build().newWebSocketBuilder() 144 .buildAsync(server.getURI(), new WebSocket.Listener() { }) 145 .join(); 146 webSocket.sendBinary(ByteBuffer.allocate(16), false).join(); 147 assertFails(ISE, webSocket.sendText("text", false)); 148 assertFails(ISE, webSocket.sendText("text", true)); 149 // Pings & Pongs are fine 150 webSocket.sendPing(ByteBuffer.allocate(125)).join(); 151 webSocket.sendPong(ByteBuffer.allocate(125)).join(); 152 } 153 154 @Test 155 public void partialTextThenBinary() throws IOException { 156 server = new DummyWebSocketServer(); 157 server.open(); 158 webSocket = newBuilder().proxy(NO_PROXY).build().newWebSocketBuilder() 159 .buildAsync(server.getURI(), new WebSocket.Listener() { }) 160 .join(); 161 162 webSocket.sendText("text", false).join(); 163 assertFails(ISE, webSocket.sendBinary(ByteBuffer.allocate(16), false)); 164 assertFails(ISE, webSocket.sendBinary(ByteBuffer.allocate(16), true)); 165 // Pings & Pongs are fine 166 webSocket.sendPing(ByteBuffer.allocate(125)).join(); 167 webSocket.sendPong(ByteBuffer.allocate(125)).join(); 168 } 169 170 @Test 171 public void sendMethodsThrowIOE1() throws IOException { 172 server = new DummyWebSocketServer(); 173 server.open(); 174 webSocket = newBuilder().proxy(NO_PROXY).build() 175 .newWebSocketBuilder() 176 .buildAsync(server.getURI(), new WebSocket.Listener() { }) 177 .join(); 178 179 webSocket.sendClose(NORMAL_CLOSURE, "ok").join(); 180 181 assertFails(IOE, webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "ok")); 182 183 assertFails(IOE, webSocket.sendText("", true)); 184 assertFails(IOE, webSocket.sendText("", false)); 185 assertFails(IOE, webSocket.sendText("abc", true)); 186 assertFails(IOE, webSocket.sendText("abc", false)); 187 assertFails(IOE, webSocket.sendBinary(ByteBuffer.allocate(0), true)); 188 assertFails(IOE, webSocket.sendBinary(ByteBuffer.allocate(0), false)); 189 assertFails(IOE, webSocket.sendBinary(ByteBuffer.allocate(1), true)); 190 assertFails(IOE, webSocket.sendBinary(ByteBuffer.allocate(1), false)); 191 192 assertFails(IOE, webSocket.sendPing(ByteBuffer.allocate(125))); 193 assertFails(IOE, webSocket.sendPing(ByteBuffer.allocate(124))); 194 assertFails(IOE, webSocket.sendPing(ByteBuffer.allocate(1))); 195 assertFails(IOE, webSocket.sendPing(ByteBuffer.allocate(0))); 196 197 assertFails(IOE, webSocket.sendPong(ByteBuffer.allocate(125))); 198 assertFails(IOE, webSocket.sendPong(ByteBuffer.allocate(124))); 199 assertFails(IOE, webSocket.sendPong(ByteBuffer.allocate(1))); 200 assertFails(IOE, webSocket.sendPong(ByteBuffer.allocate(0))); 201 } 202 203 @DataProvider(name = "sequence") 204 public Object[][] data1() { 205 int[] CLOSE = { 206 0x81, 0x00, // "" 207 0x82, 0x00, // [] 208 0x89, 0x00, // <PING> 209 0x8a, 0x00, // <PONG> 210 0x88, 0x00, // <CLOSE> 211 }; 212 int[] ERROR = { 213 0x81, 0x00, // "" 214 0x82, 0x00, // [] 215 0x89, 0x00, // <PING> 216 0x8a, 0x00, // <PONG> 217 0x8b, 0x00, // 0xB control frame (causes an error) 218 }; 219 return new Object[][]{ 220 {CLOSE, 1}, 221 {CLOSE, 3}, 222 {CLOSE, 4}, 223 {CLOSE, Long.MAX_VALUE}, 224 {ERROR, 1}, 225 {ERROR, 3}, 226 {ERROR, 4}, 227 {ERROR, Long.MAX_VALUE}, 228 }; 229 } 230 231 @Test(dataProvider = "sequence") 232 public void listenerSequentialOrder(int[] binary, long requestSize) 233 throws IOException 234 { 235 236 server = Support.serverWithCannedData(binary); 237 server.open(); 238 239 CompletableFuture<Void> violation = new CompletableFuture<>(); 240 241 MockListener listener = new MockListener(requestSize) { 242 243 final AtomicBoolean guard = new AtomicBoolean(); 244 245 private <T> T checkRunExclusively(Supplier<T> action) { 246 if (guard.getAndSet(true)) { 247 violation.completeExceptionally(new RuntimeException()); 248 } 249 try { 250 return action.get(); 251 } finally { 252 if (!guard.getAndSet(false)) { 253 violation.completeExceptionally(new RuntimeException()); 254 } 255 } 256 } 257 258 @Override 259 public void onOpen(WebSocket webSocket) { 260 checkRunExclusively(() -> { 261 super.onOpen(webSocket); 262 return null; 263 }); 264 } 265 266 @Override 267 public CompletionStage<?> onText(WebSocket webSocket, 268 CharSequence data, 269 boolean last) { 270 return checkRunExclusively( 271 () -> super.onText(webSocket, data, last)); 272 } 273 274 @Override 275 public CompletionStage<?> onBinary(WebSocket webSocket, 276 ByteBuffer data, 277 boolean last) { 278 return checkRunExclusively( 279 () -> super.onBinary(webSocket, data, last)); 280 } 281 282 @Override 283 public CompletionStage<?> onPing(WebSocket webSocket, 284 ByteBuffer message) { 285 return checkRunExclusively( 286 () -> super.onPing(webSocket, message)); 287 } 288 289 @Override 290 public CompletionStage<?> onPong(WebSocket webSocket, 291 ByteBuffer message) { 292 return checkRunExclusively( 293 () -> super.onPong(webSocket, message)); 294 } 295 296 @Override 297 public CompletionStage<?> onClose(WebSocket webSocket, 298 int statusCode, 299 String reason) { 300 return checkRunExclusively( 301 () -> super.onClose(webSocket, statusCode, reason)); 302 } 303 304 @Override 305 public void onError(WebSocket webSocket, Throwable error) { 306 checkRunExclusively(() -> { 307 super.onError(webSocket, error); 308 return null; 309 }); 310 } 311 }; 312 313 webSocket = newBuilder().proxy(NO_PROXY).build().newWebSocketBuilder() 314 .buildAsync(server.getURI(), listener) 315 .join(); 316 317 318 listener.invocations(); 319 violation.complete(null); // won't affect if completed exceptionally 320 violation.join(); 321 } 322 323 @Test 324 public void sendMethodsThrowIOE2() throws Exception { 325 server = Support.serverWithCannedData(0x88, 0x00); 326 server.open(); 327 CompletableFuture<Void> onCloseCalled = new CompletableFuture<>(); 328 CompletableFuture<Void> canClose = new CompletableFuture<>(); 329 330 WebSocket.Listener listener = new WebSocket.Listener() { 331 @Override 332 public CompletionStage<?> onClose(WebSocket webSocket, 333 int statusCode, 334 String reason) { 335 System.out.printf("onClose(%s, '%s')%n", statusCode, reason); 336 onCloseCalled.complete(null); 337 return canClose; 338 } 339 340 @Override 341 public void onError(WebSocket webSocket, Throwable error) { 342 System.out.println("onError(" + error + ")"); 343 onCloseCalled.completeExceptionally(error); 344 } 345 }; 346 347 webSocket = newBuilder().proxy(NO_PROXY).build().newWebSocketBuilder() 348 .buildAsync(server.getURI(), listener) 349 .join(); 350 351 onCloseCalled.join(); // Wait for onClose to be called 352 canClose.complete(null); // Signal to the WebSocket it can close the output 353 TimeUnit.SECONDS.sleep(5); // Give canClose some time to reach the WebSocket 354 355 assertFails(IOE, webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "ok")); 356 357 assertFails(IOE, webSocket.sendText("", true)); 358 assertFails(IOE, webSocket.sendText("", false)); 359 assertFails(IOE, webSocket.sendText("abc", true)); 360 assertFails(IOE, webSocket.sendText("abc", false)); 361 assertFails(IOE, webSocket.sendBinary(ByteBuffer.allocate(0), true)); 362 assertFails(IOE, webSocket.sendBinary(ByteBuffer.allocate(0), false)); 363 assertFails(IOE, webSocket.sendBinary(ByteBuffer.allocate(1), true)); 364 assertFails(IOE, webSocket.sendBinary(ByteBuffer.allocate(1), false)); 365 366 assertFails(IOE, webSocket.sendPing(ByteBuffer.allocate(125))); 367 assertFails(IOE, webSocket.sendPing(ByteBuffer.allocate(124))); 368 assertFails(IOE, webSocket.sendPing(ByteBuffer.allocate(1))); 369 assertFails(IOE, webSocket.sendPing(ByteBuffer.allocate(0))); 370 371 assertFails(IOE, webSocket.sendPong(ByteBuffer.allocate(125))); 372 assertFails(IOE, webSocket.sendPong(ByteBuffer.allocate(124))); 373 assertFails(IOE, webSocket.sendPong(ByteBuffer.allocate(1))); 374 assertFails(IOE, webSocket.sendPong(ByteBuffer.allocate(0))); 375 } 376 377 @Test 378 public void simpleAggregatingBinaryMessages() throws IOException { 379 List<byte[]> expected = List.of("alpha", "beta", "gamma", "delta") 380 .stream() 381 .map(s -> s.getBytes(StandardCharsets.US_ASCII)) 382 .collect(Collectors.toList()); 383 int[] binary = new int[]{ 384 0x82, 0x05, 0x61, 0x6c, 0x70, 0x68, 0x61, // [alpha] 385 0x02, 0x02, 0x62, 0x65, // [be 386 0x80, 0x02, 0x74, 0x61, // ta] 387 0x02, 0x01, 0x67, // [g 388 0x00, 0x01, 0x61, // a 389 0x00, 0x00, // 390 0x00, 0x00, // 391 0x00, 0x01, 0x6d, // m 392 0x00, 0x01, 0x6d, // m 393 0x80, 0x01, 0x61, // a] 394 0x8a, 0x00, // <PONG> 395 0x02, 0x04, 0x64, 0x65, 0x6c, 0x74, // [delt 396 0x00, 0x01, 0x61, // a 397 0x80, 0x00, // ] 398 0x88, 0x00 // <CLOSE> 399 }; 400 CompletableFuture<List<byte[]>> actual = new CompletableFuture<>(); 401 402 server = Support.serverWithCannedData(binary); 403 server.open(); 404 405 WebSocket.Listener listener = new WebSocket.Listener() { 406 407 List<byte[]> collectedBytes = new ArrayList<>(); 408 ByteBuffer buffer = ByteBuffer.allocate(1024); 409 410 @Override 411 public CompletionStage<?> onBinary(WebSocket webSocket, 412 ByteBuffer message, 413 boolean last) { 414 System.out.printf("onBinary(%s, %s)%n", message, last); 415 webSocket.request(1); 416 417 append(message); 418 if (last) { 419 buffer.flip(); 420 byte[] bytes = new byte[buffer.remaining()]; 421 buffer.get(bytes); 422 buffer.clear(); 423 processWholeBinary(bytes); 424 } 425 return null; 426 } 427 428 private void append(ByteBuffer message) { 429 if (buffer.remaining() < message.remaining()) { 430 assert message.remaining() > 0; 431 int cap = (buffer.capacity() + message.remaining()) * 2; 432 ByteBuffer b = ByteBuffer.allocate(cap); 433 b.put(buffer.flip()); 434 buffer = b; 435 } 436 buffer.put(message); 437 } 438 439 private void processWholeBinary(byte[] bytes) { 440 String stringBytes = new String(bytes, StandardCharsets.UTF_8); 441 System.out.println("processWholeBinary: " + stringBytes); 442 collectedBytes.add(bytes); 443 } 444 445 @Override 446 public CompletionStage<?> onClose(WebSocket webSocket, 447 int statusCode, 448 String reason) { 449 actual.complete(collectedBytes); 450 return null; 451 } 452 453 @Override 454 public void onError(WebSocket webSocket, Throwable error) { 455 actual.completeExceptionally(error); 456 } 457 }; 458 459 webSocket = newBuilder().proxy(NO_PROXY).build().newWebSocketBuilder() 460 .buildAsync(server.getURI(), listener) 461 .join(); 462 463 List<byte[]> a = actual.join(); 464 assertEquals(a, expected); 465 } 466 467 @Test 468 public void simpleAggregatingTextMessages() throws IOException { 469 470 List<String> expected = List.of("alpha", "beta", "gamma", "delta"); 471 472 int[] binary = new int[]{ 473 0x81, 0x05, 0x61, 0x6c, 0x70, 0x68, 0x61, // "alpha" 474 0x01, 0x02, 0x62, 0x65, // "be 475 0x80, 0x02, 0x74, 0x61, // ta" 476 0x01, 0x01, 0x67, // "g 477 0x00, 0x01, 0x61, // a 478 0x00, 0x00, // 479 0x00, 0x00, // 480 0x00, 0x01, 0x6d, // m 481 0x00, 0x01, 0x6d, // m 482 0x80, 0x01, 0x61, // a" 483 0x8a, 0x00, // <PONG> 484 0x01, 0x04, 0x64, 0x65, 0x6c, 0x74, // "delt 485 0x00, 0x01, 0x61, // a 486 0x80, 0x00, // " 487 0x88, 0x00 // <CLOSE> 488 }; 489 CompletableFuture<List<String>> actual = new CompletableFuture<>(); 490 491 server = Support.serverWithCannedData(binary); 492 server.open(); 493 494 WebSocket.Listener listener = new WebSocket.Listener() { 495 496 List<String> collectedStrings = new ArrayList<>(); 497 StringBuilder text = new StringBuilder(); 498 499 @Override 500 public CompletionStage<?> onText(WebSocket webSocket, 501 CharSequence message, 502 boolean last) { 503 System.out.printf("onText(%s, %s)%n", message, last); 504 webSocket.request(1); 505 text.append(message); 506 if (last) { 507 String str = text.toString(); 508 text.setLength(0); 509 processWholeText(str); 510 } 511 return null; 512 } 513 514 private void processWholeText(String string) { 515 System.out.println(string); 516 collectedStrings.add(string); 517 } 518 519 @Override 520 public CompletionStage<?> onClose(WebSocket webSocket, 521 int statusCode, 522 String reason) { 523 actual.complete(collectedStrings); 524 return null; 525 } 526 527 @Override 528 public void onError(WebSocket webSocket, Throwable error) { 529 actual.completeExceptionally(error); 530 } 531 }; 532 533 webSocket = newBuilder().proxy(NO_PROXY).build().newWebSocketBuilder() 534 .buildAsync(server.getURI(), listener) 535 .join(); 536 537 List<String> a = actual.join(); 538 assertEquals(a, expected); 539 } 540 541 /* 542 * Exercises the scenario where requests for more messages are made prior to 543 * completing the returned CompletionStage instances. 544 */ 545 @Test 546 public void aggregatingTextMessages() throws IOException { 547 548 List<String> expected = List.of("alpha", "beta", "gamma", "delta"); 549 550 int[] binary = new int[]{ 551 0x81, 0x05, 0x61, 0x6c, 0x70, 0x68, 0x61, // "alpha" 552 0x01, 0x02, 0x62, 0x65, // "be 553 0x80, 0x02, 0x74, 0x61, // ta" 554 0x01, 0x01, 0x67, // "g 555 0x00, 0x01, 0x61, // a 556 0x00, 0x00, // 557 0x00, 0x00, // 558 0x00, 0x01, 0x6d, // m 559 0x00, 0x01, 0x6d, // m 560 0x80, 0x01, 0x61, // a" 561 0x8a, 0x00, // <PONG> 562 0x01, 0x04, 0x64, 0x65, 0x6c, 0x74, // "delt 563 0x00, 0x01, 0x61, // a 564 0x80, 0x00, // " 565 0x88, 0x00 // <CLOSE> 566 }; 567 CompletableFuture<List<String>> actual = new CompletableFuture<>(); 568 569 570 server = Support.serverWithCannedData(binary); 571 server.open(); 572 573 WebSocket.Listener listener = new WebSocket.Listener() { 574 575 List<CharSequence> parts = new ArrayList<>(); 576 /* 577 * A CompletableFuture which will complete once the current 578 * message has been fully assembled. Until then the listener 579 * returns this instance for every call. 580 */ 581 CompletableFuture<?> currentCf = new CompletableFuture<>(); 582 List<String> collected = new ArrayList<>(); 583 584 @Override 585 public CompletionStage<?> onText(WebSocket webSocket, 586 CharSequence message, 587 boolean last) { 588 parts.add(message); 589 if (!last) { 590 webSocket.request(1); 591 } else { 592 this.currentCf.thenRun(() -> webSocket.request(1)); 593 CompletableFuture<?> refCf = this.currentCf; 594 processWholeMessage(new ArrayList<>(parts), refCf); 595 currentCf = new CompletableFuture<>(); 596 parts.clear(); 597 return refCf; 598 } 599 return currentCf; 600 } 601 602 @Override 603 public CompletionStage<?> onClose(WebSocket webSocket, 604 int statusCode, 605 String reason) { 606 actual.complete(collected); 607 return null; 608 } 609 610 @Override 611 public void onError(WebSocket webSocket, Throwable error) { 612 actual.completeExceptionally(error); 613 } 614 615 public void processWholeMessage(List<CharSequence> data, 616 CompletableFuture<?> cf) { 617 StringBuilder b = new StringBuilder(); 618 data.forEach(b::append); 619 String s = b.toString(); 620 System.out.println(s); 621 cf.complete(null); 622 collected.add(s); 623 } 624 }; 625 626 webSocket = newBuilder().proxy(NO_PROXY).build().newWebSocketBuilder() 627 .buildAsync(server.getURI(), listener) 628 .join(); 629 630 List<String> a = actual.join(); 631 assertEquals(a, expected); 632 } 633 }