< prev index next >
test/lib/jdk/test/lib/process/StreamPumper.java
Print this page
rev 51639 : [mq]: 8210112-1
*** 39,56 ****
private static final int BUF_SIZE = 256;
/**
* Pump will be called by the StreamPumper to process the incoming data
*/
! abstract public static class Pump {
abstract void register(StreamPumper d);
}
/**
* OutputStream -> Pump adapter
*/
! final public static class StreamPump extends Pump {
private final OutputStream out;
public StreamPump(OutputStream out) {
this.out = out;
}
--- 39,56 ----
private static final int BUF_SIZE = 256;
/**
* Pump will be called by the StreamPumper to process the incoming data
*/
! public abstract static class Pump {
abstract void register(StreamPumper d);
}
/**
* OutputStream -> Pump adapter
*/
! public final static class StreamPump extends Pump {
private final OutputStream out;
public StreamPump(OutputStream out) {
this.out = out;
}
*** 61,85 ****
}
/**
* Used to process the incoming data line-by-line
*/
! abstract public static class LinePump extends Pump {
@Override
final void register(StreamPumper sp) {
sp.addLineProcessor(this);
}
! abstract protected void processLine(String line);
}
private final InputStream in;
private final Set<OutputStream> outStreams = new HashSet<>();
private final Set<LinePump> linePumps = new HashSet<>();
private final AtomicBoolean processing = new AtomicBoolean(false);
- private final FutureTask<Void> processingTask = new FutureTask<>(this, null);
public StreamPumper(InputStream in) {
this.in = in;
}
--- 61,84 ----
}
/**
* Used to process the incoming data line-by-line
*/
! public abstract static class LinePump extends Pump {
@Override
final void register(StreamPumper sp) {
sp.addLineProcessor(this);
}
! protected abstract void processLine(String line);
}
private final InputStream in;
private final Set<OutputStream> outStreams = new HashSet<>();
private final Set<LinePump> linePumps = new HashSet<>();
private final AtomicBoolean processing = new AtomicBoolean(false);
public StreamPumper(InputStream in) {
this.in = in;
}
*** 106,116 ****
byte[] buf = new byte[BUF_SIZE];
int len = 0;
int linelen = 0;
while ((len = is.read(buf)) > 0 && !Thread.interrupted()) {
! for(OutputStream out : outStreams) {
out.write(buf, 0, len);
}
if (!linePumps.isEmpty()) {
int i = 0;
int lastcrlf = -1;
--- 105,115 ----
byte[] buf = new byte[BUF_SIZE];
int len = 0;
int linelen = 0;
while ((len = is.read(buf)) > 0 && !Thread.interrupted()) {
! for (OutputStream out : outStreams) {
out.write(buf, 0, len);
}
if (!linePumps.isEmpty()) {
int i = 0;
int lastcrlf = -1;
*** 123,135 ****
linelen += bufLinelen;
if (linelen > 0) {
lineBos.flush();
final String line = lineBos.toString();
! linePumps.stream().forEach((lp) -> {
! lp.processLine(line);
! });
lineBos.reset();
linelen = 0;
}
lastcrlf = i;
}
--- 122,132 ----
linelen += bufLinelen;
if (linelen > 0) {
lineBos.flush();
final String line = lineBos.toString();
! linePumps.forEach((lp) -> lp.processLine(line));
lineBos.reset();
linelen = 0;
}
lastcrlf = i;
}
*** 147,157 ****
}
} catch (IOException e) {
e.printStackTrace();
} finally {
! for(OutputStream out : outStreams) {
try {
out.flush();
} catch (IOException e) {}
}
try {
--- 144,154 ----
}
} catch (IOException e) {
e.printStackTrace();
} finally {
! for (OutputStream out : outStreams) {
try {
out.flush();
} catch (IOException e) {}
}
try {
*** 166,197 ****
final void addLineProcessor(LinePump lp) {
linePumps.add(lp);
}
! final public StreamPumper addPump(Pump ... pump) {
if (processing.get()) {
throw new IllegalStateException("Can not modify pumper while " +
"processing is in progress");
}
! for(Pump p : pump) {
p.register(this);
}
return this;
}
! final public Future<Void> process() {
if (!processing.compareAndSet(false, true)) {
throw new IllegalStateException("Can not re-run the processing");
}
! Thread t = new Thread(new Runnable() {
! @Override
! public void run() {
! processingTask.run();
! }
! });
t.setDaemon(true);
t.start();
! return processingTask;
}
}
--- 163,190 ----
final void addLineProcessor(LinePump lp) {
linePumps.add(lp);
}
! public final StreamPumper addPump(Pump ... pump) {
if (processing.get()) {
throw new IllegalStateException("Can not modify pumper while " +
"processing is in progress");
}
! for (Pump p : pump) {
p.register(this);
}
return this;
}
! public final Future<Void> process() {
if (!processing.compareAndSet(false, true)) {
throw new IllegalStateException("Can not re-run the processing");
}
! FutureTask<Void> result = new FutureTask<>(this, null);
! Thread t = new Thread(result);
t.setDaemon(true);
t.start();
! return result;
}
}
< prev index next >