68 /** object to notify for new connections from remote endpoint */
69 private TCPChannel channel;
70
71 /** input stream for underlying single connection */
72 private InputStream in;
73
74 /** output stream for underlying single connection */
75 private OutputStream out;
76
77 /** true if underlying connection originated from this endpoint
78 (used for generating unique connection IDs) */
79 private boolean orig;
80
81 /** layered stream for reading formatted data from underlying connection */
82 private DataInputStream dataIn;
83
84 /** layered stream for writing formatted data to underlying connection */
85 private DataOutputStream dataOut;
86
87 /** table holding currently open connection IDs and related info */
88 private Hashtable connectionTable = new Hashtable(7);
89
90 /** number of currently open connections */
91 private int numConnections = 0;
92
93 /** maximum allowed open connections */
94 private final static int maxConnections = 256;
95
96 /** ID of last connection opened */
97 private int lastID = 0x1001;
98
99 /** true if this mechanism is still alive */
100 private boolean alive = true;
101
102 /**
103 * Create a new ConnectionMultiplexer using the given underlying
104 * input/output stream pair. The run method must be called
105 * (possibly on a new thread) to handle the demultiplexing.
106 * @param channel object to notify when new connection is received
107 * @param in input stream of underlying connection
108 * @param out output stream of underlying connection
114 InputStream in,
115 OutputStream out,
116 boolean orig)
117 {
118 this.channel = channel;
119 this.in = in;
120 this.out = out;
121 this.orig = orig;
122
123 dataIn = new DataInputStream(in);
124 dataOut = new DataOutputStream(out);
125 }
126
127 /**
128 * Process multiplexing protocol received from underlying connection.
129 */
130 public void run() throws IOException
131 {
132 try {
133 int op, id, length;
134 Integer idObj;
135 MultiplexConnectionInfo info;
136
137 while (true) {
138
139 // read next op code from remote endpoint
140 op = dataIn.readUnsignedByte();
141 switch (op) {
142
143 // remote endpoint initiating new connection
144 case OPEN:
145 id = dataIn.readUnsignedShort();
146
147 if (multiplexLog.isLoggable(Log.VERBOSE)) {
148 multiplexLog.log(Log.VERBOSE, "operation OPEN " + id);
149 }
150
151 idObj = new Integer(id);
152 info =
153 (MultiplexConnectionInfo) connectionTable.get(idObj);
154 if (info != null)
155 throw new IOException(
156 "OPEN: Connection ID already exists");
157 info = new MultiplexConnectionInfo(id);
158 info.in = new MultiplexInputStream(this, info, 2048);
159 info.out = new MultiplexOutputStream(this, info, 2048);
160 synchronized (connectionTable) {
161 connectionTable.put(idObj, info);
162 ++ numConnections;
163 }
164 sun.rmi.transport.Connection conn;
165 conn = new TCPConnection(channel, info.in, info.out);
166 channel.acceptMultiplexConnection(conn);
167 break;
168
169 // remote endpoint closing connection
170 case CLOSE:
171 id = dataIn.readUnsignedShort();
172
173 if (multiplexLog.isLoggable(Log.VERBOSE)) {
174 multiplexLog.log(Log.VERBOSE, "operation CLOSE " + id);
175 }
176
177 idObj = new Integer(id);
178 info =
179 (MultiplexConnectionInfo) connectionTable.get(idObj);
180 if (info == null)
181 throw new IOException(
182 "CLOSE: Invalid connection ID");
183 info.in.disconnect();
184 info.out.disconnect();
185 if (!info.closed)
186 sendCloseAck(info);
187 synchronized (connectionTable) {
188 connectionTable.remove(idObj);
189 -- numConnections;
190 }
191 break;
192
193 // remote endpoint acknowledging close of connection
194 case CLOSEACK:
195 id = dataIn.readUnsignedShort();
196
197 if (multiplexLog.isLoggable(Log.VERBOSE)) {
198 multiplexLog.log(Log.VERBOSE,
199 "operation CLOSEACK " + id);
200 }
201
202 idObj = new Integer(id);
203 info =
204 (MultiplexConnectionInfo) connectionTable.get(idObj);
205 if (info == null)
206 throw new IOException(
207 "CLOSEACK: Invalid connection ID");
208 if (!info.closed)
209 throw new IOException(
210 "CLOSEACK: Connection not closed");
211 info.in.disconnect();
212 info.out.disconnect();
213 synchronized (connectionTable) {
214 connectionTable.remove(idObj);
215 -- numConnections;
216 }
217 break;
218
219 // remote endpoint declaring additional bytes receivable
220 case REQUEST:
221 id = dataIn.readUnsignedShort();
222 idObj = new Integer(id);
223 info =
224 (MultiplexConnectionInfo) connectionTable.get(idObj);
225 if (info == null)
226 throw new IOException(
227 "REQUEST: Invalid connection ID");
228 length = dataIn.readInt();
229
230 if (multiplexLog.isLoggable(Log.VERBOSE)) {
231 multiplexLog.log(Log.VERBOSE,
232 "operation REQUEST " + id + ": " + length);
233 }
234
235 info.out.request(length);
236 break;
237
238 // remote endpoint transmitting data packet
239 case TRANSMIT:
240 id = dataIn.readUnsignedShort();
241 idObj = new Integer(id);
242 info =
243 (MultiplexConnectionInfo) connectionTable.get(idObj);
244 if (info == null)
245 throw new IOException("SEND: Invalid connection ID");
246 length = dataIn.readInt();
247
248 if (multiplexLog.isLoggable(Log.VERBOSE)) {
249 multiplexLog.log(Log.VERBOSE,
250 "operation TRANSMIT " + id + ": " + length);
251 }
252
253 info.in.receive(length, dataIn);
254 break;
255
256 default:
257 throw new IOException("Invalid operation: " +
258 Integer.toHexString(op));
259 }
260 }
261 } finally {
262 shutDown();
263 }
264 }
265
266 /**
267 * Initiate a new multiplexed connection through the underlying
268 * connection.
269 */
270 public synchronized TCPConnection openConnection() throws IOException
271 {
272 // generate ID that should not be already used
273 // If all possible 32768 IDs are used,
274 // this method will block searching for a new ID forever.
275 int id;
276 Integer idObj;
277 do {
278 lastID = (++ lastID) & 0x7FFF;
279 id = lastID;
280
281 // The orig flag (copied to the high bit of the ID) is used
282 // to have two distinct ranges to choose IDs from for the
283 // two endpoints.
284 if (orig)
285 id |= 0x8000;
286 idObj = new Integer(id);
287 } while (connectionTable.get(idObj) != null);
288
289 // create multiplexing streams and bookkeeping information
290 MultiplexConnectionInfo info = new MultiplexConnectionInfo(id);
291 info.in = new MultiplexInputStream(this, info, 2048);
292 info.out = new MultiplexOutputStream(this, info, 2048);
293
294 // add to connection table if multiplexer has not died
295 synchronized (connectionTable) {
296 if (!alive)
297 throw new IOException("Multiplexer connection dead");
298 if (numConnections >= maxConnections)
299 throw new IOException("Cannot exceed " + maxConnections +
300 " simultaneous multiplexed connections");
301 connectionTable.put(idObj, info);
302 ++ numConnections;
303 }
304
305 // inform remote endpoint of new connection
306 synchronized (dataOut) {
307 try {
308 dataOut.writeByte(OPEN);
309 dataOut.writeShort(id);
310 dataOut.flush();
311 } catch (IOException e) {
312 multiplexLog.log(Log.BRIEF, "exception: ", e);
313
314 shutDown();
315 throw e;
316 }
317 }
318
319 return new TCPConnection(channel, info.in, info.out);
320 }
321
322 /**
323 * Shut down all connections and clean up.
324 */
325 public void shutDown()
326 {
327 // inform all associated streams
328 synchronized (connectionTable) {
329 // return if multiplexer already officially dead
330 if (!alive)
331 return;
332 alive = false;
333
334 Enumeration enum_ = connectionTable.elements();
335 while (enum_.hasMoreElements()) {
336 MultiplexConnectionInfo info =
337 (MultiplexConnectionInfo) enum_.nextElement();
338 info.in.disconnect();
339 info.out.disconnect();
340 }
341 connectionTable.clear();
342 numConnections = 0;
343 }
344
345 // close underlying connection, if possible (and not already done)
346 try {
347 in.close();
348 } catch (IOException e) {
349 }
350 try {
351 out.close();
352 } catch (IOException e) {
353 }
354 }
355
356 /**
357 * Send request for more data on connection to remote endpoint.
|
68 /** object to notify for new connections from remote endpoint */
69 private TCPChannel channel;
70
71 /** input stream for underlying single connection */
72 private InputStream in;
73
74 /** output stream for underlying single connection */
75 private OutputStream out;
76
77 /** true if underlying connection originated from this endpoint
78 (used for generating unique connection IDs) */
79 private boolean orig;
80
81 /** layered stream for reading formatted data from underlying connection */
82 private DataInputStream dataIn;
83
84 /** layered stream for writing formatted data to underlying connection */
85 private DataOutputStream dataOut;
86
87 /** table holding currently open connection IDs and related info */
88 private Hashtable<Integer, MultiplexConnectionInfo> connectionTable = new Hashtable<>(7);
89
90 /** number of currently open connections */
91 private int numConnections = 0;
92
93 /** maximum allowed open connections */
94 private final static int maxConnections = 256;
95
96 /** ID of last connection opened */
97 private int lastID = 0x1001;
98
99 /** true if this mechanism is still alive */
100 private boolean alive = true;
101
102 /**
103 * Create a new ConnectionMultiplexer using the given underlying
104 * input/output stream pair. The run method must be called
105 * (possibly on a new thread) to handle the demultiplexing.
106 * @param channel object to notify when new connection is received
107 * @param in input stream of underlying connection
108 * @param out output stream of underlying connection
114 InputStream in,
115 OutputStream out,
116 boolean orig)
117 {
118 this.channel = channel;
119 this.in = in;
120 this.out = out;
121 this.orig = orig;
122
123 dataIn = new DataInputStream(in);
124 dataOut = new DataOutputStream(out);
125 }
126
127 /**
128 * Process multiplexing protocol received from underlying connection.
129 */
130 public void run() throws IOException
131 {
132 try {
133 int op, id, length;
134 MultiplexConnectionInfo info;
135
136 while (true) {
137
138 // read next op code from remote endpoint
139 op = dataIn.readUnsignedByte();
140 switch (op) {
141
142 // remote endpoint initiating new connection
143 case OPEN:
144 id = dataIn.readUnsignedShort();
145
146 if (multiplexLog.isLoggable(Log.VERBOSE)) {
147 multiplexLog.log(Log.VERBOSE, "operation OPEN " + id);
148 }
149
150 info = connectionTable.get(id);
151 if (info != null)
152 throw new IOException(
153 "OPEN: Connection ID already exists");
154 info = new MultiplexConnectionInfo(id);
155 info.in = new MultiplexInputStream(this, info, 2048);
156 info.out = new MultiplexOutputStream(this, info, 2048);
157 synchronized (connectionTable) {
158 connectionTable.put(id, info);
159 ++ numConnections;
160 }
161 sun.rmi.transport.Connection conn;
162 conn = new TCPConnection(channel, info.in, info.out);
163 channel.acceptMultiplexConnection(conn);
164 break;
165
166 // remote endpoint closing connection
167 case CLOSE:
168 id = dataIn.readUnsignedShort();
169
170 if (multiplexLog.isLoggable(Log.VERBOSE)) {
171 multiplexLog.log(Log.VERBOSE, "operation CLOSE " + id);
172 }
173
174 info = connectionTable.get(id);
175 if (info == null)
176 throw new IOException(
177 "CLOSE: Invalid connection ID");
178 info.in.disconnect();
179 info.out.disconnect();
180 if (!info.closed)
181 sendCloseAck(info);
182 synchronized (connectionTable) {
183 connectionTable.remove(id);
184 -- numConnections;
185 }
186 break;
187
188 // remote endpoint acknowledging close of connection
189 case CLOSEACK:
190 id = dataIn.readUnsignedShort();
191
192 if (multiplexLog.isLoggable(Log.VERBOSE)) {
193 multiplexLog.log(Log.VERBOSE,
194 "operation CLOSEACK " + id);
195 }
196
197 info = connectionTable.get(id);
198 if (info == null)
199 throw new IOException(
200 "CLOSEACK: Invalid connection ID");
201 if (!info.closed)
202 throw new IOException(
203 "CLOSEACK: Connection not closed");
204 info.in.disconnect();
205 info.out.disconnect();
206 synchronized (connectionTable) {
207 connectionTable.remove(id);
208 -- numConnections;
209 }
210 break;
211
212 // remote endpoint declaring additional bytes receivable
213 case REQUEST:
214 id = dataIn.readUnsignedShort();
215 info = connectionTable.get(id);
216 if (info == null)
217 throw new IOException(
218 "REQUEST: Invalid connection ID");
219 length = dataIn.readInt();
220
221 if (multiplexLog.isLoggable(Log.VERBOSE)) {
222 multiplexLog.log(Log.VERBOSE,
223 "operation REQUEST " + id + ": " + length);
224 }
225
226 info.out.request(length);
227 break;
228
229 // remote endpoint transmitting data packet
230 case TRANSMIT:
231 id = dataIn.readUnsignedShort();
232 info = connectionTable.get(id);
233 if (info == null)
234 throw new IOException("SEND: Invalid connection ID");
235 length = dataIn.readInt();
236
237 if (multiplexLog.isLoggable(Log.VERBOSE)) {
238 multiplexLog.log(Log.VERBOSE,
239 "operation TRANSMIT " + id + ": " + length);
240 }
241
242 info.in.receive(length, dataIn);
243 break;
244
245 default:
246 throw new IOException("Invalid operation: " +
247 Integer.toHexString(op));
248 }
249 }
250 } finally {
251 shutDown();
252 }
253 }
254
255 /**
256 * Initiate a new multiplexed connection through the underlying
257 * connection.
258 */
259 public synchronized TCPConnection openConnection() throws IOException
260 {
261 // generate ID that should not be already used
262 // If all possible 32768 IDs are used,
263 // this method will block searching for a new ID forever.
264 int id;
265 do {
266 lastID = (++ lastID) & 0x7FFF;
267 id = lastID;
268
269 // The orig flag (copied to the high bit of the ID) is used
270 // to have two distinct ranges to choose IDs from for the
271 // two endpoints.
272 if (orig)
273 id |= 0x8000;
274 } while (connectionTable.get(id) != null);
275
276 // create multiplexing streams and bookkeeping information
277 MultiplexConnectionInfo info = new MultiplexConnectionInfo(id);
278 info.in = new MultiplexInputStream(this, info, 2048);
279 info.out = new MultiplexOutputStream(this, info, 2048);
280
281 // add to connection table if multiplexer has not died
282 synchronized (connectionTable) {
283 if (!alive)
284 throw new IOException("Multiplexer connection dead");
285 if (numConnections >= maxConnections)
286 throw new IOException("Cannot exceed " + maxConnections +
287 " simultaneous multiplexed connections");
288 connectionTable.put(id, info);
289 ++ numConnections;
290 }
291
292 // inform remote endpoint of new connection
293 synchronized (dataOut) {
294 try {
295 dataOut.writeByte(OPEN);
296 dataOut.writeShort(id);
297 dataOut.flush();
298 } catch (IOException e) {
299 multiplexLog.log(Log.BRIEF, "exception: ", e);
300
301 shutDown();
302 throw e;
303 }
304 }
305
306 return new TCPConnection(channel, info.in, info.out);
307 }
308
309 /**
310 * Shut down all connections and clean up.
311 */
312 public void shutDown()
313 {
314 // inform all associated streams
315 synchronized (connectionTable) {
316 // return if multiplexer already officially dead
317 if (!alive)
318 return;
319 alive = false;
320
321 Enumeration<MultiplexConnectionInfo> enum_ =
322 connectionTable.elements();
323 while (enum_.hasMoreElements()) {
324 MultiplexConnectionInfo info = enum_.nextElement();
325 info.in.disconnect();
326 info.out.disconnect();
327 }
328 connectionTable.clear();
329 numConnections = 0;
330 }
331
332 // close underlying connection, if possible (and not already done)
333 try {
334 in.close();
335 } catch (IOException e) {
336 }
337 try {
338 out.close();
339 } catch (IOException e) {
340 }
341 }
342
343 /**
344 * Send request for more data on connection to remote endpoint.
|