9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package com.sun.corba.se.impl.transport; 27 28 import java.io.IOException; 29 import java.nio.channels.ClosedChannelException; 30 import java.nio.channels.SelectableChannel; 31 import java.nio.channels.SelectionKey; 32 import java.nio.channels.Selector; 33 import java.util.ArrayList; 34 import java.util.HashMap; 35 import java.util.Map; 36 import java.util.Iterator; 37 import java.util.List; 38 39 import com.sun.corba.se.pept.broker.Broker; 40 import com.sun.corba.se.pept.transport.Acceptor; 41 import com.sun.corba.se.pept.transport.Connection; 42 import com.sun.corba.se.pept.transport.EventHandler; 43 import com.sun.corba.se.pept.transport.ListenerThread; 44 import com.sun.corba.se.pept.transport.ReaderThread; 45 46 import com.sun.corba.se.spi.logging.CORBALogDomains; 47 import com.sun.corba.se.spi.orb.ORB; 48 import com.sun.corba.se.spi.orbutil.threadpool.Work; 49 import com.sun.corba.se.spi.orbutil.threadpool.NoSuchThreadPoolException; 50 import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException; 51 52 import com.sun.corba.se.impl.logging.ORBUtilSystemException; 53 import com.sun.corba.se.impl.orbutil.ORBUtility; 54 55 /** 56 * @author Harold Carr 57 */ 58 class SelectorImpl 94 95 public long getTimeout() 96 { 97 return timeout; 98 } 99 100 public void registerInterestOps(EventHandler eventHandler) 101 { 102 if (orb.transportDebugFlag) { 103 dprint(".registerInterestOps:-> " + eventHandler); 104 } 105 106 SelectionKey selectionKey = eventHandler.getSelectionKey(); 107 if (selectionKey.isValid()) { 108 int ehOps = eventHandler.getInterestOps(); 109 SelectionKeyAndOp keyAndOp = new SelectionKeyAndOp(selectionKey, ehOps); 110 synchronized(interestOpsList) { 111 interestOpsList.add(keyAndOp); 112 } 113 // tell Selector Thread there's an update to a SelectorKey's Ops 114 selector.wakeup(); 115 } 116 else { 117 wrapper.selectionKeyInvalid(eventHandler.toString()); 118 if (orb.transportDebugFlag) { 119 dprint(".registerInterestOps: EventHandler SelectionKey not valid " + eventHandler); 120 } 121 } 122 123 if (orb.transportDebugFlag) { 124 dprint(".registerInterestOps:<- "); 125 } 126 } 127 128 public void registerForEvent(EventHandler eventHandler) 129 { 130 if (orb.transportDebugFlag) { 131 dprint(".registerForEvent: " + eventHandler); 132 } 133 134 if (isClosed()) { 135 if (orb.transportDebugFlag) { 169 { 170 if (orb.transportDebugFlag) { 171 dprint(".unregisterForEvent: " + eventHandler); 172 } 173 174 if (isClosed()) { 175 if (orb.transportDebugFlag) { 176 dprint(".unregisterForEvent: closed: " + eventHandler); 177 } 178 return; 179 } 180 181 if (eventHandler.shouldUseSelectThreadToWait()) { 182 SelectionKey selectionKey ; 183 synchronized(deferredRegistrations) { 184 selectionKey = eventHandler.getSelectionKey(); 185 } 186 if (selectionKey != null) { 187 selectionKey.cancel(); 188 } 189 selector.wakeup(); 190 return; 191 } 192 193 switch (eventHandler.getInterestOps()) { 194 case SelectionKey.OP_ACCEPT : 195 destroyListenerThread(eventHandler); 196 break; 197 case SelectionKey.OP_READ : 198 destroyReaderThread(eventHandler); 199 break; 200 default: 201 if (orb.transportDebugFlag) { 202 dprint(".unregisterForEvent: default: " + eventHandler); 203 } 204 throw new RuntimeException( 205 "SelectorImpl.uregisterForEvent: unknown interest ops"); 206 } 207 } 208 209 public void close() 222 setClosed(true); 223 224 Iterator i; 225 226 // Kill listeners. 227 228 i = listenerThreads.values().iterator(); 229 while (i.hasNext()) { 230 ListenerThread listenerThread = (ListenerThread) i.next(); 231 listenerThread.close(); 232 } 233 234 // Kill readers. 235 236 i = readerThreads.values().iterator(); 237 while (i.hasNext()) { 238 ReaderThread readerThread = (ReaderThread) i.next(); 239 readerThread.close(); 240 } 241 242 // Selector 243 244 try { 245 if (selector != null) { 246 // wakeup Selector thread to process close request 247 selector.wakeup(); 248 } 249 } catch (Throwable t) { 250 if (orb.transportDebugFlag) { 251 dprint(".close: selector.close: " + t); 252 } 253 } 254 } 255 256 /////////////////////////////////////////////////// 257 // 258 // Thread methods. 259 // 260 261 public void run() 262 { 263 setName("SelectorThread"); 264 while (!closed) { 265 try { 266 int n = 0; 267 if (timeout == 0 && orb.transportDebugFlag) { 268 dprint(".run: Beginning of selection cycle"); 269 } 270 handleDeferredRegistrations(); 271 enableInterestOps(); 272 try { 273 n = selector.select(timeout); 274 } catch (IOException e) { 275 if (orb.transportDebugFlag) { 276 dprint(".run: selector.select: " + e); 277 } 278 } 279 if (closed) { 280 selector.close(); 281 if (orb.transportDebugFlag) { 282 dprint(".run: closed - .run return"); 283 } 284 return; 285 } 286 /* 287 if (timeout == 0 && orb.transportDebugFlag) { 288 dprint(".run: selector.select() returned: " + n); 289 } 290 if (n == 0) { 291 continue; 292 } 293 */ 294 Iterator iterator = selector.selectedKeys().iterator(); 295 if (orb.transportDebugFlag) { 296 if (iterator.hasNext()) { 297 dprint(".run: n = " + n); 298 } 299 } 300 while (iterator.hasNext()) { 301 SelectionKey selectionKey = (SelectionKey) iterator.next(); 302 iterator.remove(); 303 EventHandler eventHandler = (EventHandler) 304 selectionKey.attachment(); 305 try { 306 eventHandler.handleEvent(); 307 } catch (Throwable t) { 308 if (orb.transportDebugFlag) { 309 dprint(".run: eventHandler.handleEvent", t); 310 } 311 } 312 } 313 if (timeout == 0 && orb.transportDebugFlag) { 314 dprint(".run: End of selection cycle"); 315 } 316 } catch (Throwable t) { 317 // IMPORTANT: ignore all errors so the select thread keeps running. 318 // Otherwise a guaranteed hang. 319 if (orb.transportDebugFlag) { 320 dprint(".run: ignoring", t); 321 } 322 } 323 } 324 } 325 326 ///////////////////////////////////////////////////// 327 // 328 // Implementation. 329 // 330 331 private synchronized boolean isClosed () 332 { 333 return closed; 334 } 335 336 private synchronized void setClosed(boolean closed) 337 { 338 this.closed = closed; 339 } 340 341 private void startSelector() 342 { 343 try { 344 selector = Selector.open(); 345 } catch (IOException e) { 346 if (orb.transportDebugFlag) { 347 dprint(".startSelector: Selector.open: IOException: " + e); 348 } 349 // REVISIT - better handling/reporting 350 RuntimeException rte = 351 new RuntimeException(".startSelector: Selector.open exception"); 352 rte.initCause(e); 353 throw rte; 354 } 355 setDaemon(true); 356 start(); 357 selectorStarted = true; 358 if (orb.transportDebugFlag) { 359 dprint(".startSelector: selector.start completed."); 360 } 361 } 362 363 private void handleDeferredRegistrations() 364 { 365 synchronized (deferredRegistrations) { 366 int deferredListSize = deferredRegistrations.size(); 367 for (int i = 0; i < deferredListSize; i++) { 368 EventHandler eventHandler = 369 (EventHandler)deferredRegistrations.get(i); 370 if (orb.transportDebugFlag) { 371 dprint(".handleDeferredRegistrations: " + eventHandler); 372 } 373 SelectableChannel channel = eventHandler.getChannel(); 374 SelectionKey selectionKey = null; 375 try { 376 selectionKey = 377 channel.register(selector, 378 eventHandler.getInterestOps(), 379 (Object)eventHandler); 380 } catch (ClosedChannelException e) { 381 if (orb.transportDebugFlag) { 382 dprint(".handleDeferredRegistrations: " + e); 383 } 384 } 385 eventHandler.setSelectionKey(selectionKey); 386 } 387 deferredRegistrations.clear(); 388 } 389 } 390 391 private void enableInterestOps() 392 { 393 synchronized (interestOpsList) { 394 int listSize = interestOpsList.size(); 395 if (listSize > 0) { 396 if (orb.transportDebugFlag) { 397 dprint(".enableInterestOps:->"); 398 } 399 SelectionKey selectionKey = null; 400 SelectionKeyAndOp keyAndOp = null; 401 int keyOp, selectionKeyOps = 0; 402 for (int i = 0; i < listSize; i++) { | 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package com.sun.corba.se.impl.transport; 27 28 import java.io.IOException; 29 import java.net.ServerSocket; 30 import java.nio.channels.ClosedChannelException; 31 import java.nio.channels.SelectableChannel; 32 import java.nio.channels.ServerSocketChannel; 33 import java.nio.channels.SelectionKey; 34 import java.nio.channels.Selector; 35 import java.nio.channels.ClosedSelectorException; 36 import java.util.ArrayList; 37 import java.util.HashMap; 38 import java.util.Map; 39 import java.util.Iterator; 40 import java.util.List; 41 42 43 import com.sun.corba.se.pept.broker.Broker; 44 import com.sun.corba.se.pept.transport.Acceptor; 45 import com.sun.corba.se.pept.transport.Connection; 46 import com.sun.corba.se.pept.transport.EventHandler; 47 import com.sun.corba.se.pept.transport.ListenerThread; 48 import com.sun.corba.se.pept.transport.ReaderThread; 49 50 import com.sun.corba.se.spi.logging.CORBALogDomains; 51 import com.sun.corba.se.spi.orb.ORB; 52 import com.sun.corba.se.spi.orbutil.threadpool.Work; 53 import com.sun.corba.se.spi.orbutil.threadpool.NoSuchThreadPoolException; 54 import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException; 55 56 import com.sun.corba.se.impl.logging.ORBUtilSystemException; 57 import com.sun.corba.se.impl.orbutil.ORBUtility; 58 59 /** 60 * @author Harold Carr 61 */ 62 class SelectorImpl 98 99 public long getTimeout() 100 { 101 return timeout; 102 } 103 104 public void registerInterestOps(EventHandler eventHandler) 105 { 106 if (orb.transportDebugFlag) { 107 dprint(".registerInterestOps:-> " + eventHandler); 108 } 109 110 SelectionKey selectionKey = eventHandler.getSelectionKey(); 111 if (selectionKey.isValid()) { 112 int ehOps = eventHandler.getInterestOps(); 113 SelectionKeyAndOp keyAndOp = new SelectionKeyAndOp(selectionKey, ehOps); 114 synchronized(interestOpsList) { 115 interestOpsList.add(keyAndOp); 116 } 117 // tell Selector Thread there's an update to a SelectorKey's Ops 118 try { 119 if (selector != null) { 120 // wakeup Selector thread to process close request 121 selector.wakeup(); 122 } 123 } catch (Throwable t) { 124 if (orb.transportDebugFlag) { 125 dprint(".registerInterestOps: selector.wakeup: ", t); 126 } 127 } 128 } 129 else { 130 wrapper.selectionKeyInvalid(eventHandler.toString()); 131 if (orb.transportDebugFlag) { 132 dprint(".registerInterestOps: EventHandler SelectionKey not valid " + eventHandler); 133 } 134 } 135 136 if (orb.transportDebugFlag) { 137 dprint(".registerInterestOps:<- "); 138 } 139 } 140 141 public void registerForEvent(EventHandler eventHandler) 142 { 143 if (orb.transportDebugFlag) { 144 dprint(".registerForEvent: " + eventHandler); 145 } 146 147 if (isClosed()) { 148 if (orb.transportDebugFlag) { 182 { 183 if (orb.transportDebugFlag) { 184 dprint(".unregisterForEvent: " + eventHandler); 185 } 186 187 if (isClosed()) { 188 if (orb.transportDebugFlag) { 189 dprint(".unregisterForEvent: closed: " + eventHandler); 190 } 191 return; 192 } 193 194 if (eventHandler.shouldUseSelectThreadToWait()) { 195 SelectionKey selectionKey ; 196 synchronized(deferredRegistrations) { 197 selectionKey = eventHandler.getSelectionKey(); 198 } 199 if (selectionKey != null) { 200 selectionKey.cancel(); 201 } 202 if (selector != null) { 203 selector.wakeup(); 204 } 205 return; 206 } 207 208 switch (eventHandler.getInterestOps()) { 209 case SelectionKey.OP_ACCEPT : 210 destroyListenerThread(eventHandler); 211 break; 212 case SelectionKey.OP_READ : 213 destroyReaderThread(eventHandler); 214 break; 215 default: 216 if (orb.transportDebugFlag) { 217 dprint(".unregisterForEvent: default: " + eventHandler); 218 } 219 throw new RuntimeException( 220 "SelectorImpl.uregisterForEvent: unknown interest ops"); 221 } 222 } 223 224 public void close() 237 setClosed(true); 238 239 Iterator i; 240 241 // Kill listeners. 242 243 i = listenerThreads.values().iterator(); 244 while (i.hasNext()) { 245 ListenerThread listenerThread = (ListenerThread) i.next(); 246 listenerThread.close(); 247 } 248 249 // Kill readers. 250 251 i = readerThreads.values().iterator(); 252 while (i.hasNext()) { 253 ReaderThread readerThread = (ReaderThread) i.next(); 254 readerThread.close(); 255 } 256 257 clearDeferredRegistrations(); 258 259 // Selector 260 261 try { 262 if (selector != null) { 263 // wakeup Selector thread to process close request 264 selector.wakeup(); 265 } 266 } catch (Throwable t) { 267 if (orb.transportDebugFlag) { 268 dprint(".close: selector.wakeup: ", t); 269 } 270 } 271 } 272 273 /////////////////////////////////////////////////// 274 // 275 // Thread methods. 276 // 277 278 public void run() 279 { 280 setName("SelectorThread"); 281 while (!closed) { 282 try { 283 int n = 0; 284 if (timeout == 0 && orb.transportDebugFlag) { 285 dprint(".run: Beginning of selection cycle"); 286 } 287 handleDeferredRegistrations(); 288 enableInterestOps(); 289 try { 290 n = selector.select(timeout); 291 } catch (IOException e) { 292 if (orb.transportDebugFlag) { 293 dprint(".run: selector.select: ", e); 294 } 295 } catch (ClosedSelectorException csEx) { 296 if (orb.transportDebugFlag) { 297 dprint(".run: selector.select: ", csEx); 298 } 299 break; 300 } 301 if (closed) { 302 break; 303 } 304 /* 305 if (timeout == 0 && orb.transportDebugFlag) { 306 dprint(".run: selector.select() returned: " + n); 307 } 308 if (n == 0) { 309 continue; 310 } 311 */ 312 Iterator iterator = selector.selectedKeys().iterator(); 313 if (orb.transportDebugFlag) { 314 if (iterator.hasNext()) { 315 dprint(".run: n = " + n); 316 } 317 } 318 while (iterator.hasNext()) { 319 SelectionKey selectionKey = (SelectionKey) iterator.next(); 320 iterator.remove(); 321 EventHandler eventHandler = (EventHandler) 322 selectionKey.attachment(); 323 try { 324 eventHandler.handleEvent(); 325 } catch (Throwable t) { 326 if (orb.transportDebugFlag) { 327 dprint(".run: eventHandler.handleEvent", t); 328 } 329 } 330 } 331 if (timeout == 0 && orb.transportDebugFlag) { 332 dprint(".run: End of selection cycle"); 333 } 334 } catch (Throwable t) { 335 // IMPORTANT: ignore all errors so the select thread keeps running. 336 // Otherwise a guaranteed hang. 337 if (orb.transportDebugFlag) { 338 dprint(".run: ignoring", t); 339 } 340 } 341 } 342 try { 343 if (selector != null) { 344 if (orb.transportDebugFlag) { 345 dprint(".run: selector.close "); 346 } 347 selector.close(); 348 } 349 } catch (Throwable t) { 350 if (orb.transportDebugFlag) { 351 dprint(".run: selector.close: ", t); 352 } 353 } 354 } 355 356 ///////////////////////////////////////////////////// 357 // 358 // Implementation. 359 // 360 361 private void clearDeferredRegistrations() { 362 synchronized (deferredRegistrations) { 363 int deferredListSize = deferredRegistrations.size(); 364 if (orb.transportDebugFlag) { 365 dprint(".clearDeferredRegistrations:deferred list size == " + deferredListSize); 366 } 367 for (int i = 0; i < deferredListSize; i++) { 368 EventHandler eventHandler = 369 (EventHandler)deferredRegistrations.get(i); 370 if (orb.transportDebugFlag) { 371 dprint(".clearDeferredRegistrations: " + eventHandler); 372 } 373 SelectableChannel channel = eventHandler.getChannel(); 374 SelectionKey selectionKey = null; 375 376 try { 377 if (orb.transportDebugFlag) { 378 dprint(".clearDeferredRegistrations:close channel == " 379 + channel); 380 dprint(".clearDeferredRegistrations:close channel class == " 381 + channel.getClass().getName()); 382 } 383 channel.close(); 384 selectionKey = eventHandler.getSelectionKey(); 385 if (selectionKey != null) { 386 selectionKey.cancel(); 387 selectionKey.attach(null); 388 } 389 } catch (IOException ioEx) { 390 if (orb.transportDebugFlag) { 391 dprint(".clearDeferredRegistrations: ", ioEx); 392 } 393 } 394 } 395 deferredRegistrations.clear(); 396 } 397 } 398 399 private synchronized boolean isClosed () 400 { 401 return closed; 402 } 403 404 private synchronized void setClosed(boolean closed) 405 { 406 this.closed = closed; 407 } 408 409 private void startSelector() 410 { 411 try { 412 selector = Selector.open(); 413 } catch (IOException e) { 414 if (orb.transportDebugFlag) { 415 dprint(".startSelector: Selector.open: IOException: ", e); 416 } 417 // REVISIT - better handling/reporting 418 RuntimeException rte = 419 new RuntimeException(".startSelector: Selector.open exception"); 420 rte.initCause(e); 421 throw rte; 422 } 423 setDaemon(true); 424 start(); 425 selectorStarted = true; 426 if (orb.transportDebugFlag) { 427 dprint(".startSelector: selector.start completed."); 428 } 429 } 430 431 private void handleDeferredRegistrations() 432 { 433 synchronized (deferredRegistrations) { 434 int deferredListSize = deferredRegistrations.size(); 435 for (int i = 0; i < deferredListSize; i++) { 436 EventHandler eventHandler = 437 (EventHandler)deferredRegistrations.get(i); 438 if (orb.transportDebugFlag) { 439 dprint(".handleDeferredRegistrations: " + eventHandler); 440 } 441 SelectableChannel channel = eventHandler.getChannel(); 442 SelectionKey selectionKey = null; 443 try { 444 selectionKey = 445 channel.register(selector, 446 eventHandler.getInterestOps(), 447 (Object)eventHandler); 448 } catch (ClosedChannelException e) { 449 if (orb.transportDebugFlag) { 450 dprint(".handleDeferredRegistrations: ", e); 451 } 452 } 453 eventHandler.setSelectionKey(selectionKey); 454 } 455 deferredRegistrations.clear(); 456 } 457 } 458 459 private void enableInterestOps() 460 { 461 synchronized (interestOpsList) { 462 int listSize = interestOpsList.size(); 463 if (listSize > 0) { 464 if (orb.transportDebugFlag) { 465 dprint(".enableInterestOps:->"); 466 } 467 SelectionKey selectionKey = null; 468 SelectionKeyAndOp keyAndOp = null; 469 int keyOp, selectionKeyOps = 0; 470 for (int i = 0; i < listSize; i++) { |