< prev index next >

test/lib/jdk/test/lib/process/StreamPumper.java

Print this page
rev 51639 : [mq]: 8210112-1

*** 39,56 **** private static final int BUF_SIZE = 256; /** * Pump will be called by the StreamPumper to process the incoming data */ ! abstract public static class Pump { abstract void register(StreamPumper d); } /** * OutputStream -> Pump adapter */ ! final public static class StreamPump extends Pump { private final OutputStream out; public StreamPump(OutputStream out) { this.out = out; } --- 39,56 ---- private static final int BUF_SIZE = 256; /** * Pump will be called by the StreamPumper to process the incoming data */ ! public abstract static class Pump { abstract void register(StreamPumper d); } /** * OutputStream -> Pump adapter */ ! public final static class StreamPump extends Pump { private final OutputStream out; public StreamPump(OutputStream out) { this.out = out; }
*** 61,85 **** } /** * Used to process the incoming data line-by-line */ ! abstract public static class LinePump extends Pump { @Override final void register(StreamPumper sp) { sp.addLineProcessor(this); } ! abstract protected void processLine(String line); } private final InputStream in; private final Set<OutputStream> outStreams = new HashSet<>(); private final Set<LinePump> linePumps = new HashSet<>(); private final AtomicBoolean processing = new AtomicBoolean(false); - private final FutureTask<Void> processingTask = new FutureTask<>(this, null); public StreamPumper(InputStream in) { this.in = in; } --- 61,84 ---- } /** * Used to process the incoming data line-by-line */ ! public abstract static class LinePump extends Pump { @Override final void register(StreamPumper sp) { sp.addLineProcessor(this); } ! protected abstract void processLine(String line); } private final InputStream in; private final Set<OutputStream> outStreams = new HashSet<>(); private final Set<LinePump> linePumps = new HashSet<>(); private final AtomicBoolean processing = new AtomicBoolean(false); public StreamPumper(InputStream in) { this.in = in; }
*** 106,116 **** byte[] buf = new byte[BUF_SIZE]; int len = 0; int linelen = 0; while ((len = is.read(buf)) > 0 && !Thread.interrupted()) { ! for(OutputStream out : outStreams) { out.write(buf, 0, len); } if (!linePumps.isEmpty()) { int i = 0; int lastcrlf = -1; --- 105,115 ---- byte[] buf = new byte[BUF_SIZE]; int len = 0; int linelen = 0; while ((len = is.read(buf)) > 0 && !Thread.interrupted()) { ! for (OutputStream out : outStreams) { out.write(buf, 0, len); } if (!linePumps.isEmpty()) { int i = 0; int lastcrlf = -1;
*** 123,135 **** linelen += bufLinelen; if (linelen > 0) { lineBos.flush(); final String line = lineBos.toString(); ! linePumps.stream().forEach((lp) -> { ! lp.processLine(line); ! }); lineBos.reset(); linelen = 0; } lastcrlf = i; } --- 122,132 ---- linelen += bufLinelen; if (linelen > 0) { lineBos.flush(); final String line = lineBos.toString(); ! linePumps.forEach((lp) -> lp.processLine(line)); lineBos.reset(); linelen = 0; } lastcrlf = i; }
*** 147,157 **** } } catch (IOException e) { e.printStackTrace(); } finally { ! for(OutputStream out : outStreams) { try { out.flush(); } catch (IOException e) {} } try { --- 144,154 ---- } } catch (IOException e) { e.printStackTrace(); } finally { ! for (OutputStream out : outStreams) { try { out.flush(); } catch (IOException e) {} } try {
*** 166,197 **** final void addLineProcessor(LinePump lp) { linePumps.add(lp); } ! final public StreamPumper addPump(Pump ... pump) { if (processing.get()) { throw new IllegalStateException("Can not modify pumper while " + "processing is in progress"); } ! for(Pump p : pump) { p.register(this); } return this; } ! final public Future<Void> process() { if (!processing.compareAndSet(false, true)) { throw new IllegalStateException("Can not re-run the processing"); } ! Thread t = new Thread(new Runnable() { ! @Override ! public void run() { ! processingTask.run(); ! } ! }); t.setDaemon(true); t.start(); ! return processingTask; } } --- 163,190 ---- final void addLineProcessor(LinePump lp) { linePumps.add(lp); } ! public final StreamPumper addPump(Pump ... pump) { if (processing.get()) { throw new IllegalStateException("Can not modify pumper while " + "processing is in progress"); } ! for (Pump p : pump) { p.register(this); } return this; } ! public final Future<Void> process() { if (!processing.compareAndSet(false, true)) { throw new IllegalStateException("Can not re-run the processing"); } ! FutureTask<Void> result = new FutureTask<>(this, null); ! Thread t = new Thread(result); t.setDaemon(true); t.start(); ! return result; } }
< prev index next >