001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.mq.il.uil2;
023:
024: import java.io.IOException;
025: import java.io.ObjectInputStream;
026: import java.io.ObjectOutputStream;
027: import java.net.InetAddress;
028: import java.net.Socket;
029: import java.util.Iterator;
030:
031: import javax.jms.JMSException;
032:
033: import org.jboss.logging.Logger;
034: import org.jboss.mq.il.uil2.msgs.BaseMsg;
035: import org.jboss.util.stream.NotifyingBufferedInputStream;
036: import org.jboss.util.stream.NotifyingBufferedOutputStream;
037:
038: import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
039: import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
040: import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
041: import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
042: import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
043: import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
044:
045: /** Used to manage the client/server and server/client communication in an
046: * asynchrounous manner.
047: *
048: * @todo verify the pooled executor config
049: *
050: * @author Scott.Stark@jboss.org
051: * @version $Revision: 57198 $
052: */
053: public class SocketManager {
054: private static Logger log = Logger.getLogger(SocketManager.class);
055:
056: private static final int STOPPED = 0;
057: private static final int STARTED = 1;
058: private static final int STOPPING = 2;
059: private static SynchronizedInt taskID = new SynchronizedInt(0);
060:
061: /** The socket created by the IL layer */
062: private Socket socket;
063: /** The input stream used by the read task */
064: private ObjectInputStream in;
065: /** The buffering for output */
066: NotifyingBufferedInputStream bufferedInput;
067: /** The output stream used by the write task */
068: private ObjectOutputStream out;
069: /** The buffering for output */
070: NotifyingBufferedOutputStream bufferedOutput;
071: /** The write task thread */
072: private Thread writeThread;
073: /** The read task thread */
074: private Thread readThread;
075: /** The thread pool used to service incoming requests */
076: PooledExecutor pool;
077: /** The flag used to control the read loop */
078: private int readState = STOPPED;
079: /** The flag used to control the write loop */
080: private int writeState = STOPPED;
081: /** Used for constrolling the state */
082: private SynchronizedBoolean running = new SynchronizedBoolean(false);
083: /** The queue of messages to be processed by the write task */
084: private LinkedQueue sendQueue;
085: /** A HashMap<Integer, BaseMsg> that are awaiting a reply */
086: private ConcurrentHashMap replyMap;
087: /** The callback handler used for msgs that are not replys */
088: private SocketManagerHandler handler;
089: /** The buffer size */
090: private int bufferSize = 1;
091: /** The chunk size for notification of stream activity */
092: private int chunkSize = 0x40000000;
093: /** The logging trace level which is set in the ctor */
094: private boolean trace;
095:
096: public SocketManager(Socket s) throws IOException {
097: socket = s;
098: sendQueue = new LinkedQueue();
099: replyMap = new ConcurrentHashMap();
100: trace = log.isTraceEnabled();
101: }
102:
103: /** Start the read and write threads using the given thread group and
104: * names of "UIL2.SocketManager.ReadTask" and "UIL2.SocketManager.WriteTask".
105: * @param tg the thread group to use for the read and write threads.
106: */
107: public void start(ThreadGroup tg) {
108: if (trace)
109: log.trace("start called",
110: new Exception("Start stack trace"));
111:
112: InetAddress inetAddr = socket.getInetAddress();
113: String ipAddress = (inetAddr != null) ? inetAddr
114: .getHostAddress() : "<unknown>";
115: ipAddress += ":" + socket.getPort();
116: if (pool == null) {
117: // TODO: Check the validity of this config
118: pool = new PooledExecutor(5);
119: pool.setMinimumPoolSize(1);
120: pool.setKeepAliveTime(1000 * 60);
121: pool.runWhenBlocked();
122: String id = "SocketManager.MsgPool@"
123: + Integer
124: .toHexString(System.identityHashCode(this ))
125: + " client=" + ipAddress;
126: pool.setThreadFactory(new UILThreadFactory(id));
127: }
128:
129: ReadTask readTask = new ReadTask();
130: readThread = new Thread(tg, readTask,
131: "UIL2.SocketManager.ReadTask#" + taskID.increment()
132: + " client=" + ipAddress);
133: readThread.setDaemon(true);
134:
135: WriteTask writeTask = new WriteTask();
136: writeThread = new Thread(tg, writeTask,
137: "UIL2.SocketManager.WriteTask#" + taskID.increment()
138: + " client=" + ipAddress);
139: writeThread.setDaemon(true);
140:
141: synchronized (running) {
142: readState = STARTED;
143: writeState = STARTED;
144: running.set(true);
145: }
146:
147: readThread.start();
148: writeThread.start();
149: }
150:
151: /** Stop the read and write threads by interrupting them.
152: */
153: public void stop() {
154: synchronized (running) {
155: if (readState == STARTED) {
156: readState = STOPPING;
157: readThread.interrupt();
158: }
159: if (writeState == STARTED) {
160: writeState = STOPPING;
161: writeThread.interrupt();
162: }
163: running.set(false);
164: if (pool != null) {
165: pool.shutdownNow();
166: pool = null;
167: }
168: }
169: }
170:
171: /** Set the callback handler for msgs that were not originated by the
172: * socket manager. This is any msgs read that was not sent via the
173: * sendMessage method.
174: *
175: * @param handler
176: */
177: public void setHandler(SocketManagerHandler handler) {
178: this .handler = handler;
179: if (bufferedInput != null)
180: bufferedInput.setStreamListener(handler);
181: if (bufferedOutput != null)
182: bufferedOutput.setStreamListener(handler);
183: }
184:
185: /**
186: * Sets the buffer size
187: *
188: * @param size the size of the buffer
189: */
190: public void setBufferSize(int size) {
191: this .bufferSize = size;
192: }
193:
194: /**
195: * Sets the chunk size
196: *
197: * @param size the size of a chunk
198: */
199: public void setChunkSize(int size) {
200: this .chunkSize = size;
201: }
202:
203: /** Send a two-way message and block the calling thread until the
204: * msg reply is received. This enques the msg to the sendQueue, places
205: * the msg in the replyMap and waits on the msg. The msg is notified by the
206: * read task thread when it finds a msg with a msgID that maps to the
207: * msg in the msgReply map.
208: *
209: * @param msg the request msg to send
210: * @throws Exception thrown if the reply message has an error value
211: */
212: public void sendMessage(BaseMsg msg) throws Exception {
213: internalSendMessage(msg, true);
214: if (msg.error != null) {
215: if (trace)
216: log.trace("sendMessage will throw error", msg.error);
217: throw msg.error;
218: }
219: }
220:
221: /**
222: * Send a reply.
223: *
224: * @param msg the message
225: * @throws Exception for any error
226: */
227: public void sendReply(BaseMsg msg) throws Exception {
228: msg.trimReply();
229: internalSendMessage(msg, false);
230: }
231:
232: /**
233: * Send a one-way.
234: *
235: * @param msg the message
236: * @throws Exception for any error
237: */
238: public void sendOneWay(BaseMsg msg) throws Exception {
239: msg.getMsgID();
240: internalSendMessage(msg, false);
241: }
242:
243: /** This places the msg into the sendQueue and returns if waitOnReply
244: * is false, or enques the msg to the sendQueue, places the msg
245: * in the replyMap and waits on the msg.
246: *
247: * @param msg
248: * @param waitOnReply
249: * @throws Exception
250: */
251: private void internalSendMessage(BaseMsg msg, boolean waitOnReply)
252: throws Exception {
253: if (running.get() == false)
254: throw new IOException("Client is not connected");
255:
256: if (waitOnReply) { // Send a request msg and wait for the reply
257: synchronized (msg) {
258: // Create the request msgID
259: msg.getMsgID();
260: if (trace)
261: log
262: .trace("Begin internalSendMessage, round-trip msg="
263: + msg);
264: // Place the msg into the write queue and reply map
265: replyMap.put(msg, msg);
266: sendQueue.put(msg);
267: // Wait for the msg reply
268: msg.wait();
269: }
270: } else { // Send an asynchronous msg, typically a reply
271: if (trace)
272: log.trace("Begin internalSendMessage, one-way msg="
273: + msg);
274: sendQueue.put(msg);
275: }
276: if (trace)
277: log.trace("End internalSendMessage, msg=" + msg);
278: }
279:
280: /** The task managing the socket read thread
281: *
282: */
283: public class ReadTask implements Runnable {
284: public void run() {
285: int msgType = 0;
286: log.debug("Begin ReadTask.run");
287: try {
288: bufferedInput = new NotifyingBufferedInputStream(socket
289: .getInputStream(), bufferSize, chunkSize,
290: handler);
291: in = new ObjectInputStream(bufferedInput);
292: log.debug("Created ObjectInputStream");
293: } catch (IOException e) {
294: handleStop("Failed to create ObjectInputStream", e);
295: return;
296: }
297:
298: while (true) {
299: try {
300: msgType = in.readByte();
301: int msgID = in.readInt();
302: if (trace)
303: log.trace("Read msgType: "
304: + BaseMsg.toString(msgType)
305: + ", msgID: " + msgID);
306: // See if there is a msg awaiting a reply
307: BaseMsg key = new BaseMsg(msgType, msgID);
308: BaseMsg msg = (BaseMsg) replyMap.remove(key);
309: if (msg == null) {
310: msg = BaseMsg.createMsg(msgType);
311: msg.setMsgID(msgID);
312: msg.read(in);
313: if (trace)
314: log.trace("Read new msg: " + msg);
315:
316: // Handle the message
317: if (pool == null)
318: break;
319: msg.setHandler(this );
320: pool.execute(msg);
321: } else {
322: if (trace)
323: log.trace("Found replyMap msg: " + msg);
324: msg.setMsgID(msgID);
325: try {
326: msg.read(in);
327: if (trace)
328: log.trace("Read msg reply: " + msg);
329: } catch (Throwable e) {
330: // Forward the error to the waiting message
331: msg.setError(e);
332: throw e;
333: }
334: // Always notify the waiting message
335: finally {
336: synchronized (msg) {
337: msg.notify();
338: }
339: }
340: }
341: } catch (ClassNotFoundException e) {
342: handleStop("Failed to read msgType:" + msgType, e);
343: break;
344: } catch (IOException e) {
345: handleStop("Exiting on IOE", e);
346: break;
347: } catch (InterruptedException e) {
348: handleStop("Exiting on interrupt", e);
349: break;
350: } catch (Throwable e) {
351: handleStop(
352: "Exiting on unexpected error in read task",
353: e);
354: break;
355: }
356: }
357: log.debug("End ReadTask.run");
358: }
359:
360: /**
361: * Handle the message or respond with an error
362: */
363: public void handleMsg(BaseMsg msg) {
364: try {
365: handler.handleMsg(msg);
366: } catch (Throwable e) {
367: if (e instanceof JMSException)
368: log.trace("Failed to handle: " + msg.toString(), e);
369: else if (e instanceof RuntimeException
370: || e instanceof Error)
371: log.error("Failed to handle: " + msg.toString(), e);
372: else
373: log.debug("Failed to handle: " + msg.toString(), e);
374: msg.setError(e);
375: try {
376: internalSendMessage(msg, false);
377: } catch (Exception ie) {
378: log.debug("Failed to send error reply", ie);
379: }
380: }
381: }
382:
383: /**
384: * Stop the read thread
385: */
386: private void handleStop(String error, Throwable e) {
387: synchronized (running) {
388: readState = STOPPING;
389: running.set(false);
390: }
391:
392: if (e instanceof IOException
393: || e instanceof InterruptedException) {
394: if (trace)
395: log.trace(error, e);
396: } else
397: log.debug(error, e);
398:
399: replyAll(e);
400: if (handler != null) {
401: handler.asynchFailure(error, e);
402: handler.close();
403: }
404:
405: synchronized (running) {
406: readState = STOPPED;
407: if (writeState == STARTED) {
408: writeState = STOPPING;
409: writeThread.interrupt();
410: }
411: }
412:
413: try {
414: in.close();
415: } catch (Exception ignored) {
416: if (trace)
417: log.trace(ignored.getMessage(), ignored);
418: }
419:
420: try {
421: socket.close();
422: } catch (Exception ignored) {
423: if (trace)
424: log.trace(ignored.getMessage(), ignored);
425: }
426: }
427:
428: private void replyAll(Throwable e) {
429: // Clear the interrupted state of the thread
430: Thread.interrupted();
431:
432: for (Iterator iterator = replyMap.keySet().iterator(); iterator
433: .hasNext();) {
434: BaseMsg msg = (BaseMsg) iterator.next();
435: msg.setError(e);
436: synchronized (msg) {
437: msg.notify();
438: }
439: iterator.remove();
440: }
441: }
442: }
443:
444: /** The task managing the socket write thread
445: *
446: */
447: public class WriteTask implements Runnable {
448: public void run() {
449: log.debug("Begin WriteTask.run");
450: try {
451: bufferedOutput = new NotifyingBufferedOutputStream(
452: socket.getOutputStream(), bufferSize,
453: chunkSize, handler);
454: out = new ObjectOutputStream(bufferedOutput);
455: log.debug("Created ObjectOutputStream");
456: } catch (IOException e) {
457: handleStop(null, "Failed to create ObjectOutputStream",
458: e);
459: return;
460: }
461:
462: while (true) {
463: BaseMsg msg = null;
464: try {
465: msg = (BaseMsg) sendQueue.take();
466: if (trace)
467: log.trace("Write msg: " + msg);
468: msg.write(out);
469: out.reset();
470: out.flush();
471: } catch (InterruptedException e) {
472: handleStop(msg, "WriteTask was interrupted", e);
473: break;
474: } catch (IOException e) {
475: handleStop(msg, "Exiting on IOE", e);
476: break;
477: } catch (Throwable e) {
478: handleStop(msg, "Failed to write msgType:" + msg, e);
479: break;
480: }
481: }
482: log.debug("End WriteTask.run");
483: }
484:
485: /**
486: * Stop the write thread
487: */
488: private void handleStop(BaseMsg msg, String error, Throwable e) {
489: synchronized (running) {
490: writeState = STOPPING;
491: running.set(false);
492: }
493:
494: if (e instanceof InterruptedException
495: || e instanceof IOException) {
496: if (trace)
497: log.trace(error, e);
498: } else
499: log.debug(error, e);
500:
501: if (msg != null) {
502: msg.setError(e);
503: synchronized (msg) {
504: msg.notify();
505: }
506: }
507:
508: synchronized (running) {
509: writeState = STOPPED;
510: if (readState == STARTED) {
511: readState = STOPPING;
512: readThread.interrupt();
513: }
514: }
515:
516: try {
517: out.close();
518: } catch (Exception ignored) {
519: if (trace)
520: log.trace(ignored.getMessage(), ignored);
521: }
522:
523: try {
524: socket.close();
525: } catch (Exception ignored) {
526: if (trace)
527: log.trace(ignored.getMessage(), ignored);
528: }
529: }
530: }
531:
532: static class UILThreadFactory implements ThreadFactory {
533: private String id;
534: private int count;
535:
536: UILThreadFactory(String id) {
537: this .id = id;
538: }
539:
540: public Thread newThread(Runnable command) {
541: synchronized (this ) {
542: count++;
543: }
544: Thread t = new Thread(command, "UIL2(" + id + ")#" + count);
545: return t;
546: }
547: }
548: }
|