001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.mq.il.uil2;
023:
024: import java.lang.reflect.UndeclaredThrowableException;
025: import java.util.Properties;
026:
027: import org.jboss.logging.Logger;
028: import org.jboss.mq.Connection;
029: import org.jboss.mq.ReceiveRequest;
030: import org.jboss.mq.SpyDestination;
031: import org.jboss.mq.il.ClientILService;
032: import org.jboss.mq.il.ClientIL;
033: import org.jboss.mq.il.uil2.msgs.MsgTypes;
034: import org.jboss.mq.il.uil2.msgs.BaseMsg;
035: import org.jboss.mq.il.uil2.msgs.ReceiveRequestMsg;
036: import org.jboss.mq.il.uil2.msgs.DeleteTemporaryDestMsg;
037: import org.jboss.mq.il.uil2.msgs.PingMsg;
038:
039: /** The UILClientILService runs on the client side of a JMS server connection
040: * and acts as a factory for the UILClientIL passed to the server. It also
041: * handles the callbacks from the client side SocketManager.
042: *
043: * @author Scott.Stark@jboss.org
044: * @version $Revision: 57198 $
045: */
046: public class UILClientILService implements ClientILService, MsgTypes,
047: SocketManagerHandler {
048: static Logger log = Logger.getLogger(UILClientILService.class);
049:
050: // Attributes ----------------------------------------------------
051:
052: //the client IL
053: private UILClientIL clientIL;
054: //The thread that is doing the Socket reading work
055: private SocketManager socketMgr;
056: //A link on my connection
057: private Connection connection;
058: // Whether to send receive replies
059: private boolean sendReceiveReplies = true;
060:
061: /**
062: * getClientIL method comment.
063: *
064: * @return The ClientIL value
065: * @exception Exception Description of Exception
066: */
067: public ClientIL getClientIL() throws Exception {
068: return clientIL;
069: }
070:
071: /**
072: * init method comment.
073: *
074: * @param connection Description of Parameter
075: * @param props Description of Parameter
076: * @exception Exception Description of Exception
077: */
078: public void init(Connection connection, Properties props)
079: throws Exception {
080: this .connection = connection;
081: clientIL = new UILClientIL();
082: UILServerIL serverIL = (UILServerIL) connection.getServerIL();
083: socketMgr = serverIL.getSocketMgr();
084: String t = props
085: .getProperty(UILServerILFactory.UIL_BUFFERSIZE_KEY);
086: if (t != null)
087: socketMgr.setBufferSize(Integer.parseInt(t));
088: t = props.getProperty(UILServerILFactory.UIL_CHUNKSIZE_KEY);
089: if (t != null)
090: socketMgr.setChunkSize(Integer.parseInt(t));
091: // If we have this property, it means it is a new server
092: t = props
093: .getProperty(UILServerILFactory.UIL_RECEIVE_REPLIES_KEY);
094: if (t != null)
095: sendReceiveReplies = false;
096: socketMgr.setHandler(this );
097: }
098:
099: /** Callback from the SocketManager
100: */
101: public void handleMsg(BaseMsg msg) throws Exception {
102: boolean trace = log.isTraceEnabled();
103: int msgType = msg.getMsgType();
104: if (trace)
105: log.trace("Begin handleMsg, msgType: " + msgType);
106: switch (msgType) {
107: case m_acknowledge:
108: // We have to ignore NACK replies because of backwards compatibility
109: break;
110: case m_receiveRequest:
111: ReceiveRequestMsg rmsg = (ReceiveRequestMsg) msg;
112: ReceiveRequest[] messages = rmsg.getMessages();
113: connection.asynchDeliver(messages);
114: // It is an old server that needs a reply
115: if (sendReceiveReplies) {
116: rmsg.trimTheMessages();
117: socketMgr.sendReply(msg);
118: }
119: break;
120: case m_deleteTemporaryDestination:
121: DeleteTemporaryDestMsg dmsg = (DeleteTemporaryDestMsg) msg;
122: SpyDestination dest = dmsg.getDest();
123: connection.asynchDeleteTemporaryDestination(dest);
124: socketMgr.sendReply(msg);
125: break;
126: case m_close:
127: connection.asynchClose();
128: socketMgr.sendReply(msg);
129: break;
130: case m_pong:
131: PingMsg pmsg = (PingMsg) msg;
132: long time = pmsg.getTime();
133: connection.asynchPong(time);
134: break;
135: default:
136: connection
137: .asynchFailure(
138: "UILClientILService received bad msg: "
139: + msg, null);
140: }
141: if (trace)
142: log.trace("End handleMsg");
143: }
144:
145: /**
146: *
147: * @exception Exception Description of Exception
148: */
149: public void start() throws Exception {
150: log.debug("Starting");
151: }
152:
153: /**
154: * @exception Exception Description of Exception
155: */
156: public void stop() throws Exception {
157: log.debug("Stopping");
158: socketMgr.stop();
159: }
160:
161: public void onStreamNotification(Object stream, int size) {
162: connection.asynchPong(System.currentTimeMillis());
163: }
164:
165: public void asynchFailure(String error, Throwable e) {
166: if (e instanceof Exception)
167: connection.asynchFailure(error, e);
168: else
169: connection.asynchFailure(error,
170: new UndeclaredThrowableException(e));
171: }
172:
173: public void close() {
174: }
175: }
|