1 /*
   2  * Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.internal.net.http;
  27 
  28 import java.nio.ByteBuffer;
  29 import java.nio.CharBuffer;
  30 import java.nio.charset.CharacterCodingException;
  31 import java.nio.charset.Charset;
  32 import java.nio.charset.CharsetDecoder;
  33 import java.nio.charset.CoderResult;
  34 import java.nio.charset.CodingErrorAction;
  35 import java.util.List;
  36 import java.util.Objects;
  37 import java.util.concurrent.CompletableFuture;
  38 import java.util.concurrent.CompletionStage;
  39 import java.util.concurrent.ConcurrentLinkedDeque;
  40 import java.util.concurrent.Flow;
  41 import java.util.concurrent.Flow.Subscriber;
  42 import java.util.concurrent.Flow.Subscription;
  43 import java.util.concurrent.atomic.AtomicBoolean;
  44 import java.util.concurrent.atomic.AtomicLong;
  45 import java.util.concurrent.atomic.AtomicReference;
  46 import java.util.function.Function;
  47 import jdk.internal.net.http.common.Demand;
  48 import java.net.http.HttpResponse.BodySubscriber;
  49 import jdk.internal.net.http.common.MinimalFuture;
  50 import jdk.internal.net.http.common.SequentialScheduler;
  51 
  52 /** An adapter between {@code BodySubscriber} and {@code Flow.Subscriber<String>}. */
  53 public final class LineSubscriberAdapter<S extends Subscriber<? super String>,R>
  54         implements BodySubscriber<R> {
  55     private final CompletableFuture<R> cf = new MinimalFuture<>();
  56     private final S subscriber;
  57     private final Function<? super S, ? extends R> finisher;
  58     private final Charset charset;
  59     private final String eol;
  60     private final AtomicBoolean subscribed = new AtomicBoolean();
  61     private volatile LineSubscription downstream;
  62 
  63     private LineSubscriberAdapter(S subscriber,
  64                                   Function<? super S, ? extends R> finisher,
  65                                   Charset charset,
  66                                   String eol) {
  67         if (eol != null && eol.isEmpty())
  68             throw new IllegalArgumentException("empty line separator");
  69         this.subscriber = Objects.requireNonNull(subscriber);
  70         this.finisher = Objects.requireNonNull(finisher);
  71         this.charset = Objects.requireNonNull(charset);
  72         this.eol = eol;
  73     }
  74 
  75     @Override
  76     public void onSubscribe(Subscription subscription) {
  77         Objects.requireNonNull(subscription);
  78         if (!subscribed.compareAndSet(false, true)) {
  79             subscription.cancel();
  80             return;
  81         }
  82 
  83         downstream = LineSubscription.create(subscription,
  84                                              charset,
  85                                              eol,
  86                                              subscriber,
  87                                              cf);
  88         subscriber.onSubscribe(downstream);
  89     }
  90 
  91     @Override
  92     public void onNext(List<ByteBuffer> item) {
  93         Objects.requireNonNull(item);
  94         try {
  95             downstream.submit(item);
  96         } catch (Throwable t) {
  97             onError(t);
  98         }
  99     }
 100 
 101     @Override
 102     public void onError(Throwable throwable) {
 103         Objects.requireNonNull(throwable);
 104         try {
 105             downstream.signalError(throwable);
 106         } finally {
 107             cf.completeExceptionally(throwable);
 108         }
 109     }
 110 
 111     @Override
 112     public void onComplete() {
 113         try {
 114             downstream.signalComplete();
 115         } finally {
 116             cf.complete(finisher.apply(subscriber));
 117         }
 118     }
 119 
 120     @Override
 121     public CompletionStage<R> getBody() {
 122         return cf;
 123     }
 124 
 125     public static <S extends Subscriber<? super String>, R> LineSubscriberAdapter<S, R>
 126     create(S subscriber, Function<? super S, ? extends R> finisher, Charset charset, String eol)
 127     {
 128         if (eol != null && eol.isEmpty())
 129             throw new IllegalArgumentException("empty line separator");
 130         return new LineSubscriberAdapter<>(Objects.requireNonNull(subscriber),
 131                 Objects.requireNonNull(finisher),
 132                 Objects.requireNonNull(charset),
 133                 eol);
 134     }
 135 
 136     static final class LineSubscription implements Flow.Subscription {
 137         final Flow.Subscription upstreamSubscription;
 138         final CharsetDecoder decoder;
 139         final String newline;
 140         final Demand downstreamDemand;
 141         final ConcurrentLinkedDeque<ByteBuffer> queue;
 142         final SequentialScheduler scheduler;
 143         final Flow.Subscriber<? super String> upstream;
 144         final CompletableFuture<?> cf;
 145         private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
 146         private final AtomicLong demanded = new AtomicLong();
 147         private volatile boolean completed;
 148         private volatile boolean cancelled;
 149 
 150         private final char[] chars = new char[1024];
 151         private final ByteBuffer leftover = ByteBuffer.wrap(new byte[64]);
 152         private final CharBuffer buffer = CharBuffer.wrap(chars);
 153         private final StringBuilder builder = new StringBuilder();
 154         private String nextLine;
 155 
 156         private LineSubscription(Flow.Subscription s,
 157                                  CharsetDecoder dec,
 158                                  String separator,
 159                                  Flow.Subscriber<? super String> subscriber,
 160                                  CompletableFuture<?> completion) {
 161             downstreamDemand = new Demand();
 162             queue = new ConcurrentLinkedDeque<>();
 163             upstreamSubscription = Objects.requireNonNull(s);
 164             decoder = Objects.requireNonNull(dec);
 165             newline = separator;
 166             upstream = Objects.requireNonNull(subscriber);
 167             cf = Objects.requireNonNull(completion);
 168             scheduler = SequentialScheduler.synchronizedScheduler(this::loop);
 169         }
 170 
 171         @Override
 172         public void request(long n) {
 173             if (cancelled) return;
 174             if (downstreamDemand.increase(n)) {
 175                 scheduler.runOrSchedule();
 176             }
 177         }
 178 
 179         @Override
 180         public void cancel() {
 181             cancelled = true;
 182             upstreamSubscription.cancel();
 183         }
 184 
 185         public void submit(List<ByteBuffer> list) {
 186             queue.addAll(list);
 187             demanded.decrementAndGet();
 188             scheduler.runOrSchedule();
 189         }
 190 
 191         public void signalComplete() {
 192             completed = true;
 193             scheduler.runOrSchedule();
 194         }
 195 
 196         public void signalError(Throwable error) {
 197             if (errorRef.compareAndSet(null,
 198                     Objects.requireNonNull(error))) {
 199                 scheduler.runOrSchedule();
 200             }
 201         }
 202 
 203         // This method looks at whether some bytes where left over (in leftover)
 204         // from decoding the previous buffer when the previous buffer was in
 205         // underflow. If so, it takes bytes one by one from the new buffer 'in'
 206         // and combines them with the leftover bytes until 'in' is exhausted or a
 207         // character was produced in 'out', resolving the previous underflow.
 208         // Returns true if the buffer is still in underflow, false otherwise.
 209         // However, in both situation some chars might have been produced in 'out'.
 210         private boolean isUnderFlow(ByteBuffer in, CharBuffer out, boolean endOfInput)
 211                 throws CharacterCodingException {
 212             int limit = leftover.position();
 213             if (limit == 0) {
 214                 // no leftover
 215                 return false;
 216             } else {
 217                 CoderResult res = null;
 218                 while (in.hasRemaining()) {
 219                     leftover.position(limit);
 220                     leftover.limit(++limit);
 221                     leftover.put(in.get());
 222                     leftover.position(0);
 223                     res = decoder.decode(leftover, out,
 224                             endOfInput && !in.hasRemaining());
 225                     int remaining = leftover.remaining();
 226                     if (remaining > 0) {
 227                         assert leftover.position() == 0;
 228                         leftover.position(remaining);
 229                     } else {
 230                         leftover.position(0);
 231                     }
 232                     leftover.limit(leftover.capacity());
 233                     if (res.isUnderflow() && remaining > 0 && in.hasRemaining()) {
 234                         continue;
 235                     }
 236                     if (res.isError()) {
 237                         res.throwException();
 238                     }
 239                     assert !res.isOverflow();
 240                     return false;
 241                 }
 242                 return !endOfInput;
 243             }
 244         }
 245 
 246         // extract characters from start to end and remove them from
 247         // the StringBuilder
 248         private static String take(StringBuilder b, int start, int end) {
 249             assert start == 0;
 250             String line;
 251             if (end == start) return "";
 252             line = b.substring(start, end);
 253             b.delete(start, end);
 254             return line;
 255         }
 256 
 257         // finds end of line, returns -1 if not found, or the position after
 258         // the line delimiter if found, removing the delimiter in the process.
 259         private static int endOfLine(StringBuilder b, String eol, boolean endOfInput) {
 260             int len = b.length();
 261             if (eol != null) { // delimiter explicitly specified
 262                 int i = b.indexOf(eol);
 263                 if (i >= 0) {
 264                     // remove the delimiter and returns the position
 265                     // of the char after it.
 266                     b.delete(i, i + eol.length());
 267                     return i;
 268                 }
 269             } else { // no delimiter specified, behaves as BufferedReader::readLine
 270                 boolean crfound = false;
 271                 for (int i = 0; i < len; i++) {
 272                     char c = b.charAt(i);
 273                     if (c == '\n') {
 274                         // '\n' or '\r\n' found.
 275                         // remove the delimiter and returns the position
 276                         // of the char after it.
 277                         b.delete(crfound ? i - 1 : i, i + 1);
 278                         return crfound ? i - 1 : i;
 279                     } else if (crfound) {
 280                         // previous char was '\r', c != '\n'
 281                         assert i != 0;
 282                         // remove the delimiter and returns the position
 283                         // of the char after it.
 284                         b.delete(i - 1, i);
 285                         return i - 1;
 286                     }
 287                     crfound = c == '\r';
 288                 }
 289                 if (crfound && endOfInput) {
 290                     // remove the delimiter and returns the position
 291                     // of the char after it.
 292                     b.delete(len - 1, len);
 293                     return len - 1;
 294                 }
 295             }
 296             return endOfInput && len > 0 ? len : -1;
 297         }
 298 
 299         // Looks at whether the StringBuilder contains a line.
 300         // Returns null if more character are needed.
 301         private static String nextLine(StringBuilder b, String eol, boolean endOfInput) {
 302             int next = endOfLine(b, eol, endOfInput);
 303             return (next > -1) ? take(b, 0, next) : null;
 304         }
 305 
 306         // Attempts to read the next line. Returns the next line if
 307         // the delimiter was found, null otherwise. The delimiters are
 308         // consumed.
 309         private String nextLine()
 310                 throws CharacterCodingException {
 311             assert nextLine == null;
 312             LINES:
 313             while (nextLine == null) {
 314                 boolean endOfInput = completed && queue.isEmpty();
 315                 nextLine = nextLine(builder, newline,
 316                         endOfInput && leftover.position() == 0);
 317                 if (nextLine != null) return nextLine;
 318                 ByteBuffer b;
 319                 BUFFERS:
 320                 while ((b = queue.peek()) != null) {
 321                     if (!b.hasRemaining()) {
 322                         queue.poll();
 323                         continue BUFFERS;
 324                     }
 325                     BYTES:
 326                     while (b.hasRemaining()) {
 327                         buffer.position(0);
 328                         buffer.limit(buffer.capacity());
 329                         boolean endofInput = completed && queue.size() <= 1;
 330                         if (isUnderFlow(b, buffer, endofInput)) {
 331                             assert !b.hasRemaining();
 332                             if (buffer.position() > 0) {
 333                                 buffer.flip();
 334                                 builder.append(buffer);
 335                             }
 336                             continue BUFFERS;
 337                         }
 338                         CoderResult res = decoder.decode(b, buffer, endofInput);
 339                         if (res.isError()) res.throwException();
 340                         if (buffer.position() > 0) {
 341                             buffer.flip();
 342                             builder.append(buffer);
 343                             continue LINES;
 344                         }
 345                         if (res.isUnderflow() && b.hasRemaining()) {
 346                             //System.out.println("underflow: adding " + b.remaining() + " bytes");
 347                             leftover.put(b);
 348                             assert !b.hasRemaining();
 349                             continue BUFFERS;
 350                         }
 351                     }
 352                 }
 353 
 354                 assert queue.isEmpty();
 355                 if (endOfInput) {
 356                     // Time to cleanup: there may be some undecoded leftover bytes
 357                     // We need to flush them out.
 358                     // The decoder has been configured to replace malformed/unmappable
 359                     // chars with some replacement, in order to behave like
 360                     // InputStreamReader.
 361                     leftover.flip();
 362                     buffer.position(0);
 363                     buffer.limit(buffer.capacity());
 364 
 365                     // decode() must be called just before flush, even if there
 366                     // is nothing to decode. We must do this even if leftover
 367                     // has no remaining bytes.
 368                     CoderResult res = decoder.decode(leftover, buffer, endOfInput);
 369                     if (buffer.position() > 0) {
 370                         buffer.flip();
 371                         builder.append(buffer);
 372                     }
 373                     if (res.isError()) res.throwException();
 374 
 375                     // Now call decoder.flush()
 376                     buffer.position(0);
 377                     buffer.limit(buffer.capacity());
 378                     res = decoder.flush(buffer);
 379                     if (buffer.position() > 0) {
 380                         buffer.flip();
 381                         builder.append(buffer);
 382                     }
 383                     if (res.isError()) res.throwException();
 384 
 385                     // It's possible that we reach here twice - just for the
 386                     // purpose of checking that no bytes were left over, so
 387                     // we reset leftover/decoder to make the function reentrant.
 388                     leftover.position(0);
 389                     leftover.limit(leftover.capacity());
 390                     decoder.reset();
 391 
 392                     // if some chars were produced then this call will
 393                     // return them.
 394                     return nextLine = nextLine(builder, newline, endOfInput);
 395                 }
 396                 return null;
 397             }
 398             return null;
 399         }
 400 
 401         // The main sequential scheduler loop.
 402         private void loop() {
 403             try {
 404                 while (!cancelled) {
 405                     Throwable error = errorRef.get();
 406                     if (error != null) {
 407                         cancelled = true;
 408                         scheduler.stop();
 409                         upstream.onError(error);
 410                         cf.completeExceptionally(error);
 411                         return;
 412                     }
 413                     if (nextLine == null) nextLine = nextLine();
 414                     if (nextLine == null) {
 415                         if (completed) {
 416                             scheduler.stop();
 417                             if (leftover.position() != 0) {
 418                                 // Underflow: not all bytes could be
 419                                 // decoded, but no more bytes will be coming.
 420                                 // This should not happen as we should already
 421                                 // have got a MalformedInputException, or
 422                                 // replaced the unmappable chars.
 423                                 errorRef.compareAndSet(null,
 424                                         new IllegalStateException(
 425                                                 "premature end of input ("
 426                                                         + leftover.position()
 427                                                         + " undecoded bytes)"));
 428                                 continue;
 429                             } else {
 430                                 upstream.onComplete();
 431                             }
 432                             return;
 433                         } else if (demanded.get() == 0
 434                                 && !downstreamDemand.isFulfilled()) {
 435                             long incr = Math.max(1, downstreamDemand.get());
 436                             demanded.addAndGet(incr);
 437                             upstreamSubscription.request(incr);
 438                             continue;
 439                         } else return;
 440                     }
 441                     assert nextLine != null;
 442                     assert newline != null && !nextLine.endsWith(newline)
 443                             || !nextLine.endsWith("\n") || !nextLine.endsWith("\r");
 444                     if (downstreamDemand.tryDecrement()) {
 445                         String forward = nextLine;
 446                         nextLine = null;
 447                         upstream.onNext(forward);
 448                     } else return; // no demand: come back later
 449                 }
 450             } catch (Throwable t) {
 451                 try {
 452                     upstreamSubscription.cancel();
 453                 } finally {
 454                     signalError(t);
 455                 }
 456             }
 457         }
 458 
 459         static LineSubscription create(Flow.Subscription s,
 460                                        Charset charset,
 461                                        String lineSeparator,
 462                                        Flow.Subscriber<? super String> upstream,
 463                                        CompletableFuture<?> cf) {
 464             return new LineSubscription(Objects.requireNonNull(s),
 465                     Objects.requireNonNull(charset).newDecoder()
 466                             // use the same decoder configuration than
 467                             // java.io.InputStreamReader
 468                             .onMalformedInput(CodingErrorAction.REPLACE)
 469                             .onUnmappableCharacter(CodingErrorAction.REPLACE),
 470                     lineSeparator,
 471                     Objects.requireNonNull(upstream),
 472                     Objects.requireNonNull(cf));
 473         }
 474     }
 475 }
 476