1 /*
   2  * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 package jdk.incubator.http.internal.websocket;
  24 
  25 import jdk.incubator.http.WebSocket.MessagePart;
  26 import jdk.incubator.http.internal.websocket.Frame.Opcode;
  27 import jdk.incubator.http.internal.websocket.TestSupport.F1;
  28 import jdk.incubator.http.internal.websocket.TestSupport.F2;
  29 import jdk.incubator.http.internal.websocket.TestSupport.InvocationChecker;
  30 import jdk.incubator.http.internal.websocket.TestSupport.InvocationExpectation;
  31 import jdk.incubator.http.internal.websocket.TestSupport.Mock;
  32 
  33 import java.io.IOException;
  34 import java.nio.ByteBuffer;
  35 import java.nio.channels.ClosedChannelException;
  36 import java.nio.channels.SelectionKey;
  37 import java.util.Iterator;
  38 import java.util.LinkedList;
  39 import java.util.List;
  40 import java.util.OptionalInt;
  41 import java.util.concurrent.CompletableFuture;
  42 import java.util.concurrent.CompletionStage;
  43 import java.util.concurrent.Executor;
  44 import java.util.concurrent.Executors;
  45 import java.util.concurrent.TimeUnit;
  46 import java.util.concurrent.atomic.AtomicBoolean;
  47 import java.util.function.Supplier;
  48 
  49 import static jdk.incubator.http.internal.websocket.Frame.MAX_HEADER_SIZE_BYTES;
  50 
  51 final class MockChannel implements RawChannel, Mock {
  52 
  53     /* Reads and writes must be able to be served concurrently, thus 2 threads */ // TODO: test this
  54     private final Executor executor = Executors.newFixedThreadPool(2);
  55     private final Object stateLock = new Object();
  56     private final Object readLock = new Object();
  57     private final Object writeLock = new Object();
  58     private volatile boolean closed;
  59     private boolean isInputOpen = true;
  60     private boolean isOutputOpen = true;
  61     private final Frame.Reader reader = new Frame.Reader();
  62     private final MockFrameConsumer delegate;
  63     private final Iterator<ReadRule> readScenario;
  64     private ReadRule currentRule;
  65     private final AtomicBoolean handedOver = new AtomicBoolean();
  66 
  67     private MockChannel(Iterable<ReadRule> scenario,
  68                         Iterable<InvocationExpectation> expectations) {
  69         Iterator<ReadRule> iterator = scenario.iterator();
  70         if (!iterator.hasNext()) {
  71             throw new RuntimeException();
  72         }
  73         this.readScenario = iterator;
  74         this.currentRule = iterator.next();
  75         this.delegate = new MockFrameConsumer(expectations);
  76     }
  77 
  78     @Override
  79     public void registerEvent(RawEvent event) throws IOException {
  80         int ops = event.interestOps();
  81         if ((ops & SelectionKey.OP_WRITE) != 0) {
  82             synchronized (stateLock) {
  83                 checkOpen();
  84                 executor.execute(event::handle);
  85             }
  86         } else if ((ops & SelectionKey.OP_READ) != 0) {
  87             CompletionStage<?> cs;
  88             synchronized (readLock) {
  89                 cs = currentRule().whenReady();
  90                 synchronized (stateLock) {
  91                     checkOpen();
  92                     cs.thenRun(() -> executor.execute(event::handle));
  93                 }
  94             }
  95         } else {
  96             throw new RuntimeException("Unexpected registration: " + ops);
  97         }
  98     }
  99 
 100     @Override
 101     public ByteBuffer initialByteBuffer() throws IllegalStateException {
 102         if (!handedOver.compareAndSet(false, true)) {
 103             throw new IllegalStateException();
 104         }
 105         return ByteBuffer.allocate(0);
 106     }
 107 
 108     @Override
 109     public ByteBuffer read() throws IOException {
 110         synchronized (readLock) {
 111             checkOpen();
 112             synchronized (stateLock) {
 113                 if (!isInputOpen) {
 114                     return null;
 115                 }
 116             }
 117             ByteBuffer r = currentRule().read();
 118             checkOpen();
 119             return r;
 120         }
 121     }
 122 
 123     @Override
 124     public long write(ByteBuffer[] src, int offset, int len) throws IOException {
 125         synchronized (writeLock) {
 126             checkOpen();
 127             synchronized (stateLock) {
 128                 if (!isOutputOpen) {
 129                     throw new ClosedChannelException();
 130                 }
 131             }
 132             long n = 0;
 133             for (int i = offset; i < offset + len && isOpen(); i++) {
 134                 ByteBuffer b = src[i];
 135                 int rem = src[i].remaining();
 136                 while (b.hasRemaining() && isOpen()) {
 137                     reader.readFrame(b, delegate);
 138                 }
 139                 n += rem;
 140             }
 141             checkOpen();
 142             return n;
 143         }
 144     }
 145 
 146     public boolean isOpen() {
 147         return !closed;
 148     }
 149 
 150     @Override
 151     public void shutdownInput() throws IOException {
 152         synchronized (stateLock) {
 153             if (!isOpen()) {
 154                 throw new ClosedChannelException();
 155             }
 156             isInputOpen = false;
 157         }
 158     }
 159 
 160     @Override
 161     public void shutdownOutput() throws IOException {
 162         synchronized (stateLock) {
 163             if (!isOpen()) {
 164                 throw new ClosedChannelException();
 165             }
 166             isOutputOpen = false;
 167         }
 168     }
 169 
 170     @Override
 171     public void close() {
 172         synchronized (stateLock) {
 173             closed = true;
 174         }
 175     }
 176 
 177     @Override
 178     public String toString() {
 179         return super.toString() + "[" + (closed ? "closed" : "open") + "]";
 180     }
 181 
 182     private ReadRule currentRule() {
 183         assert Thread.holdsLock(readLock);
 184         while (!currentRule.applies()) { // There should be the terminal rule which always applies
 185             currentRule = readScenario.next();
 186         }
 187         return currentRule;
 188     }
 189 
 190     private void checkOpen() throws ClosedChannelException {
 191         if (!isOpen()) {
 192             throw new ClosedChannelException();
 193         }
 194     }
 195 
 196     @Override
 197     public CompletableFuture<Void> expectations(long timeout, TimeUnit unit) {
 198         return delegate.expectations(timeout, unit);
 199     }
 200 
 201     private static class MockFrameConsumer extends FrameConsumer implements Mock {
 202 
 203         private final Frame.Masker masker = new Frame.Masker();
 204 
 205         MockFrameConsumer(Iterable<InvocationExpectation> expectations) {
 206             super(new MockMessageStreamConsumer(expectations));
 207         }
 208 
 209         @Override
 210         public void mask(boolean value) {
 211         }
 212 
 213         @Override
 214         public void maskingKey(int value) {
 215             masker.mask(value);
 216         }
 217 
 218         @Override
 219         public void payloadData(ByteBuffer data) {
 220             int p = data.position();
 221             int l = data.limit();
 222             masker.transferMasking(data, data);
 223 //            select(p, l, data); FIXME
 224             super.payloadData(data);
 225         }
 226 
 227         @Override
 228         public CompletableFuture<Void> expectations(long timeout, TimeUnit unit) {
 229             return ((Mock) getOutput()).expectations(timeout, unit);
 230         }
 231     }
 232 
 233     private static final class MockMessageStreamConsumer implements MessageStreamConsumer, Mock {
 234 
 235         private final InvocationChecker checker;
 236 
 237         MockMessageStreamConsumer(Iterable<InvocationExpectation> expectations) {
 238             checker = new InvocationChecker(expectations);
 239         }
 240 
 241         @Override
 242         public void onText(MessagePart part, CharSequence data) {
 243             checker.checkInvocation("onText", part, data);
 244         }
 245 
 246         @Override
 247         public void onBinary(MessagePart part, ByteBuffer data) {
 248             checker.checkInvocation("onBinary", part, data);
 249         }
 250 
 251         @Override
 252         public void onPing(ByteBuffer data) {
 253             checker.checkInvocation("onPing", data);
 254         }
 255 
 256         @Override
 257         public void onPong(ByteBuffer data) {
 258             checker.checkInvocation("onPong", data);
 259         }
 260 
 261         @Override
 262         public void onClose(OptionalInt statusCode, CharSequence reason) {
 263             checker.checkInvocation("onClose", statusCode, reason);
 264         }
 265 
 266         @Override
 267         public void onError(Exception e) {
 268             checker.checkInvocation("onError", e);
 269         }
 270 
 271         @Override
 272         public void onComplete() {
 273             checker.checkInvocation("onComplete");
 274         }
 275 
 276         @Override
 277         public CompletableFuture<Void> expectations(long timeout, TimeUnit unit) {
 278             return checker.expectations(timeout, unit);
 279         }
 280     }
 281 
 282     public static final class Builder {
 283 
 284         private final Frame.HeaderWriter b = new Frame.HeaderWriter();
 285         private final List<InvocationExpectation> expectations = new LinkedList<>();
 286         private final List<ReadRule> scenario = new LinkedList<>();
 287 
 288         Builder expectPing(F1<? super ByteBuffer, Boolean> predicate) {
 289             InvocationExpectation e = new InvocationExpectation("onPing",
 290                     args -> predicate.apply((ByteBuffer) args[0]));
 291             expectations.add(e);
 292             return this;
 293         }
 294 
 295         Builder expectPong(F1<? super ByteBuffer, Boolean> predicate) {
 296             InvocationExpectation e = new InvocationExpectation("onPong",
 297                     args -> predicate.apply((ByteBuffer) args[0]));
 298             expectations.add(e);
 299             return this;
 300         }
 301 
 302         Builder expectClose(F2<? super Integer, ? super String, Boolean> predicate) {
 303             InvocationExpectation e = new InvocationExpectation("onClose",
 304                     args -> predicate.apply((Integer) args[0], (String) args[1]));
 305             expectations.add(e);
 306             return this;
 307         }
 308 
 309         Builder provideFrame(boolean fin, boolean rsv1, boolean rsv2,
 310                              boolean rsv3, Opcode opcode, ByteBuffer data) {
 311 
 312             ByteBuffer b = ByteBuffer.allocate(MAX_HEADER_SIZE_BYTES + data.remaining());
 313             this.b.fin(fin).rsv1(rsv1).rsv2(rsv2).rsv3(rsv3).opcode(opcode).noMask()
 314                     .payloadLen(data.remaining()).write(b);
 315 
 316             int p = data.position();
 317             int l = data.limit();
 318             b.put(data);
 319             b.flip();
 320 //            select(p, l, data); FIXME
 321 
 322             ReadRule r = new ReadRule() {
 323 
 324                 private volatile boolean provided;
 325 
 326                 @Override
 327                 public CompletionStage<?> whenReady() {
 328                     return NOW;
 329                 }
 330 
 331                 @Override
 332                 public ByteBuffer read() throws IOException {
 333                     provided = true;
 334                     return data;
 335                 }
 336 
 337                 @Override
 338                 public boolean applies() {
 339                     return !provided;
 340                 }
 341             };
 342             scenario.add(r);
 343             return this;
 344         }
 345 
 346         Builder provideEos() {
 347             ReadRule r = new ReadRule() {
 348 
 349                 @Override
 350                 public CompletionStage<?> whenReady() {
 351                     return NOW;
 352                 }
 353 
 354                 @Override
 355                 public ByteBuffer read() throws IOException {
 356                     return null;
 357                 }
 358 
 359                 @Override
 360                 public boolean applies() {
 361                     return true;
 362                 }
 363             };
 364             scenario.add(r);
 365             return this;
 366         }
 367 
 368         Builder provideException(Supplier<? extends IOException> s) {
 369             return this;
 370         }
 371 
 372         MockChannel build() {
 373             LinkedList<ReadRule> scenario = new LinkedList<>(this.scenario);
 374             scenario.add(new Terminator());
 375             return new MockChannel(scenario, new LinkedList<>(expectations));
 376         }
 377     }
 378 
 379     private interface ReadRule {
 380 
 381         /*
 382          * Returns a CS which when completed means `read(ByteBuffer dst)` can be
 383          * invoked
 384          */
 385         CompletionStage<?> whenReady();
 386 
 387         ByteBuffer read() throws IOException;
 388 
 389         /*
 390          * Returns true if this rule still applies, otherwise returns false
 391          */
 392         boolean applies();
 393     }
 394 
 395     public static final class Terminator implements ReadRule {
 396 
 397         @Override
 398         public CompletionStage<?> whenReady() {
 399             return NEVER;
 400         }
 401 
 402         @Override
 403         public ByteBuffer read() {
 404             return ByteBuffer.allocate(0);
 405         }
 406 
 407         @Override
 408         public boolean applies() {
 409             return true;
 410         }
 411     }
 412 
 413     private static final CompletionStage<?> NOW = CompletableFuture.completedStage(null);
 414     private static final CompletionStage<?> NEVER = new CompletableFuture();
 415 }