1 /* 2 * $Id$ 3 * 4 * Copyright (c) 1996, 2018, Oracle and/or its affiliates. All rights reserved. 5 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 6 * 7 * This code is free software; you can redistribute it and/or modify it 8 * under the terms of the GNU General Public License version 2 only, as 9 * published by the Free Software Foundation. Oracle designates this 10 * particular file as subject to the "Classpath" exception as provided 11 * by Oracle in the LICENSE file that accompanied this code. 12 * 13 * This code is distributed in the hope that it will be useful, but WITHOUT 14 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 15 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 16 * version 2 for more details (a copy is included in the LICENSE file that 17 * accompanied this code). 18 * 19 * You should have received a copy of the GNU General Public License version 20 * 2 along with this work; if not, write to the Free Software Foundation, 21 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 22 * 23 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 24 * or visit www.oracle.com if you need additional information or have any 25 * questions. 26 */ 27 package com.sun.javatest.agent; 28 29 30 import java.io.InputStream; 31 import java.io.IOException; 32 import java.io.OutputStream; 33 import java.net.Socket; 34 import java.net.ServerSocket; 35 import java.util.Enumeration; 36 import java.util.Vector; 37 38 import com.sun.javatest.util.DynamicArray; 39 import java.io.InterruptedIOException; 40 41 /** 42 * A holding area in which to keep incoming requests from active agents 43 * until they are required. 44 */ 45 public class ActiveAgentPool 46 { 47 /** 48 * An exception which is thrown when no agent is available for use. 49 */ 50 public static class NoAgentException extends Exception 51 { 52 /** 53 * Create an exception to indicate that no agent is available for use. 54 * @param msg A string giving additional details. 55 */ 56 public NoAgentException(String msg) { 57 super(msg); 58 } 59 } 60 61 /** 62 * An Observer class to monitor activity of the active agent pool. 63 */ 64 public static interface Observer { 65 /** 66 * Called when a connection to an agent is added to the active agent pool. 67 * @param c The connection that has been added to the pool. 68 */ 69 void addedToPool(Connection c); 70 71 /** 72 * Called when a connection to an agent is removed from the active agent pool, 73 * because it is about to be used to handle a task. 74 * @param c The connection that has been removed from the pool. 75 */ 76 void removedFromPool(Connection c); 77 } 78 79 //-------------------------------------------------------------------------- 80 81 /** 82 * An entry requesting an active agent that is available for 83 * use. 84 */ 85 class Entry implements Connection { 86 Entry(Socket socket) throws IOException { 87 this.socket = socket; 88 socketInput = socket.getInputStream(); 89 socketOutput = socket.getOutputStream(); 90 } 91 92 public String getName() { 93 if (name == null) { 94 StringBuffer sb = new StringBuffer(32); 95 sb.append(socket.getInetAddress().getHostName()); 96 sb.append(",port="); 97 sb.append(socket.getPort()); 98 sb.append(",localport="); 99 sb.append(socket.getLocalPort()); 100 name = sb.toString(); 101 } 102 return name; 103 } 104 105 public synchronized InputStream getInputStream() { 106 // If there is no read outstanding in the watcher thread and 107 // no buffered data available take the fast way out and simply 108 // use the real socket stream. 109 if (!reading && data == null) 110 return socketInput; 111 112 // If there is a read outstanding in the watcher thread, or if there 113 // is already buffered data available, create a stream to return that 114 // data first. 115 return new InputStream() { 116 public int read() throws IOException { 117 // don't bother to optimize method this because stream should 118 // be wrapped in a BufferedInputStream 119 byte[] b = new byte[1]; 120 int n = read(b); 121 if (n == -1) { 122 return -1; 123 } 124 else { 125 n = 0xFF & b[0]; 126 return n; 127 } 128 } 129 public int read(byte[] buffer, int offset, int count) throws IOException { 130 if (count == 0) // we ought to check 131 return 0; 132 133 try { 134 // if the watcher thread has a read outstanding, wait for it to 135 // complete 136 waitWhileReading(); 137 // } 138 // catch (InterruptedException ignore) { 139 // } 140 // 141 // if (data == null) { 142 // // no data available: must have been used already; 143 // // simply delegate to socketInput 144 // return socketInput.read(buffer, offset, count); 145 // } 146 if (data == null) { 147 return new InterruptableReader().read(buffer, offset, count); 148 } 149 } 150 catch (InterruptedException ie) { 151 InterruptedIOException iio = 152 new InterruptedIOException("Test execution timeout"); 153 iio.fillInStackTrace(); 154 throw iio; 155 } 156 try { 157 if (data instanceof Integer) { 158 int i = ((Integer)data).intValue(); 159 if (i == -1) 160 return -1; 161 else { 162 buffer[offset] = (byte)i; 163 return 1; 164 } 165 } 166 else { 167 IOException e = (IOException)data; 168 e.fillInStackTrace(); 169 throw e; 170 } 171 } 172 finally { 173 data = null; 174 } 175 } 176 public void close() throws IOException { 177 socketInput.close(); 178 } 179 }; 180 } 181 182 /* 183 * This class made to read form socket input stream from separate thread. 184 * Thread, from which reading invokes (let's call it 'main'), waits 185 * before end of reading from socket. 186 * Waiting allows harness to interrupt 'main' thread and thus manage 187 * timeout situation correctly. 188 * The same thing is made for passive agent. 189 * See <code>PassiveConnectionFactory.nextConnection()</code> and 190 * <code>AgentManager.connectToPassiveAgent()</code> methods where 191 * InterruptableSocketConnection used instead of usual SocketConnection. 192 */ 193 private class InterruptableReader { 194 private IOException ioe; 195 private int n; 196 197 public int read(byte[] buffer, int offset, int count) 198 throws IOException, InterruptedException { 199 synchronized (Entry.this) { 200 ioe = null; 201 n = -1; 202 203 readInThread(buffer, offset, count); 204 waitWhileReading(); 205 206 if (ioe != null) { 207 throw ioe; 208 } 209 210 return n; 211 } 212 } 213 private void readInThread(byte[] buffer, int offset, int count) { 214 final byte[] b = buffer; 215 final int o = offset; 216 final int c = count; 217 218 Thread reader = new Thread() { 219 public void run() { 220 try { 221 n = socketInput.read(b, o, c); 222 } 223 catch (IOException io) { 224 ioe = io; 225 } 226 finally { 227 synchronized(Entry.this) { 228 reading = false; 229 Entry.this.notifyAll(); 230 } 231 } 232 } 233 }; 234 reading = true; 235 reader.start(); 236 } 237 } 238 239 public OutputStream getOutputStream() { 240 return socketOutput; 241 } 242 243 public synchronized void close() throws IOException { 244 socketInput.close(); 245 socketOutput.close(); 246 closed = true; 247 notifyAll(); 248 } 249 250 public synchronized boolean isClosed() { 251 return closed; 252 } 253 254 public synchronized void waitUntilClosed(int timeout) throws InterruptedException { 255 long now = System.currentTimeMillis(); 256 long end = now + timeout; 257 while (now < end && !closed) { 258 wait(end - now); 259 now = System.currentTimeMillis(); 260 } 261 } 262 263 void readAhead() { 264 synchronized (this) { 265 if (!entries.contains(this)) 266 // if this entry has already been removed from the agent pool, 267 // there is no need to monitor the socket, so exit without reading. 268 // This is an optimization only; the entry could be being removed 269 // right now, but the synchronized block we are in will handle 270 // everything OK. 271 return; 272 273 // mark this object as busy doing a read; other synchronized methods 274 // (ie getInputStream()) should take this into account 275 reading = true; 276 } 277 278 // initiate a blocking read call on the socket, in the hope of being 279 // notified if the socket gets closed prematurely. If it does 280 // (i.e. if the read terminates while the entry is still in the pool), 281 // the entry is removed from the pool and the socket closed. 282 // Otherwise, if the entry is removed from the pool while the read is blocked, 283 // then when the read terminates the data will be saved for use by the 284 // new owner (via getInputStream), and the thread will be marked as no 285 // longer doing a read. 286 try { 287 data = new Integer(socketInput.read()); 288 } 289 catch (IOException e) { 290 data = e; 291 } 292 finally { 293 synchronized (this) { 294 boolean ok = entries.remove(this); 295 if (ok) 296 // The read has unblocked prematurely and no one else 297 // owns the entry (since we managed to remove it ourselves. 298 // Drop the socket. 299 closeNoExceptions(this); 300 301 reading = false; 302 notifyAll(); 303 } 304 } 305 } 306 307 private synchronized void waitWhileReading() throws InterruptedException { 308 while (reading) 309 wait(); 310 } 311 312 private final Socket socket; 313 private InputStream socketInput; 314 private OutputStream socketOutput; 315 private String name; 316 private boolean reading; 317 private Object data; 318 private boolean closed; 319 } 320 321 322 class Entries { 323 synchronized boolean contains(Entry e) { 324 return v.contains(e); 325 } 326 327 synchronized Enumeration elements() { 328 return ((Vector)(v.clone())).elements(); 329 } 330 331 synchronized void add(final Entry e) { 332 v.addElement(e); 333 notifyAddedToPool(e); 334 notifyAll(); 335 Runnable r = new Runnable() { 336 public void run() { 337 e.readAhead(); 338 } 339 }; 340 Thread t = new Thread(r, "ActiveAgentPool.EntryWatcher" + entryWatcherCount++); 341 t.start(); 342 } 343 344 synchronized boolean remove(Entry e) { 345 if (v.contains(e)) { 346 v.removeElement(e); 347 notifyRemovedFromPool(e); 348 return true; 349 } 350 else 351 return false; 352 } 353 354 synchronized Entry next() { 355 Entry e = null; 356 if (v.size() > 0) { 357 e = v.elementAt(0); 358 v.removeElementAt(0); 359 notifyRemovedFromPool(e); 360 } 361 return e; 362 } 363 364 synchronized Entry next(int timeout) throws InterruptedException { 365 long end = System.currentTimeMillis() + timeout; 366 for (long t = timeout; t > 0; t = end - System.currentTimeMillis()) { 367 if (v.size() == 0) 368 wait(t); 369 370 Entry e = next(); 371 if (e != null) 372 return e; 373 } 374 return null; 375 } 376 377 synchronized void addObserver(Observer o) { 378 observers = DynamicArray.append(observers, o); 379 } 380 381 382 synchronized void deleteObserver(Observer o) { 383 observers = DynamicArray.remove(observers, o); 384 } 385 386 private synchronized void notifyAddedToPool(Entry e) { 387 for (int i = 0; i < observers.length; i++) { 388 observers[i].addedToPool(e); 389 } 390 } 391 392 private synchronized void notifyRemovedFromPool(Entry e) { 393 for (int i = 0; i < observers.length; i++) { 394 observers[i].removedFromPool(e); 395 } 396 } 397 398 private Vector<Entry> v = new Vector<>(); 399 private Observer[] observers = new Observer[0]; 400 } 401 402 /** 403 * Listen for requests from active agents. Active agents announce their 404 * willingness to work on behalf of a harness by contacting the harness 405 * on a nominated port. When a agent contacts the harness, it is put in 406 * a pool to be used when agent clients request an unspecified agent. 407 * @param port The port on which to listen for agents. 408 * @param timeout The maximum time to wait for a agent to contact the 409 * harness when one is needed. The timeout should be 410 * in milliseconds. 411 * @throws IOException if there a problems with any sockets 412 * while performing this operation. 413 */ 414 public synchronized void listen(int port, int timeout) throws IOException { 415 setListening(false); 416 setPort(port); 417 setTimeout(timeout); 418 setListening(true); 419 } 420 421 /** 422 * Get the port currently being used to listen for requests from active agents. 423 * @return The port being used, or Agent.defaultActivePort if no agent pool 424 * has been started. 425 * @see #setPort 426 */ 427 public synchronized int getPort() { 428 return (port == 0 && serverSocket != null ? 429 serverSocket.getLocalPort() : port); 430 } 431 432 433 /** 434 * Set the port currently to be used to listen for requests from active agents. 435 * @param port the port to be used 436 * @see #getPort 437 */ 438 public synchronized void setPort(int port) { 439 this.port = port; // takes effect on next setListening(true); 440 } 441 442 /** 443 * Get the timeout being used when waiting for requests from active agents. 444 * @return The timeout being used, in milliseconds, or 0 if no agent pool 445 * has been started. 446 * @see #setTimeout 447 */ 448 public synchronized int getTimeout() { 449 return timeout; 450 } 451 452 453 /** 454 * Set the timeout to be used when waiting for requests from active agents. 455 * @param timeout Ehe timeout, in milliseconds, to be used. 456 * @see #getTimeout 457 */ 458 public synchronized void setTimeout(int timeout) { 459 this.timeout = timeout; 460 } 461 462 /** 463 * Check whether the pool is currently listening for incoming requests. 464 * @return true if the pool is currently listening 465 * @see #setListening 466 */ 467 public synchronized boolean isListening() { 468 return (serverSocket != null); 469 } 470 471 /** 472 * Set whether or not the pool should be listening for incoming requests, 473 * on the appropriate port. 474 * If the pool is already in the appropriate state, this method has no effect. 475 * @param listen Set to true to ensure the pool is listening for incoming requests, 476 * and false otherwise. 477 * @throws IOException if any problems occur while opening or closing the 478 * socket on which the pool is listening for requests. 479 * @see #isListening 480 */ 481 public synchronized void setListening(boolean listen) throws IOException { 482 if (debug) 483 new Exception("ActiveAgentPool.setListening " + listen + ",port=" + port).printStackTrace(System.err); 484 485 if (listen) { 486 if (serverSocket != null) { 487 if (port == 0 || serverSocket.getLocalPort() == port) 488 return; 489 else 490 closeNoExceptions(serverSocket); 491 } 492 493 serverSocket = SocketConnection.createServerSocket(port); 494 495 Runnable r = new Runnable() { 496 public void run() { 497 acceptRequests(); 498 } 499 }; 500 Thread worker = new Thread(r, "ActiveAgentPool" + counter++); 501 worker.start(); 502 // could synchronize (wait()) with run() here 503 // if it should be really necessary 504 } 505 else { 506 if (serverSocket != null) 507 serverSocket.close(); 508 serverSocket = null; 509 // flush the agents that have already registered 510 Entry e; 511 while ((e = entries.next()) != null) 512 closeNoExceptions(e); 513 } 514 } 515 516 Entry nextAgent() throws NoAgentException, InterruptedException { 517 if (!isListening()) 518 throw new NoAgentException("AgentPool not listening"); 519 Entry e = entries.next(timeout); 520 if (e != null) 521 return e; 522 523 throw new NoAgentException("Timeout waiting for agent to become available"); 524 } 525 526 private void acceptRequests() { 527 ServerSocket ss; 528 // warning: serverSocket can be mutated by other methods, but we 529 // don't want to do the accept call in a synchronized block; 530 // after the accept call, we make sure that serverSocket is still 531 // what we think it is--if not, this specific thread instance is 532 // not longer current or required 533 synchronized (this) { 534 ss = serverSocket; 535 // could synchronize (notify()) with setListening() here 536 // if it should be really necessary 537 } 538 539 try { 540 int errors = 0; 541 542 while (errors < MAX_ERRORS) { 543 try { 544 545 // wait for connection or exception, whichever comes first 546 Socket s = ss.accept(); 547 548 // got connection: make sure we still want it, 549 // and if so, add it to pool and notify interested parties 550 synchronized (this) { 551 if (ss == serverSocket) 552 entries.add(new Entry(s)); 553 else { 554 closeNoExceptions(s); 555 return; 556 } 557 558 } 559 560 if (errors > 0) 561 errors--; // let #errors decay with each successful open 562 } 563 catch (IOException e) { 564 synchronized (this) { 565 if (ss != serverSocket) 566 return; 567 } 568 569 // perhaps need a better reporting channel here 570 System.err.println("error opening socket for remote socket pool"); 571 System.err.println(e.getMessage()); 572 errors++; 573 } 574 } 575 // perhaps need a better reporting channel here 576 System.err.println("too many errors opening socket for remote socket pool"); 577 System.err.println("server thread exiting"); 578 579 synchronized (this) { 580 if (serverSocket == ss) 581 serverSocket = null; 582 } 583 } 584 finally { 585 closeNoExceptions(ss); 586 } 587 588 } 589 590 /** 591 * Get an enumeration of the entries currently in the active agent pool. 592 */ 593 Enumeration elements() { 594 return entries.elements(); 595 } 596 597 /** 598 * Add an observer to monitor events. 599 * @param o The observer to be added. 600 */ 601 public void addObserver(Observer o) { 602 entries.addObserver(o); 603 } 604 605 606 /** 607 * Remove an observer that had been previously registered to monitor events. 608 * @param o The observer to be removed.. 609 */ 610 public void deleteObserver(Observer o) { 611 entries.deleteObserver(o); 612 } 613 614 private void closeNoExceptions(Entry e) { 615 try { 616 e.close(); 617 } 618 catch (IOException ignore) { 619 } 620 } 621 622 private void closeNoExceptions(Socket s) { 623 try { 624 s.close(); 625 } 626 catch (IOException ignore) { 627 } 628 } 629 630 private void closeNoExceptions(ServerSocket ss) { 631 try { 632 ss.close(); 633 } 634 catch (IOException ignore) { 635 } 636 } 637 638 private Thread worker; 639 private int counter; 640 private Entries entries = new Entries(); 641 private ServerSocket serverSocket; 642 private int timeout = 3*60*1000; // 3 minutes 643 private int port = Agent.defaultActivePort; 644 private final int MAX_ERRORS = 10; 645 private static int entryWatcherCount; 646 private static boolean debug = Boolean.getBoolean("debug.ActiveAgentPool"); 647 }