< prev index next >

src/share/classes/com/sun/jndi/ldap/LdapRequest.java

Print this page
rev 13664 : 8139965: Hang seen when using com.sun.jndi.ldap.search.replyQueueSize
Reviewed-by: dfuchs

@@ -27,57 +27,54 @@
 
 import java.io.IOException;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import javax.naming.CommunicationException;
+import java.util.concurrent.TimeUnit;
 
 final class LdapRequest {
 
-    LdapRequest next;   // Set/read in synchronized Connection methods
-    int msgId;          // read-only
+    private final static BerDecoder EOF = new BerDecoder(new byte[]{}, -1, 0);
 
-    private int gotten = 0;
-    private BlockingQueue<BerDecoder> replies;
-    private int highWatermark = -1;
-    private boolean cancelled = false;
-    private boolean pauseAfterReceipt = false;
-    private boolean completed = false;
+    LdapRequest next;   // Set/read in synchronized Connection methods
+    final int msgId;    // read-only
 
-    LdapRequest(int msgId, boolean pause) {
-        this(msgId, pause, -1);
-    }
+    private final BlockingQueue<BerDecoder> replies;
+    private volatile boolean cancelled;
+    private volatile boolean closed;
+    private volatile boolean completed;
+    private final boolean pauseAfterReceipt;
 
     LdapRequest(int msgId, boolean pause, int replyQueueCapacity) {
         this.msgId = msgId;
         this.pauseAfterReceipt = pause;
         if (replyQueueCapacity == -1) {
-            this.replies = new LinkedBlockingQueue<BerDecoder>();
+            this.replies = new LinkedBlockingQueue<>();
         } else {
-            this.replies =
-                new LinkedBlockingQueue<BerDecoder>(replyQueueCapacity);
-            highWatermark = (replyQueueCapacity * 80) / 100; // 80% capacity
+            this.replies = new LinkedBlockingQueue<>(8 * replyQueueCapacity / 10);
         }
     }
 
-    synchronized void cancel() {
+    void cancel() {
         cancelled = true;
+        replies.offer(EOF);
+    }
 
-        // Unblock reader of pending request
-        // Should only ever have atmost one waiter
-        notify();
+    synchronized void close() {
+        closed = true;
+        replies.offer(EOF);
     }
 
-    synchronized boolean addReplyBer(BerDecoder ber) {
-        if (cancelled) {
-            return false;
+    private boolean isClosed() {
+        return closed && (replies.size() == 0 || replies.peek() == EOF);
         }
 
-        // Add a new reply to the queue of unprocessed replies.
-        try {
-            replies.put(ber);
-        } catch (InterruptedException e) {
-            // ignore
+    synchronized boolean addReplyBer(BerDecoder ber) {
+        // check the closed boolean value here as we don't want anything
+        // to be added to the queue after close() has been called.
+        if (cancelled || closed) {
+            return false;
         }
 
         // peek at the BER buffer to check if it is a SearchResultDone PDU
         try {
             ber.parseSeq(null);

@@ -86,35 +83,40 @@
         } catch (IOException e) {
             // ignore
         }
         ber.reset();
 
-        notify(); // notify anyone waiting for reply
-        /*
-         * If a queue capacity has been set then trigger a pause when the
-         * queue has filled to 80% capacity. Later, when the queue has drained
-         * then the reader gets unpaused.
-         */
-        if (highWatermark != -1 && replies.size() >= highWatermark) {
-            return true; // trigger the pause
+        // Add a new reply to the queue of unprocessed replies.
+        try {
+            replies.put(ber);
+        } catch (InterruptedException e) {
+            // ignore
         }
+
         return pauseAfterReceipt;
     }
 
-    synchronized BerDecoder getReplyBer() throws CommunicationException {
+    BerDecoder getReplyBer(long millis) throws CommunicationException,
+                                               InterruptedException {
+        if (cancelled) {
+            throw new CommunicationException("Request: " + msgId +
+                " cancelled");
+        }
+        if (isClosed()) {
+            return null;
+        }
+
+        BerDecoder result = millis > 0 ?
+                replies.poll(millis, TimeUnit.MILLISECONDS) : replies.take();
+
         if (cancelled) {
             throw new CommunicationException("Request: " + msgId +
                 " cancelled");
         }
 
-        /*
-         * Remove a reply if the queue is not empty.
-         * poll returns null if queue is empty.
-         */
-        BerDecoder reply = replies.poll();
-        return reply;
+        return result == EOF ? null : result;
     }
 
-    synchronized boolean hasSearchCompleted() {
+    boolean hasSearchCompleted() {
         return completed;
     }
 }
< prev index next >