1 /*
   2  * $Id$
   3  *
   4  * Copyright (c) 1996, 2018, Oracle and/or its affiliates. All rights reserved.
   5  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   6  *
   7  * This code is free software; you can redistribute it and/or modify it
   8  * under the terms of the GNU General Public License version 2 only, as
   9  * published by the Free Software Foundation.  Oracle designates this
  10  * particular file as subject to the "Classpath" exception as provided
  11  * by Oracle in the LICENSE file that accompanied this code.
  12  *
  13  * This code is distributed in the hope that it will be useful, but WITHOUT
  14  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  15  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  16  * version 2 for more details (a copy is included in the LICENSE file that
  17  * accompanied this code).
  18  *
  19  * You should have received a copy of the GNU General Public License version
  20  * 2 along with this work; if not, write to the Free Software Foundation,
  21  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  22  *
  23  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  24  * or visit www.oracle.com if you need additional information or have any
  25  * questions.
  26  */
  27 package com.sun.javatest.agent;
  28 
  29 
  30 import java.io.InputStream;
  31 import java.io.IOException;
  32 import java.io.OutputStream;
  33 import java.net.Socket;
  34 import java.net.ServerSocket;
  35 import java.util.Enumeration;
  36 import java.util.Vector;
  37 
  38 import com.sun.javatest.util.DynamicArray;
  39 import java.io.InterruptedIOException;
  40 
  41 /**
  42  * A holding area in which to keep incoming requests from active agents
  43  * until they are required.
  44  */
  45 public class ActiveAgentPool
  46 {
  47     /**
  48      * An exception which is thrown when no agent is available for use.
  49      */
  50     public static class NoAgentException extends Exception
  51     {
  52         /**
  53          * Create an exception to indicate that no agent is available for use.
  54          * @param msg A string giving additional details.
  55          */
  56         public NoAgentException(String msg) {
  57             super(msg);
  58         }
  59     }
  60 
  61     /**
  62      * An Observer class to monitor activity of the active agent pool.
  63      */
  64     public static interface Observer {
  65         /**
  66          * Called when a connection to an agent is added to the active agent pool.
  67          * @param c The connection that has been added to the pool.
  68          */
  69         void addedToPool(Connection c);
  70 
  71         /**
  72          * Called when a connection to an agent is removed from the active agent pool,
  73          * because it is about to be used to handle a task.
  74          * @param c The connection that has been removed from the pool.
  75          */
  76         void removedFromPool(Connection c);
  77     }
  78 
  79     //--------------------------------------------------------------------------
  80 
  81     /**
  82      * An entry requesting an active agent that is available for
  83      * use.
  84      */
  85     class Entry implements Connection {
  86         Entry(Socket socket) throws IOException {
  87             this.socket = socket;
  88             socketInput = socket.getInputStream();
  89             socketOutput = socket.getOutputStream();
  90         }
  91 
  92         public String getName() {
  93             if (name == null) {
  94                 StringBuffer sb = new StringBuffer(32);
  95                 sb.append(socket.getInetAddress().getHostName());
  96                 sb.append(",port=");
  97                 sb.append(socket.getPort());
  98                 sb.append(",localport=");
  99                 sb.append(socket.getLocalPort());
 100                 name = sb.toString();
 101             }
 102             return name;
 103         }
 104 
 105         public synchronized InputStream getInputStream() {
 106             // If there is no read outstanding in the watcher thread and
 107             // no buffered data available take the fast way out and simply
 108             // use the real socket stream.
 109             if (!reading && data == null)
 110                 return socketInput;
 111 
 112             // If there is a read outstanding in the watcher  thread, or if there
 113             // is already buffered data available, create a stream to return that
 114             // data first.
 115             return new InputStream() {
 116                 public int read() throws IOException {
 117                     // don't bother to optimize method this because stream should
 118                     // be wrapped in a BufferedInputStream
 119                     byte[] b = new byte[1];
 120                     int n = read(b);
 121                     if (n == -1) {
 122                         return -1;
 123                     }
 124                     else {
 125                         n = 0xFF & b[0];
 126                         return n;
 127                     }
 128                 }
 129                 public int read(byte[] buffer, int offset, int count) throws IOException {
 130                     if (count == 0) // we ought to check
 131                         return 0;
 132 
 133                     try {
 134                         // if the watcher thread has a read outstanding, wait for it to
 135                         // complete
 136                         waitWhileReading();
 137 //                  }
 138 //                  catch (InterruptedException ignore) {
 139 //                  }
 140 //
 141 //                        if (data == null) {
 142 //                            // no data available: must have been used already;
 143 //                            // simply delegate to socketInput
 144 //                            return socketInput.read(buffer, offset, count);
 145 //                        }
 146                         if (data == null) {
 147                             return new InterruptableReader().read(buffer, offset, count);
 148                         }
 149                     }
 150                     catch (InterruptedException ie) {
 151                         InterruptedIOException iio =
 152                                 new InterruptedIOException("Test execution timeout");
 153                         iio.fillInStackTrace();
 154                         throw iio;
 155                     }
 156                     try {
 157                         if (data instanceof Integer) {
 158                             int i = ((Integer)data).intValue();
 159                             if (i == -1)
 160                                 return -1;
 161                             else {
 162                                 buffer[offset] = (byte)i;
 163                                 return 1;
 164                             }
 165                         }
 166                         else {
 167                             IOException e = (IOException)data;
 168                             e.fillInStackTrace();
 169                             throw e;
 170                         }
 171                     }
 172                     finally {
 173                         data = null;
 174                     }
 175                 }
 176                 public void close() throws IOException {
 177                     socketInput.close();
 178                 }
 179             };
 180         }
 181 
 182         /*
 183          * This class made to read form socket input stream from separate thread.
 184          * Thread, from which reading invokes (let's call it 'main'), waits
 185          * before end of reading from socket.
 186          * Waiting allows harness to interrupt 'main' thread and thus manage
 187          * timeout situation correctly.
 188          * The same thing is made for passive agent.
 189          * See <code>PassiveConnectionFactory.nextConnection()</code> and
 190          * <code>AgentManager.connectToPassiveAgent()</code> methods where
 191          * InterruptableSocketConnection used instead of usual SocketConnection.
 192          */
 193         private class InterruptableReader {
 194             private IOException ioe;
 195             private int n;
 196 
 197             public int read(byte[] buffer, int offset, int count)
 198                     throws IOException, InterruptedException {
 199                 synchronized (Entry.this) {
 200                     ioe = null;
 201                     n = -1;
 202 
 203                     readInThread(buffer, offset, count);
 204                     waitWhileReading();
 205 
 206                     if (ioe != null) {
 207                         throw ioe;
 208                     }
 209 
 210                     return n;
 211                 }
 212             }
 213             private void readInThread(byte[] buffer, int offset, int count) {
 214                 final byte[] b = buffer;
 215                 final int o = offset;
 216                 final int c = count;
 217 
 218                 Thread reader = new Thread() {
 219                     public void run() {
 220                         try {
 221                             n = socketInput.read(b, o, c);
 222                         }
 223                         catch (IOException io) {
 224                             ioe = io;
 225                         }
 226                         finally {
 227                             synchronized(Entry.this) {
 228                                 reading = false;
 229                                 Entry.this.notifyAll();
 230                             }
 231                         }
 232                     }
 233                 };
 234                 reading = true;
 235                 reader.start();
 236             }
 237         }
 238 
 239         public OutputStream getOutputStream() {
 240             return socketOutput;
 241         }
 242 
 243         public synchronized void close() throws IOException {
 244             socketInput.close();
 245             socketOutput.close();
 246             closed = true;
 247             notifyAll();
 248         }
 249 
 250         public synchronized boolean isClosed() {
 251             return closed;
 252         }
 253 
 254         public synchronized void waitUntilClosed(int timeout) throws InterruptedException {
 255             long now = System.currentTimeMillis();
 256             long end = now + timeout;
 257             while (now < end && !closed) {
 258                 wait(end - now);
 259                 now = System.currentTimeMillis();
 260             }
 261         }
 262 
 263         void readAhead() {
 264             synchronized (this) {
 265                 if (!entries.contains(this))
 266                     // if this entry has already been removed from the agent pool,
 267                     // there is no need to monitor the socket, so exit without reading.
 268                     // This is an optimization only; the entry could be being removed
 269                     // right now, but the synchronized block we are in will handle
 270                     // everything OK.
 271                     return;
 272 
 273                 // mark this object as busy doing a read; other synchronized methods
 274                 // (ie getInputStream()) should take this into account
 275                 reading = true;
 276             }
 277 
 278             // initiate a blocking read call on the socket, in the hope of being
 279             // notified if the socket gets closed prematurely. If it does
 280             // (i.e. if the read terminates while the entry is still in the pool),
 281             // the entry is removed from the pool and the socket closed.
 282             // Otherwise, if the entry is removed from the pool while the read is blocked,
 283             // then when the read terminates the data will be saved for use by the
 284             // new owner (via getInputStream), and the thread will be marked as no
 285             // longer doing a read.
 286             try {
 287                 data = new Integer(socketInput.read());
 288             }
 289             catch (IOException e) {
 290                 data = e;
 291             }
 292             finally {
 293                 synchronized (this) {
 294                     boolean ok = entries.remove(this);
 295                     if (ok)
 296                         // The read has unblocked prematurely and no one else
 297                         // owns the entry (since we managed to remove it ourselves.
 298                         // Drop the socket.
 299                         closeNoExceptions(this);
 300 
 301                     reading = false;
 302                     notifyAll();
 303                 }
 304             }
 305         }
 306 
 307         private synchronized void waitWhileReading() throws InterruptedException {
 308             while (reading)
 309                 wait();
 310         }
 311 
 312         private final Socket socket;
 313         private InputStream socketInput;
 314         private OutputStream socketOutput;
 315         private String name;
 316         private boolean reading;
 317         private Object data;
 318         private boolean closed;
 319     }
 320 
 321 
 322     class Entries {
 323         synchronized boolean contains(Entry e) {
 324             return v.contains(e);
 325         }
 326 
 327         synchronized Enumeration elements() {
 328             return ((Vector)(v.clone())).elements();
 329         }
 330 
 331         synchronized void add(final Entry e) {
 332             v.addElement(e);
 333             notifyAddedToPool(e);
 334             notifyAll();
 335             Runnable r = new Runnable() {
 336                 public void run() {
 337                     e.readAhead();
 338                 }
 339             };
 340             Thread t = new Thread(r, "ActiveAgentPool.EntryWatcher" + entryWatcherCount++);
 341             t.start();
 342         }
 343 
 344         synchronized boolean remove(Entry e) {
 345             if (v.contains(e)) {
 346                 v.removeElement(e);
 347                 notifyRemovedFromPool(e);
 348                 return true;
 349             }
 350             else
 351                 return false;
 352         }
 353 
 354         synchronized Entry next() {
 355             Entry e = null;
 356             if (v.size() > 0) {
 357                 e = v.elementAt(0);
 358                 v.removeElementAt(0);
 359                 notifyRemovedFromPool(e);
 360             }
 361             return e;
 362         }
 363 
 364         synchronized Entry next(int timeout) throws InterruptedException {
 365             long end = System.currentTimeMillis() + timeout;
 366             for (long t = timeout; t > 0;  t = end - System.currentTimeMillis()) {
 367                 if (v.size() == 0)
 368                     wait(t);
 369 
 370                 Entry e = next();
 371                 if (e != null)
 372                     return e;
 373             }
 374             return null;
 375         }
 376 
 377         synchronized void addObserver(Observer o) {
 378             observers = DynamicArray.append(observers, o);
 379         }
 380 
 381 
 382         synchronized void deleteObserver(Observer o) {
 383             observers = DynamicArray.remove(observers, o);
 384         }
 385 
 386         private synchronized void notifyAddedToPool(Entry e) {
 387             for (int i = 0; i < observers.length; i++) {
 388                 observers[i].addedToPool(e);
 389             }
 390         }
 391 
 392         private synchronized void notifyRemovedFromPool(Entry e) {
 393             for (int i = 0; i < observers.length; i++) {
 394                 observers[i].removedFromPool(e);
 395             }
 396         }
 397 
 398         private Vector<Entry> v = new Vector<>();
 399         private Observer[] observers = new Observer[0];
 400     }
 401 
 402     /**
 403      * Listen for requests from active agents. Active agents announce their
 404      * willingness to work on behalf of a harness by contacting the harness
 405      * on a nominated port.  When a agent contacts the harness, it is put in
 406      * a pool to be used when agent clients request an unspecified agent.
 407      * @param port      The port on which to listen for agents.
 408      * @param timeout   The maximum time to wait for a agent to contact the
 409      *                  harness when one is needed. The timeout should be
 410      *                  in milliseconds.
 411      * @throws IOException if there a problems with any sockets
 412      *                  while performing this operation.
 413      */
 414     public synchronized void listen(int port, int timeout) throws IOException {
 415         setListening(false);
 416         setPort(port);
 417         setTimeout(timeout);
 418         setListening(true);
 419     }
 420 
 421     /**
 422      * Get the port currently being used to listen for requests from active agents.
 423      * @return The port being used, or Agent.defaultActivePort if no agent pool
 424      * has been started.
 425      * @see #setPort
 426      */
 427     public synchronized int getPort()  {
 428         return (port == 0 && serverSocket != null ?
 429                 serverSocket.getLocalPort() : port);
 430     }
 431 
 432 
 433     /**
 434      * Set the port currently to be used to listen for requests from active agents.
 435      * @param port the port to be used
 436      * @see #getPort
 437      */
 438     public synchronized void setPort(int port) {
 439         this.port = port; // takes effect on next setListening(true);
 440     }
 441 
 442     /**
 443      * Get the timeout being used when waiting for requests from active agents.
 444      * @return The timeout being used, in milliseconds, or 0 if no agent pool
 445      * has been started.
 446      * @see #setTimeout
 447      */
 448     public synchronized int getTimeout()  {
 449         return timeout;
 450     }
 451 
 452 
 453     /**
 454      * Set the timeout to be used when waiting for requests from active agents.
 455      * @param timeout Ehe timeout, in milliseconds, to be used.
 456      * @see #getTimeout
 457      */
 458     public synchronized void setTimeout(int timeout) {
 459         this.timeout = timeout;
 460     }
 461 
 462     /**
 463      * Check whether the pool is currently listening for incoming requests.
 464      * @return true if the pool is currently listening
 465      * @see #setListening
 466      */
 467     public synchronized boolean isListening() {
 468         return (serverSocket != null);
 469     }
 470 
 471     /**
 472      * Set whether or not the pool should be listening for incoming requests,
 473      * on the appropriate port.
 474      * If the pool is already in the appropriate state, this method has no effect.
 475      * @param listen Set to true to ensure the pool is listening for incoming requests,
 476      *                  and false otherwise.
 477      * @throws IOException if any problems occur while opening or closing the
 478      *                  socket on which the pool is listening for requests.
 479      * @see #isListening
 480      */
 481     public synchronized void setListening(boolean listen) throws IOException {
 482         if (debug)
 483             new Exception("ActiveAgentPool.setListening " + listen + ",port=" + port).printStackTrace(System.err);
 484 
 485         if (listen) {
 486             if (serverSocket != null) {
 487                 if (port == 0 || serverSocket.getLocalPort() == port)
 488                     return;
 489                 else
 490                     closeNoExceptions(serverSocket);
 491             }
 492 
 493             serverSocket = SocketConnection.createServerSocket(port);
 494 
 495             Runnable r = new Runnable() {
 496                 public void run() {
 497                     acceptRequests();
 498                 }
 499             };
 500             Thread worker = new Thread(r, "ActiveAgentPool" + counter++);
 501             worker.start();
 502             // could synchronize (wait()) with run() here
 503             // if it should be really necessary
 504         }
 505         else {
 506             if (serverSocket != null)
 507                 serverSocket.close();
 508             serverSocket = null;
 509             // flush the agents that have already registered
 510             Entry e;
 511             while ((e = entries.next()) != null)
 512                 closeNoExceptions(e);
 513         }
 514     }
 515 
 516     Entry nextAgent() throws NoAgentException, InterruptedException {
 517         if (!isListening())
 518             throw new NoAgentException("AgentPool not listening");
 519         Entry e = entries.next(timeout);
 520         if (e != null)
 521             return e;
 522 
 523         throw new NoAgentException("Timeout waiting for agent to become available");
 524     }
 525 
 526     private void acceptRequests() {
 527         ServerSocket ss;
 528         // warning: serverSocket can be mutated by other methods, but we
 529         // don't want to do the accept call in a synchronized block;
 530         // after the accept call, we make sure that serverSocket is still
 531         // what we think it is--if not, this specific thread instance is
 532         // not longer current or required
 533         synchronized (this) {
 534             ss = serverSocket;
 535             // could synchronize (notify()) with setListening() here
 536             // if it should be really necessary
 537         }
 538 
 539         try {
 540             int errors = 0;
 541 
 542             while (errors < MAX_ERRORS) {
 543                 try {
 544 
 545                     // wait for connection or exception, whichever comes first
 546                     Socket s = ss.accept();
 547 
 548                     // got connection: make sure we still want it,
 549                     // and if so, add it to pool and notify interested parties
 550                     synchronized (this) {
 551                         if (ss == serverSocket)
 552                             entries.add(new Entry(s));
 553                         else {
 554                             closeNoExceptions(s);
 555                             return;
 556                         }
 557 
 558                     }
 559 
 560                     if (errors > 0)
 561                         errors--; // let #errors decay with each successful open
 562                 }
 563                 catch (IOException e) {
 564                     synchronized (this) {
 565                         if (ss != serverSocket)
 566                             return;
 567                     }
 568 
 569                     // perhaps need a better reporting channel here
 570                     System.err.println("error opening socket for remote socket pool");
 571                     System.err.println(e.getMessage());
 572                     errors++;
 573                 }
 574             }
 575             // perhaps need a better reporting channel here
 576             System.err.println("too many errors opening socket for remote socket pool");
 577             System.err.println("server thread exiting");
 578 
 579             synchronized (this) {
 580                 if (serverSocket == ss)
 581                     serverSocket = null;
 582             }
 583         }
 584         finally {
 585             closeNoExceptions(ss);
 586         }
 587 
 588     }
 589 
 590     /**
 591      * Get an enumeration of the entries currently in the active agent pool.
 592      */
 593     Enumeration elements() {
 594         return entries.elements();
 595     }
 596 
 597     /**
 598      * Add an observer to monitor events.
 599      * @param o The observer to be added.
 600      */
 601     public void addObserver(Observer o) {
 602         entries.addObserver(o);
 603     }
 604 
 605 
 606     /**
 607      * Remove an observer that had been previously registered to monitor events.
 608      * @param o The observer to be removed..
 609      */
 610     public void deleteObserver(Observer o) {
 611         entries.deleteObserver(o);
 612     }
 613 
 614     private void closeNoExceptions(Entry e) {
 615         try {
 616             e.close();
 617         }
 618         catch (IOException ignore) {
 619         }
 620     }
 621 
 622     private void closeNoExceptions(Socket s) {
 623         try {
 624             s.close();
 625         }
 626         catch (IOException ignore) {
 627         }
 628     }
 629 
 630     private void closeNoExceptions(ServerSocket ss) {
 631         try {
 632             ss.close();
 633         }
 634         catch (IOException ignore) {
 635         }
 636     }
 637 
 638     private Thread worker;
 639     private int counter;
 640     private Entries entries = new Entries();
 641     private ServerSocket serverSocket;
 642     private int timeout = 3*60*1000;  // 3 minutes
 643     private int port = Agent.defaultActivePort;
 644     private final int MAX_ERRORS = 10;
 645     private static int entryWatcherCount;
 646     private static boolean debug = Boolean.getBoolean("debug.ActiveAgentPool");
 647 }