1 2 import java.util.Collections; 3 import java.util.HashSet; 4 import java.util.Set; 5 import java.util.concurrent.CountDownLatch; 6 import java.util.concurrent.TimeUnit; 7 import java.util.concurrent.atomic.AtomicBoolean; 8 import javax.management.MBeanServerConnection; 9 import javax.management.Notification; 10 import javax.management.NotificationListener; 11 import javax.management.ObjectName; 12 import javax.management.remote.JMXConnectionNotification; 13 import javax.management.remote.JMXConnector; 14 import javax.management.remote.JMXConnectorFactory; 15 import javax.management.remote.JMXServiceURL; 16 17 public class Client { 18 public static void run(String url) throws Exception { 19 final int notifEmittedCnt = 10; 20 final CountDownLatch counter = new CountDownLatch(notifEmittedCnt); 21 final Set<Long> seqSet = Collections.synchronizedSet(new HashSet<Long>()); 22 final AtomicBoolean duplNotification = new AtomicBoolean(); 23 24 JMXServiceURL serverUrl = new JMXServiceURL(url); 25 26 ObjectName name = new ObjectName("test", "foo", "bar"); 27 JMXConnector jmxConnector = JMXConnectorFactory.connect(serverUrl); 28 System.out.println("client connected"); 29 jmxConnector.addConnectionNotificationListener(new NotificationListener() { 30 @Override 31 public void handleNotification(Notification notification, Object handback) { 32 System.out.println("connection notification: " + notification); 33 if (!seqSet.add(notification.getSequenceNumber())) { 34 duplNotification.set(true); 35 } 36 if (notification.getType().equals(JMXConnectionNotification.NOTIFS_LOST)) { 37 long lostNotifs = ((Long)((JMXConnectionNotification)notification).getUserData()).longValue(); 38 for(int i=0;i<lostNotifs;i++) { 39 counter.countDown(); 40 } 41 } 42 } 43 }, null, null); 44 MBeanServerConnection jmxServer = jmxConnector.getMBeanServerConnection(); 45 46 jmxServer.addNotificationListener(name, new NotificationListener() { 47 @Override 48 public void handleNotification(Notification notification, Object handback) { 49 System.out.println("client got: " + notification); 50 if (!seqSet.add(notification.getSequenceNumber())) { 51 duplNotification.set(true); 52 } 53 counter.countDown(); 54 } 55 }, null, null); 56 57 System.out.println("client invoking foo (" + notifEmittedCnt + " times)"); 58 for(int i=0;i<notifEmittedCnt;i++) { 59 System.out.print("."); 60 jmxServer.invoke(name, "foo", new Object[]{}, new String[]{}); 61 } 62 System.out.println(); 63 try { 64 System.out.println("waiting for " + notifEmittedCnt + " notifications to arrive"); 65 if (!counter.await(30, TimeUnit.SECONDS)) { 66 throw new InterruptedException(); 67 } 68 if (duplNotification.get()) { 69 System.out.println("ERROR: received duplicated notifications"); 70 throw new Error("received duplicated notifications"); 71 } 72 System.out.println("\nshutting down client"); 73 } catch (InterruptedException e) { 74 System.out.println("ERROR: notification processing thread interrupted"); 75 throw new Error("notification thread interrupted unexpectedly"); 76 } 77 } 78 }