1 /* 2 * $Id$ 3 * 4 * Copyright (c) 1996, 2018, Oracle and/or its affiliates. All rights reserved. 5 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 6 * 7 * This code is free software; you can redistribute it and/or modify it 8 * under the terms of the GNU General Public License version 2 only, as 9 * published by the Free Software Foundation. Oracle designates this 10 * particular file as subject to the "Classpath" exception as provided 11 * by Oracle in the LICENSE file that accompanied this code. 12 * 13 * This code is distributed in the hope that it will be useful, but WITHOUT 14 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 15 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 16 * version 2 for more details (a copy is included in the LICENSE file that 17 * accompanied this code). 18 * 19 * You should have received a copy of the GNU General Public License version 20 * 2 along with this work; if not, write to the Free Software Foundation, 21 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 22 * 23 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 24 * or visit www.oracle.com if you need additional information or have any 25 * questions. 26 */ 27 package com.sun.javatest.agent; 28 29 import java.io.BufferedOutputStream; 30 import java.io.DataInputStream; 31 import java.io.DataOutputStream; 32 import java.io.IOException; 33 import java.io.InterruptedIOException; 34 import java.io.PrintStream; 35 import java.io.PrintWriter; 36 import java.io.Writer; 37 import java.lang.reflect.Constructor; 38 import java.lang.reflect.InvocationTargetException; 39 import java.lang.reflect.Method; 40 import java.util.Arrays; 41 import java.util.Enumeration; 42 import java.util.MissingResourceException; 43 import java.util.Vector; 44 45 import com.sun.javatest.Command; 46 import com.sun.javatest.Status; 47 import com.sun.javatest.Test; 48 import com.sun.javatest.util.DynamicArray; 49 import com.sun.javatest.util.Timer; 50 import com.sun.javatest.util.WriterStream; 51 52 /** 53 * The means by which the the harness executes requests on other machines. 54 * Agents are typically started by using one of AgentMain, AgentFrame, 55 * or AgentApplet. The requests themselves are made from the harness 56 * via AgentManager, or a library class that uses AgentManager. 57 * 58 * @see AgentManager 59 */ 60 61 public class Agent implements Runnable { 62 /** 63 * An interface for observing activity on an agent. 64 */ 65 public interface Observer { 66 /** 67 * Called when an agent's run method has been entered. 68 * @param agent The agent being started. 69 * @see Agent#run 70 */ 71 void started(Agent agent); 72 73 /** 74 * Called if an agent's run method has trouble accepting a connection. 75 * @param agent The agent trying to open a connection 76 * @param e The exception that occurred 77 * @see Agent#run 78 */ 79 void errorOpeningConnection(Agent agent, Exception e); 80 81 /** 82 * Called when an agent's run method completed. Normally, the method will 83 * run until an error occurs, or until the thread is interrupted or stopped. 84 * @param agent The agent which has completed the work. 85 * @see Agent#run 86 */ 87 void finished(Agent agent); 88 89 90 /** 91 * Called when an agent has successfully opened a connection to service 92 * a request. 93 * @param agent The agent which opened the connection. 94 * @param c The connection which was opened. 95 */ 96 void openedConnection(Agent agent, Connection c); 97 98 /** 99 * Called when an agent is about to execute a request to execute a Test object. 100 * @param agent The agent about to do the work. 101 * @param c The connection to the client requesting the work. 102 * @param tag A tag identifying the work. 103 * @param className The name of the class to be run 104 * @param args Arguments for the class to be run. 105 */ 106 void execTest(Agent agent, Connection c, String tag, String className, String[] args); 107 108 /** 109 * Called when am agent is about to execute a request to execute a Command object. 110 * @param agent The agent about to do the work. 111 * @param c The connection to the client requesting the work. 112 * @param tag A tag identifying the work. 113 * @param className The name of the class to be run 114 * @param args Arguments for the class to be run. 115 */ 116 void execCommand(Agent agent, Connection c, String tag, String className, String[] args); 117 118 /** 119 * Called when the agent is about to execute a request to execute a main program. 120 * @param agent The agent about to do the work. 121 * @param c The connection to the client requesting the work. 122 * @param tag A tag identifying the work. 123 * @param className The name of the class to be run 124 * @param args Arguments for the class to be run. 125 */ 126 void execMain(Agent agent, Connection c, String tag, String className, String[] args); 127 128 /** 129 * Called when the agent has successfully completed a request to execute a class. 130 * @param agent The agent that performed the work. 131 * @param c The connection to the client requesting the work. 132 * @param result The result status of the work 133 */ 134 void result(Agent agent, Connection c, Status result); 135 136 /** 137 * Called when the agent has failed to execute a class, 138 * or has failed to report the results back to the agent requesting the action, 139 * because an exception occurred. 140 * @param agent The agent that performed the work. 141 * @param c The connection to the client requesting the work. 142 * @param e The exception that occurred. 143 */ 144 void exception(Agent agent, Connection c, Throwable e); 145 146 /** 147 * Called when the agent has completed all processing of the request 148 * that arrived on a particular connection. 149 * @param agent The agent that performed the work. 150 * @param c The connection to the client requesting the work. 151 */ 152 void completed(Agent agent, Connection c); 153 } 154 155 /** 156 * Create an agent that connects to clients using a specified connection factory. 157 * @param connectionFactory The factory from which to get connections to clients. 158 * @param concurrency The number of simultaneous requests to be accepted. 159 */ 160 public Agent(ConnectionFactory connectionFactory, int concurrency) { 161 if (!isValidConcurrency(concurrency)) 162 throw new IllegalArgumentException("bad concurrency: " + concurrency); 163 164 this.connectionFactory = connectionFactory; 165 maxThreads = concurrency; 166 } 167 168 /** 169 * Set the delay to wait after failing to open a connection amd before trying again. 170 * @param delay The number of seconds to wait before attempting to open a new connection. 171 * @see #getRetryDelay 172 */ 173 public void setRetryDelay(int delay) { 174 if (delay <= 0) 175 throw new IllegalArgumentException("invalid delay"); 176 177 retryDelay = delay; 178 } 179 180 /** 181 * Get the delay to wait after failing to open a connection and before trying again. 182 * @return the number of seconds to wait before attempting to open a new connection. 183 * @see #setRetryDelay 184 */ 185 public int getRetryDelay() { 186 return retryDelay; 187 } 188 189 /** 190 * Set the translation map to be used to localize incoming requests. 191 * If an incoming request permits it, occurrences of certain substrings 192 * will be replaced with corresponding local variants. This is typically 193 * used to remap file systems which might have different mount points 194 * on different systems, but has been superceded by the environment's 195 * map substitution facility. 196 * @param map The translation map to be used. 197 */ 198 public synchronized void setMap(Map map) { 199 this.map = map; 200 if (tracing) { 201 if (map == null) 202 traceOut.println("set map null"); 203 else { 204 traceOut.println("set map:"); 205 map.setTracing(tracing, traceOut); 206 for (Enumeration<String[]> e = map.enumerate(); e.hasMoreElements(); ) { 207 String[] entry = e.nextElement(); 208 traceOut.println("map-from: " + entry[0]); 209 traceOut.println("map-to: " + entry[1]); 210 } 211 traceOut.println("end of map"); 212 } 213 } 214 } 215 216 217 /** 218 * Enable or disable tracing for agent activities. 219 * It is best to call this as early as possible - objects created by 220 * this class will inherit the setting as they are created/set. 221 * @param state True if tracing output should be provided. 222 */ 223 public void setTracing(boolean state) { 224 tracing = state; 225 } 226 227 //-------------------------------------------------------------------------- 228 229 /** 230 * Add an observer to monitor the progress of the TestFinder. 231 * @param o the observer 232 */ 233 public void addObserver(Observer o) { 234 notifier.addObserver(o); 235 } 236 237 /** 238 * Remove an observer form the set currently monitoring the progress 239 * of the TestFinder. 240 * @param o the observer 241 */ 242 public void removeObserver(Observer o) { 243 notifier.removeObserver(o); 244 } 245 246 //-------------------------------------------------------------------------- 247 248 /** 249 * Run the agent. Since an Agent is {@link Runnable runnable}, this method 250 * will typically be called on a separate thread. 251 */ 252 public synchronized void run() { 253 if (mainThread != null) 254 throw new IllegalStateException("Agent already running"); 255 256 mainThread = Thread.currentThread(); 257 258 timer = new Timer(); 259 closing = false; 260 261 try { 262 if (tracing) 263 traceOut.println("AGENT STARTED, maxThreads=" + maxThreads); 264 265 notifier.started(); 266 267 if (maxThreads <= 0) 268 // self defense: stops infinite wait, but the test should 269 // have already been done in the constructor and an argument 270 // thrown. 271 return; 272 273 274 while (!closing) { 275 while (threads.size() < maxThreads && !closing) { 276 Thread t = new Thread(new Runnable() { 277 public void run() { 278 Thread curr = Thread.currentThread(); 279 if (tracing) 280 traceOut.println("THREAD " + curr.getName() + " STARTED " + getClass().getName()); 281 282 try { 283 handleRequestsUntilClosed(); 284 } catch (InterruptedException e) { 285 } finally { 286 synchronized (Agent.this) { 287 threads.removeElement(curr); 288 Agent.this.notifyAll(); 289 } 290 if (tracing) 291 traceOut.println("THREAD " + curr.getName() + " EXITING"); 292 } 293 } 294 }); 295 t.setName("Agent" + nextThreadNum()); 296 int currPrio = Thread.currentThread().getPriority(); 297 int slvPrio = ((currPrio + Thread.MIN_PRIORITY) / 2); 298 t.setPriority(slvPrio); 299 t.start(); 300 threads.addElement(t); 301 } 302 wait(); 303 } 304 } catch (InterruptedException e) { 305 try { 306 close(); 307 } catch (InterruptedException ignore) { 308 } 309 } finally { 310 timer.finished(); 311 notifier.finished(); 312 if (tracing) 313 traceOut.println("AGENT EXITING"); 314 mainThread = null; 315 } 316 } 317 318 /** 319 * Interrupt this agent. The thread running the {@link #run run} method 320 * will be interrupted. 321 */ 322 public synchronized void interrupt() { 323 if (mainThread != null) 324 mainThread.interrupt(); 325 } 326 327 /** 328 * Close this agent. Any requests in progress will be allowed to complete, 329 * and no new requests will be accepted. 330 * @throws InterruptedException if this thread is interrupted while waiting 331 * for outstanding requests to complete. 332 */ 333 public synchronized void close() throws InterruptedException { 334 closing = true; // will prevent new tasks from being created 335 336 // interrupt any threads that are running 337 for (int i = 0; i < threads.size(); i++) { 338 Thread t = threads.elementAt(i); 339 if (tracing) 340 traceOut.println("INTERRUPTING THREAD " + t.getName()); 341 t.interrupt(); 342 } 343 344 // wait 3s for shutdown 345 traceOut.println("WAITING 3s FOR THREADS TO CLEANUP"); 346 Thread.currentThread().sleep(3000); 347 348 // close any tasks that are running 349 for (int i = 0; i < tasks.size(); i++) { 350 Task t = tasks.elementAt(i); 351 if (tracing) { 352 Connection c = t.connection; // maybe null; if it is, task is already closing 353 traceOut.println("CLOSING TASK " + (c == null ? "[unknown]" : c.getName())); 354 } 355 t.close(); 356 } 357 358 try { 359 if (tracing) 360 traceOut.println("CLOSING CONNECTION FACTORY"); 361 362 connectionFactory.close(); 363 } catch (ConnectionFactory.Fault ignore) { 364 } 365 366 // allow main loop to exit 367 notifyAll(); 368 369 if (tracing) 370 traceOut.println("WAITING FOR TASKS TO EXIT"); 371 372 // wait for tasks to go away 373 while (tasks.size() > 0) { 374 wait(); 375 } 376 377 if (tracing) 378 traceOut.println("CLOSED"); 379 } 380 381 /** 382 * Checks that given concurrency value belong to the certain range 1-256. 383 * The highest value should be in sync with: 384 * Parameters.ConcurrencyParameters.MAX_CONCURRENCY 385 * 386 * @param concurrency value to check 387 * @return true, if concurrency is acceptable 388 */ 389 static boolean isValidConcurrency(int concurrency) { 390 return 1 <= concurrency && concurrency <= 256; 391 } 392 393 //---------------------------------------------------------------------------- 394 395 private void handleRequestsUntilClosed() throws InterruptedException { 396 while (!closing) { 397 try { 398 // The call of nextConnection() will block until a connection is 399 // open; this can be for an indefinite amount of time. 400 // Therefore we must not hold the object lock while calling this routine. 401 Connection connection = connectionFactory.nextConnection(); 402 403 Task t; 404 synchronized (this) { 405 // Having opened a connection, we check that the agent has not been 406 // marked for shutdown before updating connection. 407 if (closing) { 408 closeIgnoreExceptions(connection); 409 return; 410 } 411 412 t = new Task(connection); 413 tasks.addElement(t); 414 } 415 416 try { 417 t.handleRequest(); 418 } finally { 419 synchronized (this) { 420 tasks.removeElement(t); 421 } 422 } 423 } catch (ConnectionFactory.Fault e) { 424 notifier.errorOpeningConnection(e.getException()); 425 if (tracing) 426 traceOut.println("THREAD " + Thread.currentThread().getName() + " " + e); 427 428 if (e.isFatal()) { 429 close(); 430 return; 431 } else { 432 int millis = 1000*min(5, getRetryDelay()); 433 Thread.currentThread().sleep(millis); 434 continue; 435 } 436 } 437 438 } 439 } 440 441 private static void closeIgnoreExceptions(Connection c) { 442 try { 443 c.close(); 444 } catch (IOException e) { 445 } 446 } 447 448 private static final int min(int a, int b) { 449 return (a < b ? a : b); 450 } 451 452 private class Notifier { 453 public synchronized void addObserver(Observer o) { 454 observers = DynamicArray.append(observers, o); 455 } 456 457 public synchronized void removeObserver(Agent.Observer o) { 458 observers = DynamicArray.remove(observers, o); 459 } 460 461 public synchronized void started() { 462 for (int i = 0; i < observers.length; i++) 463 observers[i].started(Agent.this); 464 } 465 466 public synchronized void finished() { 467 for (int i = 0; i < observers.length; i++) 468 observers[i].finished(Agent.this); 469 } 470 471 public synchronized void openedConnection(Connection connection) { 472 for (int i = 0; i < observers.length; i++) 473 observers[i].openedConnection(Agent.this, connection); 474 } 475 476 public synchronized void errorOpeningConnection(Exception e) { 477 for (int i = 0; i < observers.length; i++) 478 observers[i].errorOpeningConnection(Agent.this, e); 479 } 480 481 public synchronized void execTest(Connection cconnection, String tag, String className, String[] args) { 482 for (int i = 0; i < observers.length; i++) 483 observers[i].execTest(Agent.this, cconnection, tag, className, args); 484 } 485 486 public synchronized void execCommand(Connection cconnection, String tag, String className, String[] args) { 487 for (int i = 0; i < observers.length; i++) 488 observers[i].execCommand(Agent.this, cconnection, tag, className, args); 489 } 490 491 public synchronized void execMain(Connection connection, String tag, String className, String[] args) { 492 for (int i = 0; i < observers.length; i++) 493 observers[i].execMain(Agent.this, connection, tag, className, args); 494 } 495 496 public synchronized void result(Connection connection, Status status) { 497 for (int i = 0; i < observers.length; i++) 498 observers[i].result(Agent.this, connection, status); 499 } 500 501 public synchronized void exception(Connection connection, Exception e) { 502 for (int i = 0; i < observers.length; i++) 503 observers[i].exception(Agent.this, connection, e); 504 } 505 506 public synchronized void completed(Connection connection) { 507 for (int i = 0; i < observers.length; i++) 508 observers[i].completed(Agent.this, connection); 509 } 510 511 private Observer[] observers = new Observer[0]; 512 }; 513 514 private synchronized void setSystemStreams(Object owner, PrintStream out, PrintStream err) 515 throws InterruptedException, SecurityException { 516 if (owner == null) 517 throw new NullPointerException(); 518 519 while (currSystemStreamOwner != null) 520 wait(); 521 522 currSystemStreamOwner = owner; 523 saveOut = System.out; 524 saveErr = System.err; 525 System.setOut(out); 526 System.setErr(err); 527 } 528 529 private synchronized void resetSystemStreams(Object owner) throws SecurityException { 530 if (owner == null) 531 throw new NullPointerException(); 532 533 if (owner != currSystemStreamOwner) 534 throw new IllegalStateException("expected: " + owner + " found: " + currSystemStreamOwner); 535 536 currSystemStreamOwner = null; 537 System.setOut(saveOut); 538 System.setErr(saveErr); 539 notifyAll(); 540 } 541 542 private boolean closing; 543 private Thread mainThread; 544 private int maxThreads; 545 private Vector<Thread> threads = new Vector<>(); 546 private Vector<Task> tasks = new Vector<>(); 547 private Notifier notifier = new Notifier(); 548 private Object currSystemStreamOwner = null; 549 private PrintStream saveOut; 550 private PrintStream saveErr; 551 552 /** 553 * A flag to enable debug tracing of the operation of the agent. 554 */ 555 protected boolean tracing = false; 556 protected PrintStream traceOut = System.out; 557 558 /** 559 * The default time to wait after a failed attempt to open a connection, 560 * and before trying again. 561 * @see #setRetryDelay 562 */ 563 public static final int DEFAULT_RETRY_DELAY = 5; 564 private int retryDelay = DEFAULT_RETRY_DELAY; 565 private ConnectionFactory connectionFactory; 566 567 private Map map; 568 private Timer timer; 569 570 /* For autonumbering agent tasks. */ 571 private static int threadInitNumber; 572 573 private static synchronized int nextThreadNum() { 574 return threadInitNumber++; 575 } 576 577 // The following is used to ensure consistency between Agent and AgentManager 578 static final short protocolVersion = 105; 579 580 /** 581 * The default port to which active agents will try and connect on a nominated host. 582 */ 583 public static final int defaultActivePort = 1907; 584 585 /** 586 * The default port on which passive ports will listen for incoming connections. 587 */ 588 public static final int defaultPassivePort = 1908; 589 590 static final byte CLASS = (byte)'C'; 591 static final byte DATA = (byte)'D'; 592 static final byte LOG = (byte)'L'; 593 static final byte LOG_FLUSH = (byte)'l'; 594 static final byte REF = (byte)'R'; 595 static final byte REF_FLUSH = (byte)'r'; 596 static final byte STATUS = (byte)'S'; 597 598 static final String productName = "JT Harness Agent"; 599 static final String productVersion = "JTA_5.0"; 600 static final String productCopyright = "Copyright (c) 1996, 2018, Oracle and/or its affiliates. All rights reserved."; 601 602 603 /** 604 * Tasks handle the individual requests received by Agent. 605 * They read the request from the connection, execute the request, which means 606 * running the test class on behalf of the client, and any output from the 607 * test class is written back to the client via the connection. 608 */ 609 class Task { 610 Task(Connection c) { 611 if (c == null) 612 throw new NullPointerException(); 613 connection = c; 614 } 615 616 public void handleRequest() throws ConnectionFactory.Fault { 617 618 try { 619 notifier.openedConnection(connection); 620 621 if (tracing) 622 traceOut.println("REQUEST FROM " + connection.getName()); 623 624 in = new DataInputStream(connection.getInputStream()); 625 short pVer = in.readShort(); 626 if (pVer != protocolVersion) 627 throw new IOException("protocol mismatch;" + 628 " expected " + protocolVersion + 629 " received " + pVer); 630 631 tag = in.readUTF(); 632 633 if (tracing) 634 traceOut.println("TAG IS `" + tag + "'"); 635 636 request = in.readUTF(); 637 638 if (tracing) 639 traceOut.println("REQUEST IS `" + request + "'"); 640 641 out = new DataOutputStream(new BufferedOutputStream(connection.getOutputStream())); 642 643 Status status; 644 645 if (request.equals("executeTest") || request.equals("executeCommand") || request.equals("executeMain") ) 646 status = execute(); 647 else { 648 if (tracing) 649 traceOut.println("Unrecognized request for agent: `" + request + "'"); 650 status = Status.error("Unrecognized request for agent: `" + request + "'"); 651 } 652 653 if (tracing) 654 traceOut.println("RETURN " + status); 655 656 notifier.result(connection, status); 657 658 if (tracing) 659 traceOut.println("SEND STATUS"); 660 661 sendStatus(status); 662 663 if (tracing) 664 traceOut.println("FLUSH"); 665 666 out.flush(); 667 668 if (tracing) 669 traceOut.println("AWAIT CLOSE"); 670 671 /* 672 final Thread taskThread = Thread.currentThread(); 673 674 Timer.Timeable timeOutHandler = new Timer.Timeable() { 675 public void timeout() { 676 if (tracing) 677 traceOut.println("EOF TIMEOUT"); 678 IOException e = new IOException("timeout communicating with AgentManager"); 679 synchronized (Agent.this) { 680 for (int i = 0; i < observers.length; i++) 681 observers[i].exception(Agent.this, connection, e); 682 } 683 close(); // give up 684 taskThread.interrupt(); 685 traceOut.println("EOF TIMEOUT CLOSED"); 686 } 687 }; 688 689 Timer.Entry te = timer.requestDelayedCallback(timeOutHandler, 5000); 690 while (in.read() != -1) ; 691 692 if (tracing) 693 traceOut.println("RECEIVED EOF"); 694 695 timer.cancel(te); 696 697 notifier.completed(connection); 698 */ 699 700 connection.waitUntilClosed(5000); 701 702 if (Thread.interrupted() && tracing) { 703 traceOut.println("Thread was interrupted - clearing interrupted status!"); 704 } 705 706 if (connection.isClosed()) 707 notifier.completed(connection); 708 else 709 notifier.exception(connection, new IOException("timeout awaiting close from AgentManager")); 710 } catch (InterruptedException e) { 711 if (tracing) { 712 traceOut.println("Interrupted"); 713 } 714 715 notifier.exception(connection, e); 716 } catch (InterruptedIOException e) { 717 if (tracing) { 718 traceOut.println("Interrupted (IO)"); 719 } 720 721 notifier.exception(connection, e); 722 } catch (IOException e) { 723 if (tracing) { 724 traceOut.println("EXCEPTION IS `" + e + "'"); 725 e.printStackTrace(traceOut); 726 } 727 728 notifier.exception(connection, e); 729 } finally { 730 close(); 731 } 732 } 733 734 private Status execute() throws IOException { 735 String className = in.readUTF(); 736 737 if (tracing) 738 traceOut.println("CLASSNAME: " + className); 739 740 int n = in.readShort(); 741 742 if (tracing) 743 traceOut.println("nArgs: " + n); 744 745 String[] args = new String[n]; 746 for (int i = 0; i < args.length; i++) { 747 args[i] = in.readUTF(); 748 if (tracing) 749 traceOut.println("arg[" + i + "]: " + args[i]); 750 } 751 752 boolean mapArgs = in.readBoolean(); 753 754 if (tracing) 755 traceOut.println("mapArgs: " + mapArgs); 756 757 boolean remoteClasses = in.readBoolean(); 758 759 if (tracing) 760 traceOut.println("remoteClasses: " + remoteClasses); 761 762 boolean sharedClassLoader = in.readBoolean(); 763 764 if (tracing) { 765 traceOut.println("sharedClassLoader: " + sharedClassLoader); 766 } 767 768 timeoutValue = in.readInt(); 769 770 if (tracing) 771 traceOut.println("COMMAND TIMEOUT(seconds) IS `" + timeoutValue + "'"); 772 773 byte guard = in.readByte(); 774 if (guard != 0) 775 throw new IOException("data format error"); 776 777 if (map != null && mapArgs) 778 map.map(args); 779 780 PrintWriter testLog = new PrintWriter(new AgentWriter(LOG, this)); 781 PrintWriter testRef = new PrintWriter(new AgentWriter(REF, this)); 782 783 try { 784 Class<?> c; 785 ClassLoader cl = null; 786 if (remoteClasses) { 787 cl = getAgentClassLoader(sharedClassLoader); 788 c = cl.loadClass(className); 789 } else { 790 c = Class.forName(className); 791 } 792 793 if (request.equals("executeTest")) { 794 return executeTest(c, args, testLog, testRef); 795 } else if (request.equals("executeCommand")) { 796 return executeCommand(c, args, testLog, testRef, cl); 797 } else if (request.equals("executeMain")) { 798 return executeMain(c, args, testLog, testRef); 799 } else { 800 return Status.error("Unrecognized request for agent: `" + request + "'"); 801 } 802 } catch (ClassCastException e) { 803 if (tracing) 804 e.printStackTrace(traceOut); 805 return Status.error("Can't execute class `" + className + "': required interface not found"); 806 } catch (ClassNotFoundException ex) { 807 return Status.error("Can't find class `" + className + "'"); 808 } catch (IllegalAccessException ex) { 809 return Status.error("Illegal access to class `" + className + "'"); 810 } catch (InstantiationException ex) { 811 return Status.error("Can't instantiate class`" + className + "'"); 812 } catch (ThreadDeath e) { 813 throw e; 814 } catch (Exception e) { 815 e.printStackTrace(testLog); 816 return Status.error("Unexpected exception: " + e); 817 } catch (Error e) { 818 e.printStackTrace(testLog); 819 return Status.error("Unexpected error: " + e); 820 } catch (Throwable e) { 821 e.printStackTrace(testLog); 822 return Status.error("Unexpected throwable: " + e); 823 } finally { 824 // close the streams used by the test and write the test status back 825 if (tracing) 826 traceOut.println("CLOSE TESTREF"); 827 828 testRef.close(); 829 830 if (tracing) 831 traceOut.println("CLOSE TESTLOG"); 832 833 testLog.close(); 834 } 835 } 836 837 private Status executeTest(Class<?> c, String[] args, 838 PrintWriter testLog, PrintWriter testRef) 839 throws IOException, ClassNotFoundException, IllegalAccessException, InstantiationException { 840 notifier.execTest(connection, tag, c.getName(), args); 841 Test t = (Test)(c.newInstance()); 842 return t.run(args, testLog, testRef); 843 } 844 845 private Status executeCommand(Class<?> c, String[] args, 846 PrintWriter testLog, PrintWriter testRef, 847 ClassLoader cl) 848 throws IOException, ClassNotFoundException, IllegalAccessException, InstantiationException { 849 notifier.execCommand(connection, tag, c.getName(), args); 850 851 Command tc = (Command)(c.newInstance()); 852 tc.setClassLoader(cl); 853 return new CommandExecutor(tc, args, testLog, testRef, timeoutValue).execute(); 854 } 855 856 private class CommandExecutor { 857 858 private String[] args; 859 private PrintWriter testLog; 860 private PrintWriter testRef; 861 private Command tc; 862 private Status result; 863 864 private final Object LOCK = new Object(); 865 private boolean executed = false; 866 private boolean timeout = false; 867 private int timeoutValue = 0; 868 869 public CommandExecutor(Command tc, String[] args, PrintWriter testLog, 870 PrintWriter testRef, int timeoutValue) { 871 this.args = args; 872 this.testLog = testLog; 873 this.testRef = testRef; 874 this.tc = tc; 875 this.timeoutValue = timeoutValue; 876 } 877 878 public Status execute(){ 879 880 Timer alarmTimer = null; 881 if (timeoutValue != 0) { 882 alarmTimer = new Timer(); 883 alarmTimer.requestDelayedCallback(new Timer.Timeable() { 884 @Override 885 public void timeout() { 886 result = Status.error("Marked as error by timeout after "+timeout+" seconds"); 887 timeout = true; 888 synchronized (LOCK) { 889 LOCK.notifyAll(); 890 } 891 } 892 }, timeoutValue * 1000); 893 } 894 895 Thread executeThread = new Thread(new Runnable() { 896 @Override 897 public void run() { 898 try { 899 result = tc.run(args, testLog, testRef); 900 } 901 catch (Exception e){ 902 result = Status.error("Unhandled " + e.getClass().getName() + " exception " + 903 "for " + tc.getClass().getName() + " command. " + 904 "Exception message: " + e.getMessage()); 905 } 906 finally { 907 executed = true; 908 synchronized (LOCK) { 909 LOCK.notifyAll(); 910 } 911 } 912 } 913 }, "CommandExecutor executeThread for command args: "+ Arrays.toString(args)); 914 executeThread.start(); 915 916 synchronized (LOCK) { 917 while (!executed && !timeout) { 918 try { 919 LOCK.wait(); 920 } 921 catch (InterruptedException e) { 922 // treat interrupt as exit request 923 break; 924 } 925 } 926 } 927 928 executeThread.setPriority(Thread.MIN_PRIORITY); 929 executeThread.interrupt(); 930 if (alarmTimer != null){ 931 alarmTimer.finished(); 932 } 933 934 return result; 935 } 936 937 } 938 939 private Status executeMain(Class<?> c, String[] args, 940 PrintWriter testLog, PrintWriter testRef) 941 throws IOException, ClassNotFoundException, IllegalAccessException { 942 notifier.execMain(connection, tag, c.getName(), args); 943 944 PrintStream out = Deprecated.createPrintStream(new WriterStream(testRef)); 945 PrintStream err = Deprecated.createPrintStream(new WriterStream(testLog)); 946 try { 947 setSystemStreams(this, out, err); 948 Method main = c.getDeclaredMethod("main", new Class<?>[] {String[].class}); 949 main.invoke(null, new Object[] {args}); 950 return Status.passed("OK"); 951 } catch (NoSuchMethodException e) { 952 return Status.error("Can't find `public static void main(String[] args)' for `" + c.getName() + "'"); 953 } catch (InvocationTargetException e) { 954 Throwable t = e.getTargetException(); 955 t.printStackTrace(err); 956 return Status.failed(t.toString()); 957 } catch (InterruptedException e) { 958 return Status.failed("interrupted while waiting for access to system streams"); 959 } finally { 960 resetSystemStreams(this); 961 out.flush(); 962 err.flush(); 963 } 964 } 965 966 /** 967 * Close the task, abandoning any request in progress. 968 */ 969 synchronized void close() { 970 if (!connection.isClosed()) { 971 closeIgnoreExceptions(connection); 972 // don't nullify connections because handleRequest might still be using it 973 } 974 975 if (in != null) { 976 try { 977 //System.err.println("closing in"); 978 in.close(); 979 in = null; 980 } catch (IOException ignore) { 981 } 982 } 983 984 if (out != null) { 985 try { 986 //System.err.println("closing out"); 987 out.close(); 988 out = null; 989 } catch (IOException ignore) { 990 } 991 } 992 } 993 994 /** 995 * Send wrapped data back to the client. 996 */ 997 synchronized void sendChars(byte type, char b[], int off, int len) throws IOException { 998 out.write(type); 999 1000 String message = new String(b, off, len); 1001 int strlen = message.length(); 1002 int utflen = 0; 1003 int c, count = 0; 1004 1005 // shortcut if it's not possible to exceed the limit 1006 if (strlen * 3 < 65535) { 1007 count = strlen; 1008 } 1009 else { 1010 /* count number of chars to meet 65535 bytes boundary */ 1011 for (int i = 0; i < strlen; i++) { 1012 c = message.charAt(i); 1013 if ((c >= 0x0001) && (c <= 0x007F)) { 1014 utflen++; 1015 } else if (c > 0x07FF) { 1016 utflen += 3; 1017 } else { 1018 utflen += 2; 1019 } 1020 1021 // truncate here because we won't be able to encode more 1022 if (utflen > 65535) { 1023 break; 1024 } else { 1025 count = i; 1026 } 1027 } // for 1028 } 1029 1030 out.writeUTF(message.substring(0, count)); 1031 //out.writeUTF(new String(b, off, len)); 1032 switch (type) { 1033 case LOG_FLUSH: 1034 case REF_FLUSH: 1035 out.flush(); 1036 } 1037 } 1038 1039 /** 1040 * Send the final status back to the client. 1041 */ 1042 private synchronized void sendStatus(Status s) throws IOException { 1043 out.write(STATUS); 1044 out.write((byte)s.getType()); 1045 out.writeUTF(s.getReason()); 1046 } 1047 1048 /** 1049 * Get the bytecodes for a class 1050 */ 1051 synchronized AgentRemoteClassData getClassData(String className) throws ClassNotFoundException { 1052 if (tracing) 1053 traceOut.println("REMOTE LOAD " + className); 1054 1055 try { 1056 out.write(CLASS); 1057 out.writeUTF(className); 1058 out.flush(); 1059 1060 AgentRemoteClassData classData = new AgentRemoteClassData(in); 1061 if (tracing) { 1062 traceOut.println("REMOTE LOADED CLASS " + classData.toString()); 1063 } 1064 return classData; 1065 } catch (IOException e) { 1066 throw new ClassNotFoundException(className + ": " + e); 1067 } 1068 } 1069 1070 /** 1071 * Get a resource 1072 */ 1073 synchronized byte[] getResourceData(String resourceName) throws MissingResourceException, IOException { 1074 if (tracing) 1075 traceOut.println("REMOTE LOAD " + resourceName); 1076 1077 out.write(DATA); 1078 out.writeUTF(resourceName); 1079 out.flush(); 1080 1081 int size = in.readInt(); 1082 if (size == -1) 1083 throw new MissingResourceException(resourceName, null, resourceName); 1084 1085 byte[] data = new byte[size]; 1086 int offset = 0; 1087 while (offset < data.length) { 1088 int n = in.read(data, offset, data.length - offset); 1089 if (n == -1) 1090 throw new IOException(resourceName + ": EOF while reading resource data"); 1091 else 1092 offset += n; 1093 } 1094 1095 //System.err.println(data.length); 1096 //for (int i = 0; i < min(10, data.length); i++) { 1097 // System.err.print(data[i] + " "); 1098 //} 1099 //System.err.print(" ... "); 1100 //for (int i = max(0, data.length - 10); i < data.length; i++) { 1101 // System.err.print(data[i] + " "); 1102 //} 1103 //System.err.println(); 1104 1105 return data; 1106 } 1107 1108 private ClassLoader getAgentClassLoader(boolean useSharedClassLoader) 1109 throws InstantiationException, IllegalAccessException { 1110 Class<? extends ClassLoader> classLoaderClass; 1111 try { 1112 String s = getClass().getName(); 1113 String pkg = s.substring(0, s.lastIndexOf('.')); 1114 classLoaderClass = (Class<? extends ClassLoader>) Class.forName(pkg + ".AgentClassLoader2"); 1115 } catch (Throwable t) { 1116 classLoaderClass = AgentClassLoader.class; 1117 } 1118 1119 Class<?>[] argTypes = {Task.class}; 1120 if (useSharedClassLoader && factoryMethod == null) { 1121 try { 1122 factoryMethod = classLoaderClass.getDeclaredMethod("getInstance", argTypes); 1123 } catch (NoSuchMethodException e) { 1124 e.printStackTrace(); 1125 } 1126 } 1127 1128 if (classLoaderConstructor == null) { 1129 try { 1130 classLoaderConstructor = classLoaderClass.getDeclaredConstructor(argTypes); 1131 classLoaderConstructor.setAccessible(true); 1132 } catch (NoSuchMethodException e) { 1133 e.printStackTrace(); 1134 } 1135 } 1136 1137 try { 1138 Object[] args = {this}; 1139 if (useSharedClassLoader && factoryMethod != null) { 1140 return (ClassLoader) factoryMethod.invoke(null, args); 1141 } else { 1142 return classLoaderConstructor.newInstance(args); 1143 } 1144 } catch (InvocationTargetException e) { 1145 Throwable t = e.getTargetException(); 1146 if (t instanceof RuntimeException) { 1147 throw (RuntimeException) t; 1148 } else if (t instanceof Error) { 1149 throw (Error) t; 1150 } else { 1151 throw new Error(e.toString()); 1152 } 1153 } 1154 1155 } 1156 1157 private Connection connection; 1158 private DataInputStream in; 1159 private DataOutputStream out; 1160 private String tag; 1161 private String request; 1162 private Integer timeoutValue; 1163 } 1164 1165 private static Constructor<? extends ClassLoader> classLoaderConstructor; 1166 private static Method factoryMethod = null; 1167 } 1168 1169 1170 1171 1172 1173 /** 1174 * Stream passed to the class that is executed on behalf of the client. 1175 * Data written to the stream is buffered and eventually written back to th 1176 * client via the Task's sendChars method. 1177 */ 1178 class AgentWriter extends Writer { 1179 /** 1180 * Create a stream that sends its data back to the parent Task. 1181 * @arg type A tag to pass back to parent.sendChars(). 1182 * @arg parent The parent object to which to pass the data written to the stream. 1183 */ 1184 AgentWriter(byte type, Agent.Task parent) { 1185 this.type = type; 1186 this.parent = parent; 1187 } 1188 1189 /** 1190 * Writes a character. This method will block until the character 1191 * is actually written. 1192 * @param ch the char 1193 * @exception IOException If an I/O error has occurred. 1194 */ 1195 public synchronized void write(int ch) throws IOException { 1196 buf[count++] = (char)ch; 1197 if (count == buf.length) { 1198 try { 1199 parent.sendChars(type, buf, 0, count); 1200 } 1201 finally { 1202 count = 0; 1203 } 1204 } 1205 } 1206 1207 /** 1208 * Writes an array of characters. This method will block until the 1209 * characters are actually written. 1210 * @param c the data to be written 1211 * @exception IOException If an I/O error has occurred. 1212 */ 1213 public void write(char c[]) throws IOException { 1214 write(c, 0, c.length); 1215 } 1216 1217 /** 1218 * Writes a sub array of characters. 1219 * @param c the data to be written 1220 * @param off the start offset in the data 1221 * @param len the number of bytes that are written 1222 * @exception IOException If an I/O error has occurred. 1223 */ 1224 public synchronized void write(char c[], int off, int len) throws IOException { 1225 if (len < buf.length - count) { 1226 // there is room for the bytes in the current buffer 1227 System.arraycopy(c, off, buf, count, len); 1228 count += len; 1229 } else { 1230 // not room in the current buffer, so flush it 1231 flush(); 1232 if (len < buf.length) { 1233 // there is _now_ enough room in the current buffer, so use it 1234 System.arraycopy(c, off, buf, count, len); 1235 count += len; 1236 } else { 1237 // current buffer not big enough; send data directly 1238 parent.sendChars(type, c, off, len); 1239 } 1240 } 1241 } 1242 1243 /** 1244 * Flushes the stream. This will write any buffered 1245 * output bytes. 1246 * @exception IOException If an I/O error has occurred. 1247 */ 1248 public synchronized void flush() throws IOException { 1249 if (count > 0) { 1250 switch (type) { 1251 case Agent.LOG: type = Agent.LOG_FLUSH; break; 1252 case Agent.REF: type = Agent.REF_FLUSH; break; 1253 } 1254 try { 1255 parent.sendChars(type, buf, 0, count); 1256 } 1257 finally { 1258 count = 0; 1259 } 1260 } 1261 } 1262 1263 /** 1264 * Closes the stream. This method must be called 1265 * to release any resources associated with the 1266 * stream. 1267 * @exception IOException If an I/O error has occurred. 1268 */ 1269 public void close() throws IOException { 1270 flush(); 1271 } 1272 1273 private byte type; 1274 private Agent.Task parent; 1275 private char[] buf = new char[1024]; 1276 private int count = 0; 1277 }