1 /* 2 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.io.IOException; 29 import java.lang.System.Logger.Level; 30 import java.net.URI; 31 import java.nio.ByteBuffer; 32 import java.util.ArrayList; 33 import java.util.List; 34 import java.util.Map; 35 import java.net.InetSocketAddress; 36 import java.util.Objects; 37 import java.util.concurrent.Flow; 38 39 import jdk.incubator.http.Http1Exchange.Http1BodySubscriber; 40 import jdk.incubator.http.internal.common.HttpHeadersImpl; 41 import jdk.incubator.http.internal.common.Log; 42 import jdk.incubator.http.internal.common.Utils; 43 44 import static java.nio.charset.StandardCharsets.US_ASCII; 45 46 /** 47 * An HTTP/1.1 request. 48 */ 49 class Http1Request { 50 private final HttpRequestImpl request; 51 private final Http1Exchange<?> http1Exchange; 52 private final HttpConnection connection; 53 private final HttpRequest.BodyPublisher requestPublisher; 54 private final HttpHeaders userHeaders; 55 private final HttpHeadersImpl systemHeaders; 56 private volatile boolean streaming; 57 private volatile long contentLength; 58 59 Http1Request(HttpRequestImpl request, 60 Http1Exchange<?> http1Exchange) 61 throws IOException 62 { 63 this.request = request; 64 this.http1Exchange = http1Exchange; 65 this.connection = http1Exchange.connection(); 66 this.requestPublisher = request.requestPublisher; // may be null 67 this.userHeaders = request.getUserHeaders(); 68 this.systemHeaders = request.getSystemHeaders(); 69 } 70 71 private void logHeaders(String completeHeaders) { 72 if (Log.headers()) { 73 //StringBuilder sb = new StringBuilder(256); 74 //sb.append("REQUEST HEADERS:\n"); 75 //Log.dumpHeaders(sb, " ", systemHeaders); 76 //Log.dumpHeaders(sb, " ", userHeaders); 77 //Log.logHeaders(sb.toString()); 78 79 String s = completeHeaders.replaceAll("\r\n", "\n"); 80 Log.logHeaders("REQUEST HEADERS:\n" + s); 81 } 82 } 83 84 private void collectHeaders0(StringBuilder sb) { 85 collectHeaders1(sb, systemHeaders); 86 collectHeaders1(sb, userHeaders); 87 sb.append("\r\n"); 88 } 89 90 private void collectHeaders1(StringBuilder sb, HttpHeaders headers) { 91 for (Map.Entry<String,List<String>> entry : headers.map().entrySet()) { 92 String key = entry.getKey(); 93 List<String> values = entry.getValue(); 94 for (String value : values) { 95 sb.append(key).append(": ").append(value).append("\r\n"); 96 } 97 } 98 } 99 100 private String getPathAndQuery(URI uri) { 101 String path = uri.getPath(); 102 String query = uri.getQuery(); 103 if (path == null || path.equals("")) { 104 path = "/"; 105 } 106 if (query == null) { 107 query = ""; 108 } 109 if (query.equals("")) { 110 return path; 111 } else { 112 return path + "?" + query; 113 } 114 } 115 116 private String authorityString(InetSocketAddress addr) { 117 return addr.getHostString() + ":" + addr.getPort(); 118 } 119 120 private String hostString() { 121 URI uri = request.uri(); 122 int port = uri.getPort(); 123 String host = uri.getHost(); 124 125 boolean defaultPort; 126 if (port == -1) { 127 defaultPort = true; 128 } else if (request.secure()) { 129 defaultPort = port == 443; 130 } else { 131 defaultPort = port == 80; 132 } 133 134 if (defaultPort) { 135 return host; 136 } else { 137 return host + ":" + Integer.toString(port); 138 } 139 } 140 141 private String requestURI() { 142 URI uri = request.uri(); 143 String method = request.method(); 144 145 if ((request.proxy() == null && !method.equals("CONNECT")) 146 || request.isWebSocket()) { 147 return getPathAndQuery(uri); 148 } 149 if (request.secure()) { 150 if (request.method().equals("CONNECT")) { 151 // use authority for connect itself 152 return authorityString(request.authority()); 153 } else { 154 // requests over tunnel do not require full URL 155 return getPathAndQuery(uri); 156 } 157 } 158 if (request.method().equals("CONNECT")) { 159 // use authority for connect itself 160 return authorityString(request.authority()); 161 } 162 163 return uri == null? authorityString(request.authority()) : uri.toString(); 164 } 165 166 private boolean finished; 167 168 synchronized boolean finished() { 169 return finished; 170 } 171 172 synchronized void setFinished() { 173 finished = true; 174 } 175 176 List<ByteBuffer> headers() { 177 if (Log.requests() && request != null) { 178 Log.logRequest(request.toString()); 179 } 180 String uriString = requestURI(); 181 StringBuilder sb = new StringBuilder(64); 182 sb.append(request.method()) 183 .append(' ') 184 .append(uriString) 185 .append(" HTTP/1.1\r\n"); 186 187 URI uri = request.uri(); 188 if (uri != null) { 189 systemHeaders.setHeader("Host", hostString()); 190 } 191 if (request == null || requestPublisher == null) { 192 // Not a user request, or maybe a method, e.g. GET, with no body. 193 contentLength = 0; 194 } else { 195 contentLength = requestPublisher.contentLength(); 196 } 197 198 if (contentLength == 0) { 199 systemHeaders.setHeader("Content-Length", "0"); 200 } else if (contentLength > 0) { 201 systemHeaders.setHeader("Content-Length", Long.toString(contentLength)); 202 streaming = false; 203 } else { 204 streaming = true; 205 systemHeaders.setHeader("Transfer-encoding", "chunked"); 206 } 207 collectHeaders0(sb); 208 String hs = sb.toString(); 209 logHeaders(hs); 210 ByteBuffer b = ByteBuffer.wrap(hs.getBytes(US_ASCII)); 211 return List.of(b); 212 } 213 214 Http1BodySubscriber continueRequest() { 215 Http1BodySubscriber subscriber; 216 if (streaming) { 217 subscriber = new StreamSubscriber(); 218 requestPublisher.subscribe(subscriber); 219 } else { 220 if (contentLength == 0) 221 return null; 222 223 subscriber = new FixedContentSubscriber(); 224 requestPublisher.subscribe(subscriber); 225 } 226 return subscriber; 227 } 228 229 class StreamSubscriber extends Http1BodySubscriber { 230 231 @Override 232 public void onSubscribe(Flow.Subscription subscription) { 233 if (this.subscription != null) { 234 Throwable t = new IllegalStateException("already subscribed"); 235 http1Exchange.appendToOutgoing(t); 236 } else { 237 this.subscription = subscription; 238 } 239 } 240 241 @Override 242 public void onNext(ByteBuffer item) { 243 Objects.requireNonNull(item); 244 if (complete) { 245 Throwable t = new IllegalStateException("subscription already completed"); 246 http1Exchange.appendToOutgoing(t); 247 } else { 248 int chunklen = item.remaining(); 249 ArrayList<ByteBuffer> l = new ArrayList<>(3); 250 l.add(getHeader(chunklen)); 251 l.add(item); 252 l.add(ByteBuffer.wrap(CRLF)); 253 http1Exchange.appendToOutgoing(l); 254 } 255 } 256 257 @Override 258 public void onError(Throwable throwable) { 259 if (complete) 260 return; 261 262 subscription.cancel(); 263 http1Exchange.appendToOutgoing(throwable); 264 } 265 266 @Override 267 public void onComplete() { 268 if (complete) { 269 Throwable t = new IllegalStateException("subscription already completed"); 270 http1Exchange.appendToOutgoing(t); 271 } else { 272 ArrayList<ByteBuffer> l = new ArrayList<>(2); 273 l.add(ByteBuffer.wrap(EMPTY_CHUNK_BYTES)); 274 l.add(ByteBuffer.wrap(CRLF)); 275 complete = true; 276 //setFinished(); 277 http1Exchange.appendToOutgoing(l); 278 http1Exchange.appendToOutgoing(COMPLETED); 279 setFinished(); // TODO: before or after,? does it matter? 280 281 } 282 } 283 } 284 285 class FixedContentSubscriber extends Http1BodySubscriber { 286 287 private volatile long contentWritten; 288 289 @Override 290 public void onSubscribe(Flow.Subscription subscription) { 291 if (this.subscription != null) { 292 Throwable t = new IllegalStateException("already subscribed"); 293 http1Exchange.appendToOutgoing(t); 294 } else { 295 this.subscription = subscription; 296 } 297 } 298 299 @Override 300 public void onNext(ByteBuffer item) { 301 debug.log(Level.DEBUG, "onNext"); 302 Objects.requireNonNull(item); 303 if (complete) { 304 Throwable t = new IllegalStateException("subscription already completed"); 305 http1Exchange.appendToOutgoing(t); 306 } else { 307 long writing = item.remaining(); 308 long written = (contentWritten += writing); 309 310 if (written > contentLength) { 311 subscription.cancel(); 312 String msg = connection.getConnectionFlow() 313 + " [" + Thread.currentThread().getName() +"] " 314 + "Too many bytes in request body. Expected: " 315 + contentLength + ", got: " + written; 316 http1Exchange.appendToOutgoing(new IOException(msg)); 317 } else { 318 http1Exchange.appendToOutgoing(List.of(item)); 319 } 320 } 321 } 322 323 @Override 324 public void onError(Throwable throwable) { 325 debug.log(Level.DEBUG, "onError"); 326 if (complete) // TODO: error? 327 return; 328 329 subscription.cancel(); 330 http1Exchange.appendToOutgoing(throwable); 331 } 332 333 @Override 334 public void onComplete() { 335 debug.log(Level.DEBUG, "onComplete"); 336 if (complete) { 337 Throwable t = new IllegalStateException("subscription already completed"); 338 http1Exchange.appendToOutgoing(t); 339 } else { 340 complete = true; 341 long written = contentWritten; 342 if (contentLength > written) { 343 subscription.cancel(); 344 Throwable t = new IOException(connection.getConnectionFlow() 345 + " [" + Thread.currentThread().getName() +"] " 346 + "Too few bytes returned by the publisher (" 347 + written + "/" 348 + contentLength + ")"); 349 http1Exchange.appendToOutgoing(t); 350 } else { 351 http1Exchange.appendToOutgoing(COMPLETED); 352 } 353 } 354 } 355 } 356 357 private static final byte[] CRLF = {'\r', '\n'}; 358 private static final byte[] EMPTY_CHUNK_BYTES = {'0', '\r', '\n'}; 359 360 /** Returns a header for a particular chunk size */ 361 private static ByteBuffer getHeader(int size) { 362 String hexStr = Integer.toHexString(size); 363 byte[] hexBytes = hexStr.getBytes(US_ASCII); 364 byte[] header = new byte[hexStr.length()+2]; 365 System.arraycopy(hexBytes, 0, header, 0, hexBytes.length); 366 header[hexBytes.length] = CRLF[0]; 367 header[hexBytes.length+1] = CRLF[1]; 368 return ByteBuffer.wrap(header); 369 } 370 371 static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. 372 final System.Logger debug = Utils.getDebugLogger(this::toString, DEBUG); 373 374 }