001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.mq.il.uil2.msgs;
023:
024: import java.io.ObjectOutputStream;
025: import java.io.IOException;
026: import java.io.ObjectInputStream;
027: import java.lang.reflect.UndeclaredThrowableException;
028:
029: import org.jboss.mq.il.uil2.SocketManager.ReadTask;
030:
031: /** The base msg class for all msgs used by the UIL2 invoker. Msgs consist
032: * of a msg type, id and exception and can operate as two way items that
033: * are sent with the request content and received with the reply content for
034: * the request. Such round-trip behavior is based on matching the request
035: * msgID with the reply msgID. The msgID parameter is segmented into value
036: * 1 to 2147483647 for client originated msgs and -1 to -2147483647 for server
037: * originated msgs.
038: *
039: * <p>The message is a Runnable to avoid constructing a Runnable object
040: * when asynchronously handling the message from the ReadTask.
041: *
042: * @author Scott.Stark@jboss.org
043: * @author Adrian.Brock@HappeningTimes.com
044: * @version $Revision: 57198 $
045: */
046: public class BaseMsg implements Runnable {
047: /** A flag indicating if the msgIDs are used by the JMS server */
048: private static boolean useJMSServerMsgIDs = false;
049: /** The next base msgID */
050: private static int nextMsgID = 0;
051: /** The lock for the next message id */
052: private static Object nextMsgIDLock = new Object();
053: /** 2^31+1 */
054: private static final int SERVER_MSG_ID_MASK = 0x80000000;
055:
056: /** The handler of this message */
057: private ReadTask handler;
058:
059: /** The MsgTypes constant representing the type of the msg */
060: public int msgType;
061: /** A msg id used to associated a reply with its request */
062: public int msgID;
063: /** Any error thrown by the remote side */
064: public Exception error;
065:
066: public BaseMsg(int msgType) {
067: this (msgType, 0);
068: }
069:
070: public BaseMsg(int msgType, int msgID) {
071: this .msgType = msgType;
072: this .msgID = msgID;
073: }
074:
075: /** Set the msgID parameter range. If false, the msgID is segmented into the
076: * range 1 to 2147483647 and if true, the rangs is -1 to -2147483647. The
077: * JMS server sets this to true and clients default to false.
078: * @param flag
079: */
080: public static void setUseJMSServerMsgIDs(boolean flag) {
081: useJMSServerMsgIDs = flag;
082: }
083:
084: /** Create a BaseMsg subclass based on the msgType.
085: *
086: * @param msgType A MsgTypes.m_xxx constant
087: * @return the derived BaseMsg
088: * @throws IllegalArgumentException thrown for a msgType that does not
089: * match any MsgTypes.m_xxx constant
090: */
091: public static BaseMsg createMsg(int msgType)
092: throws IllegalArgumentException {
093: BaseMsg msg = null;
094: switch (msgType) {
095: case MsgTypes.m_acknowledge:
096: msg = new AcknowledgementRequestMsg();
097: break;
098: case MsgTypes.m_addMessage:
099: msg = new AddMsg();
100: break;
101: case MsgTypes.m_browse:
102: msg = new BrowseMsg();
103: break;
104: case MsgTypes.m_checkID:
105: msg = new CheckIDMsg();
106: break;
107: case MsgTypes.m_connectionClosing:
108: msg = new CloseMsg();
109: break;
110: case MsgTypes.m_createQueue:
111: msg = new CreateDestMsg(true);
112: break;
113: case MsgTypes.m_createTopic:
114: msg = new CreateDestMsg(false);
115: break;
116: case MsgTypes.m_deleteTemporaryDestination:
117: msg = new DeleteTemporaryDestMsg();
118: break;
119: case MsgTypes.m_getID:
120: msg = new GetIDMsg();
121: break;
122: case MsgTypes.m_getTemporaryQueue:
123: msg = new TemporaryDestMsg(true);
124: break;
125: case MsgTypes.m_getTemporaryTopic:
126: msg = new TemporaryDestMsg(false);
127: break;
128: case MsgTypes.m_receive:
129: msg = new ReceiveMsg();
130: break;
131: case MsgTypes.m_setEnabled:
132: msg = new EnableConnectionMsg();
133: break;
134: case MsgTypes.m_setSpyDistributedConnection:
135: msg = new ConnectionTokenMsg();
136: break;
137: case MsgTypes.m_subscribe:
138: msg = new SubscribeMsg();
139: break;
140: case MsgTypes.m_transact:
141: msg = new TransactMsg();
142: break;
143: case MsgTypes.m_recover:
144: msg = new RecoverMsg();
145: break;
146: case MsgTypes.m_unsubscribe:
147: msg = new UnsubscribeMsg();
148: break;
149: case MsgTypes.m_destroySubscription:
150: msg = new DeleteSubscriptionMsg();
151: break;
152: case MsgTypes.m_checkUser:
153: msg = new CheckUserMsg(false);
154: break;
155: case MsgTypes.m_ping:
156: msg = new PingMsg(true);
157: break;
158: case MsgTypes.m_authenticate:
159: msg = new CheckUserMsg(true);
160: break;
161: case MsgTypes.m_close:
162: // This is never sent
163: break;
164: case MsgTypes.m_pong:
165: msg = new PingMsg(false);
166: break;
167: case MsgTypes.m_receiveRequest:
168: msg = new ReceiveRequestMsg();
169: break;
170: default:
171: throw new IllegalArgumentException("Invalid msgType: "
172: + msgType);
173: }
174: return msg;
175: }
176:
177: /** Translate a msgType into its string menmonic.
178: * @param msgType A MsgTypes.m_xxx constant
179: * @return the string form of the MsgTypes.m_xxx constant
180: */
181: public static String toString(int msgType) {
182: String msgTypeString = null;
183: switch (msgType) {
184: case MsgTypes.m_acknowledge:
185: msgTypeString = "m_acknowledge";
186: break;
187: case MsgTypes.m_addMessage:
188: msgTypeString = "m_addMessage";
189: break;
190: case MsgTypes.m_browse:
191: msgTypeString = "m_browse";
192: break;
193: case MsgTypes.m_checkID:
194: msgTypeString = "m_checkID";
195: break;
196: case MsgTypes.m_connectionClosing:
197: msgTypeString = "m_connectionClosing";
198: break;
199: case MsgTypes.m_createQueue:
200: msgTypeString = "m_createQueue";
201: break;
202: case MsgTypes.m_createTopic:
203: msgTypeString = "m_createTopic";
204: break;
205: case MsgTypes.m_deleteTemporaryDestination:
206: msgTypeString = "m_deleteTemporaryDestination";
207: break;
208: case MsgTypes.m_getID:
209: msgTypeString = "m_getID";
210: break;
211: case MsgTypes.m_getTemporaryQueue:
212: msgTypeString = "m_getTemporaryQueue";
213: break;
214: case MsgTypes.m_getTemporaryTopic:
215: msgTypeString = "m_getTemporaryTopic";
216: break;
217: case MsgTypes.m_receive:
218: msgTypeString = "m_receive";
219: break;
220: case MsgTypes.m_setEnabled:
221: msgTypeString = "m_setEnabled";
222: break;
223: case MsgTypes.m_setSpyDistributedConnection:
224: msgTypeString = "m_setSpyDistributedConnection";
225: break;
226: case MsgTypes.m_subscribe:
227: msgTypeString = "m_subscribe";
228: break;
229: case MsgTypes.m_transact:
230: msgTypeString = "m_transact";
231: break;
232: case MsgTypes.m_recover:
233: msgTypeString = "m_recover";
234: break;
235: case MsgTypes.m_unsubscribe:
236: msgTypeString = "m_unsubscribe";
237: break;
238: case MsgTypes.m_destroySubscription:
239: msgTypeString = "m_destroySubscription";
240: break;
241: case MsgTypes.m_checkUser:
242: msgTypeString = "m_checkUser";
243: break;
244: case MsgTypes.m_ping:
245: msgTypeString = "m_ping";
246: break;
247: case MsgTypes.m_authenticate:
248: msgTypeString = "m_authenticate";
249: break;
250: case MsgTypes.m_close:
251: msgTypeString = "m_close";
252: break;
253: case MsgTypes.m_pong:
254: msgTypeString = "m_pong";
255: break;
256: case MsgTypes.m_receiveRequest:
257: msgTypeString = "m_receiveRequest";
258: break;
259: default:
260: msgTypeString = "unknown message type " + msgType;
261: }
262: return msgTypeString;
263: }
264:
265: public int getMsgType() {
266: return msgType;
267: }
268:
269: /** Access the msgID, initializing it if it has not been set yet. This
270: * is used by the SocketManager.internalSendMessage to setup the unique
271: * msgID for a request msg.
272: *
273: * @return the msgID value
274: */
275: public synchronized int getMsgID() {
276: if (msgID == 0) {
277: synchronized (nextMsgIDLock) {
278: msgID = ++nextMsgID;
279: }
280: if (useJMSServerMsgIDs)
281: msgID += SERVER_MSG_ID_MASK;
282: else if (msgID >= SERVER_MSG_ID_MASK)
283: msgID = msgID % SERVER_MSG_ID_MASK;
284: }
285: return msgID;
286: }
287:
288: /** Set the msgID. This is used by the SocketManager read task to populate
289: * a msg with its request ID.
290: * @param msgID the msgID read off the socket
291: */
292: public void setMsgID(int msgID) {
293: this .msgID = msgID;
294: }
295:
296: /** Access any exception associated with the msg
297: * @return
298: */
299: public Exception getError() {
300: return error;
301: }
302:
303: /** Set an exception that should be used as the msg return value.
304: *
305: * @param e
306: */
307: public void setError(Throwable e) {
308: if (e instanceof Exception)
309: error = (Exception) e;
310: else
311: error = new UndeclaredThrowableException(e);
312: }
313:
314: /** Equality is based on BaseMsg.msgID
315: * @param o a BaseMsg
316: * @return true if o.msgID == msgID
317: */
318: public boolean equals(Object o) {
319: BaseMsg msg = (BaseMsg) o;
320: return msg.msgID == msgID;
321: }
322:
323: /** Hash code is simply the msgID
324: * @return
325: */
326: public int hashCode() {
327: return msgID;
328: }
329:
330: public String toString() {
331: StringBuffer tmp = new StringBuffer(this .getClass().getName());
332: tmp.append(System.identityHashCode(this ));
333: tmp.append("[msgType: ");
334: tmp.append(toString(msgType));
335: tmp.append(", msgID: ");
336: tmp.append(msgID);
337: tmp.append(", error: ");
338: tmp.append(error);
339: tmp.append("]");
340: return tmp.toString();
341: }
342:
343: /** Trim the message when replying
344: */
345: public void trimReply() {
346: }
347:
348: /** Write the msgType, msgID, hasError flag and optionally the error
349: * @param out
350: * @throws IOException
351: */
352: public void write(ObjectOutputStream out) throws IOException {
353: out.writeByte(msgType);
354: out.writeInt(msgID);
355: int hasError = error != null ? 1 : 0;
356: out.writeByte(hasError);
357: if (hasError == 1)
358: out.writeObject(error);
359: }
360:
361: /** Read the hasError flag and optionally the error. This method is not
362: * a complete analog of write because the SocketManager read task reads
363: * the msgType and msgID off of the socket.
364: *
365: * @param in
366: * @throws IOException
367: * @throws ClassNotFoundException
368: */
369: public void read(ObjectInputStream in) throws IOException,
370: ClassNotFoundException {
371: int hasError = in.readByte();
372: if (hasError == 1)
373: error = (Exception) in.readObject();
374: }
375:
376: public void setHandler(ReadTask handler) {
377: this .handler = handler;
378: }
379:
380: public void run() {
381: handler.handleMsg(this);
382: handler = null;
383: }
384: }
|