1 /*
   2  * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 package jdk.testlibrary;
  25 
  26 import java.io.BufferedInputStream;
  27 import java.io.ByteArrayOutputStream;
  28 import java.io.OutputStream;
  29 import java.io.InputStream;
  30 import java.io.IOException;
  31 import java.util.HashSet;
  32 import java.util.Set;
  33 import java.util.concurrent.Future;
  34 import java.util.concurrent.FutureTask;
  35 import java.util.concurrent.atomic.AtomicBoolean;
  36 
  37 /**
  38  * @deprecated This class is deprecated. Use the one from
  39  *             {@code <root>/test/lib/jdk/test/lib/process}
  40  */
  41 @Deprecated
  42 public final class StreamPumper implements Runnable {
  43 
  44     private static final int BUF_SIZE = 256;
  45 
  46     /**
  47      * Pump will be called by the StreamPumper to process the incoming data
  48      */
  49     abstract public static class Pump {
  50         abstract void register(StreamPumper d);
  51     }
  52 
  53     /**
  54      * OutputStream -> Pump adapter
  55      */
  56     final public static class StreamPump extends Pump {
  57         private final OutputStream out;
  58         public StreamPump(OutputStream out) {
  59             this.out = out;
  60         }
  61 
  62         @Override
  63         void register(StreamPumper sp) {
  64             sp.addOutputStream(out);
  65         }
  66     }
  67 
  68     /**
  69      * Used to process the incoming data line-by-line
  70      */
  71     abstract public static class LinePump extends Pump {
  72         @Override
  73         final void register(StreamPumper sp) {
  74             sp.addLineProcessor(this);
  75         }
  76 
  77         abstract protected void processLine(String line);
  78     }
  79 
  80     private final InputStream in;
  81     private final Set<OutputStream> outStreams = new HashSet<>();
  82     private final Set<LinePump> linePumps = new HashSet<>();
  83 
  84     private final AtomicBoolean processing = new AtomicBoolean(false);
  85     private final FutureTask<Void> processingTask = new FutureTask<>(this, null);
  86 
  87     public StreamPumper(InputStream in) {
  88         this.in = in;
  89     }
  90 
  91     /**
  92      * Create a StreamPumper that reads from in and writes to out.
  93      *
  94      * @param in
  95      *            The stream to read from.
  96      * @param out
  97      *            The stream to write to.
  98      */
  99     public StreamPumper(InputStream in, OutputStream out) {
 100         this(in);
 101         this.addOutputStream(out);
 102     }
 103 
 104     /**
 105      * Implements Thread.run(). Continuously read from {@code in} and write to
 106      * {@code out} until {@code in} has reached end of stream. Abort on
 107      * interruption. Abort on IOExceptions.
 108      */
 109     @Override
 110     public void run() {
 111         try (BufferedInputStream is = new BufferedInputStream(in)) {
 112             ByteArrayOutputStream lineBos = new ByteArrayOutputStream();
 113             byte[] buf = new byte[BUF_SIZE];
 114             int len = 0;
 115             int linelen = 0;
 116 
 117             while ((len = is.read(buf)) > 0 && !Thread.interrupted()) {
 118                 for(OutputStream out : outStreams) {
 119                     out.write(buf, 0, len);
 120                 }
 121                 if (!linePumps.isEmpty()) {
 122                     int i = 0;
 123                     int lastcrlf = -1;
 124                     while (i < len) {
 125                         if (buf[i] == '\n' || buf[i] == '\r') {
 126                             int bufLinelen = i - lastcrlf - 1;
 127                             if (bufLinelen > 0) {
 128                                 lineBos.write(buf, lastcrlf + 1, bufLinelen);
 129                             }
 130                             linelen += bufLinelen;
 131 
 132                             if (linelen > 0) {
 133                                 lineBos.flush();
 134                                 final String line = lineBos.toString();
 135                                 linePumps.stream().forEach((lp) -> {
 136                                     lp.processLine(line);
 137                                 });
 138                                 lineBos.reset();
 139                                 linelen = 0;
 140                             }
 141                             lastcrlf = i;
 142                         }
 143 
 144                         i++;
 145                     }
 146                     if (lastcrlf == -1) {
 147                         lineBos.write(buf, 0, len);
 148                         linelen += len;
 149                     } else if (lastcrlf < len - 1) {
 150                         lineBos.write(buf, lastcrlf + 1, len - lastcrlf - 1);
 151                         linelen += len - lastcrlf - 1;
 152                     }
 153                 }
 154             }
 155 
 156         } catch (IOException e) {
 157             e.printStackTrace();
 158         } finally {
 159             for(OutputStream out : outStreams) {
 160                 try {
 161                     out.flush();
 162                 } catch (IOException e) {}
 163             }
 164             try {
 165                 in.close();
 166             } catch (IOException e) {}
 167         }
 168     }
 169 
 170     final void addOutputStream(OutputStream out) {
 171         outStreams.add(out);
 172     }
 173 
 174     final void addLineProcessor(LinePump lp) {
 175         linePumps.add(lp);
 176     }
 177 
 178     final public StreamPumper addPump(Pump ... pump) {
 179         if (processing.get()) {
 180             throw new IllegalStateException("Can not modify pumper while " +
 181                                             "processing is in progress");
 182         }
 183         for(Pump p : pump) {
 184             p.register(this);
 185         }
 186         return this;
 187     }
 188 
 189     final public Future<Void> process() {
 190         if (!processing.compareAndSet(false, true)) {
 191             throw new IllegalStateException("Can not re-run the processing");
 192         }
 193         Thread t = new Thread(new Runnable() {
 194             @Override
 195             public void run() {
 196                 processingTask.run();
 197             }
 198         });
 199         t.setDaemon(true);
 200         t.start();
 201 
 202         return processingTask;
 203     }
 204 }