88 private final Thread thread;
89 private volatile ServerSocketChannel ssc;
90 private volatile InetSocketAddress address;
91
92 public DummyWebSocketServer() {
93 this(defaultMapping());
94 }
95
96 public DummyWebSocketServer(Function<List<String>, List<String>> mapping) {
97 requireNonNull(mapping);
98 thread = new Thread(() -> {
99 try {
100 while (!Thread.currentThread().isInterrupted()) {
101 log.log(INFO, "Accepting next connection at: " + ssc);
102 SocketChannel channel = ssc.accept();
103 log.log(INFO, "Accepted: " + channel);
104 try {
105 channel.configureBlocking(true);
106 StringBuilder request = new StringBuilder();
107 if (!readRequest(channel, request)) {
108 throw new IOException("Bad request");
109 }
110 List<String> strings = asList(request.toString().split("\r\n"));
111 List<String> response = mapping.apply(strings);
112 writeResponse(channel, response);
113 // Read until the thread is interrupted or an error occurred
114 // or the input is shutdown
115 ByteBuffer b = ByteBuffer.allocate(1024);
116 while (channel.read(b) != -1) {
117 b.clear();
118 }
119 } catch (IOException e) {
120 log.log(TRACE, () -> "Error in connection: " + channel, e);
121 } finally {
122 log.log(INFO, "Closed: " + channel);
123 close(channel);
124 }
125 }
126 } catch (ClosedByInterruptException ignored) {
127 } catch (IOException e) {
128 log.log(ERROR, e);
139 log.log(INFO, "Starting");
140 if (!started.compareAndSet(false, true)) {
141 throw new IllegalStateException("Already started");
142 }
143 ssc = ServerSocketChannel.open();
144 try {
145 ssc.configureBlocking(true);
146 ssc.bind(new InetSocketAddress("localhost", 0));
147 address = (InetSocketAddress) ssc.getLocalAddress();
148 thread.start();
149 } catch (IOException e) {
150 close(ssc);
151 }
152 log.log(INFO, "Started at: " + getURI());
153 }
154
155 @Override
156 public void close() {
157 log.log(INFO, "Stopping: " + getURI());
158 thread.interrupt();
159 }
160
161 URI getURI() {
162 if (!started.get()) {
163 throw new IllegalStateException("Not yet started");
164 }
165 return URI.create("ws://" + address.getHostName() + ":" + address.getPort());
166 }
167
168 private boolean readRequest(SocketChannel channel, StringBuilder request)
169 throws IOException
170 {
171 ByteBuffer buffer = ByteBuffer.allocate(512);
172 int num = channel.read(buffer);
173 if (num == -1) {
174 return false;
175 }
176 CharBuffer decoded;
177 buffer.flip();
178 try {
179 decoded = ISO_8859_1.newDecoder().decode(buffer);
180 } catch (CharacterCodingException e) {
181 throw new UncheckedIOException(e);
182 }
183 request.append(decoded);
184 return Pattern.compile("\r\n\r\n").matcher(request).find();
185 }
186
187 private void writeResponse(SocketChannel channel, List<String> response)
188 throws IOException
189 {
190 String s = response.stream().collect(Collectors.joining("\r\n"))
191 + "\r\n\r\n";
192 ByteBuffer encoded;
193 try {
194 encoded = ISO_8859_1.newEncoder().encode(CharBuffer.wrap(s));
195 } catch (CharacterCodingException e) {
196 throw new UncheckedIOException(e);
197 }
198 while (encoded.hasRemaining()) {
199 channel.write(encoded);
200 }
201 }
202
203 private static Function<List<String>, List<String>> defaultMapping() {
204 return request -> {
|
88 private final Thread thread;
89 private volatile ServerSocketChannel ssc;
90 private volatile InetSocketAddress address;
91
92 public DummyWebSocketServer() {
93 this(defaultMapping());
94 }
95
96 public DummyWebSocketServer(Function<List<String>, List<String>> mapping) {
97 requireNonNull(mapping);
98 thread = new Thread(() -> {
99 try {
100 while (!Thread.currentThread().isInterrupted()) {
101 log.log(INFO, "Accepting next connection at: " + ssc);
102 SocketChannel channel = ssc.accept();
103 log.log(INFO, "Accepted: " + channel);
104 try {
105 channel.configureBlocking(true);
106 StringBuilder request = new StringBuilder();
107 if (!readRequest(channel, request)) {
108 throw new IOException("Bad request:" + request);
109 }
110 List<String> strings = asList(request.toString().split("\r\n"));
111 List<String> response = mapping.apply(strings);
112 writeResponse(channel, response);
113 // Read until the thread is interrupted or an error occurred
114 // or the input is shutdown
115 ByteBuffer b = ByteBuffer.allocate(1024);
116 while (channel.read(b) != -1) {
117 b.clear();
118 }
119 } catch (IOException e) {
120 log.log(TRACE, () -> "Error in connection: " + channel, e);
121 } finally {
122 log.log(INFO, "Closed: " + channel);
123 close(channel);
124 }
125 }
126 } catch (ClosedByInterruptException ignored) {
127 } catch (IOException e) {
128 log.log(ERROR, e);
139 log.log(INFO, "Starting");
140 if (!started.compareAndSet(false, true)) {
141 throw new IllegalStateException("Already started");
142 }
143 ssc = ServerSocketChannel.open();
144 try {
145 ssc.configureBlocking(true);
146 ssc.bind(new InetSocketAddress("localhost", 0));
147 address = (InetSocketAddress) ssc.getLocalAddress();
148 thread.start();
149 } catch (IOException e) {
150 close(ssc);
151 }
152 log.log(INFO, "Started at: " + getURI());
153 }
154
155 @Override
156 public void close() {
157 log.log(INFO, "Stopping: " + getURI());
158 thread.interrupt();
159 close(ssc);
160 }
161
162 URI getURI() {
163 if (!started.get()) {
164 throw new IllegalStateException("Not yet started");
165 }
166 return URI.create("ws://" + address.getHostName() + ":" + address.getPort());
167 }
168
169 private boolean readRequest(SocketChannel channel, StringBuilder request)
170 throws IOException
171 {
172 ByteBuffer buffer = ByteBuffer.allocate(512);
173 while (channel.read(buffer) != -1) {
174 // read the complete HTTP request headers, there should be no body
175 CharBuffer decoded;
176 buffer.flip();
177 try {
178 decoded = ISO_8859_1.newDecoder().decode(buffer);
179 } catch (CharacterCodingException e) {
180 throw new UncheckedIOException(e);
181 }
182 request.append(decoded);
183 if (Pattern.compile("\r\n\r\n").matcher(request).find())
184 return true;
185 buffer.clear();
186 }
187 return false;
188 }
189
190 private void writeResponse(SocketChannel channel, List<String> response)
191 throws IOException
192 {
193 String s = response.stream().collect(Collectors.joining("\r\n"))
194 + "\r\n\r\n";
195 ByteBuffer encoded;
196 try {
197 encoded = ISO_8859_1.newEncoder().encode(CharBuffer.wrap(s));
198 } catch (CharacterCodingException e) {
199 throw new UncheckedIOException(e);
200 }
201 while (encoded.hasRemaining()) {
202 channel.write(encoded);
203 }
204 }
205
206 private static Function<List<String>, List<String>> defaultMapping() {
207 return request -> {
|