1 /*
   2  * Copyright (c) 2018, 2020, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.internal.net.http;
  27 
  28 import jdk.internal.net.http.common.Demand;
  29 import jdk.internal.net.http.common.FlowTube;
  30 import jdk.internal.net.http.common.Logger;
  31 import jdk.internal.net.http.common.Utils;
  32 import jdk.internal.net.http.websocket.RawChannel;
  33 
  34 import java.io.EOFException;
  35 import java.io.IOException;
  36 import java.lang.ref.Cleaner;
  37 import java.nio.ByteBuffer;
  38 import java.nio.channels.ClosedChannelException;
  39 import java.nio.channels.SelectionKey;
  40 import java.util.ArrayList;
  41 import java.util.List;
  42 import java.util.concurrent.ConcurrentLinkedQueue;
  43 import java.util.concurrent.Flow;
  44 import java.util.concurrent.atomic.AtomicBoolean;
  45 import java.util.concurrent.atomic.AtomicReference;
  46 import java.util.function.Supplier;
  47 import java.lang.System.Logger.Level;
  48 
  49 /*
  50  * I/O abstraction used to implement WebSocket.
  51  *
  52  */
  53 public class RawChannelTube implements RawChannel {
  54 
  55     final HttpConnection connection;
  56     final FlowTube tube;
  57     final WritePublisher writePublisher;
  58     final ReadSubscriber readSubscriber;
  59     final Supplier<ByteBuffer> initial;
  60     final AtomicBoolean inited = new AtomicBoolean();
  61     final AtomicBoolean outputClosed = new AtomicBoolean();
  62     final AtomicBoolean inputClosed = new AtomicBoolean();
  63     final AtomicBoolean closed = new AtomicBoolean();
  64     final String dbgTag;
  65     final Logger debug;
  66     private static final Cleaner cleaner =
  67             Utils.ASSERTIONSENABLED  && Utils.DEBUG_WS ? Cleaner.create() : null;
  68 
  69     RawChannelTube(HttpConnection connection,
  70                    Supplier<ByteBuffer> initial) {
  71         this.connection = connection;
  72         this.tube = connection.getConnectionFlow();
  73         this.initial = initial;
  74         this.writePublisher = new WritePublisher();
  75         this.readSubscriber = new ReadSubscriber();
  76         dbgTag = "[WebSocket] RawChannelTube(" + tube +")";
  77         debug = Utils.getWebSocketLogger(dbgTag::toString, Utils.DEBUG_WS);
  78         connection.client().webSocketOpen();
  79         connectFlows();
  80         if (Utils.ASSERTIONSENABLED && Utils.DEBUG_WS) {
  81             // this is just for debug...
  82             cleaner.register(this, new CleanupChecker(closed, debug));
  83         }
  84     }
  85 
  86     // Make sure no back reference to RawChannelTube can exist
  87     // from this class. In particular it would be dangerous
  88     // to reference connection, since connection has a reference
  89     // to SocketTube with which a RawChannelTube is registered.
  90     // Ditto for HttpClientImpl, which might have a back reference
  91     // to the connection.
  92     static final class CleanupChecker implements Runnable {
  93         final AtomicBoolean closed;
  94         final System.Logger debug;
  95         CleanupChecker(AtomicBoolean closed, System.Logger debug) {
  96             this.closed = closed;
  97             this.debug = debug;
  98         }
  99 
 100         @Override
 101         public void run() {
 102             if (!closed.get()) {
 103                 debug.log(Level.DEBUG,
 104                          "RawChannelTube was not closed before being released");
 105             }
 106         }
 107     }
 108 
 109     private void connectFlows() {
 110         if (debug.on()) debug.log("connectFlows");
 111         tube.connectFlows(writePublisher, readSubscriber);
 112     }
 113 
 114     class WriteSubscription implements Flow.Subscription {
 115         final Flow.Subscriber<? super List<ByteBuffer>> subscriber;
 116         final Demand demand = new Demand();
 117         volatile boolean cancelled;
 118         WriteSubscription(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
 119             this.subscriber = subscriber;
 120         }
 121         @Override
 122         public void request(long n) {
 123             if (debug.on()) debug.log("WriteSubscription::request %d", n);
 124             demand.increase(n);
 125             RawEvent event;
 126             while ((event = writePublisher.events.poll()) != null) {
 127                 if (debug.on()) debug.log("WriteSubscriber: handling event");
 128                 event.handle();
 129                 if (demand.isFulfilled()) break;
 130             }
 131         }
 132         @Override
 133         public void cancel() {
 134             cancelled = true;
 135             if (debug.on()) debug.log("WriteSubscription::cancel");
 136             shutdownOutput();
 137             RawEvent event;
 138             while ((event = writePublisher.events.poll()) != null) {
 139                 if (debug.on()) debug.log("WriteSubscriber: handling event");
 140                 event.handle();
 141             }
 142         }
 143     }
 144 
 145     class WritePublisher implements FlowTube.TubePublisher {
 146         final ConcurrentLinkedQueue<RawEvent> events = new ConcurrentLinkedQueue<>();
 147         volatile WriteSubscription writeSubscription;
 148         @Override
 149         public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
 150             if (debug.on()) debug.log("WritePublisher::subscribe");
 151             WriteSubscription subscription = new WriteSubscription(subscriber);
 152             subscriber.onSubscribe(subscription);
 153             writeSubscription = subscription;
 154         }
 155     }
 156 
 157     class ReadSubscriber implements  FlowTube.TubeSubscriber {
 158 
 159         volatile Flow.Subscription readSubscription;
 160         volatile boolean completed;
 161         long initialRequest;
 162         final ConcurrentLinkedQueue<RawEvent> events = new ConcurrentLinkedQueue<>();
 163         final ConcurrentLinkedQueue<ByteBuffer> buffers = new ConcurrentLinkedQueue<>();
 164         final AtomicReference<Throwable> errorRef = new AtomicReference<>();
 165 
 166         void checkEvents() {
 167             Flow.Subscription subscription = readSubscription;
 168             if (subscription != null) {
 169                 Throwable error = errorRef.get();
 170                 while (!buffers.isEmpty() || error != null || closed.get() || completed) {
 171                     RawEvent event = events.poll();
 172                     if (event == null) break;
 173                     if (debug.on()) debug.log("ReadSubscriber: handling event");
 174                     event.handle();
 175                 }
 176             }
 177         }
 178 
 179         @Override
 180         public void onSubscribe(Flow.Subscription subscription) {
 181             //buffers.add(initial.get());
 182             long n;
 183             synchronized (this) {
 184                 readSubscription = subscription;
 185                 n = initialRequest;
 186                 initialRequest = 0;
 187             }
 188             if (debug.on()) debug.log("ReadSubscriber::onSubscribe");
 189             if (n > 0) {
 190                 Throwable error = errorRef.get();
 191                 if (error == null && !closed.get() && !completed) {
 192                     if (debug.on()) debug.log("readSubscription: requesting " + n);
 193                     subscription.request(n);
 194                 }
 195             }
 196             checkEvents();
 197         }
 198 
 199         @Override
 200         public void onNext(List<ByteBuffer> item) {
 201             if (debug.on()) debug.log(() -> "ReadSubscriber::onNext "
 202                     + Utils.remaining(item) + " bytes");
 203             buffers.addAll(item);
 204             checkEvents();
 205         }
 206 
 207         @Override
 208         public void onError(Throwable throwable) {
 209             if (closed.get() || errorRef.compareAndSet(null, throwable)) {
 210                 if (debug.on()) debug.log("ReadSubscriber::onError", throwable);
 211                 if (buffers.isEmpty()) {
 212                     checkEvents();
 213                     shutdownInput();
 214                 }
 215             }
 216         }
 217 
 218         @Override
 219         public void onComplete() {
 220             if (debug.on()) debug.log("ReadSubscriber::onComplete");
 221             completed = true;
 222             if (buffers.isEmpty()) {
 223                 checkEvents();
 224                 shutdownInput();
 225             }
 226         }
 227     }
 228 
 229 
 230     /*
 231      * Registers given event whose callback will be called once only (i.e.
 232      * register new event for each callback).
 233      *
 234      * Memory consistency effects: actions in a thread calling registerEvent
 235      * happen-before any subsequent actions in the thread calling event.handle
 236      */
 237     public void registerEvent(RawEvent event) throws IOException {
 238         int interestOps = event.interestOps();
 239         if ((interestOps & SelectionKey.OP_WRITE) != 0) {
 240             if (debug.on()) debug.log("register write event");
 241             if (outputClosed.get()) throw new IOException("closed output");
 242             writePublisher.events.add(event);
 243             WriteSubscription writeSubscription = writePublisher.writeSubscription;
 244             if (writeSubscription != null) {
 245                 while (!writeSubscription.demand.isFulfilled()) {
 246                     event = writePublisher.events.poll();
 247                     if (event == null) break;
 248                     event.handle();
 249                 }
 250             }
 251         }
 252         if ((interestOps & SelectionKey.OP_READ) != 0) {
 253             if (debug.on()) debug.log("register read event");
 254             if (inputClosed.get()) throw new IOException("closed input");
 255             readSubscriber.events.add(event);
 256             readSubscriber.checkEvents();
 257             if (readSubscriber.buffers.isEmpty()
 258                     && !readSubscriber.events.isEmpty()) {
 259                 Flow.Subscription readSubscription =
 260                         readSubscriber.readSubscription;
 261                 if (readSubscription == null) {
 262                     synchronized (readSubscriber) {
 263                         readSubscription = readSubscriber.readSubscription;
 264                         if (readSubscription == null) {
 265                             readSubscriber.initialRequest = 1;
 266                             return;
 267                         }
 268                     }
 269                 }
 270                 assert  readSubscription != null;
 271                 if (debug.on()) debug.log("readSubscription: requesting 1");
 272                 readSubscription.request(1);
 273             }
 274         }
 275     }
 276 
 277     /**
 278      * Hands over the initial bytes. Once the bytes have been returned they are
 279      * no longer available and the method will throw an {@link
 280      * IllegalStateException} on each subsequent invocation.
 281      *
 282      * @return the initial bytes
 283      * @throws IllegalStateException
 284      *         if the method has been already invoked
 285      */
 286     public ByteBuffer initialByteBuffer() throws IllegalStateException {
 287         if (inited.compareAndSet(false, true)) {
 288             return initial.get();
 289         } else throw new IllegalStateException("initial buffer already drained");
 290     }
 291 
 292     /*
 293      * Returns a ByteBuffer with the data read or null if EOF is reached. Has no
 294      * remaining bytes if no data available at the moment.
 295      */
 296     public ByteBuffer read() throws IOException {
 297         if (debug.on()) debug.log("read");
 298         Flow.Subscription readSubscription = readSubscriber.readSubscription;
 299         if (readSubscription == null) return Utils.EMPTY_BYTEBUFFER;
 300         ByteBuffer buffer = readSubscriber.buffers.poll();
 301         if (buffer != null) {
 302             if (debug.on()) debug.log("read: " + buffer.remaining());
 303             return buffer;
 304         }
 305         Throwable error = readSubscriber.errorRef.get();
 306         if (error != null) error = Utils.getIOException(error);
 307         if (error instanceof EOFException) {
 308             if (debug.on()) debug.log("read: EOFException");
 309             shutdownInput();
 310             return null;
 311         }
 312         if (error != null) {
 313             if (debug.on()) debug.log("read: " + error);
 314             if (closed.get()) {
 315                 return null;
 316             }
 317             shutdownInput();
 318             throw Utils.getIOException(error);
 319         }
 320         if (readSubscriber.completed) {
 321             if (debug.on()) debug.log("read: EOF");
 322             shutdownInput();
 323             return null;
 324         }
 325         if (inputClosed.get()) {
 326             if (debug.on()) debug.log("read: CLOSED");
 327             throw new IOException("closed output");
 328         }
 329         if (debug.on()) debug.log("read: nothing to read");
 330         return Utils.EMPTY_BYTEBUFFER;
 331     }
 332 
 333     /*
 334      * Writes a sequence of bytes to this channel from a subsequence of the
 335      * given buffers.
 336      */
 337     public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
 338         if (outputClosed.get()) {
 339             if (debug.on()) debug.log("write: CLOSED");
 340             throw new IOException("closed output");
 341         }
 342         WriteSubscription writeSubscription =  writePublisher.writeSubscription;
 343         if (writeSubscription == null) {
 344             if (debug.on()) debug.log("write: unsubscribed: 0");
 345             return 0;
 346         }
 347         if (writeSubscription.cancelled) {
 348             if (debug.on()) debug.log("write: CANCELLED");
 349             shutdownOutput();
 350             throw new IOException("closed output");
 351         }
 352         if (writeSubscription.demand.tryDecrement()) {
 353             List<ByteBuffer> buffers = copy(srcs, offset, length);
 354             long res = Utils.remaining(buffers);
 355             if (debug.on()) debug.log("write: writing %d", res);
 356             writeSubscription.subscriber.onNext(buffers);
 357             return res;
 358         } else {
 359             if (debug.on()) debug.log("write: no demand: 0");
 360             return 0;
 361         }
 362     }
 363 
 364     /**
 365      * Shutdown the connection for reading without closing the channel.
 366      *
 367      * <p> Once shutdown for reading then further reads on the channel will
 368      * return {@code null}, the end-of-stream indication. If the input side of
 369      * the connection is already shutdown then invoking this method has no
 370      * effect.
 371      *
 372      * @throws ClosedChannelException
 373      *         If this channel is closed
 374      * @throws IOException
 375      *         If some other I/O error occurs
 376      */
 377     public void shutdownInput() {
 378         if (inputClosed.compareAndSet(false, true)) {
 379             if (debug.on()) debug.log("shutdownInput");
 380             // TransportImpl will eventually call RawChannel::close.
 381             // We must not call it here as this would close the socket
 382             // and can cause an exception to back fire before
 383             // TransportImpl and WebSocketImpl have updated their state.
 384         }
 385     }
 386 
 387     /**
 388      * Shutdown the connection for writing without closing the channel.
 389      *
 390      * <p> Once shutdown for writing then further attempts to write to the
 391      * channel will throw {@link ClosedChannelException}. If the output side of
 392      * the connection is already shutdown then invoking this method has no
 393      * effect.
 394      *
 395      * @throws ClosedChannelException
 396      *         If this channel is closed
 397      * @throws IOException
 398      *         If some other I/O error occurs
 399      */
 400     public void shutdownOutput() {
 401         if (outputClosed.compareAndSet(false, true)) {
 402             if (debug.on()) debug.log("shutdownOutput");
 403             // TransportImpl will eventually call RawChannel::close.
 404             // We must not call it here as this would close the socket
 405             // and can cause an exception to back fire before
 406             // TransportImpl and WebSocketImpl have updated their state.
 407         }
 408     }
 409 
 410     /**
 411      * Closes this channel.
 412      *
 413      * @throws IOException
 414      *         If an I/O error occurs
 415      */
 416     @Override
 417     public void close() {
 418         if (closed.compareAndSet(false, true)) {
 419             if (debug.on()) debug.log("close");
 420             connection.client().webSocketClose();
 421             connection.close();
 422         }
 423     }
 424 
 425     private static List<ByteBuffer> copy(ByteBuffer[] src, int offset, int len) {
 426         int count = Math.min(len, src.length - offset);
 427         if (count <= 0) return Utils.EMPTY_BB_LIST;
 428         if (count == 1) return List.of(Utils.copy(src[offset]));
 429         if (count == 2) return List.of(Utils.copy(src[offset]), Utils.copy(src[offset+1]));
 430         List<ByteBuffer> list = new ArrayList<>(count);
 431         for (int i = 0; i < count; i++) {
 432             list.add(Utils.copy(src[offset + i]));
 433         }
 434         return list;
 435     }
 436 }