/*
* $Id$
*
* Copyright (c) 1996, 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package com.sun.javatest.agent;
import java.io.InputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.net.ServerSocket;
import java.util.Enumeration;
import java.util.Vector;
import com.sun.javatest.util.DynamicArray;
import java.io.InterruptedIOException;
/**
* A holding area in which to keep incoming requests from active agents
* until they are required.
*/
public class ActiveAgentPool
{
/**
* An exception which is thrown when no agent is available for use.
*/
public static class NoAgentException extends Exception
{
/**
* Create an exception to indicate that no agent is available for use.
* @param msg A string giving additional details.
*/
public NoAgentException(String msg) {
super(msg);
}
}
/**
* An Observer class to monitor activity of the active agent pool.
*/
public static interface Observer {
/**
* Called when a connection to an agent is added to the active agent pool.
* @param c The connection that has been added to the pool.
*/
void addedToPool(Connection c);
/**
* Called when a connection to an agent is removed from the active agent pool,
* because it is about to be used to handle a task.
* @param c The connection that has been removed from the pool.
*/
void removedFromPool(Connection c);
}
//--------------------------------------------------------------------------
/**
* An entry requesting an active agent that is available for
* use.
*/
class Entry implements Connection {
Entry(Socket socket) throws IOException {
this.socket = socket;
socketInput = socket.getInputStream();
socketOutput = socket.getOutputStream();
}
public String getName() {
if (name == null) {
StringBuffer sb = new StringBuffer(32);
sb.append(socket.getInetAddress().getHostName());
sb.append(",port=");
sb.append(socket.getPort());
sb.append(",localport=");
sb.append(socket.getLocalPort());
name = sb.toString();
}
return name;
}
public synchronized InputStream getInputStream() {
// If there is no read outstanding in the watcher thread and
// no buffered data available take the fast way out and simply
// use the real socket stream.
if (!reading && data == null)
return socketInput;
// If there is a read outstanding in the watcher thread, or if there
// is already buffered data available, create a stream to return that
// data first.
return new InputStream() {
public int read() throws IOException {
// don't bother to optimize method this because stream should
// be wrapped in a BufferedInputStream
byte[] b = new byte[1];
int n = read(b);
if (n == -1) {
return -1;
}
else {
n = 0xFF & b[0];
return n;
}
}
public int read(byte[] buffer, int offset, int count) throws IOException {
if (count == 0) // we ought to check
return 0;
try {
// if the watcher thread has a read outstanding, wait for it to
// complete
waitWhileReading();
// }
// catch (InterruptedException ignore) {
// }
//
// if (data == null) {
// // no data available: must have been used already;
// // simply delegate to socketInput
// return socketInput.read(buffer, offset, count);
// }
if (data == null) {
return new InterruptableReader().read(buffer, offset, count);
}
}
catch (InterruptedException ie) {
InterruptedIOException iio =
new InterruptedIOException("Test execution timeout");
iio.fillInStackTrace();
throw iio;
}
try {
if (data instanceof Integer) {
int i = ((Integer)data).intValue();
if (i == -1)
return -1;
else {
buffer[offset] = (byte)i;
return 1;
}
}
else {
IOException e = (IOException)data;
e.fillInStackTrace();
throw e;
}
}
finally {
data = null;
}
}
public void close() throws IOException {
socketInput.close();
}
};
}
/*
* This class made to read form socket input stream from separate thread.
* Thread, from which reading invokes (let's call it 'main'), waits
* before end of reading from socket.
* Waiting allows harness to interrupt 'main' thread and thus manage
* timeout situation correctly.
* The same thing is made for passive agent.
* See PassiveConnectionFactory.nextConnection()
and
* AgentManager.connectToPassiveAgent()
methods where
* InterruptableSocketConnection used instead of usual SocketConnection.
*/
private class InterruptableReader {
private IOException ioe;
private int n;
public int read(byte[] buffer, int offset, int count)
throws IOException, InterruptedException {
synchronized (Entry.this) {
ioe = null;
n = -1;
readInThread(buffer, offset, count);
waitWhileReading();
if (ioe != null) {
throw ioe;
}
return n;
}
}
private void readInThread(byte[] buffer, int offset, int count) {
final byte[] b = buffer;
final int o = offset;
final int c = count;
Thread reader = new Thread() {
public void run() {
try {
n = socketInput.read(b, o, c);
}
catch (IOException io) {
ioe = io;
}
finally {
synchronized(Entry.this) {
reading = false;
Entry.this.notifyAll();
}
}
}
};
reading = true;
reader.start();
}
}
public OutputStream getOutputStream() {
return socketOutput;
}
public synchronized void close() throws IOException {
socketInput.close();
socketOutput.close();
closed = true;
notifyAll();
}
public synchronized boolean isClosed() {
return closed;
}
public synchronized void waitUntilClosed(int timeout) throws InterruptedException {
long now = System.currentTimeMillis();
long end = now + timeout;
while (now < end && !closed) {
wait(end - now);
now = System.currentTimeMillis();
}
}
void readAhead() {
synchronized (this) {
if (!entries.contains(this))
// if this entry has already been removed from the agent pool,
// there is no need to monitor the socket, so exit without reading.
// This is an optimization only; the entry could be being removed
// right now, but the synchronized block we are in will handle
// everything OK.
return;
// mark this object as busy doing a read; other synchronized methods
// (ie getInputStream()) should take this into account
reading = true;
}
// initiate a blocking read call on the socket, in the hope of being
// notified if the socket gets closed prematurely. If it does
// (i.e. if the read terminates while the entry is still in the pool),
// the entry is removed from the pool and the socket closed.
// Otherwise, if the entry is removed from the pool while the read is blocked,
// then when the read terminates the data will be saved for use by the
// new owner (via getInputStream), and the thread will be marked as no
// longer doing a read.
try {
data = new Integer(socketInput.read());
}
catch (IOException e) {
data = e;
}
finally {
synchronized (this) {
boolean ok = entries.remove(this);
if (ok)
// The read has unblocked prematurely and no one else
// owns the entry (since we managed to remove it ourselves.
// Drop the socket.
closeNoExceptions(this);
reading = false;
notifyAll();
}
}
}
private synchronized void waitWhileReading() throws InterruptedException {
while (reading)
wait();
}
private final Socket socket;
private InputStream socketInput;
private OutputStream socketOutput;
private String name;
private boolean reading;
private Object data;
private boolean closed;
}
class Entries {
synchronized boolean contains(Entry e) {
return v.contains(e);
}
synchronized Enumeration elements() {
return ((Vector)(v.clone())).elements();
}
synchronized void add(final Entry e) {
v.addElement(e);
notifyAddedToPool(e);
notifyAll();
Runnable r = new Runnable() {
public void run() {
e.readAhead();
}
};
Thread t = new Thread(r, "ActiveAgentPool.EntryWatcher" + entryWatcherCount++);
t.start();
}
synchronized boolean remove(Entry e) {
if (v.contains(e)) {
v.removeElement(e);
notifyRemovedFromPool(e);
return true;
}
else
return false;
}
synchronized Entry next() {
Entry e = null;
if (v.size() > 0) {
e = v.elementAt(0);
v.removeElementAt(0);
notifyRemovedFromPool(e);
}
return e;
}
synchronized Entry next(int timeout) throws InterruptedException {
long end = System.currentTimeMillis() + timeout;
for (long t = timeout; t > 0; t = end - System.currentTimeMillis()) {
if (v.size() == 0)
wait(t);
Entry e = next();
if (e != null)
return e;
}
return null;
}
synchronized void addObserver(Observer o) {
observers = DynamicArray.append(observers, o);
}
synchronized void deleteObserver(Observer o) {
observers = DynamicArray.remove(observers, o);
}
private synchronized void notifyAddedToPool(Entry e) {
for (int i = 0; i < observers.length; i++) {
observers[i].addedToPool(e);
}
}
private synchronized void notifyRemovedFromPool(Entry e) {
for (int i = 0; i < observers.length; i++) {
observers[i].removedFromPool(e);
}
}
private Vector v = new Vector<>();
private Observer[] observers = new Observer[0];
}
/**
* Listen for requests from active agents. Active agents announce their
* willingness to work on behalf of a harness by contacting the harness
* on a nominated port. When a agent contacts the harness, it is put in
* a pool to be used when agent clients request an unspecified agent.
* @param port The port on which to listen for agents.
* @param timeout The maximum time to wait for a agent to contact the
* harness when one is needed. The timeout should be
* in milliseconds.
* @throws IOException if there a problems with any sockets
* while performing this operation.
*/
public synchronized void listen(int port, int timeout) throws IOException {
setListening(false);
setPort(port);
setTimeout(timeout);
setListening(true);
}
/**
* Get the port currently being used to listen for requests from active agents.
* @return The port being used, or Agent.defaultActivePort if no agent pool
* has been started.
* @see #setPort
*/
public synchronized int getPort() {
return (port == 0 && serverSocket != null ?
serverSocket.getLocalPort() : port);
}
/**
* Set the port currently to be used to listen for requests from active agents.
* @param port the port to be used
* @see #getPort
*/
public synchronized void setPort(int port) {
this.port = port; // takes effect on next setListening(true);
}
/**
* Get the timeout being used when waiting for requests from active agents.
* @return The timeout being used, in milliseconds, or 0 if no agent pool
* has been started.
* @see #setTimeout
*/
public synchronized int getTimeout() {
return timeout;
}
/**
* Set the timeout to be used when waiting for requests from active agents.
* @param timeout Ehe timeout, in milliseconds, to be used.
* @see #getTimeout
*/
public synchronized void setTimeout(int timeout) {
this.timeout = timeout;
}
/**
* Check whether the pool is currently listening for incoming requests.
* @return true if the pool is currently listening
* @see #setListening
*/
public synchronized boolean isListening() {
return (serverSocket != null);
}
/**
* Set whether or not the pool should be listening for incoming requests,
* on the appropriate port.
* If the pool is already in the appropriate state, this method has no effect.
* @param listen Set to true to ensure the pool is listening for incoming requests,
* and false otherwise.
* @throws IOException if any problems occur while opening or closing the
* socket on which the pool is listening for requests.
* @see #isListening
*/
public synchronized void setListening(boolean listen) throws IOException {
if (debug)
new Exception("ActiveAgentPool.setListening " + listen + ",port=" + port).printStackTrace(System.err);
if (listen) {
if (serverSocket != null) {
if (port == 0 || serverSocket.getLocalPort() == port)
return;
else
closeNoExceptions(serverSocket);
}
serverSocket = SocketConnection.createServerSocket(port);
Runnable r = new Runnable() {
public void run() {
acceptRequests();
}
};
Thread worker = new Thread(r, "ActiveAgentPool" + counter++);
worker.start();
// could synchronize (wait()) with run() here
// if it should be really necessary
}
else {
if (serverSocket != null)
serverSocket.close();
serverSocket = null;
// flush the agents that have already registered
Entry e;
while ((e = entries.next()) != null)
closeNoExceptions(e);
}
}
Entry nextAgent() throws NoAgentException, InterruptedException {
if (!isListening())
throw new NoAgentException("AgentPool not listening");
Entry e = entries.next(timeout);
if (e != null)
return e;
throw new NoAgentException("Timeout waiting for agent to become available");
}
private void acceptRequests() {
ServerSocket ss;
// warning: serverSocket can be mutated by other methods, but we
// don't want to do the accept call in a synchronized block;
// after the accept call, we make sure that serverSocket is still
// what we think it is--if not, this specific thread instance is
// not longer current or required
synchronized (this) {
ss = serverSocket;
// could synchronize (notify()) with setListening() here
// if it should be really necessary
}
try {
int errors = 0;
while (errors < MAX_ERRORS) {
try {
// wait for connection or exception, whichever comes first
Socket s = ss.accept();
// got connection: make sure we still want it,
// and if so, add it to pool and notify interested parties
synchronized (this) {
if (ss == serverSocket)
entries.add(new Entry(s));
else {
closeNoExceptions(s);
return;
}
}
if (errors > 0)
errors--; // let #errors decay with each successful open
}
catch (IOException e) {
synchronized (this) {
if (ss != serverSocket)
return;
}
// perhaps need a better reporting channel here
System.err.println("error opening socket for remote socket pool");
System.err.println(e.getMessage());
errors++;
}
}
// perhaps need a better reporting channel here
System.err.println("too many errors opening socket for remote socket pool");
System.err.println("server thread exiting");
synchronized (this) {
if (serverSocket == ss)
serverSocket = null;
}
}
finally {
closeNoExceptions(ss);
}
}
/**
* Get an enumeration of the entries currently in the active agent pool.
*/
Enumeration elements() {
return entries.elements();
}
/**
* Add an observer to monitor events.
* @param o The observer to be added.
*/
public void addObserver(Observer o) {
entries.addObserver(o);
}
/**
* Remove an observer that had been previously registered to monitor events.
* @param o The observer to be removed..
*/
public void deleteObserver(Observer o) {
entries.deleteObserver(o);
}
private void closeNoExceptions(Entry e) {
try {
e.close();
}
catch (IOException ignore) {
}
}
private void closeNoExceptions(Socket s) {
try {
s.close();
}
catch (IOException ignore) {
}
}
private void closeNoExceptions(ServerSocket ss) {
try {
ss.close();
}
catch (IOException ignore) {
}
}
private Thread worker;
private int counter;
private Entries entries = new Entries();
private ServerSocket serverSocket;
private int timeout = 3*60*1000; // 3 minutes
private int port = Agent.defaultActivePort;
private final int MAX_ERRORS = 10;
private static int entryWatcherCount;
private static boolean debug = Boolean.getBoolean("debug.ActiveAgentPool");
}