< prev index next >

test/lib/jdk/test/lib/process/StreamPumper.java

Print this page
rev 51639 : [mq]: 8210112-1

@@ -39,18 +39,18 @@
     private static final int BUF_SIZE = 256;
 
     /**
      * Pump will be called by the StreamPumper to process the incoming data
      */
-    abstract public static class Pump {
+    public abstract static class Pump {
         abstract void register(StreamPumper d);
     }
 
     /**
      * OutputStream -> Pump adapter
      */
-    final public static class StreamPump extends Pump {
+    public final static class StreamPump extends Pump {
         private final OutputStream out;
         public StreamPump(OutputStream out) {
             this.out = out;
         }
 

@@ -61,25 +61,24 @@
     }
 
     /**
      * Used to process the incoming data line-by-line
      */
-    abstract public static class LinePump extends Pump {
+    public abstract static class LinePump extends Pump {
         @Override
         final void register(StreamPumper sp) {
             sp.addLineProcessor(this);
         }
 
-        abstract protected void processLine(String line);
+        protected abstract void processLine(String line);
     }
 
     private final InputStream in;
     private final Set<OutputStream> outStreams = new HashSet<>();
     private final Set<LinePump> linePumps = new HashSet<>();
 
     private final AtomicBoolean processing = new AtomicBoolean(false);
-    private final FutureTask<Void> processingTask = new FutureTask<>(this, null);
 
     public StreamPumper(InputStream in) {
         this.in = in;
     }
 

@@ -106,11 +105,11 @@
             byte[] buf = new byte[BUF_SIZE];
             int len = 0;
             int linelen = 0;
 
             while ((len = is.read(buf)) > 0 && !Thread.interrupted()) {
-                for(OutputStream out : outStreams) {
+                for (OutputStream out : outStreams) {
                     out.write(buf, 0, len);
                 }
                 if (!linePumps.isEmpty()) {
                     int i = 0;
                     int lastcrlf = -1;

@@ -123,13 +122,11 @@
                             linelen += bufLinelen;
 
                             if (linelen > 0) {
                                 lineBos.flush();
                                 final String line = lineBos.toString();
-                                linePumps.stream().forEach((lp) -> {
-                                    lp.processLine(line);
-                                });
+                                linePumps.forEach((lp) -> lp.processLine(line));
                                 lineBos.reset();
                                 linelen = 0;
                             }
                             lastcrlf = i;
                         }

@@ -147,11 +144,11 @@
             }
 
         } catch (IOException e) {
             e.printStackTrace();
         } finally {
-            for(OutputStream out : outStreams) {
+            for (OutputStream out : outStreams) {
                 try {
                     out.flush();
                 } catch (IOException e) {}
             }
             try {

@@ -166,32 +163,28 @@
 
     final void addLineProcessor(LinePump lp) {
         linePumps.add(lp);
     }
 
-    final public StreamPumper addPump(Pump ... pump) {
+    public final StreamPumper addPump(Pump ... pump) {
         if (processing.get()) {
             throw new IllegalStateException("Can not modify pumper while " +
                                             "processing is in progress");
         }
-        for(Pump p : pump) {
+        for (Pump p : pump) {
             p.register(this);
         }
         return this;
     }
 
-    final public Future<Void> process() {
+    public final Future<Void> process() {
         if (!processing.compareAndSet(false, true)) {
             throw new IllegalStateException("Can not re-run the processing");
         }
-        Thread t = new Thread(new Runnable() {
-            @Override
-            public void run() {
-                processingTask.run();
-            }
-        });
+        FutureTask<Void> result = new FutureTask<>(this, null);
+        Thread t = new Thread(result);
         t.setDaemon(true);
         t.start();
 
-        return processingTask;
+        return result;
     }
 }
< prev index next >