src/java.naming/share/classes/com/sun/jndi/ldap/LdapRequest.java
Print this page
@@ -27,49 +27,53 @@
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.naming.CommunicationException;
+import java.util.concurrent.TimeUnit;
final class LdapRequest {
- LdapRequest next; // Set/read in synchronized Connection methods
- int msgId; // read-only
+ private final static BerDecoder EOF = new BerDecoder(new byte[]{}, -1, 0);
- private int gotten = 0;
- private BlockingQueue<BerDecoder> replies;
- private int highWatermark = -1;
- private boolean cancelled = false;
- private boolean pauseAfterReceipt = false;
- private boolean completed = false;
+ LdapRequest next; // Set/read in synchronized Connection methods
+ final int msgId; // read-only
- LdapRequest(int msgId, boolean pause) {
- this(msgId, pause, -1);
- }
+ private final BlockingQueue<BerDecoder> replies;
+ private volatile boolean cancelled;
+ private boolean closed;
+ private volatile boolean completed;
+ private final boolean pauseAfterReceipt;
LdapRequest(int msgId, boolean pause, int replyQueueCapacity) {
this.msgId = msgId;
this.pauseAfterReceipt = pause;
if (replyQueueCapacity == -1) {
- this.replies = new LinkedBlockingQueue<BerDecoder>();
+ this.replies = new LinkedBlockingQueue<>();
} else {
- this.replies =
- new LinkedBlockingQueue<BerDecoder>(replyQueueCapacity);
- highWatermark = (replyQueueCapacity * 80) / 100; // 80% capacity
+ this.replies = new LinkedBlockingQueue<>(8 * replyQueueCapacity / 10);
}
}
- synchronized void cancel() {
+ void cancel() {
cancelled = true;
+ replies.offer(EOF);
+ }
- // Unblock reader of pending request
- // Should only ever have at most one waiter
- notify();
+ synchronized void close() {
+ closed = true;
+ replies.offer(EOF);
+ }
+
+ private boolean isClosed() {
+ return closed && (replies.size() == 0 || replies.peek() == EOF);
}
synchronized boolean addReplyBer(BerDecoder ber) {
- if (cancelled) {
+ // check the closed boolean value here as we don't want anything
+ // to be added to the queue after close() has been called.
+ if (cancelled || closed) {
return false;
}
// Add a new reply to the queue of unprocessed replies.
try {
@@ -85,36 +89,37 @@
completed = (ber.peekByte() == LdapClient.LDAP_REP_RESULT);
} catch (IOException e) {
// ignore
}
ber.reset();
+ return pauseAfterReceipt;
+ }
- notify(); // notify anyone waiting for reply
- /*
- * If a queue capacity has been set then trigger a pause when the
- * queue has filled to 80% capacity. Later, when the queue has drained
- * then the reader gets unpaused.
- */
- if (highWatermark != -1 && replies.size() >= highWatermark) {
- return true; // trigger the pause
+ BerDecoder getReplyBer(long millis) throws CommunicationException,
+ InterruptedException {
+ if (cancelled) {
+ throw new CommunicationException("Request: " + msgId +
+ " cancelled");
}
- return pauseAfterReceipt;
+ if (isClosed()) {
+ return null;
}
- synchronized BerDecoder getReplyBer() throws CommunicationException {
+ BerDecoder result = millis > 0 ?
+ replies.poll(millis, TimeUnit.MILLISECONDS) : replies.take();
+
if (cancelled) {
throw new CommunicationException("Request: " + msgId +
" cancelled");
}
- /*
- * Remove a reply if the queue is not empty.
- * poll returns null if queue is empty.
- */
- BerDecoder reply = replies.poll();
- return reply;
+ if (isClosed()) {
+ return null;
+ }
+
+ return result;
}
- synchronized boolean hasSearchCompleted() {
+ boolean hasSearchCompleted() {
return completed;
}
}