< prev index next >
src/share/classes/com/sun/jndi/ldap/LdapRequest.java
Print this page
rev 13664 : 8139965: Hang seen when using com.sun.jndi.ldap.search.replyQueueSize
Reviewed-by: dfuchs
*** 27,83 ****
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.naming.CommunicationException;
final class LdapRequest {
! LdapRequest next; // Set/read in synchronized Connection methods
! int msgId; // read-only
! private int gotten = 0;
! private BlockingQueue<BerDecoder> replies;
! private int highWatermark = -1;
! private boolean cancelled = false;
! private boolean pauseAfterReceipt = false;
! private boolean completed = false;
! LdapRequest(int msgId, boolean pause) {
! this(msgId, pause, -1);
! }
LdapRequest(int msgId, boolean pause, int replyQueueCapacity) {
this.msgId = msgId;
this.pauseAfterReceipt = pause;
if (replyQueueCapacity == -1) {
! this.replies = new LinkedBlockingQueue<BerDecoder>();
} else {
! this.replies =
! new LinkedBlockingQueue<BerDecoder>(replyQueueCapacity);
! highWatermark = (replyQueueCapacity * 80) / 100; // 80% capacity
}
}
! synchronized void cancel() {
cancelled = true;
! // Unblock reader of pending request
! // Should only ever have atmost one waiter
! notify();
}
! synchronized boolean addReplyBer(BerDecoder ber) {
! if (cancelled) {
! return false;
}
! // Add a new reply to the queue of unprocessed replies.
! try {
! replies.put(ber);
! } catch (InterruptedException e) {
! // ignore
}
// peek at the BER buffer to check if it is a SearchResultDone PDU
try {
ber.parseSeq(null);
--- 27,80 ----
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.naming.CommunicationException;
+ import java.util.concurrent.TimeUnit;
final class LdapRequest {
! private final static BerDecoder EOF = new BerDecoder(new byte[]{}, -1, 0);
! LdapRequest next; // Set/read in synchronized Connection methods
! final int msgId; // read-only
! private final BlockingQueue<BerDecoder> replies;
! private volatile boolean cancelled;
! private volatile boolean closed;
! private volatile boolean completed;
! private final boolean pauseAfterReceipt;
LdapRequest(int msgId, boolean pause, int replyQueueCapacity) {
this.msgId = msgId;
this.pauseAfterReceipt = pause;
if (replyQueueCapacity == -1) {
! this.replies = new LinkedBlockingQueue<>();
} else {
! this.replies = new LinkedBlockingQueue<>(8 * replyQueueCapacity / 10);
}
}
! void cancel() {
cancelled = true;
+ replies.offer(EOF);
+ }
! synchronized void close() {
! closed = true;
! replies.offer(EOF);
}
! private boolean isClosed() {
! return closed && (replies.size() == 0 || replies.peek() == EOF);
}
! synchronized boolean addReplyBer(BerDecoder ber) {
! // check the closed boolean value here as we don't want anything
! // to be added to the queue after close() has been called.
! if (cancelled || closed) {
! return false;
}
// peek at the BER buffer to check if it is a SearchResultDone PDU
try {
ber.parseSeq(null);
*** 86,120 ****
} catch (IOException e) {
// ignore
}
ber.reset();
! notify(); // notify anyone waiting for reply
! /*
! * If a queue capacity has been set then trigger a pause when the
! * queue has filled to 80% capacity. Later, when the queue has drained
! * then the reader gets unpaused.
! */
! if (highWatermark != -1 && replies.size() >= highWatermark) {
! return true; // trigger the pause
}
return pauseAfterReceipt;
}
! synchronized BerDecoder getReplyBer() throws CommunicationException {
if (cancelled) {
throw new CommunicationException("Request: " + msgId +
" cancelled");
}
! /*
! * Remove a reply if the queue is not empty.
! * poll returns null if queue is empty.
! */
! BerDecoder reply = replies.poll();
! return reply;
}
! synchronized boolean hasSearchCompleted() {
return completed;
}
}
--- 83,122 ----
} catch (IOException e) {
// ignore
}
ber.reset();
! // Add a new reply to the queue of unprocessed replies.
! try {
! replies.put(ber);
! } catch (InterruptedException e) {
! // ignore
}
+
return pauseAfterReceipt;
}
! BerDecoder getReplyBer(long millis) throws CommunicationException,
! InterruptedException {
! if (cancelled) {
! throw new CommunicationException("Request: " + msgId +
! " cancelled");
! }
! if (isClosed()) {
! return null;
! }
!
! BerDecoder result = millis > 0 ?
! replies.poll(millis, TimeUnit.MILLISECONDS) : replies.take();
!
if (cancelled) {
throw new CommunicationException("Request: " + msgId +
" cancelled");
}
! return result == EOF ? null : result;
}
! boolean hasSearchCompleted() {
return completed;
}
}
< prev index next >