001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.mq.il.uil2;
023:
024: import java.rmi.RemoteException;
025: import javax.jms.Destination;
026: import javax.transaction.xa.Xid;
027:
028: import org.jboss.logging.Logger;
029: import org.jboss.mq.ConnectionToken;
030: import org.jboss.mq.AcknowledgementRequest;
031: import org.jboss.mq.Recoverable;
032: import org.jboss.mq.SpyMessage;
033: import org.jboss.mq.SpyDestination;
034: import org.jboss.mq.TransactionRequest;
035: import org.jboss.mq.DurableSubscriptionID;
036: import org.jboss.mq.il.uil2.msgs.BaseMsg;
037: import org.jboss.mq.il.uil2.msgs.ConnectionTokenMsg;
038: import org.jboss.mq.il.uil2.msgs.AcknowledgementRequestMsg;
039: import org.jboss.mq.il.uil2.msgs.MsgTypes;
040: import org.jboss.mq.il.uil2.msgs.AddMsg;
041: import org.jboss.mq.il.uil2.msgs.BrowseMsg;
042: import org.jboss.mq.il.uil2.msgs.CheckIDMsg;
043: import org.jboss.mq.il.uil2.msgs.CreateDestMsg;
044: import org.jboss.mq.il.uil2.msgs.DeleteTemporaryDestMsg;
045: import org.jboss.mq.il.uil2.msgs.GetIDMsg;
046: import org.jboss.mq.il.uil2.msgs.RecoverMsg;
047: import org.jboss.mq.il.uil2.msgs.TemporaryDestMsg;
048: import org.jboss.mq.il.uil2.msgs.ReceiveMsg;
049: import org.jboss.mq.il.uil2.msgs.EnableConnectionMsg;
050: import org.jboss.mq.il.uil2.msgs.SubscribeMsg;
051: import org.jboss.mq.il.uil2.msgs.TransactMsg;
052: import org.jboss.mq.il.uil2.msgs.UnsubscribeMsg;
053: import org.jboss.mq.il.uil2.msgs.DeleteSubscriptionMsg;
054: import org.jboss.mq.il.uil2.msgs.CheckUserMsg;
055: import org.jboss.mq.il.uil2.msgs.PingMsg;
056: import org.jboss.mq.il.Invoker;
057:
058: import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
059:
060: /** This is the SocketManager callback handler for the UIL2 server side
061: * socket. This handles messages that are requests from clients.
062: *
063: * @author Scott.Stark@jboss.org
064: * @version $Revision: 61545 $
065: */
066: public class ServerSocketManagerHandler implements MsgTypes,
067: SocketManagerHandler {
068: private static Logger log = Logger
069: .getLogger(ServerSocketManagerHandler.class);
070:
071: private ConnectionToken connectionToken;
072: private Invoker server;
073: private SocketManager socketMgr;
074: private UILServerILService uilServerILService;
075: private SynchronizedBoolean closed = new SynchronizedBoolean(false);
076:
077: public ServerSocketManagerHandler(Invoker server,
078: SocketManager socketMgr,
079: UILServerILService uilServerILService) {
080: this .server = server;
081: this .socketMgr = socketMgr;
082: this .uilServerILService = uilServerILService;
083: }
084:
085: /** The callback from the SocketManager
086: * @param msg
087: */
088: public void handleMsg(BaseMsg msg) throws Exception {
089: boolean trace = log.isTraceEnabled();
090: int msgType = msg.getMsgType();
091: if (trace)
092: log.trace("Begin handleMsg, msgType: " + msgType);
093:
094: switch (msgType) {
095: case m_setSpyDistributedConnection:
096: log.debug("Setting up the UILClientIL Connection");
097: ConnectionTokenMsg cmsg = (ConnectionTokenMsg) msg;
098: connectionToken = cmsg.getToken();
099: UILClientIL clientIL = (UILClientIL) connectionToken.clientIL;
100: clientIL.setSocketMgr(socketMgr);
101: socketMgr.sendReply(msg);
102: log.debug("The UILClientIL Connection is set up");
103: break;
104: case m_acknowledge:
105: AcknowledgementRequestMsg ackmsg = (AcknowledgementRequestMsg) msg;
106: AcknowledgementRequest ack = ackmsg.getAck();
107: server.acknowledge(connectionToken, ack);
108: // We send the reply although on newer clients it is ignored.
109: socketMgr.sendReply(msg);
110: break;
111: case m_addMessage:
112: AddMsg amsg = (AddMsg) msg;
113: server.addMessage(connectionToken, amsg.getMsg());
114: socketMgr.sendReply(msg);
115: break;
116: case m_browse:
117: BrowseMsg bmsg = (BrowseMsg) msg;
118: SpyMessage[] msgs = server.browse(connectionToken, bmsg
119: .getDest(), bmsg.getSelector());
120: bmsg.setMessages(msgs);
121: socketMgr.sendReply(msg);
122: break;
123: case m_checkID:
124: CheckIDMsg idmsg = (CheckIDMsg) msg;
125: String ID = idmsg.getID();
126: server.checkID(ID);
127: if (connectionToken != null)
128: connectionToken.setClientID(ID);
129: socketMgr.sendReply(msg);
130: break;
131: case m_connectionClosing:
132: server.connectionClosing(connectionToken);
133: closed.set(true);
134: socketMgr.sendReply(msg);
135: socketMgr.stop();
136: break;
137: case m_createQueue:
138: CreateDestMsg cqmsg = (CreateDestMsg) msg;
139: Destination queue = server.createQueue(connectionToken,
140: cqmsg.getName());
141: cqmsg.setDest(queue);
142: socketMgr.sendReply(msg);
143: break;
144: case m_createTopic:
145: CreateDestMsg ctmsg = (CreateDestMsg) msg;
146: Destination topic = server.createTopic(connectionToken,
147: ctmsg.getName());
148: ctmsg.setDest(topic);
149: socketMgr.sendReply(msg);
150: break;
151: case m_deleteTemporaryDestination:
152: DeleteTemporaryDestMsg dtdmsg = (DeleteTemporaryDestMsg) msg;
153: SpyDestination tmpdest = dtdmsg.getDest();
154: server.deleteTemporaryDestination(connectionToken, tmpdest);
155: socketMgr.sendReply(msg);
156: break;
157: case m_getID:
158: GetIDMsg gidmsg = (GetIDMsg) msg;
159: String gid = server.getID();
160: if (connectionToken != null)
161: connectionToken.setClientID(gid);
162: gidmsg.setID(gid);
163: socketMgr.sendReply(msg);
164: break;
165: case m_getTemporaryQueue:
166: TemporaryDestMsg tqmsg = (TemporaryDestMsg) msg;
167: Destination tmpQueue = server
168: .getTemporaryQueue(connectionToken);
169: tqmsg.setDest(tmpQueue);
170: socketMgr.sendReply(msg);
171: break;
172: case m_getTemporaryTopic:
173: TemporaryDestMsg ttmsg = (TemporaryDestMsg) msg;
174: Destination tmpTopic = server
175: .getTemporaryTopic(connectionToken);
176: ttmsg.setDest(tmpTopic);
177: socketMgr.sendReply(msg);
178: break;
179: case m_receive:
180: ReceiveMsg rmsg = (ReceiveMsg) msg;
181: SpyMessage reply = server.receive(connectionToken, rmsg
182: .getSubscriberID(), rmsg.getWait());
183: rmsg.setMessage(reply);
184: socketMgr.sendReply(msg);
185: break;
186: case m_setEnabled:
187: EnableConnectionMsg ecmsg = (EnableConnectionMsg) msg;
188: server.setEnabled(connectionToken, ecmsg.isEnabled());
189: socketMgr.sendReply(msg);
190: break;
191: case m_subscribe:
192: SubscribeMsg smsg = (SubscribeMsg) msg;
193: server.subscribe(connectionToken, smsg.getSubscription());
194: socketMgr.sendReply(msg);
195: break;
196: case m_transact:
197: TransactMsg tmsg = (TransactMsg) msg;
198: TransactionRequest trans = tmsg.getRequest();
199: server.transact(connectionToken, trans);
200: socketMgr.sendReply(msg);
201: break;
202: case m_recover:
203: RecoverMsg recmsg = (RecoverMsg) msg;
204: int flags = recmsg.getFlags();
205: if (server instanceof Recoverable) {
206: Recoverable recoverable = (Recoverable) server;
207: Xid[] xids = recoverable
208: .recover(connectionToken, flags);
209: recmsg.setXids(xids);
210: socketMgr.sendReply(msg);
211: break;
212: }
213: throw new IllegalStateException(
214: "Invoker does not implement recoverable " + server);
215: case m_unsubscribe:
216: UnsubscribeMsg umsg = (UnsubscribeMsg) msg;
217: server.unsubscribe(connectionToken, umsg
218: .getSubscriptionID());
219: socketMgr.sendReply(msg);
220: break;
221: case m_destroySubscription:
222: DeleteSubscriptionMsg dsmsg = (DeleteSubscriptionMsg) msg;
223: DurableSubscriptionID dsub = dsmsg.getSubscriptionID();
224: server.destroySubscription(connectionToken, dsub);
225: socketMgr.sendReply(msg);
226: break;
227: case m_checkUser:
228: CheckUserMsg cumsg = (CheckUserMsg) msg;
229: String uid = server.checkUser(cumsg.getUsername(), cumsg
230: .getPassword());
231: cumsg.setID(uid);
232: cumsg.clearPassword();
233: socketMgr.sendReply(msg);
234: break;
235: case m_ping:
236: PingMsg ping = (PingMsg) msg;
237: server.ping(connectionToken, ping.getTime());
238: break;
239: case m_pong:
240: break;
241: // Ignore, this is an old client, that still wants to send us replies
242: case m_receiveRequest:
243: break;
244: case m_authenticate:
245: CheckUserMsg cumsg2 = (CheckUserMsg) msg;
246: String sessionID = server.authenticate(
247: cumsg2.getUsername(), cumsg2.getPassword());
248: cumsg2.setID(sessionID);
249: cumsg2.clearPassword();
250: socketMgr.sendReply(msg);
251: break;
252: default:
253: throw new RemoteException("Unknown msgType: " + msgType);
254: }
255: if (trace)
256: log.trace("End handleMsg, msgType: " + msgType);
257: }
258:
259: public void onStreamNotification(Object stream, int size) {
260: }
261:
262: public void asynchFailure(String error, Throwable e) {
263: log.debug(error, e);
264: }
265:
266: public void close() {
267: try {
268: uilServerILService.removeHandler(this );
269: if (closed.get() == false)
270: server.connectionClosing(connectionToken);
271: } catch (Exception e) {
272: log.debug("Error closing connection: ", e);
273: }
274: }
275: }
|