1 /*
   2  * $Id$
   3  *
   4  * Copyright (c) 2001, 2011, Oracle and/or its affiliates. All rights reserved.
   5  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   6  *
   7  * This code is free software; you can redistribute it and/or modify it
   8  * under the terms of the GNU General Public License version 2 only, as
   9  * published by the Free Software Foundation.  Oracle designates this
  10  * particular file as subject to the "Classpath" exception as provided
  11  * by Oracle in the LICENSE file that accompanied this code.
  12  *
  13  * This code is distributed in the hope that it will be useful, but WITHOUT
  14  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  15  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  16  * version 2 for more details (a copy is included in the LICENSE file that
  17  * accompanied this code).
  18  *
  19  * You should have received a copy of the GNU General Public License version
  20  * 2 along with this work; if not, write to the Free Software Foundation,
  21  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  22  *
  23  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  24  * or visit www.oracle.com if you need additional information or have any
  25  * questions.
  26  */
  27 package com.sun.javatest.util;
  28 
  29 import com.sun.javatest.TestResult;
  30 
  31 import java.util.Iterator;
  32 
  33 /**
  34  * An iterator that can read ahead of the current position, either for
  35  * performance reasons or to help find out the number of items returned by an
  36  * iterator before accessing them.
  37  */
  38 public class ReadAheadIterator implements Iterator<TestResult>
  39 {
  40     /**
  41      * A constant indicating that no read ahead is required.
  42      * @see #ReadAheadIterator
  43      */
  44     public static final int NONE = 0;
  45 
  46     /**
  47      * A constant indicating that limited read ahead is required.
  48      * @see #ReadAheadIterator
  49      */
  50     public static final int LIMITED = 1;
  51 
  52     /**
  53      * A constant indicating that full read ahead is required.
  54      * @see #ReadAheadIterator
  55      */
  56     public static final int FULL = 2;
  57 
  58     /**
  59      * Create a ReadAheadIterator.
  60      * @param source The iterator from which to read ahead
  61      * @param mode A value indicating the type of read ahead required.
  62      * @see #NONE
  63      * @see #LIMITED
  64      * @see #FULL
  65      */
  66     public ReadAheadIterator(Iterator<TestResult> source, int mode) {
  67         this(source, mode, DEFAULT_LIMITED_READAHEAD);
  68     }
  69 
  70     /**
  71      * Create a ReadAheadIterator.
  72      * @param source The iterator from which to read ahead.
  73      * @param mode A value indicating the type of read ahead required.
  74      * @param amount A value indicating the amount of read ahead required,
  75      * if the mode is set to LIMITED. If the mode is NON or FULL, this
  76      * parameter will be ignored.
  77      * @see #NONE
  78      * @see #LIMITED
  79      * @see #FULL
  80      */
  81     public ReadAheadIterator(Iterator<TestResult> source, int mode, int amount) {
  82         this.source = source;
  83         setMode(mode, amount);
  84     }
  85 
  86     /**
  87      * Check if all available items from the underlying source iterator
  88      * have been read.
  89      * @return true if all available items from the underlying source iterator
  90      * have been read.
  91      */
  92     public synchronized boolean isReadAheadComplete() {
  93         return (worker == null ? !source.hasNext() : !sourceHasNext);
  94     }
  95 
  96     /**
  97      * Get the number of items read (so far) from the underlying source iterator.
  98      * If the read ahead has not yet completed, this will be a partial count of
  99      * the total set of items available to be read.  If the read ahead is complete,
 100      * the value will be the total number of items returned from the underlying
 101      * source iterator.
 102      * @return the number of items (so far) from the underlying source iterator.
 103      * @see #isReadAheadComplete
 104      */
 105     public synchronized int getItemsFoundCount() {
 106         return (usedCount + queue.size());
 107     }
 108 
 109     /**
 110      * Are there items that have not be read-ahead yet?
 111      * @return True if the source has no more elements, false otherwise.
 112      * @deprecated Use hasNext().
 113      */
 114     public synchronized boolean isSourceExhausted() {
 115         return (worker == null ? !source.hasNext() : !sourceHasNext);
 116     }
 117 
 118     /**
 119      * How many elements have been distributed through <code>getNext()</code>.
 120      * @return number of used elements, greater-than or equal-to zero
 121      * @deprecated Will not be supported in the future.
 122      * @see #getItemsFoundCount
 123      */
 124     public synchronized int getUsedElementCount() {
 125         return usedCount;
 126     }
 127 
 128     /**
 129      * Number of items have been read-ahead by not distributed.
 130      * @return number of items ready to be distributed, greater-than
 131      *         or equal-to zero
 132      * @deprecated Will not be supported in the future.
 133      */
 134     public synchronized int getOutputQueueSize() {
 135         return queue.size();
 136     }
 137 
 138     /**
 139      * Set the type and/or amount of read ahead required.
 140      * @param mode A value indicating the type of read ahead required.
 141      * @param amount A value indicating the amount of read ahead required,
 142      * if the mode is set to LIMITED. If the mode is NON or FULL, this
 143      * parameter will be ignored.
 144      */
 145     synchronized void setMode(int mode, int amount) {
 146         switch (mode) {
 147         case NONE:
 148             minQueueSize = 0;
 149             maxQueueSize = 0;
 150             if (worker != null) {
 151                 worker = null;
 152                 notifyAll();  // wake up worker if necessary
 153             }
 154             break;
 155 
 156         case LIMITED:
 157             if (amount <= 0)
 158                 throw new IllegalArgumentException();
 159             minQueueSize = Math.min(10, amount);
 160             maxQueueSize = amount;
 161             break;
 162 
 163         case FULL:
 164             minQueueSize = 10;
 165             maxQueueSize = Integer.MAX_VALUE;
 166             break;
 167 
 168         default:
 169             throw new IllegalArgumentException();
 170         }
 171     }
 172 
 173     public synchronized boolean hasNext() {
 174         return (queue.size() > 0
 175                 || (worker == null ? source.hasNext() : sourceHasNext));
 176     }
 177 
 178     public synchronized TestResult next() {
 179         // see if there are items in the read ahead queue
 180         TestResult result = queue.remove();
 181 
 182         if (result == null) {
 183             // queue is empty: check whether to read source directly, or rely on the worker thread
 184             if (maxQueueSize == 0)
 185                 // no read ahead, so don't start worker; use source directly
 186                 result = source.next();
 187             else {
 188                 if (worker == null) {
 189                     // only start a worker if there are items for it to read
 190                     sourceHasNext = source.hasNext();
 191                     if (sourceHasNext) {
 192                         // there is more to be read, so start a worker to read it
 193                         worker = new Thread("ReadAheadIterator" + (workerNum++)) {
 194                                 public void run() {
 195                                     readAhead();
 196                                 }
 197                             };
 198                         worker.start();
 199                     }
 200                 }
 201                 else {
 202                     // ensure worker is awake
 203                     notifyAll();
 204                 }
 205 
 206                 // wait for the worker to deliver some results
 207                 while (sourceHasNext && queue.isEmpty()) {
 208                     try {
 209                         wait();
 210                     }
 211                     catch (InterruptedException e) {
 212                         // should not happen, but if it does, propogate the interrupt
 213                         Thread.currentThread().interrupt();
 214                     }
 215                 }
 216 
 217                 result = queue.remove();
 218             }
 219         }
 220         else if (sourceHasNext && (queue.size() < minQueueSize)) {
 221             // we've got something from the queue, but the queue is getting empty,
 222             // so ensure worker is awake
 223             notifyAll();
 224         }
 225 
 226         if (result != null)
 227             usedCount++;
 228 
 229         return result;
 230     }
 231 
 232     /**
 233      * @throws UnsupportedOperationException
 234      */
 235     public void remove() {
 236         throw new UnsupportedOperationException();
 237     }
 238 
 239     /**
 240      * The body of the worker thread which is used to perform the read ahead.
 241      * While the worker is running, it notionally "owns" the source iterator.
 242      * As such, read ahead from the source is not synchronized; instead,
 243      * just the updates to the queue and other monitored data with the results
 244      * of the read ahead are synchronized. This ensure minimum latency on the
 245      * main monitor lock.
 246      */
 247     private void readAhead() {
 248         final Thread thisThread = Thread.currentThread();
 249         boolean keepReading;
 250 
 251         // check whether the thread is really required
 252         synchronized (this) {
 253             keepReading = (sourceHasNext && (thisThread == worker));
 254         }
 255 
 256         try {
 257             while (keepReading) {
 258                 // sourceHasNext is true, which means there is another item
 259                 // to be read, so read it, and also check whether there is
 260                 // another item after that
 261                 TestResult srcNext  = source.next();
 262                 boolean srcHasNext = source.hasNext();
 263 
 264                 // get the lock to update the queue and sourceHasNext;
 265                 // check that the worker is still required; and
 266                 // wait (if necessary) for the queue to empty a bit
 267                 synchronized (this) {
 268                     queue.insert(srcNext);
 269                     sourceHasNext = srcHasNext;
 270                     notifyAll();
 271 
 272                     keepReading = (sourceHasNext && (thisThread == worker));
 273 
 274                     while (queue.size() >= maxQueueSize && keepReading) {
 275                         wait();
 276                         keepReading = (sourceHasNext && (thisThread == worker));
 277                     }
 278                 }
 279             }
 280         }
 281         catch (InterruptedException e) {
 282             // ignore
 283         }
 284         finally {
 285             // if this is still the main worker thread, zap the
 286             // reference to the thread, to help GC.
 287             synchronized (this) {
 288                 if (thisThread == worker)
 289                     worker = null;
 290             }
 291         }
 292     }
 293 
 294     //------------------------------------------------------------------------------------------
 295     //
 296     // Instance variables: access to all of these (except source) must be synchronized.
 297 
 298     /**
 299      * The queue to hold the items that have been read from the underlying source iterator.
 300      */
 301     private final Fifo<TestResult> queue = new Fifo<>();
 302 
 303     /**
 304      * The underlying source iterator.  If the worker thread is running, it alone
 305      * should access this iterator; otherwise, access to this should be synchronized,
 306      * along with everything else.
 307      * @see #worker
 308      */
 309     private final Iterator<TestResult> source;
 310 
 311     /**
 312      * A value indicating whether the underlying source iterator has more values to be read.
 313      * Use this instead of source.hasNext() when the worker thread is running.
 314      */
 315     private boolean sourceHasNext;
 316 
 317     /**
 318      * A minimum size for the queue. If the queue falls below this size, and if there
 319      * are more items to be read, the worker thread will be woken up to replenish the queue.
 320      * This may happen if the mode is set to PARTIAL and the worker thread fills the queue.
 321      */
 322     private int minQueueSize;
 323 
 324     /**
 325      * Set a maximum size for the queue, which is derived from the type and amount of
 326      * read ahead, given to setMode.
 327      * If the worker thread determines the queue size is bigger than this value, it will
 328      * wait until the size goes below minQueueSize.
 329      */
 330     private int maxQueueSize;
 331 
 332     /**
 333      * The number of items (i.e. not including null) returned from next().
 334      */
 335     private int usedCount;
 336 
 337     /**
 338      * The worker thread that does the read ahead. While it is set, the worker thread
 339      * should be the only one to access the underlying source iterator, which it will
 340      * do unsynchronized.
 341      */
 342     private Thread worker;
 343 
 344     /**
 345      * A counter for generating names for worker threads.
 346      */
 347     private static int workerNum;
 348 
 349     /**
 350      * The default amount for LIMITED read ahead.
 351      */
 352     private static final int DEFAULT_LIMITED_READAHEAD = 100;
 353 }