1 /* 2 * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 package jdk.incubator.http.internal.websocket; 24 25 import jdk.incubator.http.WebSocket.MessagePart; 26 import jdk.incubator.http.internal.websocket.Frame.Opcode; 27 import jdk.incubator.http.internal.websocket.TestSupport.F1; 28 import jdk.incubator.http.internal.websocket.TestSupport.F2; 29 import jdk.incubator.http.internal.websocket.TestSupport.InvocationChecker; 30 import jdk.incubator.http.internal.websocket.TestSupport.InvocationExpectation; 31 import jdk.incubator.http.internal.websocket.TestSupport.Mock; 32 33 import java.io.IOException; 34 import java.nio.ByteBuffer; 35 import java.nio.channels.ClosedChannelException; 36 import java.nio.channels.SelectionKey; 37 import java.util.Iterator; 38 import java.util.LinkedList; 39 import java.util.List; 40 import java.util.OptionalInt; 41 import java.util.concurrent.CompletableFuture; 42 import java.util.concurrent.CompletionStage; 43 import java.util.concurrent.Executor; 44 import java.util.concurrent.Executors; 45 import java.util.concurrent.TimeUnit; 46 import java.util.concurrent.atomic.AtomicBoolean; 47 import java.util.function.Supplier; 48 49 import static jdk.incubator.http.internal.websocket.Frame.MAX_HEADER_SIZE_BYTES; 50 51 final class MockChannel implements RawChannel, Mock { 52 53 /* Reads and writes must be able to be served concurrently, thus 2 threads */ // TODO: test this 54 private final Executor executor = Executors.newFixedThreadPool(2); 55 private final Object stateLock = new Object(); 56 private final Object readLock = new Object(); 57 private final Object writeLock = new Object(); 58 private volatile boolean closed; 59 private boolean isInputOpen = true; 60 private boolean isOutputOpen = true; 61 private final Frame.Reader reader = new Frame.Reader(); 62 private final MockFrameConsumer delegate; 63 private final Iterator<ReadRule> readScenario; 64 private ReadRule currentRule; 65 private final AtomicBoolean handedOver = new AtomicBoolean(); 66 67 private MockChannel(Iterable<ReadRule> scenario, 68 Iterable<InvocationExpectation> expectations) { 69 Iterator<ReadRule> iterator = scenario.iterator(); 70 if (!iterator.hasNext()) { 71 throw new RuntimeException(); 72 } 73 this.readScenario = iterator; 74 this.currentRule = iterator.next(); 75 this.delegate = new MockFrameConsumer(expectations); 76 } 77 78 @Override 79 public void registerEvent(RawEvent event) throws IOException { 80 int ops = event.interestOps(); 81 if ((ops & SelectionKey.OP_WRITE) != 0) { 82 synchronized (stateLock) { 83 checkOpen(); 84 executor.execute(event::handle); 85 } 86 } else if ((ops & SelectionKey.OP_READ) != 0) { 87 CompletionStage<?> cs; 88 synchronized (readLock) { 89 cs = currentRule().whenReady(); 90 synchronized (stateLock) { 91 checkOpen(); 92 cs.thenRun(() -> executor.execute(event::handle)); 93 } 94 } 95 } else { 96 throw new RuntimeException("Unexpected registration: " + ops); 97 } 98 } 99 100 @Override 101 public ByteBuffer initialByteBuffer() throws IllegalStateException { 102 if (!handedOver.compareAndSet(false, true)) { 103 throw new IllegalStateException(); 104 } 105 return ByteBuffer.allocate(0); 106 } 107 108 @Override 109 public ByteBuffer read() throws IOException { 110 synchronized (readLock) { 111 checkOpen(); 112 synchronized (stateLock) { 113 if (!isInputOpen) { 114 return null; 115 } 116 } 117 ByteBuffer r = currentRule().read(); 118 checkOpen(); 119 return r; 120 } 121 } 122 123 @Override 124 public long write(ByteBuffer[] src, int offset, int len) throws IOException { 125 synchronized (writeLock) { 126 checkOpen(); 127 synchronized (stateLock) { 128 if (!isOutputOpen) { 129 throw new ClosedChannelException(); 130 } 131 } 132 long n = 0; 133 for (int i = offset; i < offset + len && isOpen(); i++) { 134 ByteBuffer b = src[i]; 135 int rem = src[i].remaining(); 136 while (b.hasRemaining() && isOpen()) { 137 reader.readFrame(b, delegate); 138 } 139 n += rem; 140 } 141 checkOpen(); 142 return n; 143 } 144 } 145 146 public boolean isOpen() { 147 return !closed; 148 } 149 150 @Override 151 public void shutdownInput() throws IOException { 152 synchronized (stateLock) { 153 if (!isOpen()) { 154 throw new ClosedChannelException(); 155 } 156 isInputOpen = false; 157 } 158 } 159 160 @Override 161 public void shutdownOutput() throws IOException { 162 synchronized (stateLock) { 163 if (!isOpen()) { 164 throw new ClosedChannelException(); 165 } 166 isOutputOpen = false; 167 } 168 } 169 170 @Override 171 public void close() { 172 synchronized (stateLock) { 173 closed = true; 174 } 175 } 176 177 @Override 178 public String toString() { 179 return super.toString() + "[" + (closed ? "closed" : "open") + "]"; 180 } 181 182 private ReadRule currentRule() { 183 assert Thread.holdsLock(readLock); 184 while (!currentRule.applies()) { // There should be the terminal rule which always applies 185 currentRule = readScenario.next(); 186 } 187 return currentRule; 188 } 189 190 private void checkOpen() throws ClosedChannelException { 191 if (!isOpen()) { 192 throw new ClosedChannelException(); 193 } 194 } 195 196 @Override 197 public CompletableFuture<Void> expectations(long timeout, TimeUnit unit) { 198 return delegate.expectations(timeout, unit); 199 } 200 201 private static class MockFrameConsumer extends FrameConsumer implements Mock { 202 203 private final Frame.Masker masker = new Frame.Masker(); 204 205 MockFrameConsumer(Iterable<InvocationExpectation> expectations) { 206 super(new MockMessageStreamConsumer(expectations)); 207 } 208 209 @Override 210 public void mask(boolean value) { 211 } 212 213 @Override 214 public void maskingKey(int value) { 215 masker.mask(value); 216 } 217 218 @Override 219 public void payloadData(ByteBuffer data) { 220 int p = data.position(); 221 int l = data.limit(); 222 masker.transferMasking(data, data); 223 // select(p, l, data); FIXME 224 super.payloadData(data); 225 } 226 227 @Override 228 public CompletableFuture<Void> expectations(long timeout, TimeUnit unit) { 229 return ((Mock) getOutput()).expectations(timeout, unit); 230 } 231 } 232 233 private static final class MockMessageStreamConsumer implements MessageStreamConsumer, Mock { 234 235 private final InvocationChecker checker; 236 237 MockMessageStreamConsumer(Iterable<InvocationExpectation> expectations) { 238 checker = new InvocationChecker(expectations); 239 } 240 241 @Override 242 public void onText(MessagePart part, CharSequence data) { 243 checker.checkInvocation("onText", part, data); 244 } 245 246 @Override 247 public void onBinary(MessagePart part, ByteBuffer data) { 248 checker.checkInvocation("onBinary", part, data); 249 } 250 251 @Override 252 public void onPing(ByteBuffer data) { 253 checker.checkInvocation("onPing", data); 254 } 255 256 @Override 257 public void onPong(ByteBuffer data) { 258 checker.checkInvocation("onPong", data); 259 } 260 261 @Override 262 public void onClose(OptionalInt statusCode, CharSequence reason) { 263 checker.checkInvocation("onClose", statusCode, reason); 264 } 265 266 @Override 267 public void onError(Exception e) { 268 checker.checkInvocation("onError", e); 269 } 270 271 @Override 272 public void onComplete() { 273 checker.checkInvocation("onComplete"); 274 } 275 276 @Override 277 public CompletableFuture<Void> expectations(long timeout, TimeUnit unit) { 278 return checker.expectations(timeout, unit); 279 } 280 } 281 282 public static final class Builder { 283 284 private final Frame.HeaderWriter b = new Frame.HeaderWriter(); 285 private final List<InvocationExpectation> expectations = new LinkedList<>(); 286 private final List<ReadRule> scenario = new LinkedList<>(); 287 288 Builder expectPing(F1<? super ByteBuffer, Boolean> predicate) { 289 InvocationExpectation e = new InvocationExpectation("onPing", 290 args -> predicate.apply((ByteBuffer) args[0])); 291 expectations.add(e); 292 return this; 293 } 294 295 Builder expectPong(F1<? super ByteBuffer, Boolean> predicate) { 296 InvocationExpectation e = new InvocationExpectation("onPong", 297 args -> predicate.apply((ByteBuffer) args[0])); 298 expectations.add(e); 299 return this; 300 } 301 302 Builder expectClose(F2<? super Integer, ? super String, Boolean> predicate) { 303 InvocationExpectation e = new InvocationExpectation("onClose", 304 args -> predicate.apply((Integer) args[0], (String) args[1])); 305 expectations.add(e); 306 return this; 307 } 308 309 Builder provideFrame(boolean fin, boolean rsv1, boolean rsv2, 310 boolean rsv3, Opcode opcode, ByteBuffer data) { 311 312 ByteBuffer b = ByteBuffer.allocate(MAX_HEADER_SIZE_BYTES + data.remaining()); 313 this.b.fin(fin).rsv1(rsv1).rsv2(rsv2).rsv3(rsv3).opcode(opcode).noMask() 314 .payloadLen(data.remaining()).write(b); 315 316 int p = data.position(); 317 int l = data.limit(); 318 b.put(data); 319 b.flip(); 320 // select(p, l, data); FIXME 321 322 ReadRule r = new ReadRule() { 323 324 private volatile boolean provided; 325 326 @Override 327 public CompletionStage<?> whenReady() { 328 return NOW; 329 } 330 331 @Override 332 public ByteBuffer read() throws IOException { 333 provided = true; 334 return data; 335 } 336 337 @Override 338 public boolean applies() { 339 return !provided; 340 } 341 }; 342 scenario.add(r); 343 return this; 344 } 345 346 Builder provideEos() { 347 ReadRule r = new ReadRule() { 348 349 @Override 350 public CompletionStage<?> whenReady() { 351 return NOW; 352 } 353 354 @Override 355 public ByteBuffer read() throws IOException { 356 return null; 357 } 358 359 @Override 360 public boolean applies() { 361 return true; 362 } 363 }; 364 scenario.add(r); 365 return this; 366 } 367 368 Builder provideException(Supplier<? extends IOException> s) { 369 return this; 370 } 371 372 MockChannel build() { 373 LinkedList<ReadRule> scenario = new LinkedList<>(this.scenario); 374 scenario.add(new Terminator()); 375 return new MockChannel(scenario, new LinkedList<>(expectations)); 376 } 377 } 378 379 private interface ReadRule { 380 381 /* 382 * Returns a CS which when completed means `read(ByteBuffer dst)` can be 383 * invoked 384 */ 385 CompletionStage<?> whenReady(); 386 387 ByteBuffer read() throws IOException; 388 389 /* 390 * Returns true if this rule still applies, otherwise returns false 391 */ 392 boolean applies(); 393 } 394 395 public static final class Terminator implements ReadRule { 396 397 @Override 398 public CompletionStage<?> whenReady() { 399 return NEVER; 400 } 401 402 @Override 403 public ByteBuffer read() { 404 return ByteBuffer.allocate(0); 405 } 406 407 @Override 408 public boolean applies() { 409 return true; 410 } 411 } 412 413 private static final CompletionStage<?> NOW = CompletableFuture.completedStage(null); 414 private static final CompletionStage<?> NEVER = new CompletableFuture(); 415 }