1 /*
   2  * Copyright (c) 2004, 2015, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 /*
  25  * @test
  26  * @bug 6190873
  27  * @summary Tests that thread creation can use a user-supplied Executor
  28  * @author Eamonn McManus
  29  * @modules java.management
  30  * @run clean ExecutorTest
  31  * @run build ExecutorTest
  32  * @run main ExecutorTest
  33  */
  34 
  35 import java.lang.reflect.*;
  36 import java.net.MalformedURLException;
  37 
  38 import java.util.*;
  39 import java.util.concurrent.*;
  40 
  41 import javax.management.*;
  42 import javax.management.remote.*;
  43 
  44 /*
  45   When you create a JMXConnector client, you can supply a
  46   "fetch-notifications Executor", which is a
  47   java.util.concurrent.Executor that will be used each time the
  48   connector client wants to call RMIConnection.fetchNotifications.
  49   This is a hook that allows users to make that potentially-blocking
  50   call from within a thread pool or the like.  If you have very many
  51   connections, you can potentially share the work of
  52   fetchNotifications calls among a number of threads that is less than
  53   the number of connections, decreasing thread usage at the expense of
  54   increased latency.
  55 
  56   This test checks that the environment property does in fact work.
  57   It creates a connection without that property and ensures that
  58   notification sending does in fact work (with the default Executor).
  59   Then it creates a connection with the property set to an Executor
  60   that records how many times its execute method is invoked.
  61   Notifications are sent one by one and each time the test waits for
  62   the notification to be received.  This implies that
  63   fetchNotifications will be executed at least as many times as there
  64   are notifications.  Since each fetchNotifications call is supposed
  65   to happen as an Executor.execute call, if Executor.execute has been
  66   called fewer times then there were notifications, we know that the
  67   Executor is not being used correctly.
  68  */
  69 public class ExecutorTest {
  70     private static final String EXECUTOR_PROPERTY =
  71         "jmx.remote.x.fetch.notifications.executor";
  72     private static final String NOTIF_TYPE = "test.type";
  73 
  74     public static void main(String[] args) throws Exception {
  75         String lastfail = null;
  76         for (String proto : new String[] {"rmi", "iiop", "jmxmp"}) {
  77             JMXServiceURL url = new JMXServiceURL(proto, null, 0);
  78             JMXConnectorServer cs;
  79             MBeanServer mbs = MBeanServerFactory.newMBeanServer();
  80             try {
  81                 // Create server just to see if the protocol is supported
  82                 cs = JMXConnectorServerFactory.newJMXConnectorServer(url,
  83                                                                      null,
  84                                                                      mbs);
  85             } catch (MalformedURLException e) {
  86                 System.out.println();
  87                 System.out.println("Ignoring protocol: " + proto);
  88                 continue;
  89             }
  90             String fail;
  91             try {
  92                 fail = test(proto);
  93                 if (fail != null)
  94                     System.out.println("TEST FAILED: " + fail);
  95             } catch (Exception e) {
  96                 e.printStackTrace(System.out);
  97                 fail = e.toString();
  98             }
  99             if (lastfail == null)
 100                 lastfail = fail;
 101         }
 102         if (lastfail == null)
 103             return;
 104         System.out.println();
 105         System.out.println("TEST FAILED");
 106         throw new Exception("Test failed: " + lastfail);
 107     }
 108 
 109     private static enum TestType {NO_EXECUTOR, NULL_EXECUTOR, COUNT_EXECUTOR};
 110 
 111     private static String test(String proto) throws Exception {
 112         System.out.println();
 113         System.out.println("TEST: " + proto);
 114         ClassLoader myClassLoader =
 115             CountInvocationHandler.class.getClassLoader();
 116         ExecutorService wrappedExecutor = Executors.newCachedThreadPool();
 117         CountInvocationHandler executorHandler =
 118             new CountInvocationHandler(wrappedExecutor);
 119         Executor countExecutor = (Executor)
 120             Proxy.newProxyInstance(myClassLoader,
 121                                    new Class<?>[] {Executor.class},
 122                                    executorHandler);
 123 
 124         JMXServiceURL url = new JMXServiceURL(proto, null, 0);
 125 
 126         for (TestType test : TestType.values()) {
 127             Map<String, Executor> env = new HashMap<String, Executor>();
 128             switch (test) {
 129             case NO_EXECUTOR:
 130                 System.out.println("Test with no executor in env");
 131                 break;
 132             case NULL_EXECUTOR:
 133                 System.out.println("Test with null executor in env");
 134                 env.put(EXECUTOR_PROPERTY, null);
 135                 break;
 136             case COUNT_EXECUTOR:
 137                 System.out.println("Test with non-null executor in env");
 138                 env.put(EXECUTOR_PROPERTY, countExecutor);
 139                 break;
 140             }
 141             MBeanServer mbs = MBeanServerFactory.newMBeanServer();
 142             ObjectName emitName = new ObjectName("blah:type=Emitter");
 143             mbs.registerMBean(new Emitter(), emitName);
 144             JMXConnectorServer cs =
 145                 JMXConnectorServerFactory.newJMXConnectorServer(url, null, mbs);
 146             cs.start();
 147             JMXServiceURL addr = cs.getAddress();
 148             JMXConnector cc = JMXConnectorFactory.connect(addr, env);
 149             MBeanServerConnection mbsc = cc.getMBeanServerConnection();
 150             EmitterMBean emitter = (EmitterMBean)
 151                 MBeanServerInvocationHandler.newProxyInstance(mbsc,
 152                                                               emitName,
 153                                                               EmitterMBean.class,
 154                                                               false);
 155             SemaphoreListener listener = new SemaphoreListener();
 156             NotificationFilterSupport filter = new NotificationFilterSupport();
 157             filter.enableType(NOTIF_TYPE);
 158             mbsc.addNotificationListener(emitName, listener, filter, null);
 159             final int NOTIF_COUNT = 10;
 160             for (int i = 0; i < NOTIF_COUNT; i++) {
 161                 emitter.emit();
 162                 listener.await();
 163             }
 164             Thread.sleep(1);
 165             listener.checkUnavailable();
 166             System.out.println("Got notifications");
 167             cc.close();
 168             cs.stop();
 169             if (test == TestType.COUNT_EXECUTOR) {
 170                 Method m = Executor.class.getMethod("execute", Runnable.class);
 171                 Integer count = executorHandler.methodCount.get(m);
 172                 if (count == null || count < NOTIF_COUNT)
 173                     return "Incorrect method count for execute: " + count;
 174                 System.out.println("Executor was called enough times");
 175             }
 176         }
 177 
 178         wrappedExecutor.shutdown();
 179         return null;
 180     }
 181 
 182     /* Simple MBean that sends a notification every time we ask it to.  */
 183     public static interface EmitterMBean {
 184         public void emit();
 185     }
 186 
 187     public static class Emitter
 188             extends NotificationBroadcasterSupport implements EmitterMBean {
 189         public void emit() {
 190             sendNotification(new Notification(NOTIF_TYPE, this, seq++));
 191         }
 192 
 193         private long seq = 1;
 194     }
 195 
 196     /* Simple NotificationListener that allows you to wait until a
 197        notification has been received.  Since it uses a semaphore, you
 198        can wait either before or after the notification has in fact
 199        been received and it will work in either case.  */
 200     private static class SemaphoreListener implements NotificationListener {
 201         void await() throws InterruptedException {
 202             semaphore.acquire();
 203         }
 204 
 205         /* Ensure no extra notifications were received.  If we can acquire
 206            the semaphore, that means its release() method was called more
 207            times than its acquire() method, which means there were too
 208            many notifications.  */
 209         void checkUnavailable() throws Exception {
 210             if (semaphore.tryAcquire())
 211                 throw new Exception("Got extra notifications!");
 212         }
 213 
 214         public void handleNotification(Notification n, Object h) {
 215             semaphore.release();
 216         }
 217 
 218         private final Semaphore semaphore = new Semaphore(0);
 219     }
 220 
 221     /* Generic InvocationHandler that forwards all methods to a wrapped
 222        object, but counts for each method how many times it was invoked.  */
 223     private static class CountInvocationHandler implements InvocationHandler {
 224         final Map<Method, Integer> methodCount =
 225             new HashMap<Method, Integer>();
 226         private final Object wrapped;
 227 
 228         public CountInvocationHandler(Object wrapped) {
 229             this.wrapped = wrapped;
 230         }
 231 
 232         public Object invoke(Object proxy, Method method, Object[] args)
 233                 throws Throwable {
 234             synchronized (methodCount) {
 235                 Integer count = methodCount.get(method);
 236                 if (count == null)
 237                     count = 0;
 238                 methodCount.put(method, count + 1);
 239             }
 240             return method.invoke(wrapped, (Object[]) args);
 241         }
 242     }
 243 }