src/java.naming/share/classes/com/sun/jndi/ldap/LdapRequest.java

Print this page

        

*** 27,75 **** import java.io.IOException; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import javax.naming.CommunicationException; final class LdapRequest { ! LdapRequest next; // Set/read in synchronized Connection methods ! int msgId; // read-only ! private int gotten = 0; ! private BlockingQueue<BerDecoder> replies; ! private int highWatermark = -1; ! private boolean cancelled = false; ! private boolean pauseAfterReceipt = false; ! private boolean completed = false; ! LdapRequest(int msgId, boolean pause) { ! this(msgId, pause, -1); ! } LdapRequest(int msgId, boolean pause, int replyQueueCapacity) { this.msgId = msgId; this.pauseAfterReceipt = pause; if (replyQueueCapacity == -1) { ! this.replies = new LinkedBlockingQueue<BerDecoder>(); } else { ! this.replies = ! new LinkedBlockingQueue<BerDecoder>(replyQueueCapacity); ! highWatermark = (replyQueueCapacity * 80) / 100; // 80% capacity } } ! synchronized void cancel() { cancelled = true; ! // Unblock reader of pending request ! // Should only ever have at most one waiter ! notify(); } synchronized boolean addReplyBer(BerDecoder ber) { ! if (cancelled) { return false; } // Add a new reply to the queue of unprocessed replies. try { --- 27,79 ---- import java.io.IOException; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import javax.naming.CommunicationException; + import java.util.concurrent.TimeUnit; final class LdapRequest { ! private final static BerDecoder EOF = new BerDecoder(new byte[]{}, -1, 0); ! LdapRequest next; // Set/read in synchronized Connection methods ! final int msgId; // read-only ! private final BlockingQueue<BerDecoder> replies; ! private volatile boolean cancelled; ! private volatile boolean closed; ! private volatile boolean completed; ! private final boolean pauseAfterReceipt; LdapRequest(int msgId, boolean pause, int replyQueueCapacity) { this.msgId = msgId; this.pauseAfterReceipt = pause; if (replyQueueCapacity == -1) { ! this.replies = new LinkedBlockingQueue<>(); } else { ! this.replies = new LinkedBlockingQueue<>(8 * replyQueueCapacity / 10); } } ! void cancel() { cancelled = true; + replies.offer(EOF); + } ! synchronized void close() { ! closed = true; ! replies.offer(EOF); ! } ! ! private boolean isClosed() { ! return closed && (replies.size() == 0 || replies.peek() == EOF); } synchronized boolean addReplyBer(BerDecoder ber) { ! // check the closed boolean value here as we don't want anything ! // to be added to the queue after close() has been called. ! if (cancelled || closed) { return false; } // Add a new reply to the queue of unprocessed replies. try {
*** 85,120 **** completed = (ber.peekByte() == LdapClient.LDAP_REP_RESULT); } catch (IOException e) { // ignore } ber.reset(); ! notify(); // notify anyone waiting for reply ! /* ! * If a queue capacity has been set then trigger a pause when the ! * queue has filled to 80% capacity. Later, when the queue has drained ! * then the reader gets unpaused. ! */ ! if (highWatermark != -1 && replies.size() >= highWatermark) { ! return true; // trigger the pause } ! return pauseAfterReceipt; } ! synchronized BerDecoder getReplyBer() throws CommunicationException { if (cancelled) { throw new CommunicationException("Request: " + msgId + " cancelled"); } ! /* ! * Remove a reply if the queue is not empty. ! * poll returns null if queue is empty. ! */ ! BerDecoder reply = replies.poll(); ! return reply; } ! synchronized boolean hasSearchCompleted() { return completed; } } --- 89,121 ---- completed = (ber.peekByte() == LdapClient.LDAP_REP_RESULT); } catch (IOException e) { // ignore } ber.reset(); + return pauseAfterReceipt; + } ! BerDecoder getReplyBer(long millis) throws CommunicationException, ! InterruptedException { ! if (cancelled) { ! throw new CommunicationException("Request: " + msgId + ! " cancelled"); } ! if (isClosed()) { ! return null; } ! BerDecoder result = millis > 0 ? ! replies.poll(millis, TimeUnit.MILLISECONDS) : replies.take(); ! if (cancelled) { throw new CommunicationException("Request: " + msgId + " cancelled"); } ! return result == EOF ? null : result; } ! boolean hasSearchCompleted() { return completed; } }