001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.mq.il.uil2;
023:
024: import java.io.Serializable;
025: import java.io.IOException;
026: import java.net.InetAddress;
027: import java.net.ConnectException;
028: import java.net.Socket;
029: import javax.jms.Destination;
030: import javax.jms.JMSException;
031: import javax.jms.Queue;
032: import javax.jms.TemporaryQueue;
033: import javax.jms.TemporaryTopic;
034: import javax.jms.Topic;
035: import javax.net.SocketFactory;
036: import javax.transaction.xa.Xid;
037:
038: import org.jboss.logging.Logger;
039: import org.jboss.mq.AcknowledgementRequest;
040: import org.jboss.mq.Connection;
041: import org.jboss.mq.ConnectionToken;
042: import org.jboss.mq.DurableSubscriptionID;
043: import org.jboss.mq.Recoverable;
044: import org.jboss.mq.SpyDestination;
045: import org.jboss.mq.SpyMessage;
046: import org.jboss.mq.TransactionRequest;
047: import org.jboss.mq.il.ServerIL;
048: import org.jboss.mq.il.uil2.msgs.MsgTypes;
049: import org.jboss.mq.il.uil2.msgs.ConnectionTokenMsg;
050: import org.jboss.mq.il.uil2.msgs.EnableConnectionMsg;
051: import org.jboss.mq.il.uil2.msgs.GetIDMsg;
052: import org.jboss.mq.il.uil2.msgs.RecoverMsg;
053: import org.jboss.mq.il.uil2.msgs.TemporaryDestMsg;
054: import org.jboss.mq.il.uil2.msgs.AcknowledgementRequestMsg;
055: import org.jboss.mq.il.uil2.msgs.AddMsg;
056: import org.jboss.mq.il.uil2.msgs.BrowseMsg;
057: import org.jboss.mq.il.uil2.msgs.CheckIDMsg;
058: import org.jboss.mq.il.uil2.msgs.CheckUserMsg;
059: import org.jboss.mq.il.uil2.msgs.CloseMsg;
060: import org.jboss.mq.il.uil2.msgs.CreateDestMsg;
061: import org.jboss.mq.il.uil2.msgs.DeleteTemporaryDestMsg;
062: import org.jboss.mq.il.uil2.msgs.DeleteSubscriptionMsg;
063: import org.jboss.mq.il.uil2.msgs.PingMsg;
064: import org.jboss.mq.il.uil2.msgs.ReceiveMsg;
065: import org.jboss.mq.il.uil2.msgs.SubscribeMsg;
066: import org.jboss.mq.il.uil2.msgs.TransactMsg;
067: import org.jboss.mq.il.uil2.msgs.UnsubscribeMsg;
068:
069: /** The UILServerIL is created on the server and copied to the client during
070: * connection factory lookups. It represents the transport interface to the
071: * JMS server.
072: *
073: * @author Scott.Stark@jboss.org
074: * @version $Revision: 57198 $
075: */
076: public class UILServerIL implements Cloneable, MsgTypes, Serializable,
077: ServerIL, Recoverable {
078: /** @since 1.7, at least jboss-3.2.5, jboss-4.0.0 */
079: private static final long serialVersionUID = 853594001646066224L;
080: private static Logger log = Logger.getLogger(UILServerIL.class);
081:
082: /** The org.jboss.mq.il.uil2.useServerHost system property allows a client to
083: * to connect to the host name rather than the ip address
084: */
085: private final static String USE_SERVER_HOST = "org.jboss.mq.il.uil2.useServerHost";
086:
087: /** The org.jboss.mq.il.uil2.localAddr system property allows a client to
088: *define the local interface to which its sockets should be bound
089: */
090: private final static String LOCAL_ADDR = "org.jboss.mq.il.uil2.localAddr";
091: /** The org.jboss.mq.il.uil2.localPort system property allows a client to
092: *define the local port to which its sockets should be bound
093: */
094: private final static String LOCAL_PORT = "org.jboss.mq.il.uil2.localPort";
095: /** The org.jboss.mq.il.uil2.serverAddr system property allows a client to
096: * override the address to which it attempts to connect to. This is useful
097: * for networks where NAT is ocurring between the client and jms server.
098: */
099: private final static String SERVER_ADDR = "org.jboss.mq.il.uil2.serverAddr";
100: /** The org.jboss.mq.il.uil2.serverPort system property allows a client to
101: * override the port to which it attempts to connect. This is useful for
102: * for networks where port forwarding is ocurring between the client and jms
103: * server.
104: */
105: private final static String SERVER_PORT = "org.jboss.mq.il.uil2.serverPort";
106: /** The org.jboss.mq.il.uil2.retryCount controls the number of attempts to
107: * retry connecting to the jms server. Retries are only made for
108: * java.net.ConnectException failures. A value <= 0 means no retry atempts
109: * will be made.
110: */
111: private final static String RETRY_COUNT = "org.jboss.mq.il.uil2.retryCount";
112: /** The org.jboss.mq.il.uil2.retryDelay controls the delay in milliseconds
113: * between retries due to ConnectException failures.
114: */
115: private final static String RETRY_DELAY = "org.jboss.mq.il.uil2.retryDelay";
116:
117: /** The server host name/IP to connect to as defined by the jms server.
118: */
119: private InetAddress addr;
120: /** The server port to connect to as defined by the jms server.
121: */
122: private int port;
123: /** The name of the class implementing the javax.net.SocketFactory to
124: * use for creating the client socket.
125: */
126: private String socketFactoryName;
127:
128: /**
129: * If the TcpNoDelay option should be used on the socket.
130: */
131: private boolean enableTcpNoDelay = false;
132:
133: /**
134: * The client side read timeout
135: */
136: private int soTimeout = 0;
137:
138: /**
139: * The connect address
140: */
141: private String connectAddress;
142:
143: /**
144: * The connect port
145: */
146: private int connectPort = 0;
147:
148: /**
149: * The buffer size.
150: */
151: private int bufferSize;
152:
153: /**
154: * The chunk size.
155: */
156: private int chunkSize;
157:
158: /** The local interface name/IP to use for the client
159: */
160: private transient InetAddress localAddr;
161: /** The local port to use for the client
162: */
163: private transient int localPort;
164:
165: /**
166: * Description of the Field
167: */
168: protected transient Socket socket;
169: /**
170: * Description of the Field
171: */
172: protected transient SocketManager socketMgr;
173:
174: public UILServerIL(InetAddress addr, int port,
175: String socketFactoryName, boolean enableTcpNoDelay,
176: int bufferSize, int chunkSize, int soTimeout,
177: String connectAddress, int connectPort) throws Exception {
178: this .addr = addr;
179: this .port = port;
180: this .socketFactoryName = socketFactoryName;
181: this .enableTcpNoDelay = enableTcpNoDelay;
182: this .bufferSize = bufferSize;
183: this .chunkSize = chunkSize;
184: this .soTimeout = soTimeout;
185: this .connectAddress = connectAddress;
186: this .connectPort = connectPort;
187: }
188:
189: public void setConnectionToken(ConnectionToken dest)
190: throws Exception {
191: ConnectionTokenMsg msg = new ConnectionTokenMsg(dest);
192: getSocketMgr().sendMessage(msg);
193: }
194:
195: public void setEnabled(ConnectionToken dc, boolean enabled)
196: throws JMSException, Exception {
197: EnableConnectionMsg msg = new EnableConnectionMsg(enabled);
198: getSocketMgr().sendMessage(msg);
199: }
200:
201: public String getID() throws Exception {
202: GetIDMsg msg = new GetIDMsg();
203: getSocketMgr().sendMessage(msg);
204: String id = msg.getID();
205: return id;
206: }
207:
208: public TemporaryQueue getTemporaryQueue(ConnectionToken dc)
209: throws JMSException, Exception {
210: TemporaryDestMsg msg = new TemporaryDestMsg(true);
211: getSocketMgr().sendMessage(msg);
212: TemporaryQueue dest = msg.getQueue();
213: return dest;
214: }
215:
216: public TemporaryTopic getTemporaryTopic(ConnectionToken dc)
217: throws JMSException, Exception {
218: TemporaryDestMsg msg = new TemporaryDestMsg(false);
219: getSocketMgr().sendMessage(msg);
220: TemporaryTopic dest = msg.getTopic();
221: return dest;
222: }
223:
224: public void acknowledge(ConnectionToken dc,
225: AcknowledgementRequest item) throws JMSException, Exception {
226: AcknowledgementRequestMsg msg = new AcknowledgementRequestMsg(
227: item);
228: if (item.isAck())
229: getSocketMgr().sendMessage(msg);
230: else
231: getSocketMgr().sendOneWay(msg);
232: }
233:
234: public void addMessage(ConnectionToken dc, SpyMessage val)
235: throws Exception {
236: AddMsg msg = new AddMsg(val);
237: getSocketMgr().sendMessage(msg);
238: }
239:
240: public SpyMessage[] browse(ConnectionToken dc, Destination dest,
241: String selector) throws JMSException, Exception {
242: BrowseMsg msg = new BrowseMsg(dest, selector);
243: getSocketMgr().sendMessage(msg);
244: SpyMessage[] msgs = msg.getMessages();
245: return msgs;
246: }
247:
248: public void checkID(String id) throws JMSException, Exception {
249: CheckIDMsg msg = new CheckIDMsg(id);
250: getSocketMgr().sendMessage(msg);
251: }
252:
253: public String checkUser(String username, String password)
254: throws JMSException, Exception {
255: CheckUserMsg msg = new CheckUserMsg(username, password, false);
256: getSocketMgr().sendMessage(msg);
257: String clientID = msg.getID();
258: return clientID;
259: }
260:
261: public String authenticate(String username, String password)
262: throws JMSException, Exception {
263: CheckUserMsg msg = new CheckUserMsg(username, password, true);
264: getSocketMgr().sendMessage(msg);
265: String sessionID = msg.getID();
266: return sessionID;
267: }
268:
269: public Object clone() throws CloneNotSupportedException {
270: return super .clone();
271: }
272:
273: public ServerIL cloneServerIL() throws Exception {
274: return (ServerIL) clone();
275: }
276:
277: public void connectionClosing(ConnectionToken dc)
278: throws JMSException, Exception {
279: CloseMsg msg = new CloseMsg();
280: try {
281: getSocketMgr().sendMessage(msg);
282: } catch (IOException ignored) {
283: }
284: destroyConnection();
285: }
286:
287: public Queue createQueue(ConnectionToken dc, String destName)
288: throws JMSException, Exception {
289: CreateDestMsg msg = new CreateDestMsg(destName, true);
290: getSocketMgr().sendMessage(msg);
291: Queue dest = msg.getQueue();
292: return dest;
293: }
294:
295: public Topic createTopic(ConnectionToken dc, String destName)
296: throws JMSException, Exception {
297: CreateDestMsg msg = new CreateDestMsg(destName, false);
298: getSocketMgr().sendMessage(msg);
299: Topic dest = msg.getTopic();
300: return dest;
301: }
302:
303: public void deleteTemporaryDestination(ConnectionToken dc,
304: SpyDestination dest) throws JMSException, Exception {
305: DeleteTemporaryDestMsg msg = new DeleteTemporaryDestMsg(dest);
306: getSocketMgr().sendMessage(msg);
307: }
308:
309: public void destroySubscription(ConnectionToken dc,
310: DurableSubscriptionID id) throws JMSException, Exception {
311: DeleteSubscriptionMsg msg = new DeleteSubscriptionMsg(id);
312: getSocketMgr().sendMessage(msg);
313: }
314:
315: public void ping(ConnectionToken dc, long clientTime)
316: throws Exception {
317: PingMsg msg = new PingMsg(clientTime, true);
318: msg.getMsgID();
319: getSocketMgr().sendReply(msg);
320: }
321:
322: public SpyMessage receive(ConnectionToken dc, int subscriberId,
323: long wait) throws Exception, Exception {
324: ReceiveMsg msg = new ReceiveMsg(subscriberId, wait);
325: getSocketMgr().sendMessage(msg);
326: SpyMessage reply = msg.getMessage();
327: return reply;
328: }
329:
330: public void subscribe(ConnectionToken dc,
331: org.jboss.mq.Subscription s) throws JMSException, Exception {
332: SubscribeMsg msg = new SubscribeMsg(s);
333: getSocketMgr().sendMessage(msg);
334: }
335:
336: public void transact(ConnectionToken dc, TransactionRequest t)
337: throws JMSException, Exception {
338: TransactMsg msg = new TransactMsg(t);
339: getSocketMgr().sendMessage(msg);
340: }
341:
342: public Xid[] recover(ConnectionToken dc, int flags)
343: throws Exception {
344: RecoverMsg msg = new RecoverMsg(flags);
345: getSocketMgr().sendMessage(msg);
346: Xid[] reply = msg.getXids();
347: return reply;
348: }
349:
350: public void unsubscribe(ConnectionToken dc, int subscriptionID)
351: throws JMSException, Exception {
352: UnsubscribeMsg msg = new UnsubscribeMsg(subscriptionID);
353: getSocketMgr().sendMessage(msg);
354: }
355:
356: final SocketManager getSocketMgr() throws Exception {
357: if (socketMgr == null)
358: createConnection();
359: return socketMgr;
360: }
361:
362: protected void checkConnection() throws Exception {
363: if (socketMgr == null) {
364: createConnection();
365: }
366: }
367:
368: /**
369: * Used to establish a new connection to the server
370: *
371: * @exception Exception Description of Exception
372: */
373: protected void createConnection() throws Exception {
374: boolean tracing = log.isTraceEnabled();
375:
376: /** Attempt to load the socket factory and if this fails, use the
377: * default socket factory impl.
378: */
379: SocketFactory socketFactory = null;
380: if (socketFactoryName != null) {
381: try {
382: ClassLoader loader = Thread.currentThread()
383: .getContextClassLoader();
384: Class factoryClass = loader
385: .loadClass(socketFactoryName);
386: socketFactory = (SocketFactory) factoryClass
387: .newInstance();
388: } catch (Exception e) {
389: log.debug("Failed to load socket factory: "
390: + socketFactoryName, e);
391: }
392: }
393: // Use the default socket factory
394: if (socketFactory == null) {
395: socketFactory = SocketFactory.getDefault();
396: }
397:
398: // Look for a local address and port as properties
399: String tmp = getProperty(LOCAL_ADDR);
400: if (tmp != null)
401: this .localAddr = InetAddress.getByName(tmp);
402: tmp = getProperty(LOCAL_PORT);
403: if (tmp != null)
404: this .localPort = Integer.parseInt(tmp);
405:
406: // Look for client side overrides of the server address/port
407: InetAddress serverAddr = addr;
408: int serverPort = port;
409: tmp = getProperty(SERVER_ADDR);
410: if (tmp == null)
411: tmp = connectAddress;
412: if (tmp != null)
413: serverAddr = InetAddress.getByName(tmp);
414: tmp = getProperty(SERVER_PORT);
415: if (tmp != null)
416: serverPort = Integer.parseInt(tmp);
417: else if (connectPort != 0)
418: serverPort = connectPort;
419:
420: String useHostNameProp = getProperty(USE_SERVER_HOST);
421: String serverHost = serverAddr.getHostAddress();
422: if (Boolean.valueOf(useHostNameProp).booleanValue())
423: serverHost = serverAddr.getHostName();
424:
425: int retries = 0;
426: // Default to 10 retries, no delay in the absence of user override
427: int maxRetries = 10;
428: tmp = getProperty(RETRY_COUNT);
429: if (tmp != null)
430: maxRetries = Integer.parseInt(tmp);
431: long retryDelay = 0;
432: tmp = getProperty(RETRY_DELAY);
433: if (tmp != null) {
434: retryDelay = Long.parseLong(tmp);
435: if (retryDelay < 0)
436: retryDelay = 0;
437: }
438: if (tracing)
439: log.trace("Begin connect loop, maxRetries=" + maxRetries
440: + ", delay=" + retryDelay);
441:
442: while (true) {
443: try {
444: if (tracing) {
445: log.trace("Connecting with addr=" + serverHost
446: + ", port=" + serverPort + ", localAddr="
447: + localAddr + ", localPort=" + localPort
448: + ", socketFactory=" + socketFactory
449: + ", enableTcpNoDelay=" + enableTcpNoDelay
450: + ", bufferSize=" + bufferSize
451: + ", chunkSize=" + chunkSize);
452: }
453: if (localAddr != null)
454: socket = socketFactory.createSocket(serverHost,
455: serverPort, localAddr, localPort);
456: else
457: socket = socketFactory.createSocket(serverHost,
458: serverPort);
459: break;
460: } catch (ConnectException e) {
461: if (++retries > maxRetries)
462: throw e;
463: if (tracing)
464: log.trace("Failed to connect, retries=" + retries,
465: e);
466: }
467: try {
468: Thread.sleep(retryDelay);
469: } catch (InterruptedException e) {
470: break;
471: }
472: }
473:
474: socket.setTcpNoDelay(enableTcpNoDelay);
475: if (soTimeout != 0)
476: socket.setSoTimeout(soTimeout);
477: socketMgr = new SocketManager(socket);
478: socketMgr.setBufferSize(bufferSize);
479: socketMgr.setChunkSize(chunkSize);
480: socketMgr.start(Connection.getThreadGroup());
481: }
482:
483: /**
484: * Used to close the current connection with the server
485: *
486: */
487: protected void destroyConnection() {
488: try {
489: if (socket != null) {
490: try {
491: socketMgr.stop();
492: } finally {
493: socket.close();
494: }
495: }
496: } catch (IOException ignore) {
497: }
498: }
499:
500: private String getProperty(String name) {
501: String value = null;
502: try {
503: value = System.getProperty(name);
504: } catch (Throwable ignored) {
505: log.trace("Cannot retrieve system property " + name);
506: }
507: return value;
508: }
509: }
|