src/java.naming/share/classes/com/sun/jndi/ldap/LdapRequest.java

Print this page

        

@@ -27,49 +27,53 @@
 
 import java.io.IOException;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import javax.naming.CommunicationException;
+import java.util.concurrent.TimeUnit;
 
 final class LdapRequest {
 
-    LdapRequest next;   // Set/read in synchronized Connection methods
-    int msgId;          // read-only
+    private final static BerDecoder EOF = new BerDecoder(new byte[]{}, -1, 0);
 
-    private int gotten = 0;
-    private BlockingQueue<BerDecoder> replies;
-    private int highWatermark = -1;
-    private boolean cancelled = false;
-    private boolean pauseAfterReceipt = false;
-    private boolean completed = false;
+    LdapRequest next;   // Set/read in synchronized Connection methods
+    final int msgId;          // read-only
 
-    LdapRequest(int msgId, boolean pause) {
-        this(msgId, pause, -1);
-    }
+    private final BlockingQueue<BerDecoder> replies;
+    private volatile boolean cancelled;
+    private boolean closed;
+    private volatile boolean completed;
+    private final boolean pauseAfterReceipt;
 
     LdapRequest(int msgId, boolean pause, int replyQueueCapacity) {
         this.msgId = msgId;
         this.pauseAfterReceipt = pause;
         if (replyQueueCapacity == -1) {
-            this.replies = new LinkedBlockingQueue<BerDecoder>();
+            this.replies = new LinkedBlockingQueue<>();
         } else {
-            this.replies =
-                new LinkedBlockingQueue<BerDecoder>(replyQueueCapacity);
-            highWatermark = (replyQueueCapacity * 80) / 100; // 80% capacity
+            this.replies = new LinkedBlockingQueue<>(8 * replyQueueCapacity / 10);
         }
     }
 
-    synchronized void cancel() {
+    void cancel() {
         cancelled = true;
+        replies.offer(EOF);
+    }
 
-        // Unblock reader of pending request
-        // Should only ever have at most one waiter
-        notify();
+    synchronized void close() {
+        closed = true;
+        replies.offer(EOF);
+    }
+
+    private boolean isClosed() {
+        return closed && (replies.size() == 0 || replies.peek() == EOF);
     }
 
     synchronized boolean addReplyBer(BerDecoder ber) {
-        if (cancelled) {
+        // check the closed boolean value here as we don't want anything
+        // to be added to the queue after close() has been called.
+        if (cancelled || closed) {
             return false;
         }
 
         // Add a new reply to the queue of unprocessed replies.
         try {

@@ -85,36 +89,37 @@
             completed = (ber.peekByte() == LdapClient.LDAP_REP_RESULT);
         } catch (IOException e) {
             // ignore
         }
         ber.reset();
+        return pauseAfterReceipt;
+    }
 
-        notify(); // notify anyone waiting for reply
-        /*
-         * If a queue capacity has been set then trigger a pause when the
-         * queue has filled to 80% capacity. Later, when the queue has drained
-         * then the reader gets unpaused.
-         */
-        if (highWatermark != -1 && replies.size() >= highWatermark) {
-            return true; // trigger the pause
+    BerDecoder getReplyBer(long millis) throws CommunicationException,
+                                               InterruptedException {
+        if (cancelled) {
+            throw new CommunicationException("Request: " + msgId +
+                " cancelled");
         }
-        return pauseAfterReceipt;
+        if (isClosed()) {
+            return null;
     }
 
-    synchronized BerDecoder getReplyBer() throws CommunicationException {
+        BerDecoder result = millis > 0 ?
+                replies.poll(millis, TimeUnit.MILLISECONDS) : replies.take();
+
         if (cancelled) {
             throw new CommunicationException("Request: " + msgId +
                 " cancelled");
         }
 
-        /*
-         * Remove a reply if the queue is not empty.
-         * poll returns null if queue is empty.
-         */
-        BerDecoder reply = replies.poll();
-        return reply;
+        if (isClosed()) {
+            return null;
+        }
+
+        return result;
     }
 
-    synchronized boolean hasSearchCompleted() {
+    boolean hasSearchCompleted() {
         return completed;
     }
 }