< prev index next >

test/lib/jdk/test/lib/process/StreamPumper.java

Print this page
rev 51639 : [mq]: 8210112-1


  24 package jdk.test.lib.process;
  25 
  26 import java.io.BufferedInputStream;
  27 import java.io.ByteArrayOutputStream;
  28 import java.io.OutputStream;
  29 import java.io.InputStream;
  30 import java.io.IOException;
  31 import java.util.HashSet;
  32 import java.util.Set;
  33 import java.util.concurrent.Future;
  34 import java.util.concurrent.FutureTask;
  35 import java.util.concurrent.atomic.AtomicBoolean;
  36 
  37 public final class StreamPumper implements Runnable {
  38 
  39     private static final int BUF_SIZE = 256;
  40 
  41     /**
  42      * Pump will be called by the StreamPumper to process the incoming data
  43      */
  44     abstract public static class Pump {
  45         abstract void register(StreamPumper d);
  46     }
  47 
  48     /**
  49      * OutputStream -> Pump adapter
  50      */
  51     final public static class StreamPump extends Pump {
  52         private final OutputStream out;
  53         public StreamPump(OutputStream out) {
  54             this.out = out;
  55         }
  56 
  57         @Override
  58         void register(StreamPumper sp) {
  59             sp.addOutputStream(out);
  60         }
  61     }
  62 
  63     /**
  64      * Used to process the incoming data line-by-line
  65      */
  66     abstract public static class LinePump extends Pump {
  67         @Override
  68         final void register(StreamPumper sp) {
  69             sp.addLineProcessor(this);
  70         }
  71 
  72         abstract protected void processLine(String line);
  73     }
  74 
  75     private final InputStream in;
  76     private final Set<OutputStream> outStreams = new HashSet<>();
  77     private final Set<LinePump> linePumps = new HashSet<>();
  78 
  79     private final AtomicBoolean processing = new AtomicBoolean(false);
  80     private final FutureTask<Void> processingTask = new FutureTask<>(this, null);
  81 
  82     public StreamPumper(InputStream in) {
  83         this.in = in;
  84     }
  85 
  86     /**
  87      * Create a StreamPumper that reads from in and writes to out.
  88      *
  89      * @param in The stream to read from.
  90      * @param out The stream to write to.
  91      */
  92     public StreamPumper(InputStream in, OutputStream out) {
  93         this(in);
  94         this.addOutputStream(out);
  95     }
  96 
  97     /**
  98      * Implements Thread.run(). Continuously read from {@code in} and write to
  99      * {@code out} until {@code in} has reached end of stream. Abort on
 100      * interruption. Abort on IOExceptions.
 101      */
 102     @Override
 103     public void run() {
 104         try (BufferedInputStream is = new BufferedInputStream(in)) {
 105             ByteArrayOutputStream lineBos = new ByteArrayOutputStream();
 106             byte[] buf = new byte[BUF_SIZE];
 107             int len = 0;
 108             int linelen = 0;
 109 
 110             while ((len = is.read(buf)) > 0 && !Thread.interrupted()) {
 111                 for(OutputStream out : outStreams) {
 112                     out.write(buf, 0, len);
 113                 }
 114                 if (!linePumps.isEmpty()) {
 115                     int i = 0;
 116                     int lastcrlf = -1;
 117                     while (i < len) {
 118                         if (buf[i] == '\n' || buf[i] == '\r') {
 119                             int bufLinelen = i - lastcrlf - 1;
 120                             if (bufLinelen > 0) {
 121                                 lineBos.write(buf, lastcrlf + 1, bufLinelen);
 122                             }
 123                             linelen += bufLinelen;
 124 
 125                             if (linelen > 0) {
 126                                 lineBos.flush();
 127                                 final String line = lineBos.toString();
 128                                 linePumps.stream().forEach((lp) -> {
 129                                     lp.processLine(line);
 130                                 });
 131                                 lineBos.reset();
 132                                 linelen = 0;
 133                             }
 134                             lastcrlf = i;
 135                         }
 136 
 137                         i++;
 138                     }
 139                     if (lastcrlf == -1) {
 140                         lineBos.write(buf, 0, len);
 141                         linelen += len;
 142                     } else if (lastcrlf < len - 1) {
 143                         lineBos.write(buf, lastcrlf + 1, len - lastcrlf - 1);
 144                         linelen += len - lastcrlf - 1;
 145                     }
 146                 }
 147             }
 148 
 149         } catch (IOException e) {
 150             e.printStackTrace();
 151         } finally {
 152             for(OutputStream out : outStreams) {
 153                 try {
 154                     out.flush();
 155                 } catch (IOException e) {}
 156             }
 157             try {
 158                 in.close();
 159             } catch (IOException e) {}
 160         }
 161     }
 162 
 163     final void addOutputStream(OutputStream out) {
 164         outStreams.add(out);
 165     }
 166 
 167     final void addLineProcessor(LinePump lp) {
 168         linePumps.add(lp);
 169     }
 170 
 171     final public StreamPumper addPump(Pump ... pump) {
 172         if (processing.get()) {
 173             throw new IllegalStateException("Can not modify pumper while " +
 174                                             "processing is in progress");
 175         }
 176         for(Pump p : pump) {
 177             p.register(this);
 178         }
 179         return this;
 180     }
 181 
 182     final public Future<Void> process() {
 183         if (!processing.compareAndSet(false, true)) {
 184             throw new IllegalStateException("Can not re-run the processing");
 185         }
 186         Thread t = new Thread(new Runnable() {
 187             @Override
 188             public void run() {
 189                 processingTask.run();
 190             }
 191         });
 192         t.setDaemon(true);
 193         t.start();
 194 
 195         return processingTask;
 196     }
 197 }


  24 package jdk.test.lib.process;
  25 
  26 import java.io.BufferedInputStream;
  27 import java.io.ByteArrayOutputStream;
  28 import java.io.OutputStream;
  29 import java.io.InputStream;
  30 import java.io.IOException;
  31 import java.util.HashSet;
  32 import java.util.Set;
  33 import java.util.concurrent.Future;
  34 import java.util.concurrent.FutureTask;
  35 import java.util.concurrent.atomic.AtomicBoolean;
  36 
  37 public final class StreamPumper implements Runnable {
  38 
  39     private static final int BUF_SIZE = 256;
  40 
  41     /**
  42      * Pump will be called by the StreamPumper to process the incoming data
  43      */
  44     public abstract static class Pump {
  45         abstract void register(StreamPumper d);
  46     }
  47 
  48     /**
  49      * OutputStream -> Pump adapter
  50      */
  51     public final static class StreamPump extends Pump {
  52         private final OutputStream out;
  53         public StreamPump(OutputStream out) {
  54             this.out = out;
  55         }
  56 
  57         @Override
  58         void register(StreamPumper sp) {
  59             sp.addOutputStream(out);
  60         }
  61     }
  62 
  63     /**
  64      * Used to process the incoming data line-by-line
  65      */
  66     public abstract static class LinePump extends Pump {
  67         @Override
  68         final void register(StreamPumper sp) {
  69             sp.addLineProcessor(this);
  70         }
  71 
  72         protected abstract void processLine(String line);
  73     }
  74 
  75     private final InputStream in;
  76     private final Set<OutputStream> outStreams = new HashSet<>();
  77     private final Set<LinePump> linePumps = new HashSet<>();
  78 
  79     private final AtomicBoolean processing = new AtomicBoolean(false);

  80 
  81     public StreamPumper(InputStream in) {
  82         this.in = in;
  83     }
  84 
  85     /**
  86      * Create a StreamPumper that reads from in and writes to out.
  87      *
  88      * @param in The stream to read from.
  89      * @param out The stream to write to.
  90      */
  91     public StreamPumper(InputStream in, OutputStream out) {
  92         this(in);
  93         this.addOutputStream(out);
  94     }
  95 
  96     /**
  97      * Implements Thread.run(). Continuously read from {@code in} and write to
  98      * {@code out} until {@code in} has reached end of stream. Abort on
  99      * interruption. Abort on IOExceptions.
 100      */
 101     @Override
 102     public void run() {
 103         try (BufferedInputStream is = new BufferedInputStream(in)) {
 104             ByteArrayOutputStream lineBos = new ByteArrayOutputStream();
 105             byte[] buf = new byte[BUF_SIZE];
 106             int len = 0;
 107             int linelen = 0;
 108 
 109             while ((len = is.read(buf)) > 0 && !Thread.interrupted()) {
 110                 for (OutputStream out : outStreams) {
 111                     out.write(buf, 0, len);
 112                 }
 113                 if (!linePumps.isEmpty()) {
 114                     int i = 0;
 115                     int lastcrlf = -1;
 116                     while (i < len) {
 117                         if (buf[i] == '\n' || buf[i] == '\r') {
 118                             int bufLinelen = i - lastcrlf - 1;
 119                             if (bufLinelen > 0) {
 120                                 lineBos.write(buf, lastcrlf + 1, bufLinelen);
 121                             }
 122                             linelen += bufLinelen;
 123 
 124                             if (linelen > 0) {
 125                                 lineBos.flush();
 126                                 final String line = lineBos.toString();
 127                                 linePumps.forEach((lp) -> lp.processLine(line));


 128                                 lineBos.reset();
 129                                 linelen = 0;
 130                             }
 131                             lastcrlf = i;
 132                         }
 133 
 134                         i++;
 135                     }
 136                     if (lastcrlf == -1) {
 137                         lineBos.write(buf, 0, len);
 138                         linelen += len;
 139                     } else if (lastcrlf < len - 1) {
 140                         lineBos.write(buf, lastcrlf + 1, len - lastcrlf - 1);
 141                         linelen += len - lastcrlf - 1;
 142                     }
 143                 }
 144             }
 145 
 146         } catch (IOException e) {
 147             e.printStackTrace();
 148         } finally {
 149             for (OutputStream out : outStreams) {
 150                 try {
 151                     out.flush();
 152                 } catch (IOException e) {}
 153             }
 154             try {
 155                 in.close();
 156             } catch (IOException e) {}
 157         }
 158     }
 159 
 160     final void addOutputStream(OutputStream out) {
 161         outStreams.add(out);
 162     }
 163 
 164     final void addLineProcessor(LinePump lp) {
 165         linePumps.add(lp);
 166     }
 167 
 168     public final StreamPumper addPump(Pump ... pump) {
 169         if (processing.get()) {
 170             throw new IllegalStateException("Can not modify pumper while " +
 171                                             "processing is in progress");
 172         }
 173         for (Pump p : pump) {
 174             p.register(this);
 175         }
 176         return this;
 177     }
 178 
 179     public final Future<Void> process() {
 180         if (!processing.compareAndSet(false, true)) {
 181             throw new IllegalStateException("Can not re-run the processing");
 182         }
 183         FutureTask<Void> result = new FutureTask<>(this, null);
 184         Thread t = new Thread(result);




 185         t.setDaemon(true);
 186         t.start();
 187 
 188         return result;
 189     }
 190 }
< prev index next >