< prev index next >
1 /*
2 * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 */
24 package java.net.http;
25
26 import java.io.IOException;
27 import java.net.InetSocketAddress;
28 import java.net.StandardSocketOptions;
29 import java.nio.ByteBuffer;
30 import java.nio.channels.SelectableChannel;
31 import java.nio.channels.SelectionKey;
32 import java.nio.channels.SocketChannel;
33 import java.util.concurrent.CompletableFuture;
34 import java.util.function.Consumer;
35
36 /**
37 * Plain raw TCP connection direct to destination. 2 modes
38 * 1) Blocking used by http/1. In this case the connect is actually non
39 * blocking but the request is sent blocking. The first byte of a response
40 * is received non-blocking and the remainder of the response is received
41 * blocking
42 * 2) Non-blocking. In this case (for http/2) the connection is actually opened
43 * blocking but all reads and writes are done non-blocking under the
44 * control of a Http2Connection object.
45 */
46 class PlainHttpConnection extends HttpConnection implements AsyncConnection {
47
48 protected SocketChannel chan;
49 private volatile boolean connected;
50 private boolean closed;
51 Consumer<ByteBuffer> asyncReceiver;
52 Consumer<Throwable> errorReceiver;
53 Queue<ByteBuffer> asyncOutputQ;
54 final Object reading = new Object();
55 final Object writing = new Object();
56
57 @Override
58 public void startReading() {
59 try {
60 client.registerEvent(new ReadEvent());
61 } catch (IOException e) {
62 shutdown();
63 }
64 }
65
66 class ConnectEvent extends AsyncEvent {
67 CompletableFuture<Void> cf;
68
69 ConnectEvent(CompletableFuture<Void> cf) {
70 super(AsyncEvent.BLOCKING);
71 this.cf = cf;
72 }
73
74 @Override
75 public SelectableChannel channel() {
76 return chan;
77 }
78
79 @Override
80 public int interestOps() {
81 return SelectionKey.OP_CONNECT;
82 }
83
84 @Override
85 public void handle() {
86 try {
87 chan.finishConnect();
88 } catch (IOException e) {
89 cf.completeExceptionally(e);
90 }
91 connected = true;
92 cf.complete(null);
93 }
94
95 @Override
96 public void abort() {
97 close();
98 }
99 }
100
101 @Override
102 public CompletableFuture<Void> connectAsync() {
103 CompletableFuture<Void> plainFuture = new CompletableFuture<>();
104 try {
105 chan.configureBlocking(false);
106 chan.connect(address);
107 client.registerEvent(new ConnectEvent(plainFuture));
108 } catch (IOException e) {
109 plainFuture.completeExceptionally(e);
110 }
111 return plainFuture;
112 }
113
114 @Override
115 public void connect() throws IOException {
116 chan.connect(address);
117 connected = true;
118 }
119
120 @Override
121 SocketChannel channel() {
122 return chan;
123 }
124
125 PlainHttpConnection(InetSocketAddress addr, HttpClientImpl client) {
126 super(addr, client);
127 try {
128 this.chan = SocketChannel.open();
129 int bufsize = client.getReceiveBufferSize();
130 chan.setOption(StandardSocketOptions.SO_RCVBUF, bufsize);
131 } catch (IOException e) {
132 throw new InternalError(e);
133 }
134 }
135
136 @Override
137 long write(ByteBuffer[] buffers, int start, int number) throws IOException {
138 if (mode != Mode.ASYNC)
139 return chan.write(buffers, start, number);
140 // async
141 synchronized(writing) {
142 int qlen = asyncOutputQ.size();
143 ByteBuffer[] bufs = Utils.reduce(buffers, start, number);
144 long n = Utils.remaining(bufs);
145 asyncOutputQ.putAll(bufs);
146 if (qlen == 0)
147 asyncOutput();
148 return n;
149 }
150 }
151
152 ByteBuffer asyncBuffer = null;
153
154 void asyncOutput() {
155 synchronized (writing) {
156 try {
157 while (true) {
158 if (asyncBuffer == null) {
159 asyncBuffer = asyncOutputQ.poll();
160 if (asyncBuffer == null) {
161 return;
162 }
163 }
164 if (!asyncBuffer.hasRemaining()) {
165 asyncBuffer = null;
166 continue;
167 }
168 int n = chan.write(asyncBuffer);
169 //System.err.printf("Written %d bytes to chan\n", n);
170 if (n == 0) {
171 client.registerEvent(new WriteEvent());
172 return;
173 }
174 }
175 } catch (IOException e) {
176 shutdown();
177 }
178 }
179 }
180
181 @Override
182 long write(ByteBuffer buffer) throws IOException {
183 if (mode != Mode.ASYNC)
184 return chan.write(buffer);
185 // async
186 synchronized(writing) {
187 int qlen = asyncOutputQ.size();
188 long n = buffer.remaining();
189 asyncOutputQ.put(buffer);
190 if (qlen == 0)
191 asyncOutput();
192 return n;
193 }
194 }
195
196 @Override
197 public String toString() {
198 return "PlainHttpConnection: " + super.toString();
199 }
200
201 /**
202 * Close this connection
203 */
204 @Override
205 public synchronized void close() {
206 if (closed)
207 return;
208 closed = true;
209 try {
210 Log.logError("Closing: " + toString());
211 //System.out.println("Closing: " + this);
212 chan.close();
213 } catch (IOException e) {}
214 }
215
216 @Override
217 protected ByteBuffer readImpl(int length) throws IOException {
218 ByteBuffer buf = getBuffer(); // TODO not using length
219 int n = chan.read(buf);
220 if (n == -1) {
221 return null;
222 }
223 buf.flip();
224 String s = "Receive (" + n + " bytes) ";
225 //debugPrint(s, buf);
226 return buf;
227 }
228
229 void shutdown() {
230 close();
231 errorReceiver.accept(new IOException("Connection aborted"));
232 }
233
234 void asyncRead() {
235 synchronized (reading) {
236 try {
237 while (true) {
238 ByteBuffer buf = getBuffer();
239 int n = chan.read(buf);
240 //System.err.printf("Read %d bytes from chan\n", n);
241 if (n == -1) {
242 throw new IOException();
243 }
244 if (n == 0) {
245 returnBuffer(buf);
246 return;
247 }
248 buf.flip();
249 asyncReceiver.accept(buf);
250 }
251 } catch (IOException e) {
252 shutdown();
253 }
254 }
255 }
256
257 @Override
258 protected int readImpl(ByteBuffer buf) throws IOException {
259 int mark = buf.position();
260 int n;
261 // FIXME: this hack works in conjunction with the corresponding change
262 // in java.net.http.RawChannel.registerEvent
263 if ((n = buffer.remaining()) != 0) {
264 buf.put(buffer);
265 } else {
266 n = chan.read(buf);
267 }
268 if (n == -1) {
269 return -1;
270 }
271 Utils.flipToMark(buf, mark);
272 String s = "Receive (" + n + " bytes) ";
273 //debugPrint(s, buf);
274 return n;
275 }
276
277 @Override
278 ConnectionPool.CacheKey cacheKey() {
279 return new ConnectionPool.CacheKey(address, null);
280 }
281
282 @Override
283 synchronized boolean connected() {
284 return connected;
285 }
286
287 // used for all output in HTTP/2
288 class WriteEvent extends AsyncEvent {
289 WriteEvent() {
290 super(0);
291 }
292
293 @Override
294 public SelectableChannel channel() {
295 return chan;
296 }
297
298 @Override
299 public int interestOps() {
300 return SelectionKey.OP_WRITE;
301 }
302
303 @Override
304 public void handle() {
305 asyncOutput();
306 }
307
308 @Override
309 public void abort() {
310 shutdown();
311 }
312 }
313
314 // used for all input in HTTP/2
315 class ReadEvent extends AsyncEvent {
316 ReadEvent() {
317 super(AsyncEvent.REPEATING); // && !BLOCKING
318 }
319
320 @Override
321 public SelectableChannel channel() {
322 return chan;
323 }
324
325 @Override
326 public int interestOps() {
327 return SelectionKey.OP_READ;
328 }
329
330 @Override
331 public void handle() {
332 asyncRead();
333 }
334
335 @Override
336 public void abort() {
337 shutdown();
338 }
339
340 }
341
342 // used in blocking channels only
343 class ReceiveResponseEvent extends AsyncEvent {
344 CompletableFuture<Void> cf;
345
346 ReceiveResponseEvent(CompletableFuture<Void> cf) {
347 super(AsyncEvent.BLOCKING);
348 this.cf = cf;
349 }
350 @Override
351 public SelectableChannel channel() {
352 return chan;
353 }
354
355 @Override
356 public void handle() {
357 cf.complete(null);
358 }
359
360 @Override
361 public int interestOps() {
362 return SelectionKey.OP_READ;
363 }
364
365 @Override
366 public void abort() {
367 close();
368 }
369 }
370
371 @Override
372 boolean isSecure() {
373 return false;
374 }
375
376 @Override
377 boolean isProxied() {
378 return false;
379 }
380
381 @Override
382 public synchronized void setAsyncCallbacks(Consumer<ByteBuffer> asyncReceiver,
383 Consumer<Throwable> errorReceiver) {
384 this.asyncReceiver = asyncReceiver;
385 this.errorReceiver = errorReceiver;
386 asyncOutputQ = new Queue<>();
387 asyncOutputQ.registerPutCallback(this::asyncOutput);
388 }
389
390 @Override
391 CompletableFuture<Void> whenReceivingResponse() {
392 CompletableFuture<Void> cf = new CompletableFuture<>();
393 try {
394 client.registerEvent(new ReceiveResponseEvent(cf));
395 } catch (IOException e) {
396 cf.completeExceptionally(e);
397 }
398 return cf;
399 }
400 }
< prev index next >