< prev index next >

test/lib/testlibrary/jdk/testlibrary/StreamPumper.java

Print this page
rev 1531 : 8171415: Remove Java 7 features from testlibrary
Reviewed-by: omajid


  56 
  57         @Override
  58         void register(StreamPumper sp) {
  59             sp.addOutputStream(out);
  60         }
  61     }
  62 
  63     /**
  64      * Used to process the incoming data line-by-line
  65      */
  66     abstract public static class LinePump extends Pump {
  67         @Override
  68         final void register(StreamPumper sp) {
  69             sp.addLineProcessor(this);
  70         }
  71 
  72         abstract protected void processLine(String line);
  73     }
  74 
  75     private final InputStream in;
  76     private final Set<OutputStream> outStreams = new HashSet<>();
  77     private final Set<LinePump> linePumps = new HashSet<>();
  78 
  79     private final AtomicBoolean processing = new AtomicBoolean(false);
  80     private final FutureTask<Void> processingTask = new FutureTask(this, null);
  81 
  82     public StreamPumper(InputStream in) {
  83         this.in = in;
  84     }
  85 
  86     /**
  87      * Create a StreamPumper that reads from in and writes to out.
  88      *
  89      * @param in
  90      *            The stream to read from.
  91      * @param out
  92      *            The stream to write to.
  93      */
  94     public StreamPumper(InputStream in, OutputStream out) {
  95         this(in);
  96         this.addOutputStream(out);
  97     }
  98 
  99     /**
 100      * Implements Thread.run(). Continuously read from {@code in} and write to
 101      * {@code out} until {@code in} has reached end of stream. Abort on
 102      * interruption. Abort on IOExceptions.
 103      */
 104     @Override
 105     public void run() {
 106         try (BufferedInputStream is = new BufferedInputStream(in)) {


 107             ByteArrayOutputStream lineBos = new ByteArrayOutputStream();
 108             byte[] buf = new byte[BUF_SIZE];
 109             int len = 0;
 110             int linelen = 0;
 111 
 112             while ((len = is.read(buf)) > 0 && !Thread.interrupted()) {
 113                 for(OutputStream out : outStreams) {
 114                     out.write(buf, 0, len);
 115                 }
 116                 if (!linePumps.isEmpty()) {
 117                     int i = 0;
 118                     int lastcrlf = -1;
 119                     while (i < len) {
 120                         if (buf[i] == '\n' || buf[i] == '\r') {
 121                             int bufLinelen = i - lastcrlf - 1;
 122                             if (bufLinelen > 0) {
 123                                 lineBos.write(buf, lastcrlf + 1, bufLinelen);
 124                             }
 125                             linelen += bufLinelen;
 126 


 137                         }
 138 
 139                         i++;
 140                     }
 141                     if (lastcrlf == -1) {
 142                         lineBos.write(buf, 0, len);
 143                         linelen += len;
 144                     } else if (lastcrlf < len - 1) {
 145                         lineBos.write(buf, lastcrlf + 1, len - lastcrlf - 1);
 146                         linelen += len - lastcrlf - 1;
 147                     }
 148                 }
 149             }
 150 
 151         } catch (IOException e) {
 152             e.printStackTrace();
 153         } finally {
 154             for(OutputStream out : outStreams) {
 155                 try {
 156                     out.flush();





 157                 } catch (IOException e) {}
 158             }
 159             try {
 160                 in.close();
 161             } catch (IOException e) {}
 162         }
 163     }
 164 
 165     final void addOutputStream(OutputStream out) {
 166         outStreams.add(out);
 167     }
 168 
 169     final void addLineProcessor(LinePump lp) {
 170         linePumps.add(lp);
 171     }
 172 
 173     final public StreamPumper addPump(Pump ... pump) {
 174         if (processing.get()) {
 175             throw new IllegalStateException("Can not modify pumper while " +
 176                                             "processing is in progress");




  56 
  57         @Override
  58         void register(StreamPumper sp) {
  59             sp.addOutputStream(out);
  60         }
  61     }
  62 
  63     /**
  64      * Used to process the incoming data line-by-line
  65      */
  66     abstract public static class LinePump extends Pump {
  67         @Override
  68         final void register(StreamPumper sp) {
  69             sp.addLineProcessor(this);
  70         }
  71 
  72         abstract protected void processLine(String line);
  73     }
  74 
  75     private final InputStream in;
  76     private final Set<OutputStream> outStreams = new HashSet<OutputStream>();
  77     private final Set<LinePump> linePumps = new HashSet<LinePump>();
  78 
  79     private final AtomicBoolean processing = new AtomicBoolean(false);
  80     private final FutureTask<Void> processingTask = new FutureTask<Void>(this, null);
  81 
  82     public StreamPumper(InputStream in) {
  83         this.in = in;
  84     }
  85 
  86     /**
  87      * Create a StreamPumper that reads from in and writes to out.
  88      *
  89      * @param in
  90      *            The stream to read from.
  91      * @param out
  92      *            The stream to write to.
  93      */
  94     public StreamPumper(InputStream in, OutputStream out) {
  95         this(in);
  96         this.addOutputStream(out);
  97     }
  98 
  99     /**
 100      * Implements Thread.run(). Continuously read from {@code in} and write to
 101      * {@code out} until {@code in} has reached end of stream. Abort on
 102      * interruption. Abort on IOExceptions.
 103      */
 104     @Override
 105     public void run() {
 106         BufferedInputStream is = null;
 107         try {
 108             is = new BufferedInputStream(in);
 109             ByteArrayOutputStream lineBos = new ByteArrayOutputStream();
 110             byte[] buf = new byte[BUF_SIZE];
 111             int len = 0;
 112             int linelen = 0;
 113 
 114             while ((len = is.read(buf)) > 0 && !Thread.interrupted()) {
 115                 for(OutputStream out : outStreams) {
 116                     out.write(buf, 0, len);
 117                 }
 118                 if (!linePumps.isEmpty()) {
 119                     int i = 0;
 120                     int lastcrlf = -1;
 121                     while (i < len) {
 122                         if (buf[i] == '\n' || buf[i] == '\r') {
 123                             int bufLinelen = i - lastcrlf - 1;
 124                             if (bufLinelen > 0) {
 125                                 lineBos.write(buf, lastcrlf + 1, bufLinelen);
 126                             }
 127                             linelen += bufLinelen;
 128 


 139                         }
 140 
 141                         i++;
 142                     }
 143                     if (lastcrlf == -1) {
 144                         lineBos.write(buf, 0, len);
 145                         linelen += len;
 146                     } else if (lastcrlf < len - 1) {
 147                         lineBos.write(buf, lastcrlf + 1, len - lastcrlf - 1);
 148                         linelen += len - lastcrlf - 1;
 149                     }
 150                 }
 151             }
 152 
 153         } catch (IOException e) {
 154             e.printStackTrace();
 155         } finally {
 156             for(OutputStream out : outStreams) {
 157                 try {
 158                     out.flush();
 159                 } catch (IOException e) {}
 160             }
 161             if (is != null) {
 162                 try {
 163                     is.close();
 164                 } catch (IOException e) {}
 165             }
 166             try {
 167                 in.close();
 168             } catch (IOException e) {}
 169         }
 170     }
 171 
 172     final void addOutputStream(OutputStream out) {
 173         outStreams.add(out);
 174     }
 175 
 176     final void addLineProcessor(LinePump lp) {
 177         linePumps.add(lp);
 178     }
 179 
 180     final public StreamPumper addPump(Pump ... pump) {
 181         if (processing.get()) {
 182             throw new IllegalStateException("Can not modify pumper while " +
 183                                             "processing is in progress");


< prev index next >