1 /* 2 * Copyright (c) 2018, 2020, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.internal.net.http; 27 28 import jdk.internal.net.http.common.Demand; 29 import jdk.internal.net.http.common.FlowTube; 30 import jdk.internal.net.http.common.Logger; 31 import jdk.internal.net.http.common.Utils; 32 import jdk.internal.net.http.websocket.RawChannel; 33 34 import java.io.EOFException; 35 import java.io.IOException; 36 import java.lang.ref.Cleaner; 37 import java.nio.ByteBuffer; 38 import java.nio.channels.ClosedChannelException; 39 import java.nio.channels.SelectionKey; 40 import java.util.ArrayList; 41 import java.util.List; 42 import java.util.concurrent.ConcurrentLinkedQueue; 43 import java.util.concurrent.Flow; 44 import java.util.concurrent.atomic.AtomicBoolean; 45 import java.util.concurrent.atomic.AtomicReference; 46 import java.util.function.Supplier; 47 import java.lang.System.Logger.Level; 48 49 /* 50 * I/O abstraction used to implement WebSocket. 51 * 52 */ 53 public class RawChannelTube implements RawChannel { 54 55 final HttpConnection connection; 56 final FlowTube tube; 57 final WritePublisher writePublisher; 58 final ReadSubscriber readSubscriber; 59 final Supplier<ByteBuffer> initial; 60 final AtomicBoolean inited = new AtomicBoolean(); 61 final AtomicBoolean outputClosed = new AtomicBoolean(); 62 final AtomicBoolean inputClosed = new AtomicBoolean(); 63 final AtomicBoolean closed = new AtomicBoolean(); 64 final String dbgTag; 65 final Logger debug; 66 private static final Cleaner cleaner = 67 Utils.ASSERTIONSENABLED && Utils.DEBUG_WS ? Cleaner.create() : null; 68 69 RawChannelTube(HttpConnection connection, 70 Supplier<ByteBuffer> initial) { 71 this.connection = connection; 72 this.tube = connection.getConnectionFlow(); 73 this.initial = initial; 74 this.writePublisher = new WritePublisher(); 75 this.readSubscriber = new ReadSubscriber(); 76 dbgTag = "[WebSocket] RawChannelTube(" + tube +")"; 77 debug = Utils.getWebSocketLogger(dbgTag::toString, Utils.DEBUG_WS); 78 connection.client().webSocketOpen(); 79 connectFlows(); 80 if (Utils.ASSERTIONSENABLED && Utils.DEBUG_WS) { 81 // this is just for debug... 82 cleaner.register(this, new CleanupChecker(closed, debug)); 83 } 84 } 85 86 // Make sure no back reference to RawChannelTube can exist 87 // from this class. In particular it would be dangerous 88 // to reference connection, since connection has a reference 89 // to SocketTube with which a RawChannelTube is registered. 90 // Ditto for HttpClientImpl, which might have a back reference 91 // to the connection. 92 static final class CleanupChecker implements Runnable { 93 final AtomicBoolean closed; 94 final System.Logger debug; 95 CleanupChecker(AtomicBoolean closed, System.Logger debug) { 96 this.closed = closed; 97 this.debug = debug; 98 } 99 100 @Override 101 public void run() { 102 if (!closed.get()) { 103 debug.log(Level.DEBUG, 104 "RawChannelTube was not closed before being released"); 105 } 106 } 107 } 108 109 private void connectFlows() { 110 if (debug.on()) debug.log("connectFlows"); 111 tube.connectFlows(writePublisher, readSubscriber); 112 } 113 114 class WriteSubscription implements Flow.Subscription { 115 final Flow.Subscriber<? super List<ByteBuffer>> subscriber; 116 final Demand demand = new Demand(); 117 volatile boolean cancelled; 118 WriteSubscription(Flow.Subscriber<? super List<ByteBuffer>> subscriber) { 119 this.subscriber = subscriber; 120 } 121 @Override 122 public void request(long n) { 123 if (debug.on()) debug.log("WriteSubscription::request %d", n); 124 demand.increase(n); 125 RawEvent event; 126 while ((event = writePublisher.events.poll()) != null) { 127 if (debug.on()) debug.log("WriteSubscriber: handling event"); 128 event.handle(); 129 if (demand.isFulfilled()) break; 130 } 131 } 132 @Override 133 public void cancel() { 134 cancelled = true; 135 if (debug.on()) debug.log("WriteSubscription::cancel"); 136 shutdownOutput(); 137 RawEvent event; 138 while ((event = writePublisher.events.poll()) != null) { 139 if (debug.on()) debug.log("WriteSubscriber: handling event"); 140 event.handle(); 141 } 142 } 143 } 144 145 class WritePublisher implements FlowTube.TubePublisher { 146 final ConcurrentLinkedQueue<RawEvent> events = new ConcurrentLinkedQueue<>(); 147 volatile WriteSubscription writeSubscription; 148 @Override 149 public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) { 150 if (debug.on()) debug.log("WritePublisher::subscribe"); 151 WriteSubscription subscription = new WriteSubscription(subscriber); 152 subscriber.onSubscribe(subscription); 153 writeSubscription = subscription; 154 } 155 } 156 157 class ReadSubscriber implements FlowTube.TubeSubscriber { 158 159 volatile Flow.Subscription readSubscription; 160 volatile boolean completed; 161 long initialRequest; 162 final ConcurrentLinkedQueue<RawEvent> events = new ConcurrentLinkedQueue<>(); 163 final ConcurrentLinkedQueue<ByteBuffer> buffers = new ConcurrentLinkedQueue<>(); 164 final AtomicReference<Throwable> errorRef = new AtomicReference<>(); 165 166 void checkEvents() { 167 Flow.Subscription subscription = readSubscription; 168 if (subscription != null) { 169 Throwable error = errorRef.get(); 170 while (!buffers.isEmpty() || error != null || closed.get() || completed) { 171 RawEvent event = events.poll(); 172 if (event == null) break; 173 if (debug.on()) debug.log("ReadSubscriber: handling event"); 174 event.handle(); 175 } 176 } 177 } 178 179 @Override 180 public void onSubscribe(Flow.Subscription subscription) { 181 //buffers.add(initial.get()); 182 long n; 183 synchronized (this) { 184 readSubscription = subscription; 185 n = initialRequest; 186 initialRequest = 0; 187 } 188 if (debug.on()) debug.log("ReadSubscriber::onSubscribe"); 189 if (n > 0) { 190 Throwable error = errorRef.get(); 191 if (error == null && !closed.get() && !completed) { 192 if (debug.on()) debug.log("readSubscription: requesting " + n); 193 subscription.request(n); 194 } 195 } 196 checkEvents(); 197 } 198 199 @Override 200 public void onNext(List<ByteBuffer> item) { 201 if (debug.on()) debug.log(() -> "ReadSubscriber::onNext " 202 + Utils.remaining(item) + " bytes"); 203 buffers.addAll(item); 204 checkEvents(); 205 } 206 207 @Override 208 public void onError(Throwable throwable) { 209 if (closed.get() || errorRef.compareAndSet(null, throwable)) { 210 if (debug.on()) debug.log("ReadSubscriber::onError", throwable); 211 if (buffers.isEmpty()) { 212 checkEvents(); 213 shutdownInput(); 214 } 215 } 216 } 217 218 @Override 219 public void onComplete() { 220 if (debug.on()) debug.log("ReadSubscriber::onComplete"); 221 completed = true; 222 if (buffers.isEmpty()) { 223 checkEvents(); 224 shutdownInput(); 225 } 226 } 227 } 228 229 230 /* 231 * Registers given event whose callback will be called once only (i.e. 232 * register new event for each callback). 233 * 234 * Memory consistency effects: actions in a thread calling registerEvent 235 * happen-before any subsequent actions in the thread calling event.handle 236 */ 237 public void registerEvent(RawEvent event) throws IOException { 238 int interestOps = event.interestOps(); 239 if ((interestOps & SelectionKey.OP_WRITE) != 0) { 240 if (debug.on()) debug.log("register write event"); 241 if (outputClosed.get()) throw new IOException("closed output"); 242 writePublisher.events.add(event); 243 WriteSubscription writeSubscription = writePublisher.writeSubscription; 244 if (writeSubscription != null) { 245 while (!writeSubscription.demand.isFulfilled()) { 246 event = writePublisher.events.poll(); 247 if (event == null) break; 248 event.handle(); 249 } 250 } 251 } 252 if ((interestOps & SelectionKey.OP_READ) != 0) { 253 if (debug.on()) debug.log("register read event"); 254 if (inputClosed.get()) throw new IOException("closed input"); 255 readSubscriber.events.add(event); 256 readSubscriber.checkEvents(); 257 if (readSubscriber.buffers.isEmpty() 258 && !readSubscriber.events.isEmpty()) { 259 Flow.Subscription readSubscription = 260 readSubscriber.readSubscription; 261 if (readSubscription == null) { 262 synchronized (readSubscriber) { 263 readSubscription = readSubscriber.readSubscription; 264 if (readSubscription == null) { 265 readSubscriber.initialRequest = 1; 266 return; 267 } 268 } 269 } 270 assert readSubscription != null; 271 if (debug.on()) debug.log("readSubscription: requesting 1"); 272 readSubscription.request(1); 273 } 274 } 275 } 276 277 /** 278 * Hands over the initial bytes. Once the bytes have been returned they are 279 * no longer available and the method will throw an {@link 280 * IllegalStateException} on each subsequent invocation. 281 * 282 * @return the initial bytes 283 * @throws IllegalStateException 284 * if the method has been already invoked 285 */ 286 public ByteBuffer initialByteBuffer() throws IllegalStateException { 287 if (inited.compareAndSet(false, true)) { 288 return initial.get(); 289 } else throw new IllegalStateException("initial buffer already drained"); 290 } 291 292 /* 293 * Returns a ByteBuffer with the data read or null if EOF is reached. Has no 294 * remaining bytes if no data available at the moment. 295 */ 296 public ByteBuffer read() throws IOException { 297 if (debug.on()) debug.log("read"); 298 Flow.Subscription readSubscription = readSubscriber.readSubscription; 299 if (readSubscription == null) return Utils.EMPTY_BYTEBUFFER; 300 ByteBuffer buffer = readSubscriber.buffers.poll(); 301 if (buffer != null) { 302 if (debug.on()) debug.log("read: " + buffer.remaining()); 303 return buffer; 304 } 305 Throwable error = readSubscriber.errorRef.get(); 306 if (error != null) error = Utils.getIOException(error); 307 if (error instanceof EOFException) { 308 if (debug.on()) debug.log("read: EOFException"); 309 shutdownInput(); 310 return null; 311 } 312 if (error != null) { 313 if (debug.on()) debug.log("read: " + error); 314 if (closed.get()) { 315 return null; 316 } 317 shutdownInput(); 318 throw Utils.getIOException(error); 319 } 320 if (readSubscriber.completed) { 321 if (debug.on()) debug.log("read: EOF"); 322 shutdownInput(); 323 return null; 324 } 325 if (inputClosed.get()) { 326 if (debug.on()) debug.log("read: CLOSED"); 327 throw new IOException("closed output"); 328 } 329 if (debug.on()) debug.log("read: nothing to read"); 330 return Utils.EMPTY_BYTEBUFFER; 331 } 332 333 /* 334 * Writes a sequence of bytes to this channel from a subsequence of the 335 * given buffers. 336 */ 337 public long write(ByteBuffer[] srcs, int offset, int length) throws IOException { 338 if (outputClosed.get()) { 339 if (debug.on()) debug.log("write: CLOSED"); 340 throw new IOException("closed output"); 341 } 342 WriteSubscription writeSubscription = writePublisher.writeSubscription; 343 if (writeSubscription == null) { 344 if (debug.on()) debug.log("write: unsubscribed: 0"); 345 return 0; 346 } 347 if (writeSubscription.cancelled) { 348 if (debug.on()) debug.log("write: CANCELLED"); 349 shutdownOutput(); 350 throw new IOException("closed output"); 351 } 352 if (writeSubscription.demand.tryDecrement()) { 353 List<ByteBuffer> buffers = copy(srcs, offset, length); 354 long res = Utils.remaining(buffers); 355 if (debug.on()) debug.log("write: writing %d", res); 356 writeSubscription.subscriber.onNext(buffers); 357 return res; 358 } else { 359 if (debug.on()) debug.log("write: no demand: 0"); 360 return 0; 361 } 362 } 363 364 /** 365 * Shutdown the connection for reading without closing the channel. 366 * 367 * <p> Once shutdown for reading then further reads on the channel will 368 * return {@code null}, the end-of-stream indication. If the input side of 369 * the connection is already shutdown then invoking this method has no 370 * effect. 371 * 372 * @throws ClosedChannelException 373 * If this channel is closed 374 * @throws IOException 375 * If some other I/O error occurs 376 */ 377 public void shutdownInput() { 378 if (inputClosed.compareAndSet(false, true)) { 379 if (debug.on()) debug.log("shutdownInput"); 380 // TransportImpl will eventually call RawChannel::close. 381 // We must not call it here as this would close the socket 382 // and can cause an exception to back fire before 383 // TransportImpl and WebSocketImpl have updated their state. 384 } 385 } 386 387 /** 388 * Shutdown the connection for writing without closing the channel. 389 * 390 * <p> Once shutdown for writing then further attempts to write to the 391 * channel will throw {@link ClosedChannelException}. If the output side of 392 * the connection is already shutdown then invoking this method has no 393 * effect. 394 * 395 * @throws ClosedChannelException 396 * If this channel is closed 397 * @throws IOException 398 * If some other I/O error occurs 399 */ 400 public void shutdownOutput() { 401 if (outputClosed.compareAndSet(false, true)) { 402 if (debug.on()) debug.log("shutdownOutput"); 403 // TransportImpl will eventually call RawChannel::close. 404 // We must not call it here as this would close the socket 405 // and can cause an exception to back fire before 406 // TransportImpl and WebSocketImpl have updated their state. 407 } 408 } 409 410 /** 411 * Closes this channel. 412 * 413 * @throws IOException 414 * If an I/O error occurs 415 */ 416 @Override 417 public void close() { 418 if (closed.compareAndSet(false, true)) { 419 if (debug.on()) debug.log("close"); 420 connection.client().webSocketClose(); 421 connection.close(); 422 } 423 } 424 425 private static List<ByteBuffer> copy(ByteBuffer[] src, int offset, int len) { 426 int count = Math.min(len, src.length - offset); 427 if (count <= 0) return Utils.EMPTY_BB_LIST; 428 if (count == 1) return List.of(Utils.copy(src[offset])); 429 if (count == 2) return List.of(Utils.copy(src[offset]), Utils.copy(src[offset+1])); 430 List<ByteBuffer> list = new ArrayList<>(count); 431 for (int i = 0; i < count; i++) { 432 list.add(Utils.copy(src[offset + i])); 433 } 434 return list; 435 } 436 }