< prev index next >

src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseContent.java

Print this page




   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import java.io.IOException;

  29 import java.nio.ByteBuffer;
  30 import java.util.Optional;

  31 import java.util.function.Consumer;
  32 import jdk.incubator.http.internal.common.Utils;
  33 
  34 /**
  35  * Implements chunked/fixed transfer encodings of HTTP/1.1 responses.
  36  *
  37  * Call pushBody() to read the body (blocking). Data and errors are provided
  38  * to given Consumers. After final buffer delivered, empty optional delivered
  39  */
  40 class ResponseContent {
  41 
  42     final HttpResponse.BodyProcessor<?> pusher;
  43     final HttpConnection connection;

  44     final int contentLength;
  45     ByteBuffer buffer;
  46     //ByteBuffer lastBufferUsed;
  47     final ResponseHeaders headers;
  48     private final Consumer<Optional<ByteBuffer>> dataConsumer;
  49     private final Consumer<IOException> errorConsumer;
  50     private final HttpClientImpl client;
  51     // this needs to run before we complete the body
  52     // so that connection can be returned to pool
  53     private final Runnable onFinished;

  54 
  55     ResponseContent(HttpConnection connection,
  56                     int contentLength,
  57                     ResponseHeaders h,
  58                     HttpResponse.BodyProcessor<?> userProcessor,
  59                     Consumer<Optional<ByteBuffer>> dataConsumer,
  60                     Consumer<IOException> errorConsumer,
  61                     Runnable onFinished)
  62     {
  63         this.pusher = (HttpResponse.BodyProcessor)userProcessor;
  64         this.connection = connection;
  65         this.contentLength = contentLength;
  66         this.headers = h;
  67         this.dataConsumer = dataConsumer;
  68         this.errorConsumer = errorConsumer;
  69         this.client = connection.client;
  70         this.onFinished = onFinished;

  71     }
  72 
  73     static final int LF = 10;
  74     static final int CR = 13;
  75     static final int SP = 0x20;
  76     static final int BUF_SIZE = 1024;
  77 
  78     boolean chunkedContent, chunkedContentInitialized;
  79 
  80     private boolean contentChunked() throws IOException {
  81         if (chunkedContentInitialized) {
  82             return chunkedContent;
  83         }
  84         if (contentLength == -1) {
  85             String tc = headers.firstValue("Transfer-Encoding")
  86                                .orElse("");
  87             if (!tc.equals("")) {
  88                 if (tc.equalsIgnoreCase("chunked")) {
  89                     chunkedContent = true;
  90                 } else {
  91                     throw new IOException("invalid content");
  92                 }
  93             } else {
  94                 chunkedContent = false;
  95             }
  96         }
  97         chunkedContentInitialized = true;
  98         return chunkedContent;
  99     }
 100 
 101     /**
 102      * Entry point for pusher. b is an initial ByteBuffer that may
 103      * have some data in it. When this method returns, the body
 104      * has been fully processed.
 105      */
 106     void pushBody(ByteBuffer b) {
 107         try {
 108             // TODO: check status





 109             if (contentChunked()) {
 110                 pushBodyChunked(b);
 111             } else {
 112                 pushBodyFixed(b);
 113             }
 114         } catch (IOException t) {
 115             errorConsumer.accept(t);















































































 116         }
 117     }
 118 
 119     // reads and returns chunklen. Position of chunkbuf is first byte
 120     // of chunk on return. chunklen includes the CR LF at end of chunk
 121     int readChunkLen() throws IOException {
 122         chunklen = 0;
 123         boolean cr = false;
 124         while (true) {
 125             getHunk();
 126             int c = chunkbuf.get();
 127             if (cr) {
 128                 if (c == LF) {
 129                     return chunklen + 2;
 130                 } else {
 131                     throw new IOException("invalid chunk header");
 132                 }
 133             }
 134             if (c == CR) {
 135                 cr = true;
 136             } else {
 137                 int digit = toDigit(c);
 138                 chunklen = chunklen * 16 + digit;
 139             }
 140         }

 141     }
 142 
 143     int chunklen = -1;      // number of bytes in chunk (fixed)
 144     int bytesremaining;     // number of bytes in chunk left to be read incl CRLF
 145     int bytesread;
 146     ByteBuffer chunkbuf;    // initialise
 147 
 148     // make sure we have at least 1 byte to look at
 149     private void getHunk() throws IOException {
 150         if (chunkbuf == null || !chunkbuf.hasRemaining()) {
 151             chunkbuf = connection.read();
 152         }














 153     }
 154 
 155     private void consumeBytes(int n) throws IOException {
 156         getHunk();
 157         while (n > 0) {
 158             int e = Math.min(chunkbuf.remaining(), n);
 159             chunkbuf.position(chunkbuf.position() + e);
 160             n -= e;
 161             if (n > 0) {
 162                 getHunk();
 163             }
 164         }


 165     }
 166 
 167     /**
 168      * Returns a ByteBuffer containing a chunk of data or a "hunk" of data
 169      * (a chunk of a chunk if the chunk size is larger than our ByteBuffers).
 170      * ByteBuffer returned is obtained from response processor.



 171      */
 172     ByteBuffer readChunkedBuffer() throws IOException {
 173         if (chunklen == -1) {
 174             // new chunk
 175             chunklen = readChunkLen() - 2;
 176             bytesremaining =  chunklen;
 177             if (chunklen == 0) {
 178                 consumeBytes(2);
 179                 return null;















 180             }















 181         }
 182 
 183         getHunk();
 184         bytesread = chunkbuf.remaining();
 185         ByteBuffer returnBuffer = Utils.getBuffer();
 186         int space = returnBuffer.remaining();
 187 
 188         int bytes2Copy = Math.min(bytesread, Math.min(bytesremaining, space));
 189         Utils.copy(chunkbuf, returnBuffer, bytes2Copy);
 190         returnBuffer.flip();
 191         bytesremaining -= bytes2Copy;
 192         if (bytesremaining == 0) {
 193             consumeBytes(2);




















 194             chunklen = -1;




 195         }
 196         return returnBuffer;
 197     }
 198 
 199     ByteBuffer initialBuffer;
 200     int fixedBytesReturned;
 201 
 202     //ByteBuffer getResidue() {
 203         //return lastBufferUsed;
 204     //}
 205 
 206     private void compactBuffer(ByteBuffer buf) {
 207         buf.compact()
 208            .flip();
 209     }
 210 
 211     /**
 212      * Copies inbuf (numBytes from its position) to new buffer. The returned
 213      * buffer's position is zero and limit is at end (numBytes)
 214      */
 215     private ByteBuffer copyBuffer(ByteBuffer inbuf, int numBytes) {
 216         ByteBuffer b1 = Utils.getBuffer();
 217         assert b1.remaining() >= numBytes;
 218         byte[] b = b1.array();
 219         inbuf.get(b, 0, numBytes);
 220         b1.limit(numBytes);
 221         return b1;
 222     }
 223 
 224     private void pushBodyChunked(ByteBuffer b) throws IOException {
 225         chunkbuf = b;
 226         while (true) {
 227             ByteBuffer b1 = readChunkedBuffer();




 228             if (b1 != null) {

 229                 if (b1.hasRemaining()) {
 230                     dataConsumer.accept(Optional.of(b1));



 231                 }

 232             } else {
 233                 onFinished.run();
 234                 dataConsumer.accept(Optional.empty());
 235                 return;
 236             }
 237         }
 238     }
 239 
 240     private int toDigit(int b) throws IOException {
 241         if (b >= 0x30 && b <= 0x39) {
 242             return b - 0x30;
 243         }
 244         if (b >= 0x41 && b <= 0x46) {
 245             return b - 0x41 + 10;
 246         }
 247         if (b >= 0x61 && b <= 0x66) {
 248             return b - 0x61 + 10;
 249         }
 250         throw new IOException("Invalid chunk header byte " + b);
 251     }
 252 
 253     private void pushBodyFixed(ByteBuffer b) throws IOException {
 254         int remaining = contentLength;
 255         while (b.hasRemaining() && remaining > 0) {
 256             ByteBuffer buffer = Utils.getBuffer();
 257             int amount = Math.min(b.remaining(), remaining);
 258             Utils.copy(b, buffer, amount);
 259             remaining -= amount;
 260             buffer.flip();
 261             dataConsumer.accept(Optional.of(buffer));
 262         }
 263         while (remaining > 0) {
 264             ByteBuffer buffer = connection.read();
 265             if (buffer == null)
 266                 throw new IOException("connection closed");
 267 
 268             int bytesread = buffer.remaining();
 269             // assume for now that pipelining not implemented
 270             if (bytesread > remaining) {
 271                 throw new IOException("too many bytes read");
 272             }
 273             remaining -= bytesread;
 274             dataConsumer.accept(Optional.of(buffer));











































 275         }























 276         onFinished.run();
 277         dataConsumer.accept(Optional.empty());













 278     }
 279 }


   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import java.io.IOException;
  29 import java.lang.System.Logger.Level;
  30 import java.nio.ByteBuffer;
  31 import java.util.ArrayList;
  32 import java.util.List;
  33 import java.util.function.Consumer;
  34 import jdk.incubator.http.internal.common.Utils;
  35 
  36 /**
  37  * Implements chunked/fixed transfer encodings of HTTP/1.1 responses.
  38  *
  39  * Call pushBody() to read the body (blocking). Data and errors are provided
  40  * to given Consumers. After final buffer delivered, empty optional delivered
  41  */
  42 class ResponseContent {
  43 
  44     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
  45 
  46     final HttpResponse.BodySubscriber<?> pusher;
  47     final int contentLength;
  48     final HttpHeaders headers;





  49     // this needs to run before we complete the body
  50     // so that connection can be returned to pool
  51     private final Runnable onFinished;
  52     private final String dbgTag;
  53 
  54     ResponseContent(HttpConnection connection,
  55                     int contentLength,
  56                     HttpHeaders h,
  57                     HttpResponse.BodySubscriber<?> userSubscriber,


  58                     Runnable onFinished)
  59     {
  60         this.pusher = userSubscriber;

  61         this.contentLength = contentLength;
  62         this.headers = h;



  63         this.onFinished = onFinished;
  64         this.dbgTag = connection.dbgString() + "/ResponseContent";
  65     }
  66 
  67     static final int LF = 10;
  68     static final int CR = 13;


  69 
  70     private boolean chunkedContent, chunkedContentInitialized;
  71 
  72     boolean contentChunked() throws IOException {
  73         if (chunkedContentInitialized) {
  74             return chunkedContent;
  75         }
  76         if (contentLength == -1) {
  77             String tc = headers.firstValue("Transfer-Encoding")
  78                                .orElse("");
  79             if (!tc.equals("")) {
  80                 if (tc.equalsIgnoreCase("chunked")) {
  81                     chunkedContent = true;
  82                 } else {
  83                     throw new IOException("invalid content");
  84                 }
  85             } else {
  86                 chunkedContent = false;
  87             }
  88         }
  89         chunkedContentInitialized = true;
  90         return chunkedContent;
  91     }
  92 
  93     interface BodyParser extends Consumer<ByteBuffer> {
  94         void onSubscribe(AbstractSubscription sub);
  95     }
  96 
  97     // Returns a parser that will take care of parsing the received byte
  98     // buffers and forward them to the BodySubscriber.
  99     // When the parser is done, it will call onComplete.
 100     // If parsing was successful, the throwable parameter will be null.
 101     // Otherwise it will be the exception that occurred
 102     // Note: revisit: it might be better to use a CompletableFuture than
 103     //       a completion handler.
 104     BodyParser getBodyParser(Consumer<Throwable> onComplete)
 105         throws IOException {
 106         if (contentChunked()) {
 107             return new ChunkedBodyParser(onComplete);
 108         } else {
 109             return new FixedLengthBodyParser(contentLength, onComplete);
 110         }
 111     }
 112 
 113 
 114     static enum ChunkState {READING_LENGTH, READING_DATA, DONE}
 115     class ChunkedBodyParser implements BodyParser {
 116         final ByteBuffer READMORE = Utils.EMPTY_BYTEBUFFER;
 117         final Consumer<Throwable> onComplete;
 118         final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG);
 119         final String dbgTag = ResponseContent.this.dbgTag + "/ChunkedBodyParser";
 120 
 121         volatile Throwable closedExceptionally;
 122         volatile int partialChunklen = 0; // partially read chunk len
 123         volatile int chunklen = -1;  // number of bytes in chunk
 124         volatile int bytesremaining;  // number of bytes in chunk left to be read incl CRLF
 125         volatile boolean cr = false;  // tryReadChunkLength has found CR
 126         volatile int bytesToConsume;  // number of bytes that still need to be consumed before proceeding
 127         volatile ChunkState state = ChunkState.READING_LENGTH; // current state
 128         volatile AbstractSubscription sub;
 129         ChunkedBodyParser(Consumer<Throwable> onComplete) {
 130             this.onComplete = onComplete;
 131         }
 132 
 133         String dbgString() {
 134             return dbgTag;
 135         }
 136 
 137         @Override
 138         public void onSubscribe(AbstractSubscription sub) {
 139             debug.log(Level.DEBUG, () ->  "onSubscribe: "
 140                         + pusher.getClass().getName());
 141             pusher.onSubscribe(this.sub = sub);
 142         }
 143 
 144         @Override
 145         public void accept(ByteBuffer b) {
 146             if (closedExceptionally != null) {
 147                 debug.log(Level.DEBUG, () ->  "already closed: "
 148                             + closedExceptionally);
 149                 return;
 150             }
 151             boolean completed = false;
 152             try {
 153                 List<ByteBuffer> out = new ArrayList<>();
 154                 do {
 155                     if (tryPushOneHunk(b, out))  {
 156                         // We're done! (true if the final chunk was parsed).
 157                         if (!out.isEmpty()) {
 158                             // push what we have and complete
 159                             // only reduce demand if we actually push something.
 160                             // we would not have come here if there was no
 161                             // demand.
 162                             boolean hasDemand = sub.demand().tryDecrement();
 163                             assert hasDemand;
 164                             pusher.onNext(out);
 165                         }
 166                         debug.log(Level.DEBUG, () ->  "done!");
 167                         assert closedExceptionally == null;
 168                         assert state == ChunkState.DONE;
 169                         onFinished.run();
 170                         pusher.onComplete();
 171                         completed = true;
 172                         onComplete.accept(closedExceptionally); // should be null
 173                         break;
 174                     }
 175                     // the buffer may contain several hunks, and therefore
 176                     // we must loop while it's not exhausted.
 177                 } while (b.hasRemaining());
 178 
 179                 if (!completed && !out.isEmpty()) {
 180                     // push what we have.
 181                     // only reduce demand if we actually push something.
 182                     // we would not have come here if there was no
 183                     // demand.
 184                     boolean hasDemand = sub.demand().tryDecrement();
 185                     assert hasDemand;
 186                     pusher.onNext(out);
 187                 }
 188                 assert state == ChunkState.DONE || !b.hasRemaining();
 189             } catch(Throwable t) {
 190                 closedExceptionally = t;
 191                 if (!completed) onComplete.accept(t);
 192             }
 193         }
 194 
 195         // reads and returns chunklen. Position of chunkbuf is first byte
 196         // of chunk on return. chunklen includes the CR LF at end of chunk
 197         // returns -1 if needs more bytes
 198         private int tryReadChunkLen(ByteBuffer chunkbuf) throws IOException {
 199             assert state == ChunkState.READING_LENGTH;
 200             while (chunkbuf.hasRemaining()) {

 201                 int c = chunkbuf.get();
 202                 if (cr) {
 203                     if (c == LF) {
 204                         return partialChunklen;
 205                     } else {
 206                         throw new IOException("invalid chunk header");
 207                     }
 208                 }
 209                 if (c == CR) {
 210                     cr = true;
 211                 } else {
 212                     int digit = toDigit(c);
 213                     partialChunklen = partialChunklen * 16 + digit;
 214                 }
 215             }
 216             return -1;
 217         }
 218 




 219 
 220         // try to consume as many bytes as specified by bytesToConsume.
 221         // returns the number of bytes that still need to be consumed.
 222         // In practice this method is only called to consume one CRLF pair
 223         // with bytesToConsume set to 2, so it will only return 0 (if completed),
 224         // 1, or 2 (if chunkbuf doesn't have the 2 chars).
 225         private int tryConsumeBytes(ByteBuffer chunkbuf) throws IOException {
 226             int n = bytesToConsume;
 227             if (n > 0) {
 228                 int e = Math.min(chunkbuf.remaining(), n);
 229 
 230                 // verifies some assertions
 231                 // this methods is called only to consume CRLF
 232                 if (Utils.ASSERTIONSENABLED) {
 233                     assert n <= 2 && e <= 2;
 234                     ByteBuffer tmp = chunkbuf.slice();
 235                     // if n == 2 assert that we will first consume CR
 236                     assert (n == 2 && e > 0) ? tmp.get() == CR : true;
 237                     // if n == 1 || n == 2 && e == 2 assert that we then consume LF
 238                     assert (n == 1 || e == 2) ? tmp.get() == LF : true;
 239                 }
 240 




 241                 chunkbuf.position(chunkbuf.position() + e);
 242                 n -= e;
 243                 bytesToConsume = n;


 244             }
 245             assert n >= 0;
 246             return n;
 247         }
 248 
 249         /**
 250          * Returns a ByteBuffer containing chunk of data or a "hunk" of data
 251          * (a chunk of a chunk if the chunk size is larger than our ByteBuffers).
 252          * If the given chunk does not have enough data this method return
 253          * an empty ByteBuffer (READMORE).
 254          * If we encounter the final chunk (an empty chunk) this method
 255          * returns null.
 256          */
 257         ByteBuffer tryReadOneHunk(ByteBuffer chunk) throws IOException {
 258             int unfulfilled = bytesremaining;
 259             int toconsume = bytesToConsume;
 260             ChunkState st = state;
 261             if (st == ChunkState.READING_LENGTH && chunklen == -1) {
 262                 debug.log(Level.DEBUG, () ->  "Trying to read chunk len"
 263                         + " (remaining in buffer:"+chunk.remaining()+")");
 264                 int clen = chunklen = tryReadChunkLen(chunk);
 265                 if (clen == -1) return READMORE;
 266                 debug.log(Level.DEBUG, "Got chunk len %d", clen);
 267                 cr = false; partialChunklen = 0;
 268                 unfulfilled = bytesremaining =  clen;
 269                 if (clen == 0) toconsume = bytesToConsume = 2; // that was the last chunk
 270                 else st = state = ChunkState.READING_DATA; // read the data
 271             }
 272 
 273             if (toconsume > 0) {
 274                 debug.log(Level.DEBUG,
 275                         "Trying to consume bytes: %d (remaining in buffer: %s)",
 276                         toconsume, chunk.remaining());
 277                 if (tryConsumeBytes(chunk) > 0) {
 278                     return READMORE;
 279                 }
 280             }
 281 
 282             toconsume = bytesToConsume;
 283             assert toconsume == 0;
 284 
 285 
 286             if (st == ChunkState.READING_LENGTH) {
 287                 // we will come here only if chunklen was 0, after having
 288                 // consumed the trailing CRLF
 289                 int clen = chunklen;
 290                 assert clen == 0;
 291                 debug.log(Level.DEBUG, "No more chunks: %d", clen);
 292                 // the DONE state is not really needed but it helps with
 293                 // assertions...
 294                 state = ChunkState.DONE;
 295                 return null;
 296             }
 297 
 298             int clen = chunklen;
 299             assert clen > 0;
 300             assert st == ChunkState.READING_DATA;
 301 
 302             ByteBuffer returnBuffer = READMORE; // May be a hunk or a chunk
 303             if (unfulfilled > 0) {
 304                 int bytesread = chunk.remaining();
 305                 debug.log(Level.DEBUG, "Reading chunk: available %d, needed %d",
 306                           bytesread, unfulfilled);
 307 
 308                 int bytes2return = Math.min(bytesread, unfulfilled);
 309                 debug.log(Level.DEBUG,  "Returning chunk bytes: %d", bytes2return);
 310                 returnBuffer = Utils.slice(chunk, bytes2return);
 311                 unfulfilled = bytesremaining -= bytes2return;
 312                 if (unfulfilled == 0) bytesToConsume = 2;
 313             }
 314 
 315             assert unfulfilled >= 0;
 316 
 317             if (unfulfilled == 0) {
 318                 debug.log(Level.DEBUG,
 319                         "No more bytes to read - %d yet to consume.",
 320                         unfulfilled);
 321                 // check whether the trailing CRLF is consumed, try to
 322                 // consume it if not. If tryConsumeBytes needs more bytes
 323                 // then we will come back here later - skipping the block
 324                 // that reads data because remaining==0, and finding
 325                 // that the two bytes are now consumed.
 326                 if (tryConsumeBytes(chunk) == 0) {
 327                     // we're done for this chunk! reset all states and
 328                     // prepare to read the next chunk.
 329                     chunklen = -1;
 330                     partialChunklen = 0;
 331                     cr = false;
 332                     state = ChunkState.READING_LENGTH;
 333                     debug.log(Level.DEBUG, "Ready to read next chunk");
 334                 }

 335             }
 336             if (returnBuffer == READMORE) {
 337                 debug.log(Level.DEBUG, "Need more data");









 338             }
 339             return returnBuffer;











 340         }
 341 
 342 
 343         // Attempt to parse and push one hunk from the buffer.
 344         // Returns true if the final chunk was parsed.
 345         // Returns false if we need to push more chunks.
 346         private boolean tryPushOneHunk(ByteBuffer b, List<ByteBuffer> out)
 347                 throws IOException {
 348             assert state != ChunkState.DONE;
 349             ByteBuffer b1 = tryReadOneHunk(b);
 350             if (b1 != null) {
 351                 //assert b1.hasRemaining() || b1 == READMORE;
 352                 if (b1.hasRemaining()) {
 353                     debug.log(Level.DEBUG, "Sending chunk to consumer (%d)",
 354                               b1.remaining());
 355                     out.add(b1);
 356                     debug.log(Level.DEBUG, "Chunk sent.");
 357                 }
 358                 return false; // we haven't parsed the final chunk yet.
 359             } else {
 360                 return true; // we're done! the final chunk was parsed.



 361             }
 362         }
 363 
 364         private int toDigit(int b) throws IOException {
 365             if (b >= 0x30 && b <= 0x39) {
 366                 return b - 0x30;
 367             }
 368             if (b >= 0x41 && b <= 0x46) {
 369                 return b - 0x41 + 10;
 370             }
 371             if (b >= 0x61 && b <= 0x66) {
 372                 return b - 0x61 + 10;
 373             }
 374             throw new IOException("Invalid chunk header byte " + b);
 375         }
 376 



















 377     }
 378 
 379     class FixedLengthBodyParser implements BodyParser {
 380         final int contentLength;
 381         final Consumer<Throwable> onComplete;
 382         final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG);
 383         final String dbgTag = ResponseContent.this.dbgTag + "/FixedLengthBodyParser";
 384         volatile int remaining;
 385         volatile Throwable closedExceptionally;
 386         volatile AbstractSubscription sub;
 387         FixedLengthBodyParser(int contentLength, Consumer<Throwable> onComplete) {
 388             this.contentLength = this.remaining = contentLength;
 389             this.onComplete = onComplete;
 390         }
 391 
 392         String dbgString() {
 393             return dbgTag;
 394         }
 395 
 396         @Override
 397         public void onSubscribe(AbstractSubscription sub) {
 398             debug.log(Level.DEBUG, () -> "length="
 399                         + contentLength +", onSubscribe: "
 400                         + pusher.getClass().getName());
 401             pusher.onSubscribe(this.sub = sub);
 402             try {
 403                 if (contentLength == 0) {
 404                     pusher.onComplete();
 405                     onComplete.accept(null);
 406                 }
 407             } catch (Throwable t) {
 408                 closedExceptionally = t;
 409                 try {
 410                     pusher.onError(t);
 411                 } finally {
 412                     onComplete.accept(t);
 413                 }
 414             }
 415         }
 416 
 417         @Override
 418         public void accept(ByteBuffer b) {
 419             if (closedExceptionally != null) {
 420                 debug.log(Level.DEBUG, () -> "already closed: "
 421                             + closedExceptionally);
 422                 return;
 423             }
 424             boolean completed = false;
 425             try {
 426                 int unfulfilled = remaining;
 427                 debug.log(Level.DEBUG, "Parser got %d bytes (%d remaining / %d)",
 428                         b.remaining(), unfulfilled, contentLength);
 429                 assert unfulfilled != 0 || contentLength == 0 || b.remaining() == 0;
 430 
 431                 if (unfulfilled == 0 && contentLength > 0) return;
 432 
 433                 if (b.hasRemaining() && unfulfilled > 0) {
 434                     // only reduce demand if we actually push something.
 435                     // we would not have come here if there was no
 436                     // demand.
 437                     boolean hasDemand = sub.demand().tryDecrement();
 438                     assert hasDemand;
 439                     int amount = Math.min(b.remaining(), unfulfilled);
 440                     unfulfilled = remaining -= amount;
 441                     ByteBuffer buffer = Utils.slice(b, amount);
 442                     pusher.onNext(List.of(buffer));
 443                 }
 444                 if (unfulfilled == 0) {
 445                     // We're done! All data has been received.
 446                     assert closedExceptionally == null;
 447                     onFinished.run();
 448                     pusher.onComplete();
 449                     completed = true;
 450                     onComplete.accept(closedExceptionally); // should be null
 451                 } else {
 452                     assert b.remaining() == 0;
 453                 }
 454             } catch (Throwable t) {
 455                 debug.log(Level.DEBUG, "Unexpected exception", t);
 456                 closedExceptionally = t;
 457                 if (!completed) {
 458                     onComplete.accept(t);
 459                 }
 460             }
 461         }
 462     }
 463 }
< prev index next >