1 /* 2 * $Id$ 3 * 4 * Copyright (c) 2001, 2011, Oracle and/or its affiliates. All rights reserved. 5 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 6 * 7 * This code is free software; you can redistribute it and/or modify it 8 * under the terms of the GNU General Public License version 2 only, as 9 * published by the Free Software Foundation. Oracle designates this 10 * particular file as subject to the "Classpath" exception as provided 11 * by Oracle in the LICENSE file that accompanied this code. 12 * 13 * This code is distributed in the hope that it will be useful, but WITHOUT 14 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 15 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 16 * version 2 for more details (a copy is included in the LICENSE file that 17 * accompanied this code). 18 * 19 * You should have received a copy of the GNU General Public License version 20 * 2 along with this work; if not, write to the Free Software Foundation, 21 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 22 * 23 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 24 * or visit www.oracle.com if you need additional information or have any 25 * questions. 26 */ 27 package com.sun.javatest.util; 28 29 import java.util.Iterator; 30 31 /** 32 * An iterator that can read ahead of the current position, either for 33 * performance reasons or to help find out the number of items returned by an 34 * iterator before accessing them. 35 */ 36 public class ReadAheadIterator implements Iterator 37 { 38 /** 39 * A constant indicating that no read ahead is required. 40 * @see #ReadAheadIterator 41 */ 42 public static final int NONE = 0; 43 44 /** 45 * A constant indicating that limited read ahead is required. 46 * @see #ReadAheadIterator 47 */ 48 public static final int LIMITED = 1; 49 50 /** 51 * A constant indicating that full read ahead is required. 52 * @see #ReadAheadIterator 53 */ 54 public static final int FULL = 2; 55 56 /** 57 * Create a ReadAheadIterator. 58 * @param source The iterator from which to read ahead 59 * @param mode A value indicating the type of read ahead required. 60 * @see #NONE 61 * @see #LIMITED 62 * @see #FULL 63 */ 64 public ReadAheadIterator(Iterator source, int mode) { 65 this(source, mode, DEFAULT_LIMITED_READAHEAD); 66 } 67 68 /** 69 * Create a ReadAheadIterator. 70 * @param source The iterator from which to read ahead. 71 * @param mode A value indicating the type of read ahead required. 72 * @param amount A value indicating the amount of read ahead required, 73 * if the mode is set to LIMITED. If the mode is NON or FULL, this 74 * parameter will be ignored. 75 * @see #NONE 76 * @see #LIMITED 77 * @see #FULL 78 */ 79 public ReadAheadIterator(Iterator source, int mode, int amount) { 80 this.source = source; 81 setMode(mode, amount); 82 } 83 84 /** 85 * Check if all available items from the underlying source iterator 86 * have been read. 87 * @return true if all available items from the underlying source iterator 88 * have been read. 89 */ 90 public synchronized boolean isReadAheadComplete() { 91 return (worker == null ? !source.hasNext() : !sourceHasNext); 92 } 93 94 /** 95 * Get the number of items read (so far) from the underlying source iterator. 96 * If the read ahead has not yet completed, this will be a partial count of 97 * the total set of items available to be read. If the read ahead is complete, 98 * the value will be the total number of items returned from the underlying 99 * source iterator. 100 * @return the number of items (so far) from the underlying source iterator. 101 * @see #isReadAheadComplete 102 */ 103 public synchronized int getItemsFoundCount() { 104 return (usedCount + queue.size()); 105 } 106 107 /** 108 * Are there items that have not be read-ahead yet? 109 * @return True if the source has no more elements, false otherwise. 110 * @deprecated Use hasNext(). 111 */ 112 public synchronized boolean isSourceExhausted() { 113 return (worker == null ? !source.hasNext() : !sourceHasNext); 114 } 115 116 /** 117 * How many elements have been distributed through <code>getNext()</code>. 118 * @return number of used elements, greater-than or equal-to zero 119 * @deprecated Will not be supported in the future. 120 * @see #getItemsFoundCount 121 */ 122 public synchronized int getUsedElementCount() { 123 return usedCount; 124 } 125 126 /** 127 * Number of items have been read-ahead by not distributed. 128 * @return number of items ready to be distributed, greater-than 129 * or equal-to zero 130 * @deprecated Will not be supported in the future. 131 */ 132 public synchronized int getOutputQueueSize() { 133 return queue.size(); 134 } 135 136 /** 137 * Set the type and/or amount of read ahead required. 138 * @param mode A value indicating the type of read ahead required. 139 * @param amount A value indicating the amount of read ahead required, 140 * if the mode is set to LIMITED. If the mode is NON or FULL, this 141 * parameter will be ignored. 142 */ 143 synchronized void setMode(int mode, int amount) { 144 switch (mode) { 145 case NONE: 146 minQueueSize = 0; 147 maxQueueSize = 0; 148 if (worker != null) { 149 worker = null; 150 notifyAll(); // wake up worker if necessary 151 } 152 break; 153 154 case LIMITED: 155 if (amount <= 0) 156 throw new IllegalArgumentException(); 157 minQueueSize = Math.min(10, amount); 158 maxQueueSize = amount; 159 break; 160 161 case FULL: 162 minQueueSize = 10; 163 maxQueueSize = Integer.MAX_VALUE; 164 break; 165 166 default: 167 throw new IllegalArgumentException(); 168 } 169 } 170 171 public synchronized boolean hasNext() { 172 return (queue.size() > 0 173 || (worker == null ? source.hasNext() : sourceHasNext)); 174 } 175 176 public synchronized Object next() { 177 // see if there are items in the read ahead queue 178 Object result = queue.remove(); 179 180 if (result == null) { 181 // queue is empty: check whether to read source directly, or rely on the worker thread 182 if (maxQueueSize == 0) 183 // no read ahead, so don't start worker; use source directly 184 result = source.next(); 185 else { 186 if (worker == null) { 187 // only start a worker if there are items for it to read 188 sourceHasNext = source.hasNext(); 189 if (sourceHasNext) { 190 // there is more to be read, so start a worker to read it 191 worker = new Thread("ReadAheadIterator" + (workerNum++)) { 192 public void run() { 193 readAhead(); 194 } 195 }; 196 worker.start(); 197 } 198 } 199 else { 200 // ensure worker is awake 201 notifyAll(); 202 } 203 204 // wait for the worker to deliver some results 205 while (sourceHasNext && queue.isEmpty()) { 206 try { 207 wait(); 208 } 209 catch (InterruptedException e) { 210 // should not happen, but if it does, propogate the interrupt 211 Thread.currentThread().interrupt(); 212 } 213 } 214 215 result = queue.remove(); 216 } 217 } 218 else if (sourceHasNext && (queue.size() < minQueueSize)) { 219 // we've got something from the queue, but the queue is getting empty, 220 // so ensure worker is awake 221 notifyAll(); 222 } 223 224 if (result != null) 225 usedCount++; 226 227 return result; 228 } 229 230 /** 231 * @throws UnsupportedOperationException 232 */ 233 public void remove() { 234 throw new UnsupportedOperationException(); 235 } 236 237 /** 238 * The body of the worker thread which is used to perform the read ahead. 239 * While the worker is running, it notionally "owns" the source iterator. 240 * As such, read ahead from the source is not synchronized; instead, 241 * just the updates to the queue and other monitored data with the results 242 * of the read ahead are synchronized. This ensure minimum latency on the 243 * main monitor lock. 244 */ 245 private void readAhead() { 246 final Thread thisThread = Thread.currentThread(); 247 boolean keepReading; 248 249 // check whether the thread is really required 250 synchronized (this) { 251 keepReading = (sourceHasNext && (thisThread == worker)); 252 } 253 254 try { 255 while (keepReading) { 256 // sourceHasNext is true, which means there is another item 257 // to be read, so read it, and also check whether there is 258 // another item after that 259 Object srcNext = source.next(); 260 boolean srcHasNext = source.hasNext(); 261 262 // get the lock to update the queue and sourceHasNext; 263 // check that the worker is still required; and 264 // wait (if necessary) for the queue to empty a bit 265 synchronized (this) { 266 queue.insert(srcNext); 267 sourceHasNext = srcHasNext; 268 notifyAll(); 269 270 keepReading = (sourceHasNext && (thisThread == worker)); 271 272 while (queue.size() >= maxQueueSize && keepReading) { 273 wait(); 274 keepReading = (sourceHasNext && (thisThread == worker)); 275 } 276 } 277 } 278 } 279 catch (InterruptedException e) { 280 // ignore 281 } 282 finally { 283 // if this is still the main worker thread, zap the 284 // reference to the thread, to help GC. 285 synchronized (this) { 286 if (thisThread == worker) 287 worker = null; 288 } 289 } 290 } 291 292 //------------------------------------------------------------------------------------------ 293 // 294 // Instance variables: access to all of these (except source) must be synchronized. 295 296 /** 297 * The queue to hold the items that have been read from the underlying source iterator. 298 */ 299 private final Fifo queue = new Fifo(); 300 301 /** 302 * The underlying source iterator. If the worker thread is running, it alone 303 * should access this iterator; otherwise, access to this should be synchronized, 304 * along with everything else. 305 * @see #worker 306 */ 307 private final Iterator source; 308 309 /** 310 * A value indicating whether the underlying source iterator has more values to be read. 311 * Use this instead of source.hasNext() when the worker thread is running. 312 */ 313 private boolean sourceHasNext; 314 315 /** 316 * A minimum size for the queue. If the queue falls below this size, and if there 317 * are more items to be read, the worker thread will be woken up to replenish the queue. 318 * This may happen if the mode is set to PARTIAL and the worker thread fills the queue. 319 */ 320 private int minQueueSize; 321 322 /** 323 * Set a maximum size for the queue, which is derived from the type and amount of 324 * read ahead, given to setMode. 325 * If the worker thread determines the queue size is bigger than this value, it will 326 * wait until the size goes below minQueueSize. 327 */ 328 private int maxQueueSize; 329 330 /** 331 * The number of items (i.e. not including null) returned from next(). 332 */ 333 private int usedCount; 334 335 /** 336 * The worker thread that does the read ahead. While it is set, the worker thread 337 * should be the only one to access the underlying source iterator, which it will 338 * do unsynchronized. 339 */ 340 private Thread worker; 341 342 /** 343 * A counter for generating names for worker threads. 344 */ 345 private static int workerNum; 346 347 /** 348 * The default amount for LIMITED read ahead. 349 */ 350 private static final int DEFAULT_LIMITED_READAHEAD = 100; 351 }