1 /*
   2  * $Id$
   3  *
   4  * Copyright (c) 2001, 2011, Oracle and/or its affiliates. All rights reserved.
   5  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   6  *
   7  * This code is free software; you can redistribute it and/or modify it
   8  * under the terms of the GNU General Public License version 2 only, as
   9  * published by the Free Software Foundation.  Oracle designates this
  10  * particular file as subject to the "Classpath" exception as provided
  11  * by Oracle in the LICENSE file that accompanied this code.
  12  *
  13  * This code is distributed in the hope that it will be useful, but WITHOUT
  14  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  15  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  16  * version 2 for more details (a copy is included in the LICENSE file that
  17  * accompanied this code).
  18  *
  19  * You should have received a copy of the GNU General Public License version
  20  * 2 along with this work; if not, write to the Free Software Foundation,
  21  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  22  *
  23  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  24  * or visit www.oracle.com if you need additional information or have any
  25  * questions.
  26  */
  27 package com.sun.javatest.util;
  28 
  29 import java.util.Iterator;
  30 
  31 /**
  32  * An iterator that can read ahead of the current position, either for
  33  * performance reasons or to help find out the number of items returned by an
  34  * iterator before accessing them.
  35  */
  36 public class ReadAheadIterator implements Iterator
  37 {
  38     /**
  39      * A constant indicating that no read ahead is required.
  40      * @see #ReadAheadIterator
  41      */
  42     public static final int NONE = 0;
  43 
  44     /**
  45      * A constant indicating that limited read ahead is required.
  46      * @see #ReadAheadIterator
  47      */
  48     public static final int LIMITED = 1;
  49 
  50     /**
  51      * A constant indicating that full read ahead is required.
  52      * @see #ReadAheadIterator
  53      */
  54     public static final int FULL = 2;
  55 
  56     /**
  57      * Create a ReadAheadIterator.
  58      * @param source The iterator from which to read ahead
  59      * @param mode A value indicating the type of read ahead required.
  60      * @see #NONE
  61      * @see #LIMITED
  62      * @see #FULL
  63      */
  64     public ReadAheadIterator(Iterator source, int mode) {
  65         this(source, mode, DEFAULT_LIMITED_READAHEAD);
  66     }
  67 
  68     /**
  69      * Create a ReadAheadIterator.
  70      * @param source The iterator from which to read ahead.
  71      * @param mode A value indicating the type of read ahead required.
  72      * @param amount A value indicating the amount of read ahead required,
  73      * if the mode is set to LIMITED. If the mode is NON or FULL, this
  74      * parameter will be ignored.
  75      * @see #NONE
  76      * @see #LIMITED
  77      * @see #FULL
  78      */
  79     public ReadAheadIterator(Iterator source, int mode, int amount) {
  80         this.source = source;
  81         setMode(mode, amount);
  82     }
  83 
  84     /**
  85      * Check if all available items from the underlying source iterator
  86      * have been read.
  87      * @return true if all available items from the underlying source iterator
  88      * have been read.
  89      */
  90     public synchronized boolean isReadAheadComplete() {
  91         return (worker == null ? !source.hasNext() : !sourceHasNext);
  92     }
  93 
  94     /**
  95      * Get the number of items read (so far) from the underlying source iterator.
  96      * If the read ahead has not yet completed, this will be a partial count of
  97      * the total set of items available to be read.  If the read ahead is complete,
  98      * the value will be the total number of items returned from the underlying
  99      * source iterator.
 100      * @return the number of items (so far) from the underlying source iterator.
 101      * @see #isReadAheadComplete
 102      */
 103     public synchronized int getItemsFoundCount() {
 104         return (usedCount + queue.size());
 105     }
 106 
 107     /**
 108      * Are there items that have not be read-ahead yet?
 109      * @return True if the source has no more elements, false otherwise.
 110      * @deprecated Use hasNext().
 111      */
 112     public synchronized boolean isSourceExhausted() {
 113         return (worker == null ? !source.hasNext() : !sourceHasNext);
 114     }
 115 
 116     /**
 117      * How many elements have been distributed through <code>getNext()</code>.
 118      * @return number of used elements, greater-than or equal-to zero
 119      * @deprecated Will not be supported in the future.
 120      * @see #getItemsFoundCount
 121      */
 122     public synchronized int getUsedElementCount() {
 123         return usedCount;
 124     }
 125 
 126     /**
 127      * Number of items have been read-ahead by not distributed.
 128      * @return number of items ready to be distributed, greater-than
 129      *         or equal-to zero
 130      * @deprecated Will not be supported in the future.
 131      */
 132     public synchronized int getOutputQueueSize() {
 133         return queue.size();
 134     }
 135 
 136     /**
 137      * Set the type and/or amount of read ahead required.
 138      * @param mode A value indicating the type of read ahead required.
 139      * @param amount A value indicating the amount of read ahead required,
 140      * if the mode is set to LIMITED. If the mode is NON or FULL, this
 141      * parameter will be ignored.
 142      */
 143     synchronized void setMode(int mode, int amount) {
 144         switch (mode) {
 145         case NONE:
 146             minQueueSize = 0;
 147             maxQueueSize = 0;
 148             if (worker != null) {
 149                 worker = null;
 150                 notifyAll();  // wake up worker if necessary
 151             }
 152             break;
 153 
 154         case LIMITED:
 155             if (amount <= 0)
 156                 throw new IllegalArgumentException();
 157             minQueueSize = Math.min(10, amount);
 158             maxQueueSize = amount;
 159             break;
 160 
 161         case FULL:
 162             minQueueSize = 10;
 163             maxQueueSize = Integer.MAX_VALUE;
 164             break;
 165 
 166         default:
 167             throw new IllegalArgumentException();
 168         }
 169     }
 170 
 171     public synchronized boolean hasNext() {
 172         return (queue.size() > 0
 173                 || (worker == null ? source.hasNext() : sourceHasNext));
 174     }
 175 
 176     public synchronized Object next() {
 177         // see if there are items in the read ahead queue
 178         Object result = queue.remove();
 179 
 180         if (result == null) {
 181             // queue is empty: check whether to read source directly, or rely on the worker thread
 182             if (maxQueueSize == 0)
 183                 // no read ahead, so don't start worker; use source directly
 184                 result = source.next();
 185             else {
 186                 if (worker == null) {
 187                     // only start a worker if there are items for it to read
 188                     sourceHasNext = source.hasNext();
 189                     if (sourceHasNext) {
 190                         // there is more to be read, so start a worker to read it
 191                         worker = new Thread("ReadAheadIterator" + (workerNum++)) {
 192                                 public void run() {
 193                                     readAhead();
 194                                 }
 195                             };
 196                         worker.start();
 197                     }
 198                 }
 199                 else {
 200                     // ensure worker is awake
 201                     notifyAll();
 202                 }
 203 
 204                 // wait for the worker to deliver some results
 205                 while (sourceHasNext && queue.isEmpty()) {
 206                     try {
 207                         wait();
 208                     }
 209                     catch (InterruptedException e) {
 210                         // should not happen, but if it does, propogate the interrupt
 211                         Thread.currentThread().interrupt();
 212                     }
 213                 }
 214 
 215                 result = queue.remove();
 216             }
 217         }
 218         else if (sourceHasNext && (queue.size() < minQueueSize)) {
 219             // we've got something from the queue, but the queue is getting empty,
 220             // so ensure worker is awake
 221             notifyAll();
 222         }
 223 
 224         if (result != null)
 225             usedCount++;
 226 
 227         return result;
 228     }
 229 
 230     /**
 231      * @throws UnsupportedOperationException
 232      */
 233     public void remove() {
 234         throw new UnsupportedOperationException();
 235     }
 236 
 237     /**
 238      * The body of the worker thread which is used to perform the read ahead.
 239      * While the worker is running, it notionally "owns" the source iterator.
 240      * As such, read ahead from the source is not synchronized; instead,
 241      * just the updates to the queue and other monitored data with the results
 242      * of the read ahead are synchronized. This ensure minimum latency on the
 243      * main monitor lock.
 244      */
 245     private void readAhead() {
 246         final Thread thisThread = Thread.currentThread();
 247         boolean keepReading;
 248 
 249         // check whether the thread is really required
 250         synchronized (this) {
 251             keepReading = (sourceHasNext && (thisThread == worker));
 252         }
 253 
 254         try {
 255             while (keepReading) {
 256                 // sourceHasNext is true, which means there is another item
 257                 // to be read, so read it, and also check whether there is
 258                 // another item after that
 259                 Object srcNext  = source.next();
 260                 boolean srcHasNext = source.hasNext();
 261 
 262                 // get the lock to update the queue and sourceHasNext;
 263                 // check that the worker is still required; and
 264                 // wait (if necessary) for the queue to empty a bit
 265                 synchronized (this) {
 266                     queue.insert(srcNext);
 267                     sourceHasNext = srcHasNext;
 268                     notifyAll();
 269 
 270                     keepReading = (sourceHasNext && (thisThread == worker));
 271 
 272                     while (queue.size() >= maxQueueSize && keepReading) {
 273                         wait();
 274                         keepReading = (sourceHasNext && (thisThread == worker));
 275                     }
 276                 }
 277             }
 278         }
 279         catch (InterruptedException e) {
 280             // ignore
 281         }
 282         finally {
 283             // if this is still the main worker thread, zap the
 284             // reference to the thread, to help GC.
 285             synchronized (this) {
 286                 if (thisThread == worker)
 287                     worker = null;
 288             }
 289         }
 290     }
 291 
 292     //------------------------------------------------------------------------------------------
 293     //
 294     // Instance variables: access to all of these (except source) must be synchronized.
 295 
 296     /**
 297      * The queue to hold the items that have been read from the underlying source iterator.
 298      */
 299     private final Fifo queue = new Fifo();
 300 
 301     /**
 302      * The underlying source iterator.  If the worker thread is running, it alone
 303      * should access this iterator; otherwise, access to this should be synchronized,
 304      * along with everything else.
 305      * @see #worker
 306      */
 307     private final Iterator source;
 308 
 309     /**
 310      * A value indicating whether the underlying source iterator has more values to be read.
 311      * Use this instead of source.hasNext() when the worker thread is running.
 312      */
 313     private boolean sourceHasNext;
 314 
 315     /**
 316      * A minimum size for the queue. If the queue falls below this size, and if there
 317      * are more items to be read, the worker thread will be woken up to replenish the queue.
 318      * This may happen if the mode is set to PARTIAL and the worker thread fills the queue.
 319      */
 320     private int minQueueSize;
 321 
 322     /**
 323      * Set a maximum size for the queue, which is derived from the type and amount of
 324      * read ahead, given to setMode.
 325      * If the worker thread determines the queue size is bigger than this value, it will
 326      * wait until the size goes below minQueueSize.
 327      */
 328     private int maxQueueSize;
 329 
 330     /**
 331      * The number of items (i.e. not including null) returned from next().
 332      */
 333     private int usedCount;
 334 
 335     /**
 336      * The worker thread that does the read ahead. While it is set, the worker thread
 337      * should be the only one to access the underlying source iterator, which it will
 338      * do unsynchronized.
 339      */
 340     private Thread worker;
 341 
 342     /**
 343      * A counter for generating names for worker threads.
 344      */
 345     private static int workerNum;
 346 
 347     /**
 348      * The default amount for LIMITED read ahead.
 349      */
 350     private static final int DEFAULT_LIMITED_READAHEAD = 100;
 351 }