1 /* 2 * Copyright (c) 2002, 2015, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package com.sun.jmx.remote.internal; 26 27 import java.io.IOException; 28 import java.io.NotSerializableException; 29 30 import java.util.ArrayList; 31 import java.util.HashMap; 32 import java.util.List; 33 import java.util.Map; 34 import java.util.concurrent.Executor; 35 36 import java.security.AccessControlContext; 37 import java.security.AccessController; 38 import java.security.PrivilegedAction; 39 import javax.security.auth.Subject; 40 41 import javax.management.Notification; 42 import javax.management.NotificationListener; 43 import javax.management.NotificationFilter; 44 import javax.management.ObjectName; 45 import javax.management.MBeanServerNotification; 46 import javax.management.InstanceNotFoundException; 47 import javax.management.ListenerNotFoundException; 48 49 import javax.management.remote.NotificationResult; 50 import javax.management.remote.TargetedNotification; 51 52 import com.sun.jmx.remote.util.ClassLogger; 53 import com.sun.jmx.remote.util.EnvHelp; 54 import java.rmi.UnmarshalException; 55 import sun.misc.ManagedLocalsThread; 56 57 58 public abstract class ClientNotifForwarder { 59 60 private final AccessControlContext acc; 61 62 public ClientNotifForwarder(Map<String, ?> env) { 63 this(null, env); 64 } 65 66 private static int threadId; 67 68 /* An Executor that allows at most one executing and one pending 69 Runnable. It uses at most one thread -- as soon as there is 70 no pending Runnable the thread can exit. Another thread is 71 created as soon as there is a new pending Runnable. This 72 Executor is adapted for use in a situation where each Runnable 73 usually schedules up another Runnable. On return from the 74 first one, the second one is immediately executed. So this 75 just becomes a complicated way to write a while loop, but with 76 the advantage that you can replace it with another Executor, 77 for instance one that you are using to execute a bunch of other 78 unrelated work. 79 80 You might expect that a java.util.concurrent.ThreadPoolExecutor 81 with corePoolSize=0 and maximumPoolSize=1 would have the same 82 behavior, but it does not. A ThreadPoolExecutor only creates 83 a new thread when a new task is submitted and the number of 84 existing threads is < corePoolSize. This can never happen when 85 corePoolSize=0, so new threads are never created. Surprising, 86 but there you are. 87 */ 88 private static class LinearExecutor implements Executor { 89 public synchronized void execute(Runnable command) { 90 if (this.command != null) 91 throw new IllegalArgumentException("More than one command"); 92 this.command = command; 93 if (thread == null) { 94 thread = new ManagedLocalsThread( 95 ()-> { 96 while (true) { 97 Runnable r; 98 synchronized (LinearExecutor.this) { 99 if (LinearExecutor.this.command == null) { 100 thread = null; 101 return; 102 } else { 103 r = LinearExecutor.this.command; 104 LinearExecutor.this.command = null; 105 } 106 } 107 r.run(); 108 } 109 }, 110 "ClientNotifForwarder-" + ++threadId 111 ); 112 thread.setDaemon(true); 113 thread.start(); 114 } 115 } 116 117 private Runnable command; 118 private Thread thread; 119 } 120 121 public ClientNotifForwarder(ClassLoader defaultClassLoader, Map<String, ?> env) { 122 maxNotifications = EnvHelp.getMaxFetchNotifNumber(env); 123 timeout = EnvHelp.getFetchTimeout(env); 124 125 /* You can supply an Executor in which the remote call to 126 fetchNotifications will be made. The Executor's execute 127 method reschedules another task, so you must not use 128 an Executor that executes tasks in the caller's thread. */ 129 Executor ex = (Executor) 130 env.get("jmx.remote.x.fetch.notifications.executor"); 131 if (ex == null) 132 ex = new LinearExecutor(); 133 else if (logger.traceOn()) 134 logger.trace("ClientNotifForwarder", "executor is " + ex); 135 136 this.defaultClassLoader = defaultClassLoader; 137 this.executor = ex; 138 this.acc = AccessController.getContext(); 139 } 140 141 /** 142 * Called to fetch notifications from a server. 143 */ 144 abstract protected NotificationResult fetchNotifs(long clientSequenceNumber, 145 int maxNotifications, 146 long timeout) 147 throws IOException, ClassNotFoundException; 148 149 abstract protected Integer addListenerForMBeanRemovedNotif() 150 throws IOException, InstanceNotFoundException; 151 152 abstract protected void removeListenerForMBeanRemovedNotif(Integer id) 153 throws IOException, InstanceNotFoundException, 154 ListenerNotFoundException; 155 156 /** 157 * Used to send out a notification about lost notifs 158 */ 159 abstract protected void lostNotifs(String message, long number); 160 161 162 public synchronized void addNotificationListener(Integer listenerID, 163 ObjectName name, 164 NotificationListener listener, 165 NotificationFilter filter, 166 Object handback, 167 Subject delegationSubject) 168 throws IOException, InstanceNotFoundException { 169 170 if (logger.traceOn()) { 171 logger.trace("addNotificationListener", 172 "Add the listener "+listener+" at "+name); 173 } 174 175 infoList.put(listenerID, 176 new ClientListenerInfo(listenerID, 177 name, 178 listener, 179 filter, 180 handback, 181 delegationSubject)); 182 183 184 init(false); 185 } 186 187 public synchronized Integer[] 188 removeNotificationListener(ObjectName name, 189 NotificationListener listener) 190 throws ListenerNotFoundException, IOException { 191 192 beforeRemove(); 193 194 if (logger.traceOn()) { 195 logger.trace("removeNotificationListener", 196 "Remove the listener "+listener+" from "+name); 197 } 198 199 List<Integer> ids = new ArrayList<Integer>(); 200 List<ClientListenerInfo> values = 201 new ArrayList<ClientListenerInfo>(infoList.values()); 202 for (int i=values.size()-1; i>=0; i--) { 203 ClientListenerInfo li = values.get(i); 204 205 if (li.sameAs(name, listener)) { 206 ids.add(li.getListenerID()); 207 208 infoList.remove(li.getListenerID()); 209 } 210 } 211 212 if (ids.isEmpty()) 213 throw new ListenerNotFoundException("Listener not found"); 214 215 return ids.toArray(new Integer[0]); 216 } 217 218 public synchronized Integer 219 removeNotificationListener(ObjectName name, 220 NotificationListener listener, 221 NotificationFilter filter, 222 Object handback) 223 throws ListenerNotFoundException, IOException { 224 225 if (logger.traceOn()) { 226 logger.trace("removeNotificationListener", 227 "Remove the listener "+listener+" from "+name); 228 } 229 230 beforeRemove(); 231 232 Integer id = null; 233 234 List<ClientListenerInfo> values = 235 new ArrayList<ClientListenerInfo>(infoList.values()); 236 for (int i=values.size()-1; i>=0; i--) { 237 ClientListenerInfo li = values.get(i); 238 if (li.sameAs(name, listener, filter, handback)) { 239 id=li.getListenerID(); 240 241 infoList.remove(id); 242 243 break; 244 } 245 } 246 247 if (id == null) 248 throw new ListenerNotFoundException("Listener not found"); 249 250 return id; 251 } 252 253 public synchronized Integer[] removeNotificationListener(ObjectName name) { 254 if (logger.traceOn()) { 255 logger.trace("removeNotificationListener", 256 "Remove all listeners registered at "+name); 257 } 258 259 List<Integer> ids = new ArrayList<Integer>(); 260 261 List<ClientListenerInfo> values = 262 new ArrayList<ClientListenerInfo>(infoList.values()); 263 for (int i=values.size()-1; i>=0; i--) { 264 ClientListenerInfo li = values.get(i); 265 if (li.sameAs(name)) { 266 ids.add(li.getListenerID()); 267 268 infoList.remove(li.getListenerID()); 269 } 270 } 271 272 return ids.toArray(new Integer[0]); 273 } 274 275 /* 276 * Called when a connector is doing reconnection. Like <code>postReconnection</code>, 277 * this method is intended to be called only by a client connector: 278 * <code>RMIConnector</code> and <code>ClientIntermediary</code>. 279 * Call this method will set the flag beingReconnection to <code>true</code>, 280 * and the thread used to fetch notifis will be stopped, a new thread can be 281 * created only after the method <code>postReconnection</code> is called. 282 * 283 * It is caller's responsiblity to not re-call this method before calling 284 * <code>postReconnection</code>. 285 */ 286 public synchronized ClientListenerInfo[] preReconnection() throws IOException { 287 if (state == TERMINATED || beingReconnected) { // should never 288 throw new IOException("Illegal state."); 289 } 290 291 final ClientListenerInfo[] tmp = 292 infoList.values().toArray(new ClientListenerInfo[0]); 293 294 295 beingReconnected = true; 296 297 infoList.clear(); 298 299 return tmp; 300 } 301 302 /** 303 * Called after reconnection is finished. 304 * This method is intended to be called only by a client connector: 305 * <code>RMIConnector</code> and <code>ClientIntermediary</code>. 306 */ 307 public synchronized void postReconnection(ClientListenerInfo[] listenerInfos) 308 throws IOException { 309 310 if (state == TERMINATED) { 311 return; 312 } 313 314 while (state == STOPPING) { 315 try { 316 wait(); 317 } catch (InterruptedException ire) { 318 IOException ioe = new IOException(ire.toString()); 319 EnvHelp.initCause(ioe, ire); 320 throw ioe; 321 } 322 } 323 324 final boolean trace = logger.traceOn(); 325 final int len = listenerInfos.length; 326 327 for (int i=0; i<len; i++) { 328 if (trace) { 329 logger.trace("addNotificationListeners", 330 "Add a listener at "+ 331 listenerInfos[i].getListenerID()); 332 } 333 334 infoList.put(listenerInfos[i].getListenerID(), listenerInfos[i]); 335 } 336 337 beingReconnected = false; 338 notifyAll(); 339 340 if (currentFetchThread == Thread.currentThread() || 341 state == STARTING || state == STARTED) { // doing or waiting reconnection 342 // only update mbeanRemovedNotifID 343 try { 344 mbeanRemovedNotifID = addListenerForMBeanRemovedNotif(); 345 } catch (Exception e) { 346 final String msg = 347 "Failed to register a listener to the mbean " + 348 "server: the client will not do clean when an MBean " + 349 "is unregistered"; 350 if (logger.traceOn()) { 351 logger.trace("init", msg, e); 352 } 353 } 354 } else { 355 while (state == STOPPING) { 356 try { 357 wait(); 358 } catch (InterruptedException ire) { 359 IOException ioe = new IOException(ire.toString()); 360 EnvHelp.initCause(ioe, ire); 361 throw ioe; 362 } 363 } 364 365 if (listenerInfos.length > 0) { // old listeners are re-added 366 init(true); // not update clientSequenceNumber 367 } else if (infoList.size() > 0) { // only new listeners added during reconnection 368 init(false); // need update clientSequenceNumber 369 } 370 } 371 } 372 373 public synchronized void terminate() { 374 if (state == TERMINATED) { 375 return; 376 } 377 378 if (logger.traceOn()) { 379 logger.trace("terminate", "Terminating..."); 380 } 381 382 if (state == STARTED) { 383 infoList.clear(); 384 } 385 386 setState(TERMINATED); 387 } 388 389 390 // ------------------------------------------------- 391 // private classes 392 // ------------------------------------------------- 393 // 394 395 private class NotifFetcher implements Runnable { 396 397 private volatile boolean alreadyLogged = false; 398 399 private void logOnce(String msg, SecurityException x) { 400 if (alreadyLogged) return; 401 // Log only once. 402 logger.config("setContextClassLoader",msg); 403 if (x != null) logger.fine("setContextClassLoader", x); 404 alreadyLogged = true; 405 } 406 407 // Set new context class loader, returns previous one. 408 private final ClassLoader setContextClassLoader(final ClassLoader loader) { 409 final AccessControlContext ctxt = ClientNotifForwarder.this.acc; 410 // if ctxt is null, log a config message and throw a 411 // SecurityException. 412 if (ctxt == null) { 413 logOnce("AccessControlContext must not be null.",null); 414 throw new SecurityException("AccessControlContext must not be null"); 415 } 416 return AccessController.doPrivileged( 417 new PrivilegedAction<ClassLoader>() { 418 public ClassLoader run() { 419 try { 420 // get context class loader - may throw 421 // SecurityException - though unlikely. 422 final ClassLoader previous = 423 Thread.currentThread().getContextClassLoader(); 424 425 // if nothing needs to be done, break here... 426 if (loader == previous) return previous; 427 428 // reset context class loader - may throw 429 // SecurityException 430 Thread.currentThread().setContextClassLoader(loader); 431 return previous; 432 } catch (SecurityException x) { 433 logOnce("Permission to set ContextClassLoader missing. " + 434 "Notifications will not be dispatched. " + 435 "Please check your Java policy configuration: " + 436 x, x); 437 throw x; 438 } 439 } 440 }, ctxt); 441 } 442 443 public void run() { 444 final ClassLoader previous; 445 if (defaultClassLoader != null) { 446 previous = setContextClassLoader(defaultClassLoader); 447 } else { 448 previous = null; 449 } 450 try { 451 doRun(); 452 } finally { 453 if (defaultClassLoader != null) { 454 setContextClassLoader(previous); 455 } 456 } 457 } 458 459 private void doRun() { 460 synchronized (ClientNotifForwarder.this) { 461 currentFetchThread = Thread.currentThread(); 462 463 if (state == STARTING) { 464 setState(STARTED); 465 } 466 } 467 468 469 NotificationResult nr = null; 470 if (!shouldStop() && (nr = fetchNotifs()) != null) { 471 // nr == null means got exception 472 473 final TargetedNotification[] notifs = 474 nr.getTargetedNotifications(); 475 final int len = notifs.length; 476 final Map<Integer, ClientListenerInfo> listeners; 477 final Integer myListenerID; 478 479 long missed = 0; 480 481 synchronized(ClientNotifForwarder.this) { 482 // check sequence number. 483 // 484 if (clientSequenceNumber >= 0) { 485 missed = nr.getEarliestSequenceNumber() - 486 clientSequenceNumber; 487 } 488 489 clientSequenceNumber = nr.getNextSequenceNumber(); 490 491 listeners = new HashMap<Integer, ClientListenerInfo>(); 492 493 for (int i = 0 ; i < len ; i++) { 494 final TargetedNotification tn = notifs[i]; 495 final Integer listenerID = tn.getListenerID(); 496 497 // check if an mbean unregistration notif 498 if (!listenerID.equals(mbeanRemovedNotifID)) { 499 final ClientListenerInfo li = infoList.get(listenerID); 500 if (li != null) { 501 listeners.put(listenerID, li); 502 } 503 continue; 504 } 505 final Notification notif = tn.getNotification(); 506 final String unreg = 507 MBeanServerNotification.UNREGISTRATION_NOTIFICATION; 508 if (notif instanceof MBeanServerNotification && 509 notif.getType().equals(unreg)) { 510 511 MBeanServerNotification mbsn = 512 (MBeanServerNotification) notif; 513 ObjectName name = mbsn.getMBeanName(); 514 515 removeNotificationListener(name); 516 } 517 } 518 myListenerID = mbeanRemovedNotifID; 519 } 520 521 if (missed > 0) { 522 final String msg = 523 "May have lost up to " + missed + 524 " notification" + (missed == 1 ? "" : "s"); 525 lostNotifs(msg, missed); 526 logger.trace("NotifFetcher.run", msg); 527 } 528 529 // forward 530 for (int i = 0 ; i < len ; i++) { 531 final TargetedNotification tn = notifs[i]; 532 dispatchNotification(tn,myListenerID,listeners); 533 } 534 } 535 536 synchronized (ClientNotifForwarder.this) { 537 currentFetchThread = null; 538 } 539 540 if (nr == null) { 541 if (logger.traceOn()) { 542 logger.trace("NotifFetcher-run", 543 "Recieved null object as notifs, stops fetching because the " 544 + "notification server is terminated."); 545 } 546 } 547 if (nr == null || shouldStop()) { 548 // tell that the thread is REALLY stopped 549 setState(STOPPED); 550 551 try { 552 removeListenerForMBeanRemovedNotif(mbeanRemovedNotifID); 553 } catch (Exception e) { 554 if (logger.traceOn()) { 555 logger.trace("NotifFetcher-run", 556 "removeListenerForMBeanRemovedNotif", e); 557 } 558 } 559 } else { 560 executor.execute(this); 561 } 562 } 563 564 void dispatchNotification(TargetedNotification tn, 565 Integer myListenerID, 566 Map<Integer, ClientListenerInfo> listeners) { 567 final Notification notif = tn.getNotification(); 568 final Integer listenerID = tn.getListenerID(); 569 570 if (listenerID.equals(myListenerID)) return; 571 final ClientListenerInfo li = listeners.get(listenerID); 572 573 if (li == null) { 574 logger.trace("NotifFetcher.dispatch", 575 "Listener ID not in map"); 576 return; 577 } 578 579 NotificationListener l = li.getListener(); 580 Object h = li.getHandback(); 581 try { 582 l.handleNotification(notif, h); 583 } catch (RuntimeException e) { 584 final String msg = 585 "Failed to forward a notification " + 586 "to a listener"; 587 logger.trace("NotifFetcher-run", msg, e); 588 } 589 590 } 591 592 private NotificationResult fetchNotifs() { 593 try { 594 NotificationResult nr = ClientNotifForwarder.this. 595 fetchNotifs(clientSequenceNumber,maxNotifications, 596 timeout); 597 598 if (logger.traceOn()) { 599 logger.trace("NotifFetcher-run", 600 "Got notifications from the server: "+nr); 601 } 602 603 return nr; 604 } catch (ClassNotFoundException | NotSerializableException | UnmarshalException e) { 605 logger.trace("NotifFetcher.fetchNotifs", e); 606 return fetchOneNotif(); 607 } catch (IOException ioe) { 608 if (!shouldStop()) { 609 logger.error("NotifFetcher-run", 610 "Failed to fetch notification, " + 611 "stopping thread. Error is: " + ioe, ioe); 612 logger.debug("NotifFetcher-run",ioe); 613 } 614 615 // no more fetching 616 return null; 617 } 618 } 619 620 /* Fetch one notification when we suspect that it might be a 621 notification that we can't deserialize (because of a 622 missing class). First we ask for 0 notifications with 0 623 timeout. This allows us to skip sequence numbers for 624 notifications that don't match our filters. Then we ask 625 for one notification. If that produces a 626 ClassNotFoundException, NotSerializableException or 627 UnmarshalException, we increase our sequence number and ask again. 628 Eventually we will either get a successful notification, or a 629 return with 0 notifications. In either case we can return a 630 NotificationResult. This algorithm works (albeit less 631 well) even if the server implementation doesn't optimize a 632 request for 0 notifications to skip sequence numbers for 633 notifications that don't match our filters. 634 635 If we had at least one 636 ClassNotFoundException/NotSerializableException/UnmarshalException, 637 then we must emit a JMXConnectionNotification.LOST_NOTIFS. 638 */ 639 private NotificationResult fetchOneNotif() { 640 ClientNotifForwarder cnf = ClientNotifForwarder.this; 641 642 long startSequenceNumber = clientSequenceNumber; 643 644 int notFoundCount = 0; 645 646 NotificationResult result = null; 647 long firstEarliest = -1; 648 649 while (result == null && !shouldStop()) { 650 NotificationResult nr; 651 652 try { 653 // 0 notifs to update startSequenceNumber 654 nr = cnf.fetchNotifs(startSequenceNumber, 0, 0L); 655 } catch (ClassNotFoundException e) { 656 logger.warning("NotifFetcher.fetchOneNotif", 657 "Impossible exception: " + e); 658 logger.debug("NotifFetcher.fetchOneNotif",e); 659 return null; 660 } catch (IOException e) { 661 if (!shouldStop()) 662 logger.trace("NotifFetcher.fetchOneNotif", e); 663 return null; 664 } 665 666 if (shouldStop() || nr == null) 667 return null; 668 669 startSequenceNumber = nr.getNextSequenceNumber(); 670 if (firstEarliest < 0) 671 firstEarliest = nr.getEarliestSequenceNumber(); 672 673 try { 674 // 1 notif to skip possible missing class 675 result = cnf.fetchNotifs(startSequenceNumber, 1, 0L); 676 } catch (ClassNotFoundException | NotSerializableException | UnmarshalException e) { 677 logger.warning("NotifFetcher.fetchOneNotif", 678 "Failed to deserialize a notification: "+e.toString()); 679 if (logger.traceOn()) { 680 logger.trace("NotifFetcher.fetchOneNotif", 681 "Failed to deserialize a notification.", e); 682 } 683 684 notFoundCount++; 685 startSequenceNumber++; 686 } catch (Exception e) { 687 if (!shouldStop()) 688 logger.trace("NotifFetcher.fetchOneNotif", e); 689 return null; 690 } 691 } 692 693 if (notFoundCount > 0) { 694 final String msg = 695 "Dropped " + notFoundCount + " notification" + 696 (notFoundCount == 1 ? "" : "s") + 697 " because classes were missing locally or incompatible"; 698 lostNotifs(msg, notFoundCount); 699 // Even if result.getEarliestSequenceNumber() is now greater than 700 // it was initially, meaning some notifs have been dropped 701 // from the buffer, we don't want the caller to see that 702 // because it is then likely to renotify about the lost notifs. 703 // So we put back the first value of earliestSequenceNumber 704 // that we saw. 705 if (result != null) { 706 result = new NotificationResult( 707 firstEarliest, result.getNextSequenceNumber(), 708 result.getTargetedNotifications()); 709 } 710 } 711 712 return result; 713 } 714 715 private boolean shouldStop() { 716 synchronized (ClientNotifForwarder.this) { 717 if (state != STARTED) { 718 return true; 719 } else if (infoList.size() == 0) { 720 // no more listener, stop fetching 721 setState(STOPPING); 722 723 return true; 724 } 725 726 return false; 727 } 728 } 729 } 730 731 732 // ------------------------------------------------- 733 // private methods 734 // ------------------------------------------------- 735 private synchronized void setState(int newState) { 736 if (state == TERMINATED) { 737 return; 738 } 739 740 state = newState; 741 this.notifyAll(); 742 } 743 744 /* 745 * Called to decide whether need to start a thread for fetching notifs. 746 * <P>The parameter reconnected will decide whether to initilize the clientSequenceNumber, 747 * initilaizing the clientSequenceNumber means to ignore all notifications arrived before. 748 * If it is reconnected, we will not initialize in order to get all notifications arrived 749 * during the reconnection. It may cause the newly registered listeners to receive some 750 * notifications arrived before its registray. 751 */ 752 private synchronized void init(boolean reconnected) throws IOException { 753 switch (state) { 754 case STARTED: 755 return; 756 case STARTING: 757 return; 758 case TERMINATED: 759 throw new IOException("The ClientNotifForwarder has been terminated."); 760 case STOPPING: 761 if (beingReconnected == true) { 762 // wait for another thread to do, which is doing reconnection 763 return; 764 } 765 766 while (state == STOPPING) { // make sure only one fetching thread. 767 try { 768 wait(); 769 } catch (InterruptedException ire) { 770 IOException ioe = new IOException(ire.toString()); 771 EnvHelp.initCause(ioe, ire); 772 773 throw ioe; 774 } 775 } 776 777 // re-call this method to check the state again, 778 // the state can be other value like TERMINATED. 779 init(reconnected); 780 781 return; 782 case STOPPED: 783 if (beingReconnected == true) { 784 // wait for another thread to do, which is doing reconnection 785 return; 786 } 787 788 if (logger.traceOn()) { 789 logger.trace("init", "Initializing..."); 790 } 791 792 // init the clientSequenceNumber if not reconnected. 793 if (!reconnected) { 794 try { 795 NotificationResult nr = fetchNotifs(-1, 0, 0); 796 797 if (state != STOPPED) { // JDK-8038940 798 // reconnection must happen during 799 // fetchNotifs(-1, 0, 0), and a new 800 // thread takes over the fetching job 801 return; 802 } 803 804 clientSequenceNumber = nr.getNextSequenceNumber(); 805 } catch (ClassNotFoundException e) { 806 // can't happen 807 logger.warning("init", "Impossible exception: "+ e); 808 logger.debug("init",e); 809 } 810 } 811 812 // for cleaning 813 try { 814 mbeanRemovedNotifID = addListenerForMBeanRemovedNotif(); 815 } catch (Exception e) { 816 final String msg = 817 "Failed to register a listener to the mbean " + 818 "server: the client will not do clean when an MBean " + 819 "is unregistered"; 820 if (logger.traceOn()) { 821 logger.trace("init", msg, e); 822 } 823 } 824 825 setState(STARTING); 826 827 // start fetching 828 executor.execute(new NotifFetcher()); 829 830 return; 831 default: 832 // should not 833 throw new IOException("Unknown state."); 834 } 835 } 836 837 /** 838 * Import: should not remove a listener during reconnection, the reconnection 839 * needs to change the listener list and that will possibly make removal fail. 840 */ 841 private synchronized void beforeRemove() throws IOException { 842 while (beingReconnected) { 843 if (state == TERMINATED) { 844 throw new IOException("Terminated."); 845 } 846 847 try { 848 wait(); 849 } catch (InterruptedException ire) { 850 IOException ioe = new IOException(ire.toString()); 851 EnvHelp.initCause(ioe, ire); 852 853 throw ioe; 854 } 855 } 856 857 if (state == TERMINATED) { 858 throw new IOException("Terminated."); 859 } 860 } 861 862 // ------------------------------------------------- 863 // private variables 864 // ------------------------------------------------- 865 866 private final ClassLoader defaultClassLoader; 867 private final Executor executor; 868 869 private final Map<Integer, ClientListenerInfo> infoList = 870 new HashMap<Integer, ClientListenerInfo>(); 871 872 // notif stuff 873 private long clientSequenceNumber = -1; 874 private final int maxNotifications; 875 private final long timeout; 876 private Integer mbeanRemovedNotifID = null; 877 private Thread currentFetchThread; 878 879 // state 880 /** 881 * This state means that a thread is being created for fetching and forwarding notifications. 882 */ 883 private static final int STARTING = 0; 884 885 /** 886 * This state tells that a thread has been started for fetching and forwarding notifications. 887 */ 888 private static final int STARTED = 1; 889 890 /** 891 * This state means that the fetching thread is informed to stop. 892 */ 893 private static final int STOPPING = 2; 894 895 /** 896 * This state means that the fetching thread is already stopped. 897 */ 898 private static final int STOPPED = 3; 899 900 /** 901 * This state means that this object is terminated and no more thread will be created 902 * for fetching notifications. 903 */ 904 private static final int TERMINATED = 4; 905 906 private int state = STOPPED; 907 908 /** 909 * This variable is used to tell whether a connector (RMIConnector or ClientIntermediary) 910 * is doing reconnection. 911 * This variable will be set to true by the method <code>preReconnection</code>, and set 912 * to false by <code>postReconnection</code>. 913 * When beingReconnected == true, no thread will be created for fetching notifications. 914 */ 915 private boolean beingReconnected = false; 916 917 private static final ClassLogger logger = 918 new ClassLogger("javax.management.remote.misc", 919 "ClientNotifForwarder"); 920 }