0001: /*
0002: * JBoss, Home of Professional Open Source.
0003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
0004: * as indicated by the @author tags. See the copyright.txt file in the
0005: * distribution for a full listing of individual contributors.
0006: *
0007: * This is free software; you can redistribute it and/or modify it
0008: * under the terms of the GNU Lesser General Public License as
0009: * published by the Free Software Foundation; either version 2.1 of
0010: * the License, or (at your option) any later version.
0011: *
0012: * This software is distributed in the hope that it will be useful,
0013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
0014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
0015: * Lesser General Public License for more details.
0016: *
0017: * You should have received a copy of the GNU Lesser General Public
0018: * License along with this software; if not, write to the Free
0019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
0020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
0021: */
0022: package org.jboss.mq;
0023:
0024: import java.io.IOException;
0025: import java.io.Serializable;
0026: import java.util.Arrays;
0027: import java.util.HashMap;
0028: import java.util.HashSet;
0029: import java.util.LinkedList;
0030:
0031: import javax.jms.ConnectionMetaData;
0032: import javax.jms.Destination;
0033: import javax.jms.ExceptionListener;
0034: import javax.jms.IllegalStateException;
0035: import javax.jms.JMSException;
0036: import javax.jms.JMSSecurityException;
0037: import javax.jms.Queue;
0038: import javax.jms.TemporaryQueue;
0039: import javax.jms.TemporaryTopic;
0040: import javax.transaction.xa.Xid;
0041:
0042: import org.jboss.logging.Logger;
0043: import org.jboss.mq.il.ClientILService;
0044: import org.jboss.mq.il.ServerIL;
0045: import org.jboss.util.UnreachableStatementException;
0046:
0047: import EDU.oswego.cs.dl.util.concurrent.ClockDaemon;
0048: import EDU.oswego.cs.dl.util.concurrent.Semaphore;
0049: import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
0050: import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
0051:
0052: /**
0053: * This class implements javax.jms.Connection.
0054: *
0055: * <p>
0056: * It is also the gateway through wich all calls to the JMS server is done. To
0057: * do its work it needs a ServerIL to invoke (@see
0058: * org.jboss.mq.server.ServerIL).
0059: * </p>
0060: *
0061: * <p>
0062: * The (new from february 2002) logic for clientID is the following: if logging
0063: * in with a user and passwork a preconfigured clientID may be automatically
0064: * delivered from the server.
0065: * </p>
0066: *
0067: * <p>
0068: * If the client wants to set it's own clientID it must do so on a connection
0069: * wich does not have a prefonfigured clientID and it must do so before it
0070: * calls any other methods on the connection (even getClientID()). It is not
0071: * allowable to use a clientID that either looks like JBossMQ internal one
0072: * (beginning with ID) or a clientID that is allready in use by someone, or a
0073: * clientID that is already preconfigured in the server.
0074: * </p>
0075: *
0076: * <p>
0077: * If a preconfigured ID is not get, or a valid one is not set, the server will
0078: * set an internal ID. This ID is NEVER possible to use for durable
0079: * subscriptions. If a prefconfigured ID or one manually set is possible to use
0080: * to create a durable subscriptions is governed by the security configuration
0081: * of JBossMQ. In the default setup, only preconfigured clientID's are possible
0082: * to use. If using a SecurityManager, permissions to create a surable
0083: * subscriptions is * the resiult of a combination of the following:
0084: * </p>
0085: * <p>- The clientID is not one of JBossMQ's internal.
0086: * </p>
0087: * <p>- The user is authenticated and has a role that has create set to true
0088: * in the security config of the destination.
0089: * </p>
0090: *
0091: * <p>
0092: * Notes for JBossMQ developers: All calls, except close(), that is possible to
0093: * do on a connection must call checkClientID()
0094: * </p>
0095: *
0096: * @author Norbert Lataille (Norbert.Lataille@m4x.org)
0097: * @author Hiram Chirino (Cojonudo14@hotmail.com)
0098: * @author <a href="pra@tim.se">Peter Antman</a>
0099: * @author <a href="mailto:adrian@jboss.org">Adrian Brock</a>
0100: * @version $Revision: 61739 $
0101: */
0102: public abstract class Connection implements Serializable,
0103: javax.jms.Connection {
0104: /** The serialVersionUID */
0105: private static final long serialVersionUID = 87938199839407082L;
0106:
0107: /** The threadGroup */
0108: private static ThreadGroup threadGroup = new ThreadGroup(
0109: "JBossMQ Client Threads");
0110:
0111: /** The log */
0112: static Logger log = Logger.getLogger(Connection.class);
0113:
0114: /** Whether trace is enabled */
0115: static boolean trace = log.isTraceEnabled();
0116:
0117: /** Manages the thread that pings the connection to see if it is 'alive' */
0118: static protected ClockDaemon clockDaemon = new ClockDaemon();
0119:
0120: /** Maps a destination to a LinkedList of Subscriptions */
0121: public HashMap destinationSubscriptions = new HashMap();
0122:
0123: /** Maps a subscription id to a Subscription */
0124: public HashMap subscriptions = new HashMap();
0125:
0126: /** Is the connection stopped ? */
0127: public boolean modeStop;
0128:
0129: /** This is our connection to the JMS server */
0130: protected ServerIL serverIL;
0131:
0132: /** This is the clientID */
0133: protected String clientID;
0134:
0135: /** The connection token is used to identify our connection to the server. */
0136: protected ConnectionToken connectionToken;
0137:
0138: /** The object that sets up the client IL */
0139: protected ClientILService clientILService;
0140:
0141: /** How often to ping the connection */
0142: protected long pingPeriod = 1000 * 60;
0143:
0144: /** This field is reset when a ping is sent, set when ponged. */
0145: protected boolean ponged = true;
0146:
0147: /** This is used to know when the PingTask is running */
0148: Semaphore pingTaskSemaphore = new Semaphore(1);
0149:
0150: /** Identifies the PinkTask in the ClockDaemon */
0151: Object pingTaskId;
0152:
0153: /** Set a soon as close() is called on the connection. */
0154: private SynchronizedBoolean closing = new SynchronizedBoolean(false);
0155:
0156: /** Whether setClientId is Allowed */
0157: private volatile boolean setClientIdAllowed = true;
0158:
0159: /** LinkedList of all created sessions by this connection */
0160: HashSet createdSessions;
0161:
0162: /** Numbers subscriptions */
0163: int subscriptionCounter = Integer.MIN_VALUE;
0164:
0165: /** The lock for subscriptionCounter */
0166: Object subCountLock = new Object();
0167:
0168: /** Is the connection closed */
0169: private SynchronizedBoolean closed = new SynchronizedBoolean(false);
0170:
0171: /** Used to control tranactions */
0172: SpyXAResourceManager spyXAResourceManager;
0173:
0174: /** The class that created this connection */
0175: GenericConnectionFactory genericConnectionFactory;
0176:
0177: /** Last message ID returned */
0178: private int lastMessageID;
0179:
0180: /** the exceptionListener */
0181: private ExceptionListener exceptionListener;
0182:
0183: /** The exception listener lock */
0184: private Object elLock = new Object();
0185:
0186: /** The exception listener invocation thread */
0187: private Thread elThread;
0188:
0189: /** Used in message id generation */
0190: private StringBuffer sb = new StringBuffer();
0191:
0192: /** Used in message id generation */
0193: private char[] charStack = new char[22];
0194:
0195: /** The next session id */
0196: String sessionId;
0197:
0198: /** Temporary destinations created by this connection */
0199: protected HashSet temps = new HashSet();
0200:
0201: static {
0202: log.debug("Setting the clockDaemon's thread factory");
0203: clockDaemon.setThreadFactory(new ThreadFactory() {
0204: public Thread newThread(Runnable r) {
0205: Thread t = new Thread(getThreadGroup(), r,
0206: "Connection Monitor Thread");
0207: t.setDaemon(true);
0208: return t;
0209: }
0210: });
0211: }
0212:
0213: public static ThreadGroup getThreadGroup() {
0214: if (threadGroup.isDestroyed())
0215: threadGroup = new ThreadGroup("JBossMQ Client Threads");
0216: return threadGroup;
0217: }
0218:
0219: /**
0220: * Create a new Connection
0221: *
0222: * @param userName the username
0223: * @param password the password
0224: * @param genericConnectionFactory the constructing class
0225: * @throws JMSException for any error
0226: */
0227: Connection(String userName, String password,
0228: GenericConnectionFactory genericConnectionFactory)
0229: throws JMSException {
0230: //Set the attributes
0231: createdSessions = new HashSet();
0232: connectionToken = null;
0233: lastMessageID = 0;
0234: modeStop = true;
0235:
0236: if (trace)
0237: log.trace("Connection Initializing userName=" + userName
0238: + " " + this );
0239: this .genericConnectionFactory = genericConnectionFactory;
0240: genericConnectionFactory.initialise(this );
0241:
0242: // Connect to the server
0243: if (trace)
0244: log.trace("Getting the serverIL " + this );
0245: serverIL = genericConnectionFactory.createServerIL();
0246: if (trace)
0247: log.trace("serverIL=" + serverIL + " " + this );
0248:
0249: // Register ourselves as a client
0250: try {
0251: authenticate(userName, password);
0252:
0253: if (userName != null)
0254: askForAnID(userName, password);
0255:
0256: startILService();
0257: } catch (Throwable t) {
0258: // Client registeration failed, close the connection
0259: try {
0260: serverIL.connectionClosing(null);
0261: } catch (Throwable t2) {
0262: log.debug("Error closing the connection", t2);
0263: }
0264:
0265: SpyJMSException.rethrowAsJMSException(
0266: "Failed to create connection", t);
0267: }
0268:
0269: // Finish constructing the connection
0270: try {
0271: if (trace)
0272: log.trace("Creating XAResourceManager " + this );
0273:
0274: // Setup the XA Resource manager,
0275: spyXAResourceManager = new SpyXAResourceManager(this );
0276:
0277: if (trace)
0278: log.trace("Starting the ping thread " + this );
0279: startPingThread();
0280:
0281: if (trace)
0282: log
0283: .trace("Connection establishment successful "
0284: + this );
0285: } catch (Throwable t) {
0286: // Could not complete the connection, tidy up
0287: // the server and client ILs.
0288: try {
0289: serverIL.connectionClosing(connectionToken);
0290: } catch (Throwable t2) {
0291: log.debug("Error closing the connection", t2);
0292: }
0293: try {
0294: stopILService();
0295: } catch (Throwable t2) {
0296: log.debug("Error stopping the client IL", t2);
0297: }
0298:
0299: SpyJMSException.rethrowAsJMSException(
0300: "Failed to create connection", t);
0301: }
0302: }
0303:
0304: /**
0305: * Create a new Connection
0306: *
0307: * @param genericConnectionFactory the constructing class
0308: * @throws JMSException for any error
0309: */
0310: Connection(GenericConnectionFactory genericConnectionFactory)
0311: throws JMSException {
0312: this (null, null, genericConnectionFactory);
0313: }
0314:
0315: /**
0316: * Gets the ServerIL attribute of the Connection object
0317: *
0318: * @return The ServerIL value
0319: */
0320: public ServerIL getServerIL() {
0321: return serverIL;
0322: }
0323:
0324: /**
0325: * Notification from the server that the connection is closed
0326: */
0327: public void asynchClose() {
0328: // If we receive a close and we did not initiate it, then fire the exception listener
0329: if (closing.get() == false)
0330: asynchFailure(
0331: "Asynchronous close from server.",
0332: new IOException(
0333: "Close request from the server or transport layer."));
0334: }
0335:
0336: /**
0337: * Called by a TemporaryDestination which is going to be deleted()
0338: *
0339: * @param dest the temporary destination
0340: */
0341: public void asynchDeleteTemporaryDestination(SpyDestination dest) {
0342: if (trace)
0343: log.trace("Deleting temporary destination " + dest);
0344: try {
0345: deleteTemporaryDestination(dest);
0346: } catch (Throwable t) {
0347: asynchFailure("Error deleting temporary destination "
0348: + dest, t);
0349: }
0350: }
0351:
0352: /**
0353: * Gets the first consumer that is listening to a destination.
0354: *
0355: * @param requests the receive requests
0356: */
0357: public void asynchDeliver(ReceiveRequest requests[]) {
0358: // If we are closing the connection, the server will nack the messages
0359: if (closing.get())
0360: return;
0361:
0362: if (trace)
0363: log.trace("Async deliver requests="
0364: + Arrays.asList(requests) + " " + this );
0365:
0366: try {
0367: for (int i = 0; i < requests.length; i++) {
0368: ReceiveRequest r = requests[i];
0369: if (trace)
0370: log.trace("Processing request=" + r + " " + this );
0371:
0372: SpyConsumer consumer = (SpyConsumer) subscriptions
0373: .get(r.subscriptionId);
0374: r.message.createAcknowledgementRequest(r.subscriptionId
0375: .intValue());
0376:
0377: if (consumer == null) {
0378: send(r.message.getAcknowledgementRequest(false));
0379: log
0380: .debug("WARNING: NACK issued due to non existent subscription "
0381: + r.message.header.messageId);
0382: continue;
0383: }
0384:
0385: if (trace)
0386: log.trace("Delivering messageid="
0387: + r.message.header.messageId
0388: + " to consumer=" + consumer);
0389:
0390: consumer.addMessage(r.message);
0391: }
0392: } catch (Throwable t) {
0393: asynchFailure("Error during async delivery", t);
0394: }
0395: }
0396:
0397: /**
0398: * Notification of a failure on this connection
0399: *
0400: * @param reason the reason for the failure
0401: * @param t the throwable
0402: */
0403: public void asynchFailure(String reason, Throwable t) {
0404: if (trace)
0405: log.trace("Notified of failure reason=" + reason + " "
0406: + this , t);
0407:
0408: // Exceptions due to closing will be ignored.
0409: if (closing.get())
0410: return;
0411:
0412: JMSException excep = SpyJMSException.getAsJMSException(reason,
0413: t);
0414:
0415: synchronized (elLock) {
0416: ExceptionListener el = exceptionListener;
0417: if (el != null && elThread == null) {
0418: try {
0419: Runnable run = new ExceptionListenerRunnable(el,
0420: excep);
0421: elThread = new Thread(getThreadGroup(), run,
0422: "ExceptionListener " + this );
0423: elThread.setDaemon(false);
0424: elThread.start();
0425: } catch (Throwable t1) {
0426: log.warn("Connection failure: ", excep);
0427: log
0428: .warn(
0429: "Unable to start exception listener thread: ",
0430: t1);
0431: }
0432: } else if (elThread != null)
0433: log
0434: .warn(
0435: "Connection failure, already in the exception listener",
0436: excep);
0437: else
0438: log
0439: .warn(
0440: "Connection failure, use javax.jms.Connection.setExceptionListener() to handle this error and reconnect",
0441: excep);
0442: }
0443: }
0444:
0445: /**
0446: * Invoked when the server pong us
0447: *
0448: * @param serverTime the server time
0449: */
0450: public void asynchPong(long serverTime) {
0451: if (trace)
0452: log.trace("PONG serverTime=" + serverTime + " " + this );
0453: ponged = true;
0454: }
0455:
0456: /**
0457: * Called by a TemporaryDestination which is going to be deleted
0458: *
0459: * @param dest the temporary destination
0460: * @exception JMSException for any error
0461: */
0462: public void deleteTemporaryDestination(SpyDestination dest)
0463: throws JMSException {
0464: checkClosed();
0465: if (trace)
0466: log.trace("DeleteDestination dest=" + dest + " " + this );
0467: try {
0468: //Ask the broker to delete() this TemporaryDestination
0469: serverIL.deleteTemporaryDestination(connectionToken, dest);
0470:
0471: //Remove it from the destinations list
0472: synchronized (subscriptions) {
0473: destinationSubscriptions.remove(dest);
0474: }
0475:
0476: // Remove it from the temps list
0477: synchronized (temps) {
0478: temps.remove(dest);
0479: }
0480: } catch (Throwable t) {
0481:
0482: SpyJMSException.rethrowAsJMSException(
0483: "Cannot delete the TemporaryDestination", t);
0484: }
0485: }
0486:
0487: public void setClientID(String cID) throws JMSException {
0488: checkClosed();
0489: if (clientID != null)
0490: throw new IllegalStateException(
0491: "The connection has already a clientID");
0492: if (setClientIdAllowed == false)
0493: throw new IllegalStateException(
0494: "SetClientID was not called emediately after creation of connection");
0495:
0496: if (trace)
0497: log.trace("SetClientID clientID=" + clientID + " " + this );
0498:
0499: try {
0500: serverIL.checkID(cID);
0501: } catch (Throwable t) {
0502: SpyJMSException.rethrowAsJMSException(
0503: "Cannot connect to the JMSServer", t);
0504: }
0505:
0506: clientID = cID;
0507: connectionToken.setClientID(clientID);
0508: }
0509:
0510: public String getClientID() throws JMSException {
0511: checkClosed();
0512: return clientID;
0513: }
0514:
0515: public ExceptionListener getExceptionListener() throws JMSException {
0516: checkClosed();
0517: checkClientID();
0518: return exceptionListener;
0519: }
0520:
0521: public void setExceptionListener(ExceptionListener listener)
0522: throws JMSException {
0523: checkClosed();
0524: checkClientID();
0525:
0526: exceptionListener = listener;
0527: }
0528:
0529: public ConnectionMetaData getMetaData() throws JMSException {
0530: checkClosed();
0531: checkClientID();
0532:
0533: return new SpyConnectionMetaData();
0534: }
0535:
0536: public synchronized void close() throws JMSException {
0537: if (closed.get())
0538: return;
0539: if (trace)
0540: log.trace("Closing connection " + this );
0541:
0542: closing.set(true);
0543:
0544: // We don't want to notify the exception listener
0545: exceptionListener = null;
0546:
0547: // The first exception
0548: JMSException exception = null;
0549:
0550: try {
0551: doStop();
0552: } catch (Throwable t) {
0553: log.trace("Error during stop", t);
0554: }
0555:
0556: if (trace)
0557: log.trace("Closing sessions " + this );
0558: Object[] vect = null;
0559: synchronized (createdSessions) {
0560: vect = createdSessions.toArray();
0561: }
0562: for (int i = 0; i < vect.length; i++) {
0563: SpySession session = (SpySession) vect[i];
0564: try {
0565: session.close();
0566: } catch (Throwable t) {
0567: if (trace)
0568: log.trace("Error closing session " + session, t);
0569: }
0570: }
0571: if (trace)
0572: log.trace("Closed sessions " + this );
0573:
0574: if (trace)
0575: log.trace("Notifying the server of close " + this );
0576: try {
0577: serverIL.connectionClosing(connectionToken);
0578: } catch (Throwable t) {
0579: log.trace("Cannot close properly the connection", t);
0580: }
0581:
0582: if (trace)
0583: log.trace("Stopping ping thread " + this );
0584: try {
0585: stopPingThread();
0586: } catch (Throwable t) {
0587: if (exception == null)
0588: exception = SpyJMSException.getAsJMSException(
0589: "Cannot stop the ping thread", t);
0590: }
0591:
0592: if (trace)
0593: log.trace("Stopping the ClientIL service " + this );
0594: try {
0595: stopILService();
0596: } catch (Throwable t) {
0597: log.trace("Cannot stop the client il service", t);
0598: }
0599:
0600: // Only set the closed flag after all the objects that depend
0601: // on this connection have been closed.
0602: closed.set(true);
0603:
0604: if (trace)
0605: log.trace("Disconnected from server " + this );
0606:
0607: // Throw the first exception
0608: if (exception != null)
0609: throw exception;
0610: }
0611:
0612: public void start() throws JMSException {
0613: checkClosed();
0614: checkClientID();
0615:
0616: if (modeStop == false)
0617: return;
0618: modeStop = false;
0619:
0620: if (trace)
0621: log.trace("Starting connection " + this );
0622:
0623: try {
0624: serverIL.setEnabled(connectionToken, true);
0625: } catch (Throwable t) {
0626: SpyJMSException.rethrowAsJMSException(
0627: "Cannot enable the connection with the JMS server",
0628: t);
0629: }
0630: }
0631:
0632: public void stop() throws JMSException {
0633: checkClosed();
0634: checkClientID();
0635: doStop();
0636: }
0637:
0638: public String toString() {
0639: StringBuffer buffer = new StringBuffer();
0640: buffer.append("Connection@").append(
0641: System.identityHashCode(this ));
0642: buffer.append('[');
0643: if (connectionToken != null)
0644: buffer.append("token=").append(connectionToken);
0645: else
0646: buffer.append("clientID=").append(clientID);
0647: if (closed.get())
0648: buffer.append(" CLOSED");
0649: else if (closing.get())
0650: buffer.append(" CLOSING");
0651: buffer.append(" rcvstate=");
0652: if (modeStop)
0653: buffer.append("STOPPED");
0654: else
0655: buffer.append("STARTED");
0656: buffer.append(']');
0657: return buffer.toString();
0658: }
0659:
0660: /**
0661: * Get the next message id
0662: * <p>
0663: *
0664: * All longs are less than 22 digits long
0665: * <p>
0666: *
0667: * Note that in this routine we assume that System.currentTimeMillis() is
0668: * non-negative always be non-negative (so don't set lastMessageID to a
0669: * positive for a start).
0670: *
0671: * @return the next message id
0672: * @throws JMSException for any error
0673: */
0674: String getNewMessageID() throws JMSException {
0675: checkClosed();
0676: synchronized (sb) {
0677: sb.setLength(0);
0678: sb.append(clientID);
0679: sb.append('-');
0680: long time = System.currentTimeMillis();
0681: int count = 0;
0682: do {
0683: charStack[count] = (char) ('0' + (time % 10));
0684: time = time / 10;
0685: ++count;
0686: } while (time != 0);
0687: --count;
0688: for (; count >= 0; --count) {
0689: sb.append(charStack[count]);
0690: }
0691: ++lastMessageID;
0692: //avoid having to deal with negative numbers.
0693: if (lastMessageID < 0) {
0694: lastMessageID = 0;
0695: }
0696: int id = lastMessageID;
0697: count = 0;
0698: do {
0699: charStack[count] = (char) ('0' + (id % 10));
0700: id = id / 10;
0701: ++count;
0702: } while (id != 0);
0703: --count;
0704: for (; count >= 0; --count) {
0705: sb.append(charStack[count]);
0706: }
0707: return sb.toString();
0708: }
0709: }
0710:
0711: /**
0712: * A new Consumer has been created.
0713: * <p>
0714: * We have to handle security issues, a consumer may actually not be allowed
0715: * to be created
0716: *
0717: * @param consumer the consumer added
0718: * @throws JMSException for any error
0719: */
0720: void addConsumer(SpyConsumer consumer) throws JMSException {
0721: checkClosed();
0722: Subscription req = consumer.getSubscription();
0723: synchronized (subCountLock) {
0724: req.subscriptionId = subscriptionCounter++;
0725: }
0726: req.connectionToken = connectionToken;
0727: if (trace)
0728: log.trace("addConsumer sub=" + req);
0729:
0730: try {
0731: synchronized (subscriptions) {
0732: subscriptions.put(new Integer(req.subscriptionId),
0733: consumer);
0734:
0735: LinkedList ll = (LinkedList) destinationSubscriptions
0736: .get(req.destination);
0737: if (ll == null) {
0738: ll = new LinkedList();
0739: destinationSubscriptions.put(req.destination, ll);
0740: }
0741:
0742: ll.add(consumer);
0743: }
0744:
0745: serverIL.subscribe(connectionToken, req);
0746: } catch (JMSSecurityException ex) {
0747: removeConsumerInternal(consumer);
0748: throw ex;
0749: } catch (Throwable t) {
0750: SpyJMSException.rethrowAsJMSException(
0751: "Cannot subscribe to this Destination: ", t);
0752: }
0753: }
0754:
0755: /**
0756: * Browse a queue
0757: *
0758: * @param queue the queue
0759: * @param selector the selector
0760: * @return an array of messages
0761: * @exception JMSException for any error
0762: */
0763: SpyMessage[] browse(Queue queue, String selector)
0764: throws JMSException {
0765: checkClosed();
0766: if (trace)
0767: log.trace("Browsing queue=" + queue + " selector="
0768: + selector + " " + this );
0769:
0770: try {
0771: return serverIL.browse(connectionToken, queue, selector);
0772: } catch (Throwable t) {
0773: SpyJMSException.rethrowAsJMSException(
0774: "Cannot browse the Queue", t);
0775: throw new UnreachableStatementException();
0776: }
0777: }
0778:
0779: /**
0780: * Ping the server
0781: *
0782: * @param clientTime the start of the ping
0783: * @throws JMSException for any error
0784: */
0785: void pingServer(long clientTime) throws JMSException {
0786: checkClosed();
0787: trace = log.isTraceEnabled();
0788: if (trace)
0789: log.trace("PING " + clientTime + " " + this );
0790:
0791: try {
0792: serverIL.ping(connectionToken, clientTime);
0793: } catch (Throwable t) {
0794: SpyJMSException.rethrowAsJMSException(
0795: "Cannot ping the JMS server", t);
0796: }
0797: }
0798:
0799: /**
0800: * Receive a message
0801: *
0802: * @param sub the subscription
0803: * @param wait the wait time
0804: * @return the message or null if there isn't one
0805: * @throws JMSException for any error
0806: */
0807: SpyMessage receive(Subscription sub, long wait) throws JMSException {
0808: checkClosed();
0809: if (trace)
0810: log.trace("Receive subscription=" + sub + " wait=" + wait);
0811:
0812: try {
0813: SpyMessage message = serverIL.receive(connectionToken,
0814: sub.subscriptionId, wait);
0815: if (message != null)
0816: message
0817: .createAcknowledgementRequest(sub.subscriptionId);
0818: return message;
0819: } catch (Throwable t) {
0820: SpyJMSException.rethrowAsJMSException("Cannot receive ", t);
0821: throw new UnreachableStatementException();
0822: }
0823: }
0824:
0825: /**
0826: * Remove a consumer
0827: *
0828: * @param consumer the consumer
0829: * @throws JMSException for any error
0830: */
0831: void removeConsumer(SpyConsumer consumer) throws JMSException {
0832: checkClosed();
0833: Subscription req = consumer.getSubscription();
0834: if (trace)
0835: log.trace("removeConsumer req=" + req);
0836:
0837: try {
0838: serverIL.unsubscribe(connectionToken, req.subscriptionId);
0839:
0840: removeConsumerInternal(consumer);
0841: } catch (Throwable t) {
0842: SpyJMSException.rethrowAsJMSException(
0843: "Cannot unsubscribe to this destination", t);
0844: }
0845:
0846: }
0847:
0848: /**
0849: * Send a message to the server
0850: *
0851: * @param mes the message
0852: * @throws JMSException for any error
0853: */
0854: void sendToServer(SpyMessage mes) throws JMSException {
0855: checkClosed();
0856: if (trace)
0857: log.trace("SendToServer message=" + mes.header.jmsMessageID
0858: + " " + this );
0859:
0860: try {
0861: serverIL.addMessage(connectionToken, mes);
0862: } catch (Throwable t) {
0863: SpyJMSException.rethrowAsJMSException(
0864: "Cannot send a message to the JMS server", t);
0865: }
0866: }
0867:
0868: /**
0869: * Closing a session
0870: *
0871: * @param who the session
0872: */
0873: void sessionClosing(SpySession who) {
0874: if (trace)
0875: log.trace("Closing session " + who);
0876:
0877: synchronized (createdSessions) {
0878: createdSessions.remove(who);
0879: }
0880:
0881: //This session should not be in the "destinations" object anymore.
0882: //We could check this, though
0883: }
0884:
0885: void unsubscribe(DurableSubscriptionID id) throws JMSException {
0886: if (trace)
0887: log.trace("Unsubscribe id=" + id + " " + this );
0888:
0889: try {
0890: serverIL.destroySubscription(connectionToken, id);
0891: } catch (Throwable t) {
0892: SpyJMSException.rethrowAsJMSException(
0893: "Cannot destroy durable subscription " + id, t);
0894: }
0895: }
0896:
0897: /**
0898: * Check a tempoary destination
0899: *
0900: * @param destination the destination
0901: */
0902: void checkTemporary(Destination destination) throws JMSException {
0903: if (destination instanceof TemporaryQueue
0904: || destination instanceof TemporaryTopic) {
0905: synchronized (temps) {
0906: if (temps.contains(destination) == false)
0907: throw new JMSException(
0908: "Cannot create a consumer for a temporary destination from a different session. "
0909: + destination);
0910: }
0911: }
0912: }
0913:
0914: /**
0915: * Check that a clientID exists. If not get one from server.
0916: *
0917: * Also sets the setClientIdAllowed to false.
0918: *
0919: * Check clientId, must be called by all public methods on the
0920: * jacax.jmx.Connection interface and its children.
0921: *
0922: * @exception JMSException if clientID is null as post condition
0923: */
0924: synchronized protected void checkClientID() throws JMSException {
0925: if (setClientIdAllowed == false)
0926: return;
0927:
0928: setClientIdAllowed = false;
0929: if (trace)
0930: log.trace("Checking clientID=" + clientID + " " + this );
0931: if (clientID == null) {
0932: askForAnID();//Request a random one
0933: if (clientID == null)
0934: throw new JMSException("Could not get a clientID");
0935: connectionToken.setClientID(clientID);
0936:
0937: if (trace)
0938: log.trace("ClientID established " + this );
0939: }
0940: }
0941:
0942: /**
0943: * Ask the server for an id
0944: *
0945: * @exception JMSException for any error
0946: */
0947: protected void askForAnID() throws JMSException {
0948: if (trace)
0949: log.trace("Ask for an id " + this );
0950:
0951: try {
0952: if (clientID == null)
0953: clientID = serverIL.getID();
0954: } catch (Throwable t) {
0955: SpyJMSException.rethrowAsJMSException(
0956: "Cannot get a client ID", t);
0957: }
0958: }
0959:
0960: /**
0961: * Ask the server for an id
0962: *
0963: * @param userName the user
0964: * @param password the password
0965: * @exception JMSException for any error
0966: */
0967: protected void askForAnID(String userName, String password)
0968: throws JMSException {
0969: if (trace)
0970: log.trace("Ask for an id user=" + userName + " " + this );
0971:
0972: try {
0973: String configuredClientID = serverIL.checkUser(userName,
0974: password);
0975: if (configuredClientID != null)
0976: clientID = configuredClientID;
0977: } catch (Throwable t) {
0978: SpyJMSException.rethrowAsJMSException(
0979: "Cannot get a client ID", t);
0980: }
0981: }
0982:
0983: /**
0984: * Authenticate a user
0985: *
0986: * @param userName the user
0987: * @param password the password
0988: * @throws JMSException for any error
0989: */
0990: protected void authenticate(String userName, String password)
0991: throws JMSException {
0992: if (trace)
0993: log.trace("Authenticating user " + userName + " " + this );
0994: try {
0995: sessionId = serverIL.authenticate(userName, password);
0996: } catch (Throwable t) {
0997: SpyJMSException.rethrowAsJMSException(
0998: "Cannot authenticate user", t);
0999: }
1000: }
1001:
1002: // used to acknowledge a message
1003: /**
1004: * Acknowledge/Nack a message
1005: *
1006: * @param item the acknowledgement
1007: * @exception JMSException for any error
1008: */
1009: protected void send(AcknowledgementRequest item)
1010: throws JMSException {
1011: checkClosed();
1012: if (trace)
1013: log.trace("Acknowledge item=" + item + " " + this );
1014:
1015: try {
1016: serverIL.acknowledge(connectionToken, item);
1017: } catch (Throwable t) {
1018: SpyJMSException.rethrowAsJMSException(
1019: "Cannot acknowlege a message", t);
1020: }
1021: }
1022:
1023: /**
1024: * Commit/rollback
1025: *
1026: * @param transaction the transaction request
1027: * @exception JMSException for any error
1028: */
1029: protected void send(TransactionRequest transaction)
1030: throws JMSException {
1031: checkClosed();
1032: if (trace)
1033: log.trace("Transact request=" + transaction + " " + this );
1034:
1035: try {
1036: serverIL.transact(connectionToken, transaction);
1037: } catch (Throwable t) {
1038: SpyJMSException.rethrowAsJMSException(
1039: "Cannot process a transaction", t);
1040: }
1041: }
1042:
1043: /**
1044: * Recover
1045: *
1046: * @param flags the flags
1047: * @throws JMSException for any error
1048: */
1049: protected Xid[] recover(int flags) throws JMSException {
1050: checkClosed();
1051: if (trace)
1052: log.trace("Recover flags=" + flags + " " + this );
1053:
1054: try {
1055: if (serverIL instanceof Recoverable) {
1056: Recoverable recoverableIL = (Recoverable) serverIL;
1057: return recoverableIL.recover(connectionToken, flags);
1058: }
1059: } catch (Throwable t) {
1060: SpyJMSException.rethrowAsJMSException("Cannot recover", t);
1061: }
1062:
1063: log.warn(serverIL + " does not implement "
1064: + Recoverable.class.getName());
1065: return new Xid[0];
1066: }
1067:
1068: /**
1069: * Start the il
1070: *
1071: * @exception JMSException for any error
1072: */
1073: protected void startILService() throws JMSException {
1074: if (trace)
1075: log.trace("Starting the client il " + this );
1076: try {
1077: clientILService = genericConnectionFactory
1078: .createClientILService(this );
1079: clientILService.start();
1080: if (trace)
1081: log.trace("Using client id " + clientILService + " "
1082: + this );
1083: connectionToken = new ConnectionToken(clientID,
1084: clientILService.getClientIL(), sessionId);
1085: serverIL.setConnectionToken(connectionToken);
1086: } catch (Throwable t) {
1087: SpyJMSException.rethrowAsJMSException(
1088: "Cannot start a the client IL service", t);
1089: }
1090: }
1091:
1092: /**
1093: * Stop the il
1094: *
1095: * @exception JMSException for any error
1096: */
1097: protected void stopILService() throws JMSException {
1098: try {
1099: clientILService.stop();
1100: } catch (Throwable t) {
1101: SpyJMSException.rethrowAsJMSException(
1102: "Cannot stop a the client IL service", t);
1103: }
1104: }
1105:
1106: /**
1107: * Stop delivery
1108: *
1109: * @param consumer the consumer
1110: */
1111: public void doStop() throws JMSException {
1112: if (modeStop)
1113: return;
1114: modeStop = true;
1115:
1116: if (trace)
1117: log.trace("Stopping connection " + this );
1118:
1119: try {
1120: serverIL.setEnabled(connectionToken, false);
1121: } catch (Throwable t) {
1122: SpyJMSException
1123: .rethrowAsJMSException(
1124: "Cannot disable the connection with the JMS server",
1125: t);
1126: }
1127: }
1128:
1129: /**
1130: * Remove a consumer
1131: *
1132: * @param consumer the consumer
1133: */
1134: private void removeConsumerInternal(SpyConsumer consumer) {
1135: synchronized (subscriptions) {
1136: Subscription req = consumer.getSubscription();
1137: subscriptions.remove(new Integer(req.subscriptionId));
1138:
1139: LinkedList ll = (LinkedList) destinationSubscriptions
1140: .get(req.destination);
1141: if (ll != null) {
1142: ll.remove(consumer);
1143: if (ll.size() == 0) {
1144: destinationSubscriptions.remove(req.destination);
1145: }
1146: }
1147: }
1148: }
1149:
1150: /**
1151: * Check whether we are closed
1152: *
1153: * @throws IllegalStateException when the session is closed
1154: */
1155: protected void checkClosed() throws IllegalStateException {
1156: if (closed.get())
1157: throw new IllegalStateException("The connection is closed");
1158: }
1159:
1160: /**
1161: * Start the ping thread
1162: */
1163: private void startPingThread() {
1164: // Ping thread does not need to be running if the ping period is 0.
1165: if (pingPeriod == 0)
1166: return;
1167: pingTaskId = clockDaemon.executePeriodically(pingPeriod,
1168: new PingTask(), true);
1169: }
1170:
1171: /**
1172: * Stop the ping thread
1173: */
1174: private void stopPingThread() {
1175: // Ping thread was not running if ping period is 0.
1176: if (pingPeriod == 0)
1177: return;
1178:
1179: ClockDaemon.cancel(pingTaskId);
1180:
1181: //Aquire the Semaphore to make sure the ping task is not running.
1182: try {
1183: pingTaskSemaphore.attempt(1000 * 10);
1184: } catch (InterruptedException e) {
1185: Thread.currentThread().interrupt();
1186: }
1187: }
1188:
1189: /**
1190: * The ping task
1191: */
1192: class PingTask implements Runnable {
1193: public void run() {
1194: // Don't bother if we are closing
1195: if (closing.get())
1196: return;
1197:
1198: try {
1199: // If we can't aquire the semaphore then it
1200: // almost certainly means the close has got it
1201: // Try for 10 seconds to make sure the problem
1202: // is not just a long garbage collection that has suspended threads
1203: if (pingTaskSemaphore.attempt(1000 * 10) == false)
1204: return;
1205: } catch (InterruptedException e) {
1206: log.debug("Interrupted requesting ping semaphore");
1207: return;
1208: }
1209: try {
1210: if (ponged == false) {
1211: // Server did not pong use with in the timeout
1212: // period.. Assuming the connection is dead.
1213: throw new SpyJMSException("No pong received",
1214: new IOException("ping timeout."));
1215: }
1216:
1217: ponged = false;
1218: pingServer(System.currentTimeMillis());
1219: } catch (Throwable t) {
1220: asynchFailure("Unexpected ping failure", t);
1221: } finally {
1222: pingTaskSemaphore.release();
1223: }
1224: }
1225: }
1226:
1227: /**
1228: * The Exception listener runnable
1229: */
1230: class ExceptionListenerRunnable implements Runnable {
1231: ExceptionListener el;
1232: JMSException excep;
1233:
1234: /**
1235: * Create a new ExceptionListener runnable
1236: *
1237: * @param el the exception exception
1238: * @param excep the jms exception
1239: */
1240: public ExceptionListenerRunnable(ExceptionListener el,
1241: JMSException excep) {
1242: this .el = el;
1243: this .excep = excep;
1244: }
1245:
1246: public void run() {
1247: try {
1248: synchronized (elLock) {
1249: el.onException(excep);
1250: }
1251: } catch (Throwable t) {
1252: log.warn("Connection failure: ", excep);
1253: log.warn("Exception listener ended abnormally: ", t);
1254: }
1255:
1256: synchronized (elLock) {
1257: elThread = null;
1258: }
1259: }
1260: }
1261: }
|