1 /* 2 * Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.internal.net.http; 27 28 import java.nio.ByteBuffer; 29 import java.nio.CharBuffer; 30 import java.nio.charset.CharacterCodingException; 31 import java.nio.charset.Charset; 32 import java.nio.charset.CharsetDecoder; 33 import java.nio.charset.CoderResult; 34 import java.nio.charset.CodingErrorAction; 35 import java.util.List; 36 import java.util.Objects; 37 import java.util.concurrent.CompletableFuture; 38 import java.util.concurrent.CompletionStage; 39 import java.util.concurrent.ConcurrentLinkedDeque; 40 import java.util.concurrent.Flow; 41 import java.util.concurrent.Flow.Subscriber; 42 import java.util.concurrent.Flow.Subscription; 43 import java.util.concurrent.atomic.AtomicBoolean; 44 import java.util.concurrent.atomic.AtomicLong; 45 import java.util.concurrent.atomic.AtomicReference; 46 import java.util.function.Function; 47 import jdk.internal.net.http.common.Demand; 48 import java.net.http.HttpResponse.BodySubscriber; 49 import jdk.internal.net.http.common.MinimalFuture; 50 import jdk.internal.net.http.common.SequentialScheduler; 51 52 /** An adapter between {@code BodySubscriber} and {@code Flow.Subscriber<String>}. */ 53 public final class LineSubscriberAdapter<S extends Subscriber<? super String>,R> 54 implements BodySubscriber<R> { 55 private final CompletableFuture<R> cf = new MinimalFuture<>(); 56 private final S subscriber; 57 private final Function<? super S, ? extends R> finisher; 58 private final Charset charset; 59 private final String eol; 60 private final AtomicBoolean subscribed = new AtomicBoolean(); 61 private volatile LineSubscription downstream; 62 63 private LineSubscriberAdapter(S subscriber, 64 Function<? super S, ? extends R> finisher, 65 Charset charset, 66 String eol) { 67 if (eol != null && eol.isEmpty()) 68 throw new IllegalArgumentException("empty line separator"); 69 this.subscriber = Objects.requireNonNull(subscriber); 70 this.finisher = Objects.requireNonNull(finisher); 71 this.charset = Objects.requireNonNull(charset); 72 this.eol = eol; 73 } 74 75 @Override 76 public void onSubscribe(Subscription subscription) { 77 Objects.requireNonNull(subscription); 78 if (!subscribed.compareAndSet(false, true)) { 79 subscription.cancel(); 80 return; 81 } 82 83 downstream = LineSubscription.create(subscription, 84 charset, 85 eol, 86 subscriber, 87 cf); 88 subscriber.onSubscribe(downstream); 89 } 90 91 @Override 92 public void onNext(List<ByteBuffer> item) { 93 Objects.requireNonNull(item); 94 try { 95 downstream.submit(item); 96 } catch (Throwable t) { 97 onError(t); 98 } 99 } 100 101 @Override 102 public void onError(Throwable throwable) { 103 Objects.requireNonNull(throwable); 104 try { 105 downstream.signalError(throwable); 106 } finally { 107 cf.completeExceptionally(throwable); 108 } 109 } 110 111 @Override 112 public void onComplete() { 113 try { 114 downstream.signalComplete(); 115 } finally { 116 cf.complete(finisher.apply(subscriber)); 117 } 118 } 119 120 @Override 121 public CompletionStage<R> getBody() { 122 return cf; 123 } 124 125 public static <S extends Subscriber<? super String>, R> LineSubscriberAdapter<S, R> 126 create(S subscriber, Function<? super S, ? extends R> finisher, Charset charset, String eol) 127 { 128 if (eol != null && eol.isEmpty()) 129 throw new IllegalArgumentException("empty line separator"); 130 return new LineSubscriberAdapter<>(Objects.requireNonNull(subscriber), 131 Objects.requireNonNull(finisher), 132 Objects.requireNonNull(charset), 133 eol); 134 } 135 136 static final class LineSubscription implements Flow.Subscription { 137 final Flow.Subscription upstreamSubscription; 138 final CharsetDecoder decoder; 139 final String newline; 140 final Demand downstreamDemand; 141 final ConcurrentLinkedDeque<ByteBuffer> queue; 142 final SequentialScheduler scheduler; 143 final Flow.Subscriber<? super String> upstream; 144 final CompletableFuture<?> cf; 145 private final AtomicReference<Throwable> errorRef = new AtomicReference<>(); 146 private final AtomicLong demanded = new AtomicLong(); 147 private volatile boolean completed; 148 private volatile boolean cancelled; 149 150 private final char[] chars = new char[1024]; 151 private final ByteBuffer leftover = ByteBuffer.wrap(new byte[64]); 152 private final CharBuffer buffer = CharBuffer.wrap(chars); 153 private final StringBuilder builder = new StringBuilder(); 154 private String nextLine; 155 156 private LineSubscription(Flow.Subscription s, 157 CharsetDecoder dec, 158 String separator, 159 Flow.Subscriber<? super String> subscriber, 160 CompletableFuture<?> completion) { 161 downstreamDemand = new Demand(); 162 queue = new ConcurrentLinkedDeque<>(); 163 upstreamSubscription = Objects.requireNonNull(s); 164 decoder = Objects.requireNonNull(dec); 165 newline = separator; 166 upstream = Objects.requireNonNull(subscriber); 167 cf = Objects.requireNonNull(completion); 168 scheduler = SequentialScheduler.synchronizedScheduler(this::loop); 169 } 170 171 @Override 172 public void request(long n) { 173 if (cancelled) return; 174 if (downstreamDemand.increase(n)) { 175 scheduler.runOrSchedule(); 176 } 177 } 178 179 @Override 180 public void cancel() { 181 cancelled = true; 182 upstreamSubscription.cancel(); 183 } 184 185 public void submit(List<ByteBuffer> list) { 186 queue.addAll(list); 187 demanded.decrementAndGet(); 188 scheduler.runOrSchedule(); 189 } 190 191 public void signalComplete() { 192 completed = true; 193 scheduler.runOrSchedule(); 194 } 195 196 public void signalError(Throwable error) { 197 if (errorRef.compareAndSet(null, 198 Objects.requireNonNull(error))) { 199 scheduler.runOrSchedule(); 200 } 201 } 202 203 // This method looks at whether some bytes where left over (in leftover) 204 // from decoding the previous buffer when the previous buffer was in 205 // underflow. If so, it takes bytes one by one from the new buffer 'in' 206 // and combines them with the leftover bytes until 'in' is exhausted or a 207 // character was produced in 'out', resolving the previous underflow. 208 // Returns true if the buffer is still in underflow, false otherwise. 209 // However, in both situation some chars might have been produced in 'out'. 210 private boolean isUnderFlow(ByteBuffer in, CharBuffer out, boolean endOfInput) 211 throws CharacterCodingException { 212 int limit = leftover.position(); 213 if (limit == 0) { 214 // no leftover 215 return false; 216 } else { 217 CoderResult res = null; 218 while (in.hasRemaining()) { 219 leftover.position(limit); 220 leftover.limit(++limit); 221 leftover.put(in.get()); 222 leftover.position(0); 223 res = decoder.decode(leftover, out, 224 endOfInput && !in.hasRemaining()); 225 int remaining = leftover.remaining(); 226 if (remaining > 0) { 227 assert leftover.position() == 0; 228 leftover.position(remaining); 229 } else { 230 leftover.position(0); 231 } 232 leftover.limit(leftover.capacity()); 233 if (res.isUnderflow() && remaining > 0 && in.hasRemaining()) { 234 continue; 235 } 236 if (res.isError()) { 237 res.throwException(); 238 } 239 assert !res.isOverflow(); 240 return false; 241 } 242 return !endOfInput; 243 } 244 } 245 246 // extract characters from start to end and remove them from 247 // the StringBuilder 248 private static String take(StringBuilder b, int start, int end) { 249 assert start == 0; 250 String line; 251 if (end == start) return ""; 252 line = b.substring(start, end); 253 b.delete(start, end); 254 return line; 255 } 256 257 // finds end of line, returns -1 if not found, or the position after 258 // the line delimiter if found, removing the delimiter in the process. 259 private static int endOfLine(StringBuilder b, String eol, boolean endOfInput) { 260 int len = b.length(); 261 if (eol != null) { // delimiter explicitly specified 262 int i = b.indexOf(eol); 263 if (i >= 0) { 264 // remove the delimiter and returns the position 265 // of the char after it. 266 b.delete(i, i + eol.length()); 267 return i; 268 } 269 } else { // no delimiter specified, behaves as BufferedReader::readLine 270 boolean crfound = false; 271 for (int i = 0; i < len; i++) { 272 char c = b.charAt(i); 273 if (c == '\n') { 274 // '\n' or '\r\n' found. 275 // remove the delimiter and returns the position 276 // of the char after it. 277 b.delete(crfound ? i - 1 : i, i + 1); 278 return crfound ? i - 1 : i; 279 } else if (crfound) { 280 // previous char was '\r', c != '\n' 281 assert i != 0; 282 // remove the delimiter and returns the position 283 // of the char after it. 284 b.delete(i - 1, i); 285 return i - 1; 286 } 287 crfound = c == '\r'; 288 } 289 if (crfound && endOfInput) { 290 // remove the delimiter and returns the position 291 // of the char after it. 292 b.delete(len - 1, len); 293 return len - 1; 294 } 295 } 296 return endOfInput && len > 0 ? len : -1; 297 } 298 299 // Looks at whether the StringBuilder contains a line. 300 // Returns null if more character are needed. 301 private static String nextLine(StringBuilder b, String eol, boolean endOfInput) { 302 int next = endOfLine(b, eol, endOfInput); 303 return (next > -1) ? take(b, 0, next) : null; 304 } 305 306 // Attempts to read the next line. Returns the next line if 307 // the delimiter was found, null otherwise. The delimiters are 308 // consumed. 309 private String nextLine() 310 throws CharacterCodingException { 311 assert nextLine == null; 312 LINES: 313 while (nextLine == null) { 314 boolean endOfInput = completed && queue.isEmpty(); 315 nextLine = nextLine(builder, newline, 316 endOfInput && leftover.position() == 0); 317 if (nextLine != null) return nextLine; 318 ByteBuffer b; 319 BUFFERS: 320 while ((b = queue.peek()) != null) { 321 if (!b.hasRemaining()) { 322 queue.poll(); 323 continue BUFFERS; 324 } 325 BYTES: 326 while (b.hasRemaining()) { 327 buffer.position(0); 328 buffer.limit(buffer.capacity()); 329 boolean endofInput = completed && queue.size() <= 1; 330 if (isUnderFlow(b, buffer, endofInput)) { 331 assert !b.hasRemaining(); 332 if (buffer.position() > 0) { 333 buffer.flip(); 334 builder.append(buffer); 335 } 336 continue BUFFERS; 337 } 338 CoderResult res = decoder.decode(b, buffer, endofInput); 339 if (res.isError()) res.throwException(); 340 if (buffer.position() > 0) { 341 buffer.flip(); 342 builder.append(buffer); 343 continue LINES; 344 } 345 if (res.isUnderflow() && b.hasRemaining()) { 346 //System.out.println("underflow: adding " + b.remaining() + " bytes"); 347 leftover.put(b); 348 assert !b.hasRemaining(); 349 continue BUFFERS; 350 } 351 } 352 } 353 354 assert queue.isEmpty(); 355 if (endOfInput) { 356 // Time to cleanup: there may be some undecoded leftover bytes 357 // We need to flush them out. 358 // The decoder has been configured to replace malformed/unmappable 359 // chars with some replacement, in order to behave like 360 // InputStreamReader. 361 leftover.flip(); 362 buffer.position(0); 363 buffer.limit(buffer.capacity()); 364 365 // decode() must be called just before flush, even if there 366 // is nothing to decode. We must do this even if leftover 367 // has no remaining bytes. 368 CoderResult res = decoder.decode(leftover, buffer, endOfInput); 369 if (buffer.position() > 0) { 370 buffer.flip(); 371 builder.append(buffer); 372 } 373 if (res.isError()) res.throwException(); 374 375 // Now call decoder.flush() 376 buffer.position(0); 377 buffer.limit(buffer.capacity()); 378 res = decoder.flush(buffer); 379 if (buffer.position() > 0) { 380 buffer.flip(); 381 builder.append(buffer); 382 } 383 if (res.isError()) res.throwException(); 384 385 // It's possible that we reach here twice - just for the 386 // purpose of checking that no bytes were left over, so 387 // we reset leftover/decoder to make the function reentrant. 388 leftover.position(0); 389 leftover.limit(leftover.capacity()); 390 decoder.reset(); 391 392 // if some chars were produced then this call will 393 // return them. 394 return nextLine = nextLine(builder, newline, endOfInput); 395 } 396 return null; 397 } 398 return null; 399 } 400 401 // The main sequential scheduler loop. 402 private void loop() { 403 try { 404 while (!cancelled) { 405 Throwable error = errorRef.get(); 406 if (error != null) { 407 cancelled = true; 408 scheduler.stop(); 409 upstream.onError(error); 410 cf.completeExceptionally(error); 411 return; 412 } 413 if (nextLine == null) nextLine = nextLine(); 414 if (nextLine == null) { 415 if (completed) { 416 scheduler.stop(); 417 if (leftover.position() != 0) { 418 // Underflow: not all bytes could be 419 // decoded, but no more bytes will be coming. 420 // This should not happen as we should already 421 // have got a MalformedInputException, or 422 // replaced the unmappable chars. 423 errorRef.compareAndSet(null, 424 new IllegalStateException( 425 "premature end of input (" 426 + leftover.position() 427 + " undecoded bytes)")); 428 continue; 429 } else { 430 upstream.onComplete(); 431 } 432 return; 433 } else if (demanded.get() == 0 434 && !downstreamDemand.isFulfilled()) { 435 long incr = Math.max(1, downstreamDemand.get()); 436 demanded.addAndGet(incr); 437 upstreamSubscription.request(incr); 438 continue; 439 } else return; 440 } 441 assert nextLine != null; 442 assert newline != null && !nextLine.endsWith(newline) 443 || !nextLine.endsWith("\n") || !nextLine.endsWith("\r"); 444 if (downstreamDemand.tryDecrement()) { 445 String forward = nextLine; 446 nextLine = null; 447 upstream.onNext(forward); 448 } else return; // no demand: come back later 449 } 450 } catch (Throwable t) { 451 try { 452 upstreamSubscription.cancel(); 453 } finally { 454 signalError(t); 455 } 456 } 457 } 458 459 static LineSubscription create(Flow.Subscription s, 460 Charset charset, 461 String lineSeparator, 462 Flow.Subscriber<? super String> upstream, 463 CompletableFuture<?> cf) { 464 return new LineSubscription(Objects.requireNonNull(s), 465 Objects.requireNonNull(charset).newDecoder() 466 // use the same decoder configuration than 467 // java.io.InputStreamReader 468 .onMalformedInput(CodingErrorAction.REPLACE) 469 .onUnmappableCharacter(CodingErrorAction.REPLACE), 470 lineSeparator, 471 Objects.requireNonNull(upstream), 472 Objects.requireNonNull(cf)); 473 } 474 } 475 } 476