Print this page
Split |
Close |
Expand all |
Collapse all |
--- old/src/share/classes/sun/rmi/transport/tcp/TCPTransport.java
+++ new/src/share/classes/sun/rmi/transport/tcp/TCPTransport.java
1 1 /*
2 2 * Copyright (c) 1996, 2005, Oracle and/or its affiliates. All rights reserved.
3 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 4 *
5 5 * This code is free software; you can redistribute it and/or modify it
6 6 * under the terms of the GNU General Public License version 2 only, as
7 7 * published by the Free Software Foundation. Oracle designates this
8 8 * particular file as subject to the "Classpath" exception as provided
9 9 * by Oracle in the LICENSE file that accompanied this code.
10 10 *
11 11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 14 * version 2 for more details (a copy is included in the LICENSE file that
15 15 * accompanied this code).
16 16 *
17 17 * You should have received a copy of the GNU General Public License version
18 18 * 2 along with this work; if not, write to the Free Software Foundation,
19 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 20 *
21 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 22 * or visit www.oracle.com if you need additional information or have any
23 23 * questions.
24 24 */
25 25 package sun.rmi.transport.tcp;
26 26
27 27 import java.lang.ref.Reference;
28 28 import java.lang.ref.SoftReference;
29 29 import java.lang.ref.WeakReference;
30 30 import java.lang.reflect.InvocationTargetException;
31 31 import java.io.DataInputStream;
32 32 import java.io.DataOutputStream;
33 33 import java.io.IOException;
34 34 import java.io.InputStream;
35 35 import java.io.OutputStream;
36 36 import java.io.BufferedInputStream;
37 37 import java.io.BufferedOutputStream;
38 38 import java.net.InetAddress;
39 39 import java.net.ServerSocket;
40 40 import java.net.Socket;
41 41 import java.rmi.RemoteException;
42 42 import java.rmi.server.ExportException;
43 43 import java.rmi.server.LogStream;
44 44 import java.rmi.server.RMIFailureHandler;
45 45 import java.rmi.server.RMISocketFactory;
46 46 import java.rmi.server.RemoteCall;
47 47 import java.rmi.server.ServerNotActiveException;
48 48 import java.rmi.server.UID;
49 49 import java.security.AccessControlContext;
50 50 import java.security.AccessController;
51 51 import java.util.ArrayList;
52 52 import java.util.LinkedList;
53 53 import java.util.List;
54 54 import java.util.Map;
55 55 import java.util.WeakHashMap;
56 56 import java.util.logging.Level;
57 57 import java.util.concurrent.ExecutorService;
58 58 import java.util.concurrent.RejectedExecutionException;
59 59 import java.util.concurrent.SynchronousQueue;
60 60 import java.util.concurrent.ThreadFactory;
61 61 import java.util.concurrent.ThreadPoolExecutor;
62 62 import java.util.concurrent.TimeUnit;
63 63 import java.util.concurrent.atomic.AtomicInteger;
64 64 import sun.rmi.runtime.Log;
65 65 import sun.rmi.runtime.NewThreadAction;
66 66 import sun.rmi.transport.Channel;
67 67 import sun.rmi.transport.Connection;
68 68 import sun.rmi.transport.DGCAckHandler;
69 69 import sun.rmi.transport.Endpoint;
70 70 import sun.rmi.transport.StreamRemoteCall;
71 71 import sun.rmi.transport.Target;
72 72 import sun.rmi.transport.Transport;
73 73 import sun.rmi.transport.TransportConstants;
74 74 import sun.rmi.transport.proxy.HttpReceiveSocket;
75 75 import sun.security.action.GetIntegerAction;
76 76 import sun.security.action.GetLongAction;
77 77 import sun.security.action.GetPropertyAction;
78 78
79 79 /**
80 80 * TCPTransport is the socket-based implementation of the RMI Transport
81 81 * abstraction.
82 82 *
83 83 * @author Ann Wollrath
84 84 * @author Peter Jones
85 85 */
86 86 public class TCPTransport extends Transport {
87 87
88 88 /* tcp package log */
89 89 static final Log tcpLog = Log.getLog("sun.rmi.transport.tcp", "tcp",
90 90 LogStream.parseLevel(AccessController.doPrivileged(
91 91 new GetPropertyAction("sun.rmi.transport.tcp.logLevel"))));
92 92
93 93 /** maximum number of connection handler threads */
94 94 private static final int maxConnectionThreads = // default no limit
95 95 AccessController.doPrivileged(
96 96 new GetIntegerAction("sun.rmi.transport.tcp.maxConnectionThreads",
97 97 Integer.MAX_VALUE));
98 98
99 99 /** keep alive time for idle connection handler threads */
100 100 private static final long threadKeepAliveTime = // default 1 minute
101 101 AccessController.doPrivileged(
102 102 new GetLongAction("sun.rmi.transport.tcp.threadKeepAliveTime",
103 103 60000));
104 104
105 105 /** thread pool for connection handlers */
106 106 private static final ExecutorService connectionThreadPool =
107 107 new ThreadPoolExecutor(0, maxConnectionThreads,
108 108 threadKeepAliveTime, TimeUnit.MILLISECONDS,
109 109 new SynchronousQueue<Runnable>(),
110 110 new ThreadFactory() {
111 111 public Thread newThread(Runnable runnable) {
↓ open down ↓ |
111 lines elided |
↑ open up ↑ |
112 112 return AccessController.doPrivileged(new NewThreadAction(
113 113 runnable, "TCP Connection(idle)", true, true));
114 114 }
115 115 });
116 116
117 117 /** total connections handled */
118 118 private static final AtomicInteger connectionCount = new AtomicInteger(0);
119 119
120 120 /** client host for the current thread's connection */
121 121 private static final ThreadLocal<ConnectionHandler>
122 - threadConnectionHandler = new ThreadLocal<ConnectionHandler>();
122 + threadConnectionHandler = new ThreadLocal<>();
123 123
124 124 /** endpoints for this transport */
125 125 private final LinkedList<TCPEndpoint> epList;
126 126 /** number of objects exported on this transport */
127 127 private int exportCount = 0;
128 128 /** server socket for this transport */
129 129 private ServerSocket server = null;
130 130 /** table mapping endpoints to channels */
131 131 private final Map<TCPEndpoint,Reference<TCPChannel>> channelTable =
132 - new WeakHashMap<TCPEndpoint,Reference<TCPChannel>>();
132 + new WeakHashMap<>();
133 133
134 134 static final RMISocketFactory defaultSocketFactory =
135 135 RMISocketFactory.getDefaultSocketFactory();
136 136
137 137 /** number of milliseconds in accepted-connection timeout.
138 138 * Warning: this should be greater than 15 seconds (the client-side
139 139 * timeout), and defaults to 2 hours.
140 140 * The maximum representable value is slightly more than 24 days
141 141 * and 20 hours.
142 142 */
143 143 private static final int connectionReadTimeout = // default 2 hours
144 144 AccessController.doPrivileged(
145 145 new GetIntegerAction("sun.rmi.transport.tcp.readTimeout",
146 146 2 * 3600 * 1000));
147 147
148 148 /**
149 149 * Constructs a TCPTransport.
150 150 */
151 151 TCPTransport(LinkedList<TCPEndpoint> epList) {
152 152 // assert ((epList.size() != null) && (epList.size() >= 1))
153 153 this.epList = epList;
154 154 if (tcpLog.isLoggable(Log.BRIEF)) {
155 155 tcpLog.log(Log.BRIEF, "Version = " +
156 156 TransportConstants.Version + ", ep = " + getEndpoint());
157 157 }
158 158 }
159 159
160 160 /**
161 161 * Closes all cached connections in every channel subordinated to this
162 162 * transport. Currently, this only closes outgoing connections.
163 163 */
164 164 public void shedConnectionCaches() {
165 165 List<TCPChannel> channels;
166 166 synchronized (channelTable) {
167 167 channels = new ArrayList<TCPChannel>(channelTable.values().size());
168 168 for (Reference<TCPChannel> ref : channelTable.values()) {
169 169 TCPChannel ch = ref.get();
170 170 if (ch != null) {
171 171 channels.add(ch);
172 172 }
173 173 }
174 174 }
175 175 for (TCPChannel channel : channels) {
176 176 channel.shedCache();
177 177 }
178 178 }
179 179
180 180 /**
181 181 * Returns a <I>Channel</I> that generates connections to the
182 182 * endpoint <I>ep</I>. A Channel is an object that creates and
183 183 * manages connections of a particular type to some particular
184 184 * address space.
185 185 * @param ep the endpoint to which connections will be generated.
186 186 * @return the channel or null if the transport cannot
187 187 * generate connections to this endpoint
188 188 */
189 189 public TCPChannel getChannel(Endpoint ep) {
190 190 TCPChannel ch = null;
191 191 if (ep instanceof TCPEndpoint) {
192 192 synchronized (channelTable) {
193 193 Reference<TCPChannel> ref = channelTable.get(ep);
194 194 if (ref != null) {
195 195 ch = ref.get();
196 196 }
197 197 if (ch == null) {
198 198 TCPEndpoint tcpEndpoint = (TCPEndpoint) ep;
199 199 ch = new TCPChannel(this, tcpEndpoint);
200 200 channelTable.put(tcpEndpoint,
201 201 new WeakReference<TCPChannel>(ch));
202 202 }
203 203 }
204 204 }
205 205 return ch;
206 206 }
207 207
208 208 /**
209 209 * Removes the <I>Channel</I> that generates connections to the
210 210 * endpoint <I>ep</I>.
211 211 */
212 212 public void free(Endpoint ep) {
213 213 if (ep instanceof TCPEndpoint) {
214 214 synchronized (channelTable) {
215 215 Reference<TCPChannel> ref = channelTable.remove(ep);
216 216 if (ref != null) {
217 217 TCPChannel channel = ref.get();
218 218 if (channel != null) {
219 219 channel.shedCache();
220 220 }
221 221 }
222 222 }
223 223 }
224 224 }
225 225
226 226 /**
227 227 * Export the object so that it can accept incoming calls.
228 228 */
229 229 public void exportObject(Target target) throws RemoteException {
230 230 /*
231 231 * Ensure that a server socket is listening, and count this
232 232 * export while synchronized to prevent the server socket from
233 233 * being closed due to concurrent unexports.
234 234 */
235 235 synchronized (this) {
236 236 listen();
237 237 exportCount++;
238 238 }
239 239
240 240 /*
241 241 * Try to add the Target to the exported object table; keep
242 242 * counting this export (to keep server socket open) only if
243 243 * that succeeds.
244 244 */
245 245 boolean ok = false;
246 246 try {
247 247 super.exportObject(target);
248 248 ok = true;
249 249 } finally {
250 250 if (!ok) {
251 251 synchronized (this) {
252 252 decrementExportCount();
253 253 }
254 254 }
255 255 }
256 256 }
257 257
258 258 protected synchronized void targetUnexported() {
259 259 decrementExportCount();
260 260 }
261 261
262 262 /**
263 263 * Decrements the count of exported objects, closing the current
264 264 * server socket if the count reaches zero.
265 265 **/
266 266 private void decrementExportCount() {
267 267 assert Thread.holdsLock(this);
268 268 exportCount--;
269 269 if (exportCount == 0 && getEndpoint().getListenPort() != 0) {
270 270 ServerSocket ss = server;
271 271 server = null;
272 272 try {
273 273 ss.close();
274 274 } catch (IOException e) {
275 275 }
276 276 }
277 277 }
278 278
279 279 /**
280 280 * Verify that the current access control context has permission to
281 281 * accept the connection being dispatched by the current thread.
282 282 */
283 283 protected void checkAcceptPermission(AccessControlContext acc) {
284 284 SecurityManager sm = System.getSecurityManager();
285 285 if (sm == null) {
286 286 return;
287 287 }
288 288 ConnectionHandler h = threadConnectionHandler.get();
289 289 if (h == null) {
290 290 throw new Error(
291 291 "checkAcceptPermission not in ConnectionHandler thread");
292 292 }
293 293 h.checkAcceptPermission(sm, acc);
294 294 }
295 295
296 296 private TCPEndpoint getEndpoint() {
297 297 synchronized (epList) {
298 298 return epList.getLast();
299 299 }
300 300 }
301 301
302 302 /**
303 303 * Listen on transport's endpoint.
304 304 */
305 305 private void listen() throws RemoteException {
306 306 assert Thread.holdsLock(this);
307 307 TCPEndpoint ep = getEndpoint();
308 308 int port = ep.getPort();
309 309
310 310 if (server == null) {
311 311 if (tcpLog.isLoggable(Log.BRIEF)) {
312 312 tcpLog.log(Log.BRIEF,
313 313 "(port " + port + ") create server socket");
314 314 }
315 315
316 316 try {
317 317 server = ep.newServerSocket();
318 318 /*
319 319 * Don't retry ServerSocket if creation fails since
320 320 * "port in use" will cause export to hang if an
321 321 * RMIFailureHandler is not installed.
322 322 */
323 323 Thread t = AccessController.doPrivileged(
324 324 new NewThreadAction(new AcceptLoop(server),
325 325 "TCP Accept-" + port, true));
326 326 t.start();
327 327 } catch (java.net.BindException e) {
328 328 throw new ExportException("Port already in use: " + port, e);
329 329 } catch (IOException e) {
330 330 throw new ExportException("Listen failed on port: " + port, e);
331 331 }
332 332
333 333 } else {
334 334 // otherwise verify security access to existing server socket
335 335 SecurityManager sm = System.getSecurityManager();
336 336 if (sm != null) {
337 337 sm.checkListen(port);
338 338 }
339 339 }
340 340 }
341 341
342 342 /**
343 343 * Worker for accepting connections from a server socket.
344 344 **/
345 345 private class AcceptLoop implements Runnable {
346 346
347 347 private final ServerSocket serverSocket;
348 348
349 349 // state for throttling loop on exceptions (local to accept thread)
350 350 private long lastExceptionTime = 0L;
351 351 private int recentExceptionCount;
352 352
353 353 AcceptLoop(ServerSocket serverSocket) {
354 354 this.serverSocket = serverSocket;
355 355 }
356 356
357 357 public void run() {
358 358 try {
359 359 executeAcceptLoop();
360 360 } finally {
361 361 try {
362 362 /*
363 363 * Only one accept loop is started per server
364 364 * socket, so after no more connections will be
365 365 * accepted, ensure that the server socket is no
366 366 * longer listening.
367 367 */
368 368 serverSocket.close();
369 369 } catch (IOException e) {
370 370 }
371 371 }
372 372 }
373 373
374 374 /**
375 375 * Accepts connections from the server socket and executes
376 376 * handlers for them in the thread pool.
377 377 **/
378 378 private void executeAcceptLoop() {
379 379 if (tcpLog.isLoggable(Log.BRIEF)) {
380 380 tcpLog.log(Log.BRIEF, "listening on port " +
381 381 getEndpoint().getPort());
382 382 }
383 383
384 384 while (true) {
385 385 Socket socket = null;
386 386 try {
387 387 socket = serverSocket.accept();
388 388
389 389 /*
390 390 * Find client host name (or "0.0.0.0" if unknown)
391 391 */
392 392 InetAddress clientAddr = socket.getInetAddress();
393 393 String clientHost = (clientAddr != null
394 394 ? clientAddr.getHostAddress()
395 395 : "0.0.0.0");
396 396
397 397 /*
398 398 * Execute connection handler in the thread pool,
399 399 * which uses non-system threads.
400 400 */
401 401 try {
402 402 connectionThreadPool.execute(
403 403 new ConnectionHandler(socket, clientHost));
404 404 } catch (RejectedExecutionException e) {
405 405 closeSocket(socket);
406 406 tcpLog.log(Log.BRIEF,
407 407 "rejected connection from " + clientHost);
408 408 }
409 409
410 410 } catch (Throwable t) {
411 411 try {
412 412 /*
413 413 * If the server socket has been closed, such
414 414 * as because there are no more exported
415 415 * objects, then we expect accept to throw an
416 416 * exception, so just terminate normally.
417 417 */
418 418 if (serverSocket.isClosed()) {
419 419 break;
420 420 }
421 421
422 422 try {
423 423 if (tcpLog.isLoggable(Level.WARNING)) {
424 424 tcpLog.log(Level.WARNING,
425 425 "accept loop for " + serverSocket +
426 426 " throws", t);
427 427 }
428 428 } catch (Throwable tt) {
429 429 }
430 430 } finally {
431 431 /*
432 432 * Always close the accepted socket (if any)
433 433 * if an exception occurs, but only after
434 434 * logging an unexpected exception.
435 435 */
436 436 if (socket != null) {
437 437 closeSocket(socket);
438 438 }
439 439 }
440 440
441 441 /*
442 442 * In case we're running out of file descriptors,
443 443 * release resources held in caches.
444 444 */
445 445 if (!(t instanceof SecurityException)) {
446 446 try {
447 447 TCPEndpoint.shedConnectionCaches();
448 448 } catch (Throwable tt) {
449 449 }
450 450 }
451 451
452 452 /*
453 453 * A NoClassDefFoundError can occur if no file
454 454 * descriptors are available, in which case this
455 455 * loop should not terminate.
456 456 */
457 457 if (t instanceof Exception ||
458 458 t instanceof OutOfMemoryError ||
459 459 t instanceof NoClassDefFoundError)
460 460 {
461 461 if (!continueAfterAcceptFailure(t)) {
462 462 return;
463 463 }
464 464 // continue loop
465 465 } else {
466 466 throw (Error) t;
467 467 }
468 468 }
469 469 }
470 470 }
471 471
472 472 /**
473 473 * Returns true if the accept loop should continue after the
474 474 * specified exception has been caught, or false if the accept
475 475 * loop should terminate (closing the server socket). If
476 476 * there is an RMIFailureHandler, this method returns the
477 477 * result of passing the specified exception to it; otherwise,
478 478 * this method always returns true, after sleeping to throttle
479 479 * the accept loop if necessary.
480 480 **/
481 481 private boolean continueAfterAcceptFailure(Throwable t) {
482 482 RMIFailureHandler fh = RMISocketFactory.getFailureHandler();
483 483 if (fh != null) {
484 484 return fh.failure(t instanceof Exception ? (Exception) t :
485 485 new InvocationTargetException(t));
486 486 } else {
487 487 throttleLoopOnException();
488 488 return true;
489 489 }
490 490 }
491 491
492 492 /**
493 493 * Throttles the accept loop after an exception has been
494 494 * caught: if a burst of 10 exceptions in 5 seconds occurs,
495 495 * then wait for 10 seconds to curb busy CPU usage.
496 496 **/
497 497 private void throttleLoopOnException() {
498 498 long now = System.currentTimeMillis();
499 499 if (lastExceptionTime == 0L || (now - lastExceptionTime) > 5000) {
500 500 // last exception was long ago (or this is the first)
501 501 lastExceptionTime = now;
502 502 recentExceptionCount = 0;
503 503 } else {
504 504 // exception burst window was started recently
505 505 if (++recentExceptionCount >= 10) {
506 506 try {
507 507 Thread.sleep(10000);
508 508 } catch (InterruptedException ignore) {
509 509 }
510 510 }
511 511 }
512 512 }
513 513 }
514 514
515 515 /** close socket and eat exception */
516 516 private static void closeSocket(Socket sock) {
517 517 try {
518 518 sock.close();
519 519 } catch (IOException ex) {
520 520 // eat exception
521 521 }
522 522 }
523 523
524 524 /**
525 525 * handleMessages decodes transport operations and handles messages
526 526 * appropriately. If an exception occurs during message handling,
527 527 * the socket is closed.
528 528 */
529 529 void handleMessages(Connection conn, boolean persistent) {
530 530 int port = getEndpoint().getPort();
531 531
532 532 try {
533 533 DataInputStream in = new DataInputStream(conn.getInputStream());
534 534 do {
535 535 int op = in.read(); // transport op
536 536 if (op == -1) {
537 537 if (tcpLog.isLoggable(Log.BRIEF)) {
538 538 tcpLog.log(Log.BRIEF, "(port " +
539 539 port + ") connection closed");
540 540 }
541 541 break;
542 542 }
543 543
544 544 if (tcpLog.isLoggable(Log.BRIEF)) {
545 545 tcpLog.log(Log.BRIEF, "(port " + port +
546 546 ") op = " + op);
547 547 }
548 548
549 549 switch (op) {
550 550 case TransportConstants.Call:
551 551 // service incoming RMI call
552 552 RemoteCall call = new StreamRemoteCall(conn);
553 553 if (serviceCall(call) == false)
554 554 return;
555 555 break;
556 556
557 557 case TransportConstants.Ping:
558 558 // send ack for ping
559 559 DataOutputStream out =
560 560 new DataOutputStream(conn.getOutputStream());
561 561 out.writeByte(TransportConstants.PingAck);
562 562 conn.releaseOutputStream();
563 563 break;
564 564
565 565 case TransportConstants.DGCAck:
566 566 DGCAckHandler.received(UID.read(in));
567 567 break;
568 568
569 569 default:
570 570 throw new IOException("unknown transport op " + op);
571 571 }
572 572 } while (persistent);
573 573
574 574 } catch (IOException e) {
575 575 // exception during processing causes connection to close (below)
576 576 if (tcpLog.isLoggable(Log.BRIEF)) {
577 577 tcpLog.log(Log.BRIEF, "(port " + port +
578 578 ") exception: ", e);
579 579 }
580 580 } finally {
581 581 try {
582 582 conn.close();
583 583 } catch (IOException ex) {
584 584 // eat exception
585 585 }
586 586 }
587 587 }
588 588
589 589 /**
590 590 * Returns the client host for the current thread's connection. Throws
591 591 * ServerNotActiveException if no connection is active for this thread.
592 592 */
593 593 public static String getClientHost() throws ServerNotActiveException {
594 594 ConnectionHandler h = threadConnectionHandler.get();
595 595 if (h != null) {
596 596 return h.getClientHost();
597 597 } else {
598 598 throw new ServerNotActiveException("not in a remote call");
599 599 }
600 600 }
601 601
602 602 /**
603 603 * Services messages on accepted connection
604 604 */
605 605 private class ConnectionHandler implements Runnable {
606 606
607 607 /** int value of "POST" in ASCII (Java's specified data formats
608 608 * make this once-reviled tactic again socially acceptable) */
609 609 private static final int POST = 0x504f5354;
610 610
611 611 /** most recently accept-authorized AccessControlContext */
612 612 private AccessControlContext okContext;
613 613 /** cache of accept-authorized AccessControlContexts */
614 614 private Map<AccessControlContext,
615 615 Reference<AccessControlContext>> authCache;
616 616 /** security manager which authorized contexts in authCache */
617 617 private SecurityManager cacheSecurityManager = null;
618 618
619 619 private Socket socket;
620 620 private String remoteHost;
621 621
622 622 ConnectionHandler(Socket socket, String remoteHost) {
623 623 this.socket = socket;
624 624 this.remoteHost = remoteHost;
625 625 }
626 626
627 627 String getClientHost() {
628 628 return remoteHost;
629 629 }
630 630
631 631 /**
632 632 * Verify that the given AccessControlContext has permission to
633 633 * accept this connection.
634 634 */
635 635 void checkAcceptPermission(SecurityManager sm,
636 636 AccessControlContext acc)
637 637 {
638 638 /*
639 639 * Note: no need to synchronize on cache-related fields, since this
640 640 * method only gets called from the ConnectionHandler's thread.
641 641 */
642 642 if (sm != cacheSecurityManager) {
643 643 okContext = null;
644 644 authCache = new WeakHashMap<AccessControlContext,
645 645 Reference<AccessControlContext>>();
646 646 cacheSecurityManager = sm;
647 647 }
648 648 if (acc.equals(okContext) || authCache.containsKey(acc)) {
649 649 return;
650 650 }
651 651 InetAddress addr = socket.getInetAddress();
652 652 String host = (addr != null) ? addr.getHostAddress() : "*";
653 653
654 654 sm.checkAccept(host, socket.getPort());
655 655
656 656 authCache.put(acc, new SoftReference<AccessControlContext>(acc));
657 657 okContext = acc;
658 658 }
659 659
660 660 public void run() {
661 661 Thread t = Thread.currentThread();
662 662 String name = t.getName();
663 663 try {
664 664 t.setName("RMI TCP Connection(" +
665 665 connectionCount.incrementAndGet() +
666 666 ")-" + remoteHost);
667 667 run0();
668 668 } finally {
669 669 t.setName(name);
670 670 }
671 671 }
672 672
673 673 private void run0() {
674 674 TCPEndpoint endpoint = getEndpoint();
675 675 int port = endpoint.getPort();
676 676
677 677 threadConnectionHandler.set(this);
678 678
679 679 // set socket to disable Nagle's algorithm (always send
680 680 // immediately)
681 681 // TBD: should this be left up to socket factory instead?
682 682 try {
683 683 socket.setTcpNoDelay(true);
684 684 } catch (Exception e) {
685 685 // if we fail to set this, ignore and proceed anyway
686 686 }
687 687 // set socket to timeout after excessive idle time
688 688 try {
689 689 if (connectionReadTimeout > 0)
690 690 socket.setSoTimeout(connectionReadTimeout);
691 691 } catch (Exception e) {
692 692 // too bad, continue anyway
693 693 }
694 694
695 695 try {
696 696 InputStream sockIn = socket.getInputStream();
697 697 InputStream bufIn = sockIn.markSupported()
698 698 ? sockIn
699 699 : new BufferedInputStream(sockIn);
700 700
701 701 // Read magic (or HTTP wrapper)
702 702 bufIn.mark(4);
703 703 DataInputStream in = new DataInputStream(bufIn);
704 704 int magic = in.readInt();
705 705
706 706 if (magic == POST) {
707 707 tcpLog.log(Log.BRIEF, "decoding HTTP-wrapped call");
708 708
709 709 // It's really a HTTP-wrapped request. Repackage
710 710 // the socket in a HttpReceiveSocket, reinitialize
711 711 // sockIn and in, and reread magic.
712 712 bufIn.reset(); // unread "POST"
713 713
714 714 try {
715 715 socket = new HttpReceiveSocket(socket, bufIn, null);
716 716 remoteHost = "0.0.0.0";
717 717 sockIn = socket.getInputStream();
718 718 bufIn = new BufferedInputStream(sockIn);
719 719 in = new DataInputStream(bufIn);
720 720 magic = in.readInt();
721 721
722 722 } catch (IOException e) {
723 723 throw new RemoteException("Error HTTP-unwrapping call",
724 724 e);
725 725 }
726 726 }
727 727 // bufIn's mark will invalidate itself when it overflows
728 728 // so it doesn't have to be turned off
729 729
730 730 // read and verify transport header
731 731 short version = in.readShort();
732 732 if (magic != TransportConstants.Magic ||
733 733 version != TransportConstants.Version) {
734 734 // protocol mismatch detected...
735 735 // just close socket: this would recurse if we marshal an
736 736 // exception to the client and the protocol at other end
737 737 // doesn't match.
738 738 closeSocket(socket);
739 739 return;
740 740 }
741 741
742 742 OutputStream sockOut = socket.getOutputStream();
743 743 BufferedOutputStream bufOut =
744 744 new BufferedOutputStream(sockOut);
745 745 DataOutputStream out = new DataOutputStream(bufOut);
746 746
747 747 int remotePort = socket.getPort();
748 748
749 749 if (tcpLog.isLoggable(Log.BRIEF)) {
750 750 tcpLog.log(Log.BRIEF, "accepted socket from [" +
751 751 remoteHost + ":" + remotePort + "]");
752 752 }
753 753
754 754 TCPEndpoint ep;
755 755 TCPChannel ch;
756 756 TCPConnection conn;
757 757
758 758 // send ack (or nack) for protocol
759 759 byte protocol = in.readByte();
760 760 switch (protocol) {
761 761 case TransportConstants.SingleOpProtocol:
762 762 // no ack for protocol
763 763
764 764 // create dummy channel for receiving messages
765 765 ep = new TCPEndpoint(remoteHost, socket.getLocalPort(),
766 766 endpoint.getClientSocketFactory(),
767 767 endpoint.getServerSocketFactory());
768 768 ch = new TCPChannel(TCPTransport.this, ep);
769 769 conn = new TCPConnection(ch, socket, bufIn, bufOut);
770 770
771 771 // read input messages
772 772 handleMessages(conn, false);
773 773 break;
774 774
775 775 case TransportConstants.StreamProtocol:
776 776 // send ack
777 777 out.writeByte(TransportConstants.ProtocolAck);
778 778
779 779 // suggest endpoint (in case client doesn't know host name)
780 780 if (tcpLog.isLoggable(Log.VERBOSE)) {
781 781 tcpLog.log(Log.VERBOSE, "(port " + port +
782 782 ") " + "suggesting " + remoteHost + ":" +
783 783 remotePort);
784 784 }
785 785
786 786 out.writeUTF(remoteHost);
787 787 out.writeInt(remotePort);
788 788 out.flush();
789 789
790 790 // read and discard (possibly bogus) endpoint
791 791 // REMIND: would be faster to read 2 bytes then skip N+4
792 792 String clientHost = in.readUTF();
793 793 int clientPort = in.readInt();
794 794 if (tcpLog.isLoggable(Log.VERBOSE)) {
795 795 tcpLog.log(Log.VERBOSE, "(port " + port +
796 796 ") client using " + clientHost + ":" + clientPort);
797 797 }
798 798
799 799 // create dummy channel for receiving messages
800 800 // (why not use clientHost and clientPort?)
801 801 ep = new TCPEndpoint(remoteHost, socket.getLocalPort(),
802 802 endpoint.getClientSocketFactory(),
803 803 endpoint.getServerSocketFactory());
804 804 ch = new TCPChannel(TCPTransport.this, ep);
805 805 conn = new TCPConnection(ch, socket, bufIn, bufOut);
806 806
807 807 // read input messages
808 808 handleMessages(conn, true);
809 809 break;
810 810
811 811 case TransportConstants.MultiplexProtocol:
812 812 if (tcpLog.isLoggable(Log.VERBOSE)) {
813 813 tcpLog.log(Log.VERBOSE, "(port " + port +
814 814 ") accepting multiplex protocol");
815 815 }
816 816
817 817 // send ack
818 818 out.writeByte(TransportConstants.ProtocolAck);
819 819
820 820 // suggest endpoint (in case client doesn't already have one)
821 821 if (tcpLog.isLoggable(Log.VERBOSE)) {
822 822 tcpLog.log(Log.VERBOSE, "(port " + port +
823 823 ") suggesting " + remoteHost + ":" + remotePort);
824 824 }
825 825
826 826 out.writeUTF(remoteHost);
827 827 out.writeInt(remotePort);
828 828 out.flush();
829 829
830 830 // read endpoint client has decided to use
831 831 ep = new TCPEndpoint(in.readUTF(), in.readInt(),
832 832 endpoint.getClientSocketFactory(),
833 833 endpoint.getServerSocketFactory());
834 834 if (tcpLog.isLoggable(Log.VERBOSE)) {
835 835 tcpLog.log(Log.VERBOSE, "(port " +
836 836 port + ") client using " +
837 837 ep.getHost() + ":" + ep.getPort());
838 838 }
839 839
840 840 ConnectionMultiplexer multiplexer;
841 841 synchronized (channelTable) {
842 842 // create or find channel for this endpoint
843 843 ch = getChannel(ep);
844 844 multiplexer =
845 845 new ConnectionMultiplexer(ch, bufIn, sockOut,
846 846 false);
847 847 ch.useMultiplexer(multiplexer);
848 848 }
849 849 multiplexer.run();
850 850 break;
851 851
852 852 default:
853 853 // protocol not understood, send nack and close socket
854 854 out.writeByte(TransportConstants.ProtocolNack);
855 855 out.flush();
856 856 break;
857 857 }
858 858
859 859 } catch (IOException e) {
860 860 // socket in unknown state: destroy socket
861 861 tcpLog.log(Log.BRIEF, "terminated with exception:", e);
862 862 } finally {
863 863 closeSocket(socket);
864 864 }
865 865 }
866 866 }
867 867 }
↓ open down ↓ |
725 lines elided |
↑ open up ↑ |
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX