1 
   2 import java.util.Collections;
   3 import java.util.HashSet;
   4 import java.util.Set;
   5 import java.util.concurrent.CountDownLatch;
   6 import java.util.concurrent.TimeUnit;
   7 import java.util.concurrent.atomic.AtomicBoolean;
   8 import javax.management.MBeanServerConnection;
   9 import javax.management.Notification;
  10 import javax.management.NotificationListener;
  11 import javax.management.ObjectName;
  12 import javax.management.remote.JMXConnectionNotification;
  13 import javax.management.remote.JMXConnector;
  14 import javax.management.remote.JMXConnectorFactory;
  15 import javax.management.remote.JMXServiceURL;
  16 
  17 public class Client {
  18     public static void run(String url) throws Exception {
  19         final int notifEmittedCnt = 10;
  20         final CountDownLatch counter = new CountDownLatch(notifEmittedCnt);
  21         final Set<Long> seqSet = Collections.synchronizedSet(new HashSet<Long>());
  22         final AtomicBoolean duplNotification = new AtomicBoolean();
  23 
  24         JMXServiceURL serverUrl = new JMXServiceURL(url);
  25 
  26         ObjectName name = new ObjectName("test", "foo", "bar");
  27         JMXConnector jmxConnector = JMXConnectorFactory.connect(serverUrl);
  28         System.out.println("client connected");
  29         jmxConnector.addConnectionNotificationListener(new NotificationListener() {
  30             @Override
  31             public void handleNotification(Notification notification, Object handback) {
  32                 System.out.println("connection notification: " + notification);
  33                 if (!seqSet.add(notification.getSequenceNumber())) {
  34                     duplNotification.set(true);
  35                 }
  36                 if (notification.getType().equals(JMXConnectionNotification.NOTIFS_LOST)) {
  37                     long lostNotifs = ((Long)((JMXConnectionNotification)notification).getUserData()).longValue();
  38                     for(int i=0;i<lostNotifs;i++) {
  39                         counter.countDown();
  40                     }
  41                 }
  42             }
  43         }, null, null);
  44         MBeanServerConnection jmxServer = jmxConnector.getMBeanServerConnection();
  45 
  46         jmxServer.addNotificationListener(name, new NotificationListener() {
  47             @Override
  48             public void handleNotification(Notification notification, Object handback) {
  49                 System.out.println("client got: " + notification);
  50                 if (!seqSet.add(notification.getSequenceNumber())) {
  51                     duplNotification.set(true);
  52                 }
  53                 counter.countDown();
  54             }
  55         }, null, null);
  56 
  57         System.out.println("client invoking foo (" + notifEmittedCnt + " times)");
  58         for(int i=0;i<notifEmittedCnt;i++) {
  59             System.out.print(".");
  60             jmxServer.invoke(name, "foo", new Object[]{}, new String[]{});
  61         }
  62         System.out.println();
  63         try {
  64             System.out.println("waiting for " + notifEmittedCnt + " notifications to arrive");
  65             if (!counter.await(30, TimeUnit.SECONDS)) {
  66                 throw new InterruptedException();
  67             }
  68             if (duplNotification.get()) {
  69                 System.out.println("ERROR: received duplicated notifications");
  70                 throw new Error("received duplicated notifications");
  71             }
  72             System.out.println("\nshutting down client");
  73         } catch (InterruptedException e) {
  74             System.out.println("ERROR: notification processing thread interrupted");
  75             throw new Error("notification thread interrupted unexpectedly");
  76         }
  77     }
  78 }