1 /* 2 * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.io.IOException; 29 import java.net.URI; 30 import java.nio.ByteBuffer; 31 import java.util.List; 32 import java.util.Map; 33 import java.util.Set; 34 import java.net.InetSocketAddress; 35 import jdk.incubator.http.HttpConnection.Mode; 36 import java.nio.charset.StandardCharsets; 37 import static java.nio.charset.StandardCharsets.US_ASCII; 38 import java.util.concurrent.CompletableFuture; 39 import java.util.concurrent.CompletionException; 40 import java.util.concurrent.Flow; 41 42 import jdk.incubator.http.internal.common.HttpHeadersImpl; 43 import jdk.incubator.http.internal.common.Log; 44 import jdk.incubator.http.internal.common.MinimalFuture; 45 import jdk.incubator.http.internal.common.Utils; 46 47 /** 48 * A HTTP/1.1 request. 49 * 50 * send() -> Writes the request + body to the given channel, in one blocking 51 * operation. 52 */ 53 class Http1Request { 54 final HttpClientImpl client; 55 final HttpRequestImpl request; 56 final HttpConnection chan; 57 // Multiple buffers are used to hold different parts of request 58 // See line 206 and below for description 59 final ByteBuffer[] buffers; 60 final HttpRequest.BodyProcessor requestProc; 61 final HttpHeaders userHeaders; 62 final HttpHeadersImpl systemHeaders; 63 boolean streaming; 64 long contentLength; 65 final CompletableFuture<Void> cf; 66 67 Http1Request(HttpRequestImpl request, HttpClientImpl client, HttpConnection connection) 68 throws IOException 69 { 70 this.client = client; 71 this.request = request; 72 this.chan = connection; 73 buffers = new ByteBuffer[5]; // TODO: check 74 this.requestProc = request.requestProcessor; 75 this.userHeaders = request.getUserHeaders(); 76 this.systemHeaders = request.getSystemHeaders(); 77 this.cf = new MinimalFuture<>(); 78 } 79 80 private void logHeaders() throws IOException { 81 StringBuilder sb = new StringBuilder(256); 82 sb.append("REQUEST HEADERS:\n"); 83 Log.dumpHeaders(sb, " ", systemHeaders); 84 Log.dumpHeaders(sb, " ", userHeaders); 85 Log.logHeaders(sb.toString()); 86 } 87 88 private void dummy(long x) { 89 // not used in this class 90 } 91 92 private void collectHeaders0() throws IOException { 93 if (Log.headers()) { 94 logHeaders(); 95 } 96 StringBuilder sb = new StringBuilder(256); 97 collectHeaders1(sb, request, systemHeaders); 98 collectHeaders1(sb, request, userHeaders); 99 sb.append("\r\n"); 100 String headers = sb.toString(); 101 buffers[1] = ByteBuffer.wrap(headers.getBytes(StandardCharsets.US_ASCII)); 102 } 103 104 private void collectHeaders1(StringBuilder sb, 105 HttpRequestImpl request, 106 HttpHeaders headers) 107 throws IOException 108 { 109 Map<String,List<String>> h = headers.map(); 110 Set<Map.Entry<String,List<String>>> entries = h.entrySet(); 111 112 for (Map.Entry<String,List<String>> entry : entries) { 113 String key = entry.getKey(); 114 List<String> values = entry.getValue(); 115 for (String value : values) { 116 sb.append(key) 117 .append(": ") 118 .append(value) 119 .append("\r\n"); 120 } 121 } 122 } 123 124 private String getPathAndQuery(URI uri) { 125 String path = uri.getPath(); 126 String query = uri.getQuery(); 127 if (path == null || path.equals("")) { 128 path = "/"; 129 } 130 if (query == null) { 131 query = ""; 132 } 133 if (query.equals("")) { 134 return path; 135 } else { 136 return path + "?" + query; 137 } 138 } 139 149 boolean defaultPort; 150 if (port == -1) { 151 defaultPort = true; 152 } else if (request.secure()) { 153 defaultPort = port == 443; 154 } else { 155 defaultPort = port == 80; 156 } 157 158 if (defaultPort) { 159 return host; 160 } else { 161 return host + ":" + Integer.toString(port); 162 } 163 } 164 165 private String requestURI() { 166 URI uri = request.uri(); 167 String method = request.method(); 168 169 if ((request.proxy(client) == null && !method.equals("CONNECT")) 170 || request.isWebSocket()) { 171 return getPathAndQuery(uri); 172 } 173 if (request.secure()) { 174 if (request.method().equals("CONNECT")) { 175 // use authority for connect itself 176 return authorityString(request.authority()); 177 } else { 178 // requests over tunnel do not require full URL 179 return getPathAndQuery(uri); 180 } 181 } 182 return uri == null? authorityString(request.authority()) : uri.toString(); 183 } 184 185 void sendHeadersOnly() throws IOException { 186 collectHeaders(); 187 chan.write(buffers, 0, 2); 188 } 189 190 void sendRequest() throws IOException { 191 collectHeaders(); 192 chan.configureMode(Mode.BLOCKING); 193 if (contentLength == 0) { 194 chan.write(buffers, 0, 2); 195 } else if (contentLength > 0) { 196 writeFixedContent(true); 197 } else { 198 writeStreamedContent(true); 199 } 200 setFinished(); 201 } 202 203 private boolean finished; 204 205 synchronized boolean finished() { 206 return finished; 207 } 208 209 synchronized void setFinished() { 210 finished = true; 211 } 212 213 private void collectHeaders() throws IOException { 214 if (Log.requests() && request != null) { 215 Log.logRequest(request.toString()); 216 } 217 String uriString = requestURI(); 218 StringBuilder sb = new StringBuilder(64); 219 sb.append(request.method()) 220 .append(' ') 221 .append(uriString) 222 .append(" HTTP/1.1\r\n"); 223 String cmd = sb.toString(); 224 225 buffers[0] = ByteBuffer.wrap(cmd.getBytes(StandardCharsets.US_ASCII)); 226 URI uri = request.uri(); 227 if (uri != null) { 228 systemHeaders.setHeader("Host", hostString()); 229 } 230 if (request == null) { 231 // this is not a user request. No content 232 contentLength = 0; 233 } else { 234 contentLength = requestProc.contentLength(); 235 } 236 237 if (contentLength == 0) { 238 systemHeaders.setHeader("Content-Length", "0"); 239 collectHeaders0(); 240 } else if (contentLength > 0) { 241 /* [0] request line [1] headers [2] body */ 242 systemHeaders.setHeader("Content-Length", 243 Integer.toString((int) contentLength)); 244 streaming = false; 245 collectHeaders0(); 246 buffers[2] = getBuffer(); 247 } else { 248 /* Chunked: 249 * 250 * [0] request line [1] headers [2] chunk header [3] chunk data [4] 251 * final chunk header and trailing CRLF of previous chunks 252 * 253 * 2,3,4 used repeatedly */ 254 streaming = true; 255 systemHeaders.setHeader("Transfer-encoding", "chunked"); 256 collectHeaders0(); 257 buffers[3] = getBuffer(); 258 } 259 } 260 261 private ByteBuffer getBuffer() { 262 return ByteBuffer.allocate(Utils.BUFSIZE); 263 } 264 265 // The following two methods used by Http1Exchange to handle expect continue 266 267 void continueRequest() throws IOException { 268 if (streaming) { 269 writeStreamedContent(false); 270 } else { 271 writeFixedContent(false); 272 } 273 setFinished(); 274 } 275 276 class StreamSubscriber implements Flow.Subscriber<ByteBuffer> { 277 volatile Flow.Subscription subscription; 278 volatile boolean includeHeaders; 279 280 StreamSubscriber(boolean includeHeaders) { 281 this.includeHeaders = includeHeaders; 282 } 283 284 @Override 285 public void onSubscribe(Flow.Subscription subscription) { 286 if (this.subscription != null) { 287 throw new IllegalStateException("already subscribed"); 288 } 289 this.subscription = subscription; 290 subscription.request(1); 291 } 292 293 @Override 294 public void onNext(ByteBuffer item) { 295 int startbuf, nbufs; 296 297 if (cf.isDone()) { 298 throw new IllegalStateException("subscription already completed"); 299 } 300 301 if (includeHeaders) { 302 startbuf = 0; 303 nbufs = 5; 304 } else { 305 startbuf = 2; 306 nbufs = 3; 307 } 308 int chunklen = item.remaining(); 309 buffers[3] = item; 310 buffers[2] = getHeader(chunklen); 311 buffers[4] = CRLF_BUFFER(); 312 try { 313 chan.write(buffers, startbuf, nbufs); 314 } catch (IOException e) { 315 subscription.cancel(); 316 cf.completeExceptionally(e); 317 } 318 includeHeaders = false; 319 subscription.request(1); 320 } 321 322 @Override 323 public void onError(Throwable throwable) { 324 if (cf.isDone()) { 325 return; 326 } 327 subscription.cancel(); 328 cf.completeExceptionally(throwable); 329 } 330 331 @Override 332 public void onComplete() { 333 if (cf.isDone()) { 334 throw new IllegalStateException("subscription already completed"); 335 } 336 buffers[3] = EMPTY_CHUNK_HEADER(); 337 buffers[4] = CRLF_BUFFER(); 338 try { 339 chan.write(buffers, 3, 2); 340 } catch (IOException ex) { 341 cf.completeExceptionally(ex); 342 return; 343 } 344 cf.complete(null); 345 } 346 } 347 348 private void waitForCompletion() throws IOException { 349 try { 350 cf.join(); 351 } catch (CompletionException e) { 352 throw Utils.getIOException(e); 353 } 354 } 355 356 /* Entire request is sent, or just body only */ 357 private void writeStreamedContent(boolean includeHeaders) 358 throws IOException 359 { 360 StreamSubscriber subscriber = new StreamSubscriber(includeHeaders); 361 requestProc.subscribe(subscriber); 362 waitForCompletion(); 363 } 364 365 class FixedContentSubscriber implements Flow.Subscriber<ByteBuffer> 366 { 367 volatile Flow.Subscription subscription; 368 volatile boolean includeHeaders; 369 volatile long contentWritten = 0; 370 371 FixedContentSubscriber(boolean includeHeaders) { 372 this.includeHeaders = includeHeaders; 373 } 374 375 @Override 376 public void onSubscribe(Flow.Subscription subscription) { 377 if (this.subscription != null) { 378 throw new IllegalStateException("already subscribed"); 379 } 380 this.subscription = subscription; 381 subscription.request(1); 382 } 383 384 @Override 385 public void onNext(ByteBuffer item) { 386 int startbuf, nbufs; 387 long headersLength; 388 389 if (includeHeaders) { 390 startbuf = 0; 391 nbufs = 3; 392 headersLength = buffers[0].remaining() + buffers[1].remaining(); 393 } else { 394 startbuf = 2; 395 nbufs = 1; 396 headersLength = 0; 397 } 398 buffers[2] = item; 399 try { 400 long writing = buffers[2].remaining() + headersLength; 401 contentWritten += buffers[2].remaining(); 402 chan.checkWrite(writing, buffers, startbuf, nbufs); 403 404 if (contentWritten > contentLength) { 405 String msg = "Too many bytes in request body. Expected: " + 406 Long.toString(contentLength) + " Sent: " + 407 Long.toString(contentWritten); 408 throw new IOException(msg); 409 } 410 subscription.request(1); 411 } catch (IOException e) { 412 subscription.cancel(); 413 cf.completeExceptionally(e); 414 } 415 } 416 417 @Override 418 public void onError(Throwable throwable) { 419 if (cf.isDone()) { 420 return; 421 } 422 subscription.cancel(); 423 cf.completeExceptionally(throwable); 424 } 425 426 @Override 427 public void onComplete() { 428 if (cf.isDone()) { 429 throw new IllegalStateException("subscription already completed"); 430 } 431 432 if (contentLength > contentWritten) { 433 subscription.cancel(); 434 Exception e = new IOException("Too few bytes returned by the processor"); 435 cf.completeExceptionally(e); 436 } else { 437 cf.complete(null); 438 } 439 } 440 } 441 442 /* Entire request is sent, or just body only */ 443 private void writeFixedContent(boolean includeHeaders) 444 throws IOException { 445 if (contentLength == 0) { 446 return; 447 } 448 FixedContentSubscriber subscriber = new FixedContentSubscriber(includeHeaders); 449 requestProc.subscribe(subscriber); 450 waitForCompletion(); 451 } 452 453 private static final byte[] CRLF = {'\r', '\n'}; 454 private static final byte[] EMPTY_CHUNK_BYTES = {'0', '\r', '\n'}; 455 456 private ByteBuffer CRLF_BUFFER() { 457 return ByteBuffer.wrap(CRLF); 458 } 459 460 private ByteBuffer EMPTY_CHUNK_HEADER() { 461 return ByteBuffer.wrap(EMPTY_CHUNK_BYTES); 462 } 463 464 /* Returns a header for a particular chunk size */ 465 private static ByteBuffer getHeader(int size){ 466 String hexStr = Integer.toHexString(size); 467 byte[] hexBytes = hexStr.getBytes(US_ASCII); 468 byte[] header = new byte[hexStr.length()+2]; 469 System.arraycopy(hexBytes, 0, header, 0, hexBytes.length); 470 header[hexBytes.length] = CRLF[0]; 471 header[hexBytes.length+1] = CRLF[1]; 472 return ByteBuffer.wrap(header); 473 } 474 } | 1 /* 2 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.io.IOException; 29 import java.lang.System.Logger.Level; 30 import java.net.URI; 31 import java.nio.ByteBuffer; 32 import java.util.ArrayList; 33 import java.util.List; 34 import java.util.Map; 35 import java.net.InetSocketAddress; 36 import java.util.Objects; 37 import java.util.concurrent.Flow; 38 39 import jdk.incubator.http.Http1Exchange.Http1BodySubscriber; 40 import jdk.incubator.http.internal.common.HttpHeadersImpl; 41 import jdk.incubator.http.internal.common.Log; 42 import jdk.incubator.http.internal.common.Utils; 43 44 import static java.nio.charset.StandardCharsets.US_ASCII; 45 46 /** 47 * An HTTP/1.1 request. 48 */ 49 class Http1Request { 50 private final HttpRequestImpl request; 51 private final Http1Exchange<?> http1Exchange; 52 private final HttpConnection connection; 53 private final HttpRequest.BodyPublisher requestPublisher; 54 private final HttpHeaders userHeaders; 55 private final HttpHeadersImpl systemHeaders; 56 private volatile boolean streaming; 57 private volatile long contentLength; 58 59 Http1Request(HttpRequestImpl request, 60 Http1Exchange<?> http1Exchange) 61 throws IOException 62 { 63 this.request = request; 64 this.http1Exchange = http1Exchange; 65 this.connection = http1Exchange.connection(); 66 this.requestPublisher = request.requestPublisher; // may be null 67 this.userHeaders = request.getUserHeaders(); 68 this.systemHeaders = request.getSystemHeaders(); 69 } 70 71 private void logHeaders(String completeHeaders) { 72 if (Log.headers()) { 73 //StringBuilder sb = new StringBuilder(256); 74 //sb.append("REQUEST HEADERS:\n"); 75 //Log.dumpHeaders(sb, " ", systemHeaders); 76 //Log.dumpHeaders(sb, " ", userHeaders); 77 //Log.logHeaders(sb.toString()); 78 79 String s = completeHeaders.replaceAll("\r\n", "\n"); 80 Log.logHeaders("REQUEST HEADERS:\n" + s); 81 } 82 } 83 84 private void collectHeaders0(StringBuilder sb) { 85 collectHeaders1(sb, systemHeaders); 86 collectHeaders1(sb, userHeaders); 87 sb.append("\r\n"); 88 } 89 90 private void collectHeaders1(StringBuilder sb, HttpHeaders headers) { 91 for (Map.Entry<String,List<String>> entry : headers.map().entrySet()) { 92 String key = entry.getKey(); 93 List<String> values = entry.getValue(); 94 for (String value : values) { 95 sb.append(key).append(": ").append(value).append("\r\n"); 96 } 97 } 98 } 99 100 private String getPathAndQuery(URI uri) { 101 String path = uri.getPath(); 102 String query = uri.getQuery(); 103 if (path == null || path.equals("")) { 104 path = "/"; 105 } 106 if (query == null) { 107 query = ""; 108 } 109 if (query.equals("")) { 110 return path; 111 } else { 112 return path + "?" + query; 113 } 114 } 115 125 boolean defaultPort; 126 if (port == -1) { 127 defaultPort = true; 128 } else if (request.secure()) { 129 defaultPort = port == 443; 130 } else { 131 defaultPort = port == 80; 132 } 133 134 if (defaultPort) { 135 return host; 136 } else { 137 return host + ":" + Integer.toString(port); 138 } 139 } 140 141 private String requestURI() { 142 URI uri = request.uri(); 143 String method = request.method(); 144 145 if ((request.proxy() == null && !method.equals("CONNECT")) 146 || request.isWebSocket()) { 147 return getPathAndQuery(uri); 148 } 149 if (request.secure()) { 150 if (request.method().equals("CONNECT")) { 151 // use authority for connect itself 152 return authorityString(request.authority()); 153 } else { 154 // requests over tunnel do not require full URL 155 return getPathAndQuery(uri); 156 } 157 } 158 if (request.method().equals("CONNECT")) { 159 // use authority for connect itself 160 return authorityString(request.authority()); 161 } 162 163 return uri == null? authorityString(request.authority()) : uri.toString(); 164 } 165 166 private boolean finished; 167 168 synchronized boolean finished() { 169 return finished; 170 } 171 172 synchronized void setFinished() { 173 finished = true; 174 } 175 176 List<ByteBuffer> headers() { 177 if (Log.requests() && request != null) { 178 Log.logRequest(request.toString()); 179 } 180 String uriString = requestURI(); 181 StringBuilder sb = new StringBuilder(64); 182 sb.append(request.method()) 183 .append(' ') 184 .append(uriString) 185 .append(" HTTP/1.1\r\n"); 186 187 URI uri = request.uri(); 188 if (uri != null) { 189 systemHeaders.setHeader("Host", hostString()); 190 } 191 if (request == null || requestPublisher == null) { 192 // Not a user request, or maybe a method, e.g. GET, with no body. 193 contentLength = 0; 194 } else { 195 contentLength = requestPublisher.contentLength(); 196 } 197 198 if (contentLength == 0) { 199 systemHeaders.setHeader("Content-Length", "0"); 200 } else if (contentLength > 0) { 201 systemHeaders.setHeader("Content-Length", Long.toString(contentLength)); 202 streaming = false; 203 } else { 204 streaming = true; 205 systemHeaders.setHeader("Transfer-encoding", "chunked"); 206 } 207 collectHeaders0(sb); 208 String hs = sb.toString(); 209 logHeaders(hs); 210 ByteBuffer b = ByteBuffer.wrap(hs.getBytes(US_ASCII)); 211 return List.of(b); 212 } 213 214 Http1BodySubscriber continueRequest() { 215 Http1BodySubscriber subscriber; 216 if (streaming) { 217 subscriber = new StreamSubscriber(); 218 requestPublisher.subscribe(subscriber); 219 } else { 220 if (contentLength == 0) 221 return null; 222 223 subscriber = new FixedContentSubscriber(); 224 requestPublisher.subscribe(subscriber); 225 } 226 return subscriber; 227 } 228 229 class StreamSubscriber extends Http1BodySubscriber { 230 231 @Override 232 public void onSubscribe(Flow.Subscription subscription) { 233 if (this.subscription != null) { 234 Throwable t = new IllegalStateException("already subscribed"); 235 http1Exchange.appendToOutgoing(t); 236 } else { 237 this.subscription = subscription; 238 } 239 } 240 241 @Override 242 public void onNext(ByteBuffer item) { 243 Objects.requireNonNull(item); 244 if (complete) { 245 Throwable t = new IllegalStateException("subscription already completed"); 246 http1Exchange.appendToOutgoing(t); 247 } else { 248 int chunklen = item.remaining(); 249 ArrayList<ByteBuffer> l = new ArrayList<>(3); 250 l.add(getHeader(chunklen)); 251 l.add(item); 252 l.add(ByteBuffer.wrap(CRLF)); 253 http1Exchange.appendToOutgoing(l); 254 } 255 } 256 257 @Override 258 public void onError(Throwable throwable) { 259 if (complete) 260 return; 261 262 subscription.cancel(); 263 http1Exchange.appendToOutgoing(throwable); 264 } 265 266 @Override 267 public void onComplete() { 268 if (complete) { 269 Throwable t = new IllegalStateException("subscription already completed"); 270 http1Exchange.appendToOutgoing(t); 271 } else { 272 ArrayList<ByteBuffer> l = new ArrayList<>(2); 273 l.add(ByteBuffer.wrap(EMPTY_CHUNK_BYTES)); 274 l.add(ByteBuffer.wrap(CRLF)); 275 complete = true; 276 //setFinished(); 277 http1Exchange.appendToOutgoing(l); 278 http1Exchange.appendToOutgoing(COMPLETED); 279 setFinished(); // TODO: before or after,? does it matter? 280 281 } 282 } 283 } 284 285 class FixedContentSubscriber extends Http1BodySubscriber { 286 287 private volatile long contentWritten; 288 289 @Override 290 public void onSubscribe(Flow.Subscription subscription) { 291 if (this.subscription != null) { 292 Throwable t = new IllegalStateException("already subscribed"); 293 http1Exchange.appendToOutgoing(t); 294 } else { 295 this.subscription = subscription; 296 } 297 } 298 299 @Override 300 public void onNext(ByteBuffer item) { 301 debug.log(Level.DEBUG, "onNext"); 302 Objects.requireNonNull(item); 303 if (complete) { 304 Throwable t = new IllegalStateException("subscription already completed"); 305 http1Exchange.appendToOutgoing(t); 306 } else { 307 long writing = item.remaining(); 308 long written = (contentWritten += writing); 309 310 if (written > contentLength) { 311 subscription.cancel(); 312 String msg = connection.getConnectionFlow() 313 + " [" + Thread.currentThread().getName() +"] " 314 + "Too many bytes in request body. Expected: " 315 + contentLength + ", got: " + written; 316 http1Exchange.appendToOutgoing(new IOException(msg)); 317 } else { 318 http1Exchange.appendToOutgoing(List.of(item)); 319 } 320 } 321 } 322 323 @Override 324 public void onError(Throwable throwable) { 325 debug.log(Level.DEBUG, "onError"); 326 if (complete) // TODO: error? 327 return; 328 329 subscription.cancel(); 330 http1Exchange.appendToOutgoing(throwable); 331 } 332 333 @Override 334 public void onComplete() { 335 debug.log(Level.DEBUG, "onComplete"); 336 if (complete) { 337 Throwable t = new IllegalStateException("subscription already completed"); 338 http1Exchange.appendToOutgoing(t); 339 } else { 340 complete = true; 341 long written = contentWritten; 342 if (contentLength > written) { 343 subscription.cancel(); 344 Throwable t = new IOException(connection.getConnectionFlow() 345 + " [" + Thread.currentThread().getName() +"] " 346 + "Too few bytes returned by the publisher (" 347 + written + "/" 348 + contentLength + ")"); 349 http1Exchange.appendToOutgoing(t); 350 } else { 351 http1Exchange.appendToOutgoing(COMPLETED); 352 } 353 } 354 } 355 } 356 357 private static final byte[] CRLF = {'\r', '\n'}; 358 private static final byte[] EMPTY_CHUNK_BYTES = {'0', '\r', '\n'}; 359 360 /** Returns a header for a particular chunk size */ 361 private static ByteBuffer getHeader(int size) { 362 String hexStr = Integer.toHexString(size); 363 byte[] hexBytes = hexStr.getBytes(US_ASCII); 364 byte[] header = new byte[hexStr.length()+2]; 365 System.arraycopy(hexBytes, 0, header, 0, hexBytes.length); 366 header[hexBytes.length] = CRLF[0]; 367 header[hexBytes.length+1] = CRLF[1]; 368 return ByteBuffer.wrap(header); 369 } 370 371 static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. 372 final System.Logger debug = Utils.getDebugLogger(this::toString, DEBUG); 373 374 } |