1 /*
   2  * Copyright (c) 2002, 2015, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package com.sun.jmx.remote.internal;
  26 
  27 import java.io.IOException;
  28 import java.io.NotSerializableException;
  29 
  30 import java.util.ArrayList;
  31 import java.util.HashMap;
  32 import java.util.List;
  33 import java.util.Map;
  34 import java.util.concurrent.Executor;
  35 
  36 import java.security.AccessControlContext;
  37 import java.security.AccessController;
  38 import java.security.PrivilegedAction;
  39 import javax.security.auth.Subject;
  40 
  41 import javax.management.Notification;
  42 import javax.management.NotificationListener;
  43 import javax.management.NotificationFilter;
  44 import javax.management.ObjectName;
  45 import javax.management.MBeanServerNotification;
  46 import javax.management.InstanceNotFoundException;
  47 import javax.management.ListenerNotFoundException;
  48 
  49 import javax.management.remote.NotificationResult;
  50 import javax.management.remote.TargetedNotification;
  51 
  52 import com.sun.jmx.remote.util.ClassLogger;
  53 import com.sun.jmx.remote.util.EnvHelp;
  54 import java.rmi.UnmarshalException;
  55 import sun.misc.ManagedLocalsThread;
  56 
  57 
  58 public abstract class ClientNotifForwarder {
  59 
  60     private final AccessControlContext acc;
  61 
  62     public ClientNotifForwarder(Map<String, ?> env) {
  63         this(null, env);
  64     }
  65 
  66     private static int threadId;
  67 
  68     /* An Executor that allows at most one executing and one pending
  69        Runnable.  It uses at most one thread -- as soon as there is
  70        no pending Runnable the thread can exit.  Another thread is
  71        created as soon as there is a new pending Runnable.  This
  72        Executor is adapted for use in a situation where each Runnable
  73        usually schedules up another Runnable.  On return from the
  74        first one, the second one is immediately executed.  So this
  75        just becomes a complicated way to write a while loop, but with
  76        the advantage that you can replace it with another Executor,
  77        for instance one that you are using to execute a bunch of other
  78        unrelated work.
  79 
  80        You might expect that a java.util.concurrent.ThreadPoolExecutor
  81        with corePoolSize=0 and maximumPoolSize=1 would have the same
  82        behavior, but it does not.  A ThreadPoolExecutor only creates
  83        a new thread when a new task is submitted and the number of
  84        existing threads is < corePoolSize.  This can never happen when
  85        corePoolSize=0, so new threads are never created.  Surprising,
  86        but there you are.
  87     */
  88     private static class LinearExecutor implements Executor {
  89         public synchronized void execute(Runnable command) {
  90             if (this.command != null)
  91                 throw new IllegalArgumentException("More than one command");
  92             this.command = command;
  93             if (thread == null) {
  94                 thread = new ManagedLocalsThread(
  95                     ()-> {
  96                         while (true) {
  97                             Runnable r;
  98                             synchronized (LinearExecutor.this) {
  99                                 if (LinearExecutor.this.command == null) {
 100                                     thread = null;
 101                                     return;
 102                                 } else {
 103                                     r = LinearExecutor.this.command;
 104                                     LinearExecutor.this.command = null;
 105                                 }
 106                             }
 107                             r.run();
 108                         }
 109                     },
 110                     "ClientNotifForwarder-" + ++threadId
 111                 );
 112                 thread.setDaemon(true);
 113                 thread.start();
 114             }
 115         }
 116 
 117         private Runnable command;
 118         private Thread thread;
 119     }
 120 
 121     public ClientNotifForwarder(ClassLoader defaultClassLoader, Map<String, ?> env) {
 122         maxNotifications = EnvHelp.getMaxFetchNotifNumber(env);
 123         timeout = EnvHelp.getFetchTimeout(env);
 124 
 125         /* You can supply an Executor in which the remote call to
 126            fetchNotifications will be made.  The Executor's execute
 127            method reschedules another task, so you must not use
 128            an Executor that executes tasks in the caller's thread.  */
 129         Executor ex = (Executor)
 130             env.get("jmx.remote.x.fetch.notifications.executor");
 131         if (ex == null)
 132             ex = new LinearExecutor();
 133         else if (logger.traceOn())
 134             logger.trace("ClientNotifForwarder", "executor is " + ex);
 135 
 136         this.defaultClassLoader = defaultClassLoader;
 137         this.executor = ex;
 138         this.acc = AccessController.getContext();
 139     }
 140 
 141     /**
 142      * Called to fetch notifications from a server.
 143      */
 144     abstract protected NotificationResult fetchNotifs(long clientSequenceNumber,
 145                                                       int maxNotifications,
 146                                                       long timeout)
 147             throws IOException, ClassNotFoundException;
 148 
 149     abstract protected Integer addListenerForMBeanRemovedNotif()
 150         throws IOException, InstanceNotFoundException;
 151 
 152     abstract protected void removeListenerForMBeanRemovedNotif(Integer id)
 153         throws IOException, InstanceNotFoundException,
 154                ListenerNotFoundException;
 155 
 156     /**
 157      * Used to send out a notification about lost notifs
 158      */
 159     abstract protected void lostNotifs(String message, long number);
 160 
 161 
 162     public synchronized void addNotificationListener(Integer listenerID,
 163                                         ObjectName name,
 164                                         NotificationListener listener,
 165                                         NotificationFilter filter,
 166                                         Object handback,
 167                                         Subject delegationSubject)
 168             throws IOException, InstanceNotFoundException {
 169 
 170         if (logger.traceOn()) {
 171             logger.trace("addNotificationListener",
 172                          "Add the listener "+listener+" at "+name);
 173         }
 174 
 175         infoList.put(listenerID,
 176                      new ClientListenerInfo(listenerID,
 177                                             name,
 178                                             listener,
 179                                             filter,
 180                                             handback,
 181                                             delegationSubject));
 182 
 183 
 184         init(false);
 185     }
 186 
 187     public synchronized Integer[]
 188         removeNotificationListener(ObjectName name,
 189                                    NotificationListener listener)
 190         throws ListenerNotFoundException, IOException {
 191 
 192         beforeRemove();
 193 
 194         if (logger.traceOn()) {
 195             logger.trace("removeNotificationListener",
 196                          "Remove the listener "+listener+" from "+name);
 197         }
 198 
 199         List<Integer> ids = new ArrayList<Integer>();
 200         List<ClientListenerInfo> values =
 201                 new ArrayList<ClientListenerInfo>(infoList.values());
 202         for (int i=values.size()-1; i>=0; i--) {
 203             ClientListenerInfo li = values.get(i);
 204 
 205             if (li.sameAs(name, listener)) {
 206                 ids.add(li.getListenerID());
 207 
 208                 infoList.remove(li.getListenerID());
 209             }
 210         }
 211 
 212         if (ids.isEmpty())
 213             throw new ListenerNotFoundException("Listener not found");
 214 
 215         return ids.toArray(new Integer[0]);
 216     }
 217 
 218     public synchronized Integer
 219         removeNotificationListener(ObjectName name,
 220                                    NotificationListener listener,
 221                                    NotificationFilter filter,
 222                                    Object handback)
 223             throws ListenerNotFoundException, IOException {
 224 
 225         if (logger.traceOn()) {
 226             logger.trace("removeNotificationListener",
 227                          "Remove the listener "+listener+" from "+name);
 228         }
 229 
 230         beforeRemove();
 231 
 232         Integer id = null;
 233 
 234         List<ClientListenerInfo> values =
 235                 new ArrayList<ClientListenerInfo>(infoList.values());
 236         for (int i=values.size()-1; i>=0; i--) {
 237             ClientListenerInfo li = values.get(i);
 238             if (li.sameAs(name, listener, filter, handback)) {
 239                 id=li.getListenerID();
 240 
 241                 infoList.remove(id);
 242 
 243                 break;
 244             }
 245         }
 246 
 247         if (id == null)
 248             throw new ListenerNotFoundException("Listener not found");
 249 
 250         return id;
 251     }
 252 
 253     public synchronized Integer[] removeNotificationListener(ObjectName name) {
 254         if (logger.traceOn()) {
 255             logger.trace("removeNotificationListener",
 256                          "Remove all listeners registered at "+name);
 257         }
 258 
 259         List<Integer> ids = new ArrayList<Integer>();
 260 
 261         List<ClientListenerInfo> values =
 262                 new ArrayList<ClientListenerInfo>(infoList.values());
 263         for (int i=values.size()-1; i>=0; i--) {
 264             ClientListenerInfo li = values.get(i);
 265             if (li.sameAs(name)) {
 266                 ids.add(li.getListenerID());
 267 
 268                 infoList.remove(li.getListenerID());
 269             }
 270         }
 271 
 272         return ids.toArray(new Integer[0]);
 273     }
 274 
 275     /*
 276      * Called when a connector is doing reconnection. Like <code>postReconnection</code>,
 277      * this method is intended to be called only by a client connector:
 278      * <code>RMIConnector</code> and <code>ClientIntermediary</code>.
 279      * Call this method will set the flag beingReconnection to <code>true</code>,
 280      * and the thread used to fetch notifis will be stopped, a new thread can be
 281      * created only after the method <code>postReconnection</code> is called.
 282      *
 283      * It is caller's responsiblity to not re-call this method before calling
 284      * <code>postReconnection</code>.
 285      */
 286     public synchronized ClientListenerInfo[] preReconnection() throws IOException {
 287         if (state == TERMINATED || beingReconnected) { // should never
 288             throw new IOException("Illegal state.");
 289         }
 290 
 291         final ClientListenerInfo[] tmp =
 292             infoList.values().toArray(new ClientListenerInfo[0]);
 293 
 294 
 295         beingReconnected = true;
 296 
 297         infoList.clear();
 298 
 299         return tmp;
 300     }
 301 
 302     /**
 303      * Called after reconnection is finished.
 304      * This method is intended to be called only by a client connector:
 305      * <code>RMIConnector</code> and <code>ClientIntermediary</code>.
 306      */
 307     public synchronized void postReconnection(ClientListenerInfo[] listenerInfos)
 308         throws IOException {
 309 
 310         if (state == TERMINATED) {
 311             return;
 312         }
 313 
 314         while (state == STOPPING) {
 315             try {
 316                 wait();
 317             } catch (InterruptedException ire) {
 318                 IOException ioe = new IOException(ire.toString());
 319                 EnvHelp.initCause(ioe, ire);
 320                 throw ioe;
 321             }
 322         }
 323 
 324         final boolean trace = logger.traceOn();
 325         final int len   = listenerInfos.length;
 326 
 327         for (int i=0; i<len; i++) {
 328             if (trace) {
 329                 logger.trace("addNotificationListeners",
 330                              "Add a listener at "+
 331                              listenerInfos[i].getListenerID());
 332             }
 333 
 334             infoList.put(listenerInfos[i].getListenerID(), listenerInfos[i]);
 335         }
 336 
 337         beingReconnected = false;
 338         notifyAll();
 339 
 340         if (currentFetchThread == Thread.currentThread() ||
 341               state == STARTING || state == STARTED) { // doing or waiting reconnection
 342               // only update mbeanRemovedNotifID
 343             try {
 344                 mbeanRemovedNotifID = addListenerForMBeanRemovedNotif();
 345             } catch (Exception e) {
 346                 final String msg =
 347                     "Failed to register a listener to the mbean " +
 348                     "server: the client will not do clean when an MBean " +
 349                     "is unregistered";
 350                 if (logger.traceOn()) {
 351                     logger.trace("init", msg, e);
 352                 }
 353             }
 354         } else {
 355               while (state == STOPPING) {
 356                   try {
 357                       wait();
 358                   } catch (InterruptedException ire) {
 359                       IOException ioe = new IOException(ire.toString());
 360                       EnvHelp.initCause(ioe, ire);
 361                       throw ioe;
 362                   }
 363               }
 364 
 365               if (listenerInfos.length > 0) { // old listeners are re-added
 366                   init(true); // not update clientSequenceNumber
 367               } else if (infoList.size() > 0) { // only new listeners added during reconnection
 368                   init(false); // need update clientSequenceNumber
 369               }
 370           }
 371     }
 372 
 373     public synchronized void terminate() {
 374         if (state == TERMINATED) {
 375             return;
 376         }
 377 
 378         if (logger.traceOn()) {
 379             logger.trace("terminate", "Terminating...");
 380         }
 381 
 382         if (state == STARTED) {
 383            infoList.clear();
 384         }
 385 
 386         setState(TERMINATED);
 387     }
 388 
 389 
 390     // -------------------------------------------------
 391     // private classes
 392     // -------------------------------------------------
 393     //
 394 
 395     private class NotifFetcher implements Runnable {
 396 
 397         private volatile boolean alreadyLogged = false;
 398 
 399         private void logOnce(String msg, SecurityException x) {
 400             if (alreadyLogged) return;
 401             // Log only once.
 402             logger.config("setContextClassLoader",msg);
 403             if (x != null) logger.fine("setContextClassLoader", x);
 404             alreadyLogged = true;
 405         }
 406 
 407         // Set new context class loader, returns previous one.
 408         private final ClassLoader setContextClassLoader(final ClassLoader loader) {
 409             final AccessControlContext ctxt = ClientNotifForwarder.this.acc;
 410             // if ctxt is null, log a config message and throw a
 411             // SecurityException.
 412             if (ctxt == null) {
 413                 logOnce("AccessControlContext must not be null.",null);
 414                 throw new SecurityException("AccessControlContext must not be null");
 415             }
 416             return AccessController.doPrivileged(
 417                 new PrivilegedAction<ClassLoader>() {
 418                     public ClassLoader run() {
 419                         try {
 420                             // get context class loader - may throw
 421                             // SecurityException - though unlikely.
 422                             final ClassLoader previous =
 423                                 Thread.currentThread().getContextClassLoader();
 424 
 425                             // if nothing needs to be done, break here...
 426                             if (loader == previous) return previous;
 427 
 428                             // reset context class loader - may throw
 429                             // SecurityException
 430                             Thread.currentThread().setContextClassLoader(loader);
 431                             return previous;
 432                         } catch (SecurityException x) {
 433                             logOnce("Permission to set ContextClassLoader missing. " +
 434                                     "Notifications will not be dispatched. " +
 435                                     "Please check your Java policy configuration: " +
 436                                     x, x);
 437                             throw x;
 438                         }
 439                     }
 440                 }, ctxt);
 441         }
 442 
 443         public void run() {
 444             final ClassLoader previous;
 445             if (defaultClassLoader != null) {
 446                 previous = setContextClassLoader(defaultClassLoader);
 447             } else {
 448                 previous = null;
 449             }
 450             try {
 451                 doRun();
 452             } finally {
 453                 if (defaultClassLoader != null) {
 454                     setContextClassLoader(previous);
 455                 }
 456             }
 457         }
 458 
 459         private void doRun() {
 460             synchronized (ClientNotifForwarder.this) {
 461                 currentFetchThread = Thread.currentThread();
 462 
 463                 if (state == STARTING) {
 464                     setState(STARTED);
 465                 }
 466             }
 467 
 468 
 469             NotificationResult nr = null;
 470             if (!shouldStop() && (nr = fetchNotifs()) != null) {
 471                 // nr == null means got exception
 472 
 473                 final TargetedNotification[] notifs =
 474                     nr.getTargetedNotifications();
 475                 final int len = notifs.length;
 476                 final Map<Integer, ClientListenerInfo> listeners;
 477                 final Integer myListenerID;
 478 
 479                 long missed = 0;
 480 
 481                 synchronized(ClientNotifForwarder.this) {
 482                     // check sequence number.
 483                     //
 484                     if (clientSequenceNumber >= 0) {
 485                         missed = nr.getEarliestSequenceNumber() -
 486                             clientSequenceNumber;
 487                     }
 488 
 489                     clientSequenceNumber = nr.getNextSequenceNumber();
 490 
 491                     listeners = new HashMap<Integer, ClientListenerInfo>();
 492 
 493                     for (int i = 0 ; i < len ; i++) {
 494                         final TargetedNotification tn = notifs[i];
 495                         final Integer listenerID = tn.getListenerID();
 496 
 497                         // check if an mbean unregistration notif
 498                         if (!listenerID.equals(mbeanRemovedNotifID)) {
 499                             final ClientListenerInfo li = infoList.get(listenerID);
 500                             if (li != null) {
 501                                 listeners.put(listenerID, li);
 502                             }
 503                             continue;
 504                         }
 505                         final Notification notif = tn.getNotification();
 506                         final String unreg =
 507                             MBeanServerNotification.UNREGISTRATION_NOTIFICATION;
 508                         if (notif instanceof MBeanServerNotification &&
 509                             notif.getType().equals(unreg)) {
 510 
 511                             MBeanServerNotification mbsn =
 512                                 (MBeanServerNotification) notif;
 513                             ObjectName name = mbsn.getMBeanName();
 514 
 515                             removeNotificationListener(name);
 516                         }
 517                     }
 518                     myListenerID = mbeanRemovedNotifID;
 519                 }
 520 
 521                 if (missed > 0) {
 522                     final String msg =
 523                         "May have lost up to " + missed +
 524                         " notification" + (missed == 1 ? "" : "s");
 525                     lostNotifs(msg, missed);
 526                     logger.trace("NotifFetcher.run", msg);
 527                 }
 528 
 529                 // forward
 530                 for (int i = 0 ; i < len ; i++) {
 531                     final TargetedNotification tn = notifs[i];
 532                     dispatchNotification(tn,myListenerID,listeners);
 533                 }
 534             }
 535 
 536             synchronized (ClientNotifForwarder.this) {
 537                 currentFetchThread = null;
 538             }
 539 
 540             if (nr == null) {
 541                 if (logger.traceOn()) {
 542                     logger.trace("NotifFetcher-run",
 543                             "Recieved null object as notifs, stops fetching because the "
 544                                     + "notification server is terminated.");
 545                 }
 546             }
 547             if (nr == null || shouldStop()) {
 548                 // tell that the thread is REALLY stopped
 549                 setState(STOPPED);
 550 
 551                 try {
 552                       removeListenerForMBeanRemovedNotif(mbeanRemovedNotifID);
 553                 } catch (Exception e) {
 554                     if (logger.traceOn()) {
 555                         logger.trace("NotifFetcher-run",
 556                                 "removeListenerForMBeanRemovedNotif", e);
 557                     }
 558                 }
 559             } else {
 560                 executor.execute(this);
 561             }
 562         }
 563 
 564         void dispatchNotification(TargetedNotification tn,
 565                                   Integer myListenerID,
 566                                   Map<Integer, ClientListenerInfo> listeners) {
 567             final Notification notif = tn.getNotification();
 568             final Integer listenerID = tn.getListenerID();
 569 
 570             if (listenerID.equals(myListenerID)) return;
 571             final ClientListenerInfo li = listeners.get(listenerID);
 572 
 573             if (li == null) {
 574                 logger.trace("NotifFetcher.dispatch",
 575                              "Listener ID not in map");
 576                 return;
 577             }
 578 
 579             NotificationListener l = li.getListener();
 580             Object h = li.getHandback();
 581             try {
 582                 l.handleNotification(notif, h);
 583             } catch (RuntimeException e) {
 584                 final String msg =
 585                     "Failed to forward a notification " +
 586                     "to a listener";
 587                 logger.trace("NotifFetcher-run", msg, e);
 588             }
 589 
 590         }
 591 
 592         private NotificationResult fetchNotifs() {
 593             try {
 594                 NotificationResult nr = ClientNotifForwarder.this.
 595                     fetchNotifs(clientSequenceNumber,maxNotifications,
 596                                 timeout);
 597 
 598                 if (logger.traceOn()) {
 599                     logger.trace("NotifFetcher-run",
 600                                  "Got notifications from the server: "+nr);
 601                 }
 602 
 603                 return nr;
 604             } catch (ClassNotFoundException | NotSerializableException | UnmarshalException e) {
 605                 logger.trace("NotifFetcher.fetchNotifs", e);
 606                 return fetchOneNotif();
 607             } catch (IOException ioe) {
 608                 if (!shouldStop()) {
 609                     logger.error("NotifFetcher-run",
 610                                  "Failed to fetch notification, " +
 611                                  "stopping thread. Error is: " + ioe, ioe);
 612                     logger.debug("NotifFetcher-run",ioe);
 613                 }
 614 
 615                 // no more fetching
 616                 return null;
 617             }
 618         }
 619 
 620         /* Fetch one notification when we suspect that it might be a
 621            notification that we can't deserialize (because of a
 622            missing class).  First we ask for 0 notifications with 0
 623            timeout.  This allows us to skip sequence numbers for
 624            notifications that don't match our filters.  Then we ask
 625            for one notification.  If that produces a
 626            ClassNotFoundException, NotSerializableException or
 627            UnmarshalException, we increase our sequence number and ask again.
 628            Eventually we will either get a successful notification, or a
 629            return with 0 notifications.  In either case we can return a
 630            NotificationResult.  This algorithm works (albeit less
 631            well) even if the server implementation doesn't optimize a
 632            request for 0 notifications to skip sequence numbers for
 633            notifications that don't match our filters.
 634 
 635            If we had at least one
 636            ClassNotFoundException/NotSerializableException/UnmarshalException,
 637            then we must emit a JMXConnectionNotification.LOST_NOTIFS.
 638         */
 639         private NotificationResult fetchOneNotif() {
 640             ClientNotifForwarder cnf = ClientNotifForwarder.this;
 641 
 642             long startSequenceNumber = clientSequenceNumber;
 643 
 644             int notFoundCount = 0;
 645 
 646             NotificationResult result = null;
 647             long firstEarliest = -1;
 648 
 649             while (result == null && !shouldStop()) {
 650                 NotificationResult nr;
 651 
 652                 try {
 653                     // 0 notifs to update startSequenceNumber
 654                     nr = cnf.fetchNotifs(startSequenceNumber, 0, 0L);
 655                 } catch (ClassNotFoundException e) {
 656                     logger.warning("NotifFetcher.fetchOneNotif",
 657                                    "Impossible exception: " + e);
 658                     logger.debug("NotifFetcher.fetchOneNotif",e);
 659                     return null;
 660                 } catch (IOException e) {
 661                     if (!shouldStop())
 662                         logger.trace("NotifFetcher.fetchOneNotif", e);
 663                     return null;
 664                 }
 665 
 666                 if (shouldStop() || nr == null)
 667                     return null;
 668 
 669                 startSequenceNumber = nr.getNextSequenceNumber();
 670                 if (firstEarliest < 0)
 671                     firstEarliest = nr.getEarliestSequenceNumber();
 672 
 673                 try {
 674                     // 1 notif to skip possible missing class
 675                     result = cnf.fetchNotifs(startSequenceNumber, 1, 0L);
 676                 } catch (ClassNotFoundException | NotSerializableException | UnmarshalException e) {
 677                     logger.warning("NotifFetcher.fetchOneNotif",
 678                                    "Failed to deserialize a notification: "+e.toString());
 679                     if (logger.traceOn()) {
 680                         logger.trace("NotifFetcher.fetchOneNotif",
 681                                      "Failed to deserialize a notification.", e);
 682                     }
 683 
 684                     notFoundCount++;
 685                     startSequenceNumber++;
 686                 } catch (Exception e) {
 687                     if (!shouldStop())
 688                         logger.trace("NotifFetcher.fetchOneNotif", e);
 689                     return null;
 690                 }
 691             }
 692 
 693             if (notFoundCount > 0) {
 694                 final String msg =
 695                     "Dropped " + notFoundCount + " notification" +
 696                     (notFoundCount == 1 ? "" : "s") +
 697                     " because classes were missing locally or incompatible";
 698                 lostNotifs(msg, notFoundCount);
 699                 // Even if result.getEarliestSequenceNumber() is now greater than
 700                 // it was initially, meaning some notifs have been dropped
 701                 // from the buffer, we don't want the caller to see that
 702                 // because it is then likely to renotify about the lost notifs.
 703                 // So we put back the first value of earliestSequenceNumber
 704                 // that we saw.
 705                 if (result != null) {
 706                     result = new NotificationResult(
 707                             firstEarliest, result.getNextSequenceNumber(),
 708                             result.getTargetedNotifications());
 709                 }
 710             }
 711 
 712             return result;
 713         }
 714 
 715         private boolean shouldStop() {
 716             synchronized (ClientNotifForwarder.this) {
 717                 if (state != STARTED) {
 718                     return true;
 719                 } else if (infoList.size() == 0) {
 720                     // no more listener, stop fetching
 721                     setState(STOPPING);
 722 
 723                     return true;
 724                 }
 725 
 726                 return false;
 727             }
 728         }
 729     }
 730 
 731 
 732 // -------------------------------------------------
 733 // private methods
 734 // -------------------------------------------------
 735     private synchronized void setState(int newState) {
 736         if (state == TERMINATED) {
 737             return;
 738         }
 739 
 740         state = newState;
 741         this.notifyAll();
 742     }
 743 
 744     /*
 745      * Called to decide whether need to start a thread for fetching notifs.
 746      * <P>The parameter reconnected will decide whether to initilize the clientSequenceNumber,
 747      * initilaizing the clientSequenceNumber means to ignore all notifications arrived before.
 748      * If it is reconnected, we will not initialize in order to get all notifications arrived
 749      * during the reconnection. It may cause the newly registered listeners to receive some
 750      * notifications arrived before its registray.
 751      */
 752     private synchronized void init(boolean reconnected) throws IOException {
 753         switch (state) {
 754         case STARTED:
 755             return;
 756         case STARTING:
 757             return;
 758         case TERMINATED:
 759             throw new IOException("The ClientNotifForwarder has been terminated.");
 760         case STOPPING:
 761             if (beingReconnected == true) {
 762                 // wait for another thread to do, which is doing reconnection
 763                 return;
 764             }
 765 
 766             while (state == STOPPING) { // make sure only one fetching thread.
 767                 try {
 768                     wait();
 769                 } catch (InterruptedException ire) {
 770                     IOException ioe = new IOException(ire.toString());
 771                     EnvHelp.initCause(ioe, ire);
 772 
 773                     throw ioe;
 774                 }
 775             }
 776 
 777             // re-call this method to check the state again,
 778             // the state can be other value like TERMINATED.
 779             init(reconnected);
 780 
 781             return;
 782         case STOPPED:
 783             if (beingReconnected == true) {
 784                 // wait for another thread to do, which is doing reconnection
 785                 return;
 786             }
 787 
 788             if (logger.traceOn()) {
 789                 logger.trace("init", "Initializing...");
 790             }
 791 
 792             // init the clientSequenceNumber if not reconnected.
 793             if (!reconnected) {
 794                 try {
 795                     NotificationResult nr = fetchNotifs(-1, 0, 0);
 796 
 797                     if (state != STOPPED) { // JDK-8038940
 798                                             // reconnection must happen during
 799                                             // fetchNotifs(-1, 0, 0), and a new
 800                                             // thread takes over the fetching job
 801                         return;
 802                     }
 803 
 804                     clientSequenceNumber = nr.getNextSequenceNumber();
 805                 } catch (ClassNotFoundException e) {
 806                     // can't happen
 807                     logger.warning("init", "Impossible exception: "+ e);
 808                     logger.debug("init",e);
 809                 }
 810             }
 811 
 812             // for cleaning
 813             try {
 814                 mbeanRemovedNotifID = addListenerForMBeanRemovedNotif();
 815             } catch (Exception e) {
 816                 final String msg =
 817                     "Failed to register a listener to the mbean " +
 818                     "server: the client will not do clean when an MBean " +
 819                     "is unregistered";
 820                 if (logger.traceOn()) {
 821                     logger.trace("init", msg, e);
 822                 }
 823             }
 824 
 825             setState(STARTING);
 826 
 827             // start fetching
 828             executor.execute(new NotifFetcher());
 829 
 830             return;
 831         default:
 832             // should not
 833             throw new IOException("Unknown state.");
 834         }
 835     }
 836 
 837     /**
 838      * Import: should not remove a listener during reconnection, the reconnection
 839      * needs to change the listener list and that will possibly make removal fail.
 840      */
 841     private synchronized void beforeRemove() throws IOException {
 842         while (beingReconnected) {
 843             if (state == TERMINATED) {
 844                 throw new IOException("Terminated.");
 845             }
 846 
 847             try {
 848                 wait();
 849             } catch (InterruptedException ire) {
 850                 IOException ioe = new IOException(ire.toString());
 851                 EnvHelp.initCause(ioe, ire);
 852 
 853                 throw ioe;
 854             }
 855         }
 856 
 857         if (state == TERMINATED) {
 858             throw new IOException("Terminated.");
 859         }
 860     }
 861 
 862 // -------------------------------------------------
 863 // private variables
 864 // -------------------------------------------------
 865 
 866     private final ClassLoader defaultClassLoader;
 867     private final Executor executor;
 868 
 869     private final Map<Integer, ClientListenerInfo> infoList =
 870             new HashMap<Integer, ClientListenerInfo>();
 871 
 872     // notif stuff
 873     private long clientSequenceNumber = -1;
 874     private final int maxNotifications;
 875     private final long timeout;
 876     private Integer mbeanRemovedNotifID = null;
 877     private Thread currentFetchThread;
 878 
 879     // state
 880     /**
 881      * This state means that a thread is being created for fetching and forwarding notifications.
 882      */
 883     private static final int STARTING = 0;
 884 
 885     /**
 886      * This state tells that a thread has been started for fetching and forwarding notifications.
 887      */
 888     private static final int STARTED = 1;
 889 
 890     /**
 891      * This state means that the fetching thread is informed to stop.
 892      */
 893     private static final int STOPPING = 2;
 894 
 895     /**
 896      * This state means that the fetching thread is already stopped.
 897      */
 898     private static final int STOPPED = 3;
 899 
 900     /**
 901      * This state means that this object is terminated and no more thread will be created
 902      * for fetching notifications.
 903      */
 904     private static final int TERMINATED = 4;
 905 
 906     private int state = STOPPED;
 907 
 908     /**
 909      * This variable is used to tell whether a connector (RMIConnector or ClientIntermediary)
 910      * is doing reconnection.
 911      * This variable will be set to true by the method <code>preReconnection</code>, and set
 912      * to false by <code>postReconnection</code>.
 913      * When beingReconnected == true, no thread will be created for fetching notifications.
 914      */
 915     private boolean beingReconnected = false;
 916 
 917     private static final ClassLogger logger =
 918         new ClassLogger("javax.management.remote.misc",
 919                         "ClientNotifForwarder");
 920 }