56
57 @Override
58 void register(StreamPumper sp) {
59 sp.addOutputStream(out);
60 }
61 }
62
63 /**
64 * Used to process the incoming data line-by-line
65 */
66 abstract public static class LinePump extends Pump {
67 @Override
68 final void register(StreamPumper sp) {
69 sp.addLineProcessor(this);
70 }
71
72 abstract protected void processLine(String line);
73 }
74
75 private final InputStream in;
76 private final Set<OutputStream> outStreams = new HashSet<>();
77 private final Set<LinePump> linePumps = new HashSet<>();
78
79 private final AtomicBoolean processing = new AtomicBoolean(false);
80 private final FutureTask<Void> processingTask = new FutureTask(this, null);
81
82 public StreamPumper(InputStream in) {
83 this.in = in;
84 }
85
86 /**
87 * Create a StreamPumper that reads from in and writes to out.
88 *
89 * @param in
90 * The stream to read from.
91 * @param out
92 * The stream to write to.
93 */
94 public StreamPumper(InputStream in, OutputStream out) {
95 this(in);
96 this.addOutputStream(out);
97 }
98
99 /**
100 * Implements Thread.run(). Continuously read from {@code in} and write to
101 * {@code out} until {@code in} has reached end of stream. Abort on
102 * interruption. Abort on IOExceptions.
103 */
104 @Override
105 public void run() {
106 try (BufferedInputStream is = new BufferedInputStream(in)) {
107 ByteArrayOutputStream lineBos = new ByteArrayOutputStream();
108 byte[] buf = new byte[BUF_SIZE];
109 int len = 0;
110 int linelen = 0;
111
112 while ((len = is.read(buf)) > 0 && !Thread.interrupted()) {
113 for(OutputStream out : outStreams) {
114 out.write(buf, 0, len);
115 }
116 if (!linePumps.isEmpty()) {
117 int i = 0;
118 int lastcrlf = -1;
119 while (i < len) {
120 if (buf[i] == '\n' || buf[i] == '\r') {
121 int bufLinelen = i - lastcrlf - 1;
122 if (bufLinelen > 0) {
123 lineBos.write(buf, lastcrlf + 1, bufLinelen);
124 }
125 linelen += bufLinelen;
126
137 }
138
139 i++;
140 }
141 if (lastcrlf == -1) {
142 lineBos.write(buf, 0, len);
143 linelen += len;
144 } else if (lastcrlf < len - 1) {
145 lineBos.write(buf, lastcrlf + 1, len - lastcrlf - 1);
146 linelen += len - lastcrlf - 1;
147 }
148 }
149 }
150
151 } catch (IOException e) {
152 e.printStackTrace();
153 } finally {
154 for(OutputStream out : outStreams) {
155 try {
156 out.flush();
157 } catch (IOException e) {}
158 }
159 try {
160 in.close();
161 } catch (IOException e) {}
162 }
163 }
164
165 final void addOutputStream(OutputStream out) {
166 outStreams.add(out);
167 }
168
169 final void addLineProcessor(LinePump lp) {
170 linePumps.add(lp);
171 }
172
173 final public StreamPumper addPump(Pump ... pump) {
174 if (processing.get()) {
175 throw new IllegalStateException("Can not modify pumper while " +
176 "processing is in progress");
|
56
57 @Override
58 void register(StreamPumper sp) {
59 sp.addOutputStream(out);
60 }
61 }
62
63 /**
64 * Used to process the incoming data line-by-line
65 */
66 abstract public static class LinePump extends Pump {
67 @Override
68 final void register(StreamPumper sp) {
69 sp.addLineProcessor(this);
70 }
71
72 abstract protected void processLine(String line);
73 }
74
75 private final InputStream in;
76 private final Set<OutputStream> outStreams = new HashSet<OutputStream>();
77 private final Set<LinePump> linePumps = new HashSet<LinePump>();
78
79 private final AtomicBoolean processing = new AtomicBoolean(false);
80 private final FutureTask<Void> processingTask = new FutureTask<Void>(this, null);
81
82 public StreamPumper(InputStream in) {
83 this.in = in;
84 }
85
86 /**
87 * Create a StreamPumper that reads from in and writes to out.
88 *
89 * @param in
90 * The stream to read from.
91 * @param out
92 * The stream to write to.
93 */
94 public StreamPumper(InputStream in, OutputStream out) {
95 this(in);
96 this.addOutputStream(out);
97 }
98
99 /**
100 * Implements Thread.run(). Continuously read from {@code in} and write to
101 * {@code out} until {@code in} has reached end of stream. Abort on
102 * interruption. Abort on IOExceptions.
103 */
104 @Override
105 public void run() {
106 BufferedInputStream is = null;
107 try {
108 is = new BufferedInputStream(in);
109 ByteArrayOutputStream lineBos = new ByteArrayOutputStream();
110 byte[] buf = new byte[BUF_SIZE];
111 int len = 0;
112 int linelen = 0;
113
114 while ((len = is.read(buf)) > 0 && !Thread.interrupted()) {
115 for(OutputStream out : outStreams) {
116 out.write(buf, 0, len);
117 }
118 if (!linePumps.isEmpty()) {
119 int i = 0;
120 int lastcrlf = -1;
121 while (i < len) {
122 if (buf[i] == '\n' || buf[i] == '\r') {
123 int bufLinelen = i - lastcrlf - 1;
124 if (bufLinelen > 0) {
125 lineBos.write(buf, lastcrlf + 1, bufLinelen);
126 }
127 linelen += bufLinelen;
128
139 }
140
141 i++;
142 }
143 if (lastcrlf == -1) {
144 lineBos.write(buf, 0, len);
145 linelen += len;
146 } else if (lastcrlf < len - 1) {
147 lineBos.write(buf, lastcrlf + 1, len - lastcrlf - 1);
148 linelen += len - lastcrlf - 1;
149 }
150 }
151 }
152
153 } catch (IOException e) {
154 e.printStackTrace();
155 } finally {
156 for(OutputStream out : outStreams) {
157 try {
158 out.flush();
159 } catch (IOException e) {}
160 }
161 if (is != null) {
162 try {
163 is.close();
164 } catch (IOException e) {}
165 }
166 try {
167 in.close();
168 } catch (IOException e) {}
169 }
170 }
171
172 final void addOutputStream(OutputStream out) {
173 outStreams.add(out);
174 }
175
176 final void addLineProcessor(LinePump lp) {
177 linePumps.add(lp);
178 }
179
180 final public StreamPumper addPump(Pump ... pump) {
181 if (processing.get()) {
182 throw new IllegalStateException("Can not modify pumper while " +
183 "processing is in progress");
|