< prev index next >
test/lib/jdk/test/lib/process/StreamPumper.java
Print this page
rev 51639 : [mq]: 8210112-1
@@ -39,18 +39,18 @@
private static final int BUF_SIZE = 256;
/**
* Pump will be called by the StreamPumper to process the incoming data
*/
- abstract public static class Pump {
+ public abstract static class Pump {
abstract void register(StreamPumper d);
}
/**
* OutputStream -> Pump adapter
*/
- final public static class StreamPump extends Pump {
+ public final static class StreamPump extends Pump {
private final OutputStream out;
public StreamPump(OutputStream out) {
this.out = out;
}
@@ -61,25 +61,24 @@
}
/**
* Used to process the incoming data line-by-line
*/
- abstract public static class LinePump extends Pump {
+ public abstract static class LinePump extends Pump {
@Override
final void register(StreamPumper sp) {
sp.addLineProcessor(this);
}
- abstract protected void processLine(String line);
+ protected abstract void processLine(String line);
}
private final InputStream in;
private final Set<OutputStream> outStreams = new HashSet<>();
private final Set<LinePump> linePumps = new HashSet<>();
private final AtomicBoolean processing = new AtomicBoolean(false);
- private final FutureTask<Void> processingTask = new FutureTask<>(this, null);
public StreamPumper(InputStream in) {
this.in = in;
}
@@ -106,11 +105,11 @@
byte[] buf = new byte[BUF_SIZE];
int len = 0;
int linelen = 0;
while ((len = is.read(buf)) > 0 && !Thread.interrupted()) {
- for(OutputStream out : outStreams) {
+ for (OutputStream out : outStreams) {
out.write(buf, 0, len);
}
if (!linePumps.isEmpty()) {
int i = 0;
int lastcrlf = -1;
@@ -123,13 +122,11 @@
linelen += bufLinelen;
if (linelen > 0) {
lineBos.flush();
final String line = lineBos.toString();
- linePumps.stream().forEach((lp) -> {
- lp.processLine(line);
- });
+ linePumps.forEach((lp) -> lp.processLine(line));
lineBos.reset();
linelen = 0;
}
lastcrlf = i;
}
@@ -147,11 +144,11 @@
}
} catch (IOException e) {
e.printStackTrace();
} finally {
- for(OutputStream out : outStreams) {
+ for (OutputStream out : outStreams) {
try {
out.flush();
} catch (IOException e) {}
}
try {
@@ -166,32 +163,28 @@
final void addLineProcessor(LinePump lp) {
linePumps.add(lp);
}
- final public StreamPumper addPump(Pump ... pump) {
+ public final StreamPumper addPump(Pump ... pump) {
if (processing.get()) {
throw new IllegalStateException("Can not modify pumper while " +
"processing is in progress");
}
- for(Pump p : pump) {
+ for (Pump p : pump) {
p.register(this);
}
return this;
}
- final public Future<Void> process() {
+ public final Future<Void> process() {
if (!processing.compareAndSet(false, true)) {
throw new IllegalStateException("Can not re-run the processing");
}
- Thread t = new Thread(new Runnable() {
- @Override
- public void run() {
- processingTask.run();
- }
- });
+ FutureTask<Void> result = new FutureTask<>(this, null);
+ Thread t = new Thread(result);
t.setDaemon(true);
t.start();
- return processingTask;
+ return result;
}
}
< prev index next >