test/lib/testlibrary/jdk/testlibrary/StreamPumper.java

Print this page




   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 package jdk.testlibrary;
  25 


  26 import java.io.OutputStream;
  27 import java.io.InputStream;
  28 import java.io.IOException;





  29 
  30 public final class StreamPumper implements Runnable {
  31 
  32     private static final int BUF_SIZE = 256;
  33 











  34     private final OutputStream out;






















  35     private final InputStream in;


  36 







  37     /**
  38      * Create a StreamPumper that reads from in and writes to out.
  39      *
  40      * @param in
  41      *            The stream to read from.
  42      * @param out
  43      *            The stream to write to.
  44      */
  45     public StreamPumper(InputStream in, OutputStream out) {
  46         this.in = in;
  47         this.out = out;
  48     }
  49 
  50     /**
  51      * Implements Thread.run(). Continuously read from {@code in} and write to
  52      * {@code out} until {@code in} has reached end of stream. Abort on
  53      * interruption. Abort on IOExceptions.
  54      */
  55     @Override
  56     public void run() {
  57         int length;
  58         InputStream localIn = in;
  59         OutputStream localOut = out;
  60         byte[] buffer = new byte[BUF_SIZE];

  61 
  62         try {
  63             while ((length = localIn.read(buffer)) > 0 && !Thread.interrupted()) {
  64                 localOut.write(buffer, 0, length);
  65             }






































  66         } catch (IOException e) {
  67             // Just abort if something like this happens.
  68             e.printStackTrace();
  69         } finally {

  70             try {
  71                 localOut.flush();



  72                 in.close();
  73             } catch (IOException e) {
  74                 e.printStackTrace();
  75             }
  76         }



  77     }































  78 }


   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 package jdk.testlibrary;
  25 
  26 import java.io.BufferedInputStream;
  27 import java.io.ByteArrayOutputStream;
  28 import java.io.OutputStream;
  29 import java.io.InputStream;
  30 import java.io.IOException;
  31 import java.util.HashSet;
  32 import java.util.Set;
  33 import java.util.concurrent.Future;
  34 import java.util.concurrent.FutureTask;
  35 import java.util.concurrent.atomic.AtomicBoolean;
  36 
  37 public final class StreamPumper implements Runnable {
  38 
  39     private static final int BUF_SIZE = 256;
  40 
  41     /**
  42      * Pump will be called by the StreamPumper to process the incoming data
  43      */
  44     abstract public static class Pump {
  45         abstract void register(StreamPumper d);
  46     }
  47 
  48     /**
  49      * OutputStream -> Pump adapter
  50      */
  51     final public static class StreamPump extends Pump {
  52         private final OutputStream out;
  53         public StreamPump(OutputStream out) {
  54             this.out = out;
  55         }
  56 
  57         @Override
  58         void register(StreamPumper sp) {
  59             sp.addOutputStream(out);
  60         }
  61     }
  62 
  63     /**
  64      * Used to process the incoming data line-by-line
  65      */
  66     abstract public static class LinePump extends Pump {
  67         @Override
  68         final void register(StreamPumper sp) {
  69             sp.addLineProcessor(this);
  70         }
  71 
  72         abstract protected void processLine(String line);
  73     }
  74 
  75     private final InputStream in;
  76     private final Set<OutputStream> outStreams = new HashSet<>();
  77     private final Set<LinePump> linePumps = new HashSet<>();
  78 
  79     private final AtomicBoolean processing = new AtomicBoolean(false);
  80     private final FutureTask<Void> processingTask = new FutureTask(this, null);
  81 
  82     public StreamPumper(InputStream in) {
  83         this.in = in;
  84     }
  85 
  86     /**
  87      * Create a StreamPumper that reads from in and writes to out.
  88      *
  89      * @param in
  90      *            The stream to read from.
  91      * @param out
  92      *            The stream to write to.
  93      */
  94     public StreamPumper(InputStream in, OutputStream out) {
  95         this(in);
  96         this.addOutputStream(out);
  97     }
  98 
  99     /**
 100      * Implements Thread.run(). Continuously read from {@code in} and write to
 101      * {@code out} until {@code in} has reached end of stream. Abort on
 102      * interruption. Abort on IOExceptions.
 103      */
 104     @Override
 105     public void run() {
 106         try (BufferedInputStream is = new BufferedInputStream(in)) {
 107             ByteArrayOutputStream lineBos = new ByteArrayOutputStream();
 108             byte[] buf = new byte[BUF_SIZE];
 109             int len = 0;
 110             int linelen = 0;
 111 
 112             while ((len = is.read(buf)) > 0 && !Thread.interrupted()) {
 113                 for(OutputStream out : outStreams) {
 114                     out.write(buf, 0, len);
 115                 }
 116                 if (!linePumps.isEmpty()) {
 117                     int i = 0;
 118                     int lastcrlf = -1;
 119                     while (i < len) {
 120                         if (buf[i] == '\n' || buf[i] == '\r') {
 121                             int bufLinelen = i - lastcrlf - 1;
 122                             if (bufLinelen > 0) {
 123                                 lineBos.write(buf, lastcrlf + 1, bufLinelen);
 124                             }
 125                             linelen += bufLinelen;
 126 
 127                             if (linelen > 0) {
 128                                 lineBos.flush();
 129                                 final String line = lineBos.toString();
 130                                 for (LinePump lp : linePumps) {
 131                                     lp.processLine(line);
 132                                 };
 133 //                                linePumps.stream().forEach((lp) -> {
 134 //                                    lp.processLine(line);
 135 //                                });
 136                                 lineBos.reset();
 137                                 linelen = 0;
 138                             }
 139                             lastcrlf = i;
 140                         }
 141 
 142                         i++;
 143                     }
 144                     if (lastcrlf == -1) {
 145                         lineBos.write(buf, 0, len);
 146                         linelen += len;
 147                     } else if (lastcrlf < len - 1) {
 148                         lineBos.write(buf, lastcrlf + 1, len - lastcrlf - 1);
 149                         linelen += len - lastcrlf - 1;
 150                     }
 151                 }
 152             }
 153 
 154         } catch (IOException e) {

 155             e.printStackTrace();
 156         } finally {
 157             for(OutputStream out : outStreams) {
 158                 try {
 159                     out.flush();
 160                 } catch (IOException e) {}
 161             }
 162             try {
 163                 in.close();
 164             } catch (IOException e) {}

 165         }
 166     }
 167 
 168     final void addOutputStream(OutputStream out) {
 169         outStreams.add(out);
 170     }
 171 
 172     final void addLineProcessor(LinePump lp) {
 173         linePumps.add(lp);
 174     }
 175 
 176     final public StreamPumper addPump(Pump ... pump) {
 177         if (processing.get()) {
 178             throw new IllegalStateException("Can not modify pumper while " +
 179                                             "processing is in progress");
 180         }
 181         for(Pump p : pump) {
 182             p.register(this);
 183         }
 184         return this;
 185     }
 186 
 187     final public Future<Void> process() {
 188         if (!processing.compareAndSet(false, true)) {
 189             throw new IllegalStateException("Can not re-run the processing");
 190         }
 191         Thread t = new Thread(new Runnable() {
 192             @Override
 193             public void run() {
 194                 processingTask.run();
 195             }
 196         });
 197         t.setDaemon(true);
 198         t.start();
 199 
 200         return processingTask;
 201     }
 202 }