< prev index next >

src/jdk.jshell/share/classes/jdk/jshell/execution/Util.java

Print this page
rev 3613 : imported patch 8131023

@@ -23,20 +23,25 @@
  * questions.
  */
 package jdk.jshell.execution;
 
 import jdk.jshell.spi.ExecutionEnv;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectInput;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutput;
 import java.io.ObjectOutputStream;
 import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.function.BiFunction;
 import java.util.function.Consumer;
+
 import com.sun.jdi.VirtualMachine;
 import jdk.jshell.spi.ExecutionControl;
 
 
 /**

@@ -97,45 +102,87 @@
     /**
      * Forward commands from the input to the specified {@link ExecutionControl}
      * instance, then responses back on the output.
      * @param ec the direct instance of {@link ExecutionControl} to process commands
      * @param inStream the stream from which to create the command input
-     * @param outStream the stream that will carry {@code System.out},
-     * {@code System.err}, any specified auxiliary channels, and the
-     * command response output.
-     * @param streamMap a map between names of additional streams to carry and setters
-     * for the stream
+     * @param outStream the stream that will carry any specified auxiliary channels (like
+     *                  {@code System.out} and {@code System.err}), and the command response output.
+     * @param outputStreamMap a map between names of additional streams to carry and setters
+     *                        for the stream. Names starting with '$' are reserved for internal use.
+     * @param inputStreamMap a map between names of additional streams to carry and setters
+     *                       for the stream. Names starting with '$' are reserved for internal use.
      * @throws IOException if there are errors using the passed streams
      */
     public static void forwardExecutionControlAndIO(ExecutionControl ec,
             InputStream inStream, OutputStream outStream,
-            Map<String, Consumer<OutputStream>> streamMap) throws IOException {
-        ObjectInputStream cmdIn = new ObjectInputStream(inStream);
-        for (Entry<String, Consumer<OutputStream>> e : streamMap.entrySet()) {
+            Map<String, Consumer<OutputStream>> outputStreamMap,
+            Map<String, Consumer<InputStream>> inputStreamMap) throws IOException {
+        for (Entry<String, Consumer<OutputStream>> e : outputStreamMap.entrySet()) {
             e.getValue().accept(multiplexingOutputStream(e.getKey(), outStream));
         }
-        ObjectOutputStream cmdOut = new ObjectOutputStream(multiplexingOutputStream("command", outStream));
+
+        ObjectOutputStream cmdOut = new ObjectOutputStream(multiplexingOutputStream("$command", outStream));
+        PipeInputStream cmdInPipe = new PipeInputStream();
+        Map<String, OutputStream> inputs = new HashMap<>();
+        inputs.put("$command", cmdInPipe.createOutput());
+        for (Entry<String, Consumer<InputStream>> e : inputStreamMap.entrySet()) {
+            OutputStream inputSignal = multiplexingOutputStream("$" + e.getKey() + "-input-requested", outStream);
+            PipeInputStream inputPipe = new PipeInputStream() {
+                @Override protected void inputNeeded() throws IOException {
+                    inputSignal.write('1');
+                    inputSignal.flush();
+                }
+            };
+            inputs.put(e.getKey(), inputPipe.createOutput());
+            e.getValue().accept(inputPipe);
+        }
+        new DemultiplexInput(inStream, inputs, inputs.values()).start();
+        ObjectInputStream cmdIn = new ObjectInputStream(cmdInPipe);
+
         forwardExecutionControl(ec, cmdIn, cmdOut);
     }
 
     static OutputStream multiplexingOutputStream(String label, OutputStream outputStream) {
         return new MultiplexingOutputStream(label, outputStream);
     }
 
     /**
-     * Reads from an InputStream which has been packetized and write its contents
-     * to the out and err OutputStreams; Copies the command stream.
+     * Creates an ExecutionControl for given packetized input and output. The given InputStream
+     * is de-packetized, and content forwarded to ObjectInput and given OutputStreams. The ObjectOutput
+     * and values read from the given InputStream are packetized and sent to the given OutputStream.
+     *
      * @param input the packetized input stream
-     * @param streamMap a map between stream names and the output streams to forward
-     * @return the command stream
+     * @param output the packetized output stream
+     * @param outputStreamMap a map between stream names and the output streams to forward.
+     *                        Names starting with '$' are reserved for internal use.
+     * @param inputStreamMap a map between stream names and the input streams to forward.
+     *                       Names starting with '$' are reserved for internal use.
+     * @param factory to create the ExecutionControl from ObjectInput and ObjectOutput.
+     * @return the created ExecutionControl
      * @throws IOException if setting up the streams raised an exception
      */
-    public static ObjectInput remoteInput(InputStream input,
-            Map<String, OutputStream> streamMap) throws IOException {
+    public static ExecutionControl remoteInputOutput(InputStream input, OutputStream output,
+            Map<String, OutputStream> outputStreamMap, Map<String, InputStream> inputStreamMap,
+            BiFunction<ObjectInput, ObjectOutput, ExecutionControl> factory) throws IOException {
+        Map<String, OutputStream> augmentedStreamMap = new HashMap<>(outputStreamMap);
+        ObjectOutput commandOut = new ObjectOutputStream(Util.multiplexingOutputStream("$command", output));
+        for (Entry<String, InputStream> e : inputStreamMap.entrySet()) {
+            InputStream  in = e.getValue();
+            OutputStream inTarget = Util.multiplexingOutputStream(e.getKey(), output);
+            augmentedStreamMap.put("$" + e.getKey() + "-input-requested", new OutputStream() {
+                @Override
+                public void write(int b) throws IOException {
+                    //value ignored, just a trigger to read from the input
+                    inTarget.write(in.read());
+                }
+            });
+        }
         PipeInputStream commandIn = new PipeInputStream();
-        new DemultiplexInput(input, commandIn, streamMap).start();
-        return new ObjectInputStream(commandIn);
+        OutputStream commandInTarget = commandIn.createOutput();
+        augmentedStreamMap.put("$command", commandInTarget);
+        new DemultiplexInput(input, augmentedStreamMap, Arrays.asList(commandInTarget)).start();
+        return factory.apply(new ObjectInputStream(commandIn), commandOut);
     }
 
     /**
      * Monitor the JDI event stream for {@link com.sun.jdi.event.VMDeathEvent}
      * and {@link com.sun.jdi.event.VMDisconnectEvent}. If encountered, invokes

@@ -149,34 +196,6 @@
         if (vm.canBeModified()) {
             new JDIEventHandler(vm, unbiddenExitHandler).start();
         }
     }
 
-    /**
-     * Creates a Thread that will ship all input to the remote agent.
-     *
-     * @param inputStream the user input
-     * @param outStream the input to the remote agent
-     * @param handler a failure handler
-     */
-    public static void forwardInputToRemote(final InputStream inputStream,
-            final OutputStream outStream, final Consumer<Exception> handler) {
-        Thread thr = new Thread("input reader") {
-            @Override
-            public void run() {
-                try {
-                    byte[] buf = new byte[256];
-                    int cnt;
-                    while ((cnt = inputStream.read(buf)) != -1) {
-                        outStream.write(buf, 0, cnt);
-                        outStream.flush();
-                    }
-                } catch (Exception ex) {
-                    handler.accept(ex);
-                }
-            }
-        };
-        thr.setPriority(Thread.MAX_PRIORITY - 1);
-        thr.start();
-    }
-
 }
< prev index next >