001: /*
002: * JOnAS: Java(TM) Open Application Server
003: * Copyright (C) 1999 Bull S.A.
004: * Contact: jonas-team@objectweb.org
005: *
006: * This library is free software; you can redistribute it and/or
007: * modify it under the terms of the GNU Lesser General Public
008: * License as published by the Free Software Foundation; either
009: * version 2.1 of the License, or any later version.
010: *
011: * This library is distributed in the hope that it will be useful,
012: * but WITHOUT ANY WARRANTY; without even the implied warranty of
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
014: * Lesser General Public License for more details.
015: *
016: * You should have received a copy of the GNU Lesser General Public
017: * License along with this library; if not, write to the Free Software
018: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
019: * USA
020: *
021: * --------------------------------------------------------------------------
022: * $Id: JConnection.java 4663 2004-04-28 15:31:21Z durieuxp $
023: * --------------------------------------------------------------------------
024: */
025:
026: package org.objectweb.jonas_jms;
027:
028: import java.util.LinkedList;
029:
030: import javax.jms.Connection;
031: import javax.jms.ConnectionConsumer;
032: import javax.jms.ConnectionMetaData;
033: import javax.jms.Destination;
034: import javax.jms.ExceptionListener;
035: import javax.jms.JMSException;
036: import javax.jms.ServerSessionPool;
037: import javax.jms.Session;
038: import javax.jms.Topic;
039: import javax.jms.XAConnection;
040: import javax.jms.XAConnectionFactory;
041:
042: import org.objectweb.transaction.jta.TransactionManager;
043: import org.objectweb.util.monolog.api.BasicLevel;
044:
045: /**
046: * Common methods used in JQueueConnection and JTopicConnection.
047: *
048: * @author Laurent Chauvirey, Frederic Maistre, Nicolas Tachker
049: * Contributor(s):
050: * Philippe Durieux
051: * Jeff Mesnil connection anonymous
052: * Philippe Coq JMS 1.1 integration
053: */
054:
055: public class JConnection implements Connection {
056:
057: // The XAConnection used in the MOM
058: protected XAConnection xac;
059:
060: protected boolean closed;
061: protected String user;
062: protected boolean globaltx = false;
063: protected static TransactionManager tm;
064: protected JConnectionFactory jcf;
065: protected LinkedList sessionlist = new LinkedList();
066:
067: // This constant is used to determine connection
068: // with an anonymous user in the pool of JConnection.
069: protected static final String INTERNAL_USER_NAME = "anInternalNameUsedOnlyByJOnAS";
070:
071: /**
072: * Prepares the construction of a JConnection.
073: */
074: protected JConnection(JConnectionFactory jcf, String user)
075: throws JMSException {
076: this .user = user;
077: this .jcf = jcf;
078: closed = false;
079: if (tm == null) {
080: tm = JmsManagerImpl.getTransactionManager();
081: }
082: // Remember if we are inside a global transaction
083: // This is to handle a case that is not expected in JMS specs.
084: try {
085: globaltx = (tm.getTransaction() != null);
086: } catch (Exception e) {
087: }
088: }
089:
090: /**
091: * Constructor of a JConnection for a specified user.
092: *
093: * @param user user's name
094: * @param passwd user's password
095: */
096: public JConnection(JConnectionFactory jcf,
097: XAConnectionFactory xacf, String user, String passwd)
098: throws JMSException {
099: this (jcf, user);
100: // Create the underlaying XAConnection
101: xac = xacf.createXAConnection(user, passwd);
102: }
103:
104: /**
105: * Constructor of a JConnection for an anonymous user.
106: */
107: public JConnection(JConnectionFactory jcf, XAConnectionFactory xacf)
108: throws JMSException {
109: this (jcf, INTERNAL_USER_NAME);
110: // Create the underlaying XAConnection
111: xac = xacf.createXAConnection();
112: }
113:
114: // -----------------------------------------------------------------------
115: // internal methods
116: // -----------------------------------------------------------------------
117:
118: /**
119: * A new non transacted session has been opened
120: */
121: protected synchronized boolean sessionOpen(Session s) {
122: if (!closed) {
123: sessionlist.add(s);
124: return true;
125: } else {
126: return false;
127: }
128: }
129:
130: /**
131: * A non transacted session has beem closed
132: */
133: protected synchronized void sessionClose(Session s) {
134: sessionlist.remove(s);
135: if (sessionlist.size() == 0 && closed) {
136: notify();
137: }
138: }
139:
140: /**
141: * Return the user associated to this connection
142: */
143: public String getUser() {
144: return user;
145: }
146:
147: // -----------------------------------------------------------------------
148: // Connection implementation
149: // -----------------------------------------------------------------------
150:
151: /**
152: * When this method is invoked it should not return until message processing
153: * has been orderly shut down. This means that all message listeners that may
154: * have been running have returned and that all pending receives have returned.
155: * A close terminates all pending message receives on the connection's sessions'
156: * consumers.
157: * @throws JMSException - if JMS implementation fails to return the client ID for this
158: * Connection due to some internal
159: */
160: public void close() throws JMSException {
161: TraceJms.logger.log(BasicLevel.DEBUG, "");
162: if (globaltx) {
163: // Connection that was open inside a global transaction.
164: // Don't wait (to avoid deadlocks) and don't close it now
165: // Since this situation is not expected by the specs, we just
166: // pool this connection for now, waiting better...
167: jcf.freeJConnection(this );
168: } else {
169: // Wait for all NON transacted sessions to be finished.
170: // LATER: Should rollback first all transacted sessions still running.
171: synchronized (this ) {
172: while (sessionlist.size() > 0) {
173: try {
174: wait();
175: } catch (InterruptedException e) {
176: TraceJms.logger.log(BasicLevel.ERROR,
177: "interrupted");
178: }
179: }
180: }
181: closed = true;
182: xac.close();
183: }
184: }
185:
186: public void finalClose() throws JMSException {
187: if (!closed) {
188: xac.close();
189: }
190: }
191:
192: /**
193: * Creates a connection consumer for this connection (optional operation)
194: * @param destination - the destination to access
195: * @param messageSelector - only messages with properties matching
196: * the message selector expression are delivered.
197: * A value of null or an empty string indicates that
198: * there is no message selector for the message consumer.
199: * @param sessionPool - the server session pool to associate with this connection consumer
200: * @param maxMessages - the maximum number of messages that can be assigned to a server
201: * session at one time
202: * @return the connection consumer
203: */
204:
205: public ConnectionConsumer createConnectionConsumer(
206: Destination destination, java.lang.String messageSelector,
207: ServerSessionPool sessionPool, int maxMessages)
208: throws JMSException {
209: TraceJms.logger.log(BasicLevel.DEBUG, "");
210: return xac.createConnectionConsumer(destination,
211: messageSelector, sessionPool, maxMessages);
212: }
213:
214: /**
215: * Creates a connection consumer for this connection (optional operation)
216: * @param topic - the topic to access
217: * @param subscriptionName - durable subscription name
218: * @param messageSelector - only messages with properties matching
219: * the message selector expression are delivered.
220: * A value of null or an empty string indicates that
221: * there is no message selector for the message consumer.
222: * @param sessionPool - the server session pool to associate with this connection consumer
223: * @param maxMessages - the maximum number of messages that can be assigned to a server
224: * session at one time
225: * @return the durable connection consumer
226: */
227: public ConnectionConsumer createDurableConnectionConsumer(
228: Topic topic, java.lang.String subscriptionName,
229: java.lang.String messageSelector,
230: ServerSessionPool sessionPool, int maxMessages)
231: throws JMSException {
232: TraceJms.logger.log(BasicLevel.DEBUG, "");
233: return xac.createDurableConnectionConsumer(topic,
234: subscriptionName, messageSelector, sessionPool,
235: maxMessages);
236: }
237:
238: /**
239: * Creates a Session object.
240: * @param transacted - indicates whether the session is transacted
241: * @param acknowledgeMode indicates whether the consumer or the client
242: * will acknowledge any messages it receives;
243: * ignored if the session is transacted.
244: * Legal values are Session.AUTO_ACKNOWLEDGE, Session.CLIENT_ACKNOWLEDGE,
245: * and Session.DUPS_OK_ACKNOWLEDGE.
246: */
247:
248: public Session createSession(boolean transacted, int acknowledgeMode)
249: throws JMSException {
250: TraceJms.logger.log(BasicLevel.DEBUG, "");
251: return new JSession(this , xac);
252: }
253:
254: /**
255: * Get the client identifier for this connection. This value is JMS Provider specific.
256: * Either pre-configured by an administrator in a ConnectionFactory or assigned dynamically
257: * by the application by calling setClientID method.
258: * @return the unique client identifier.
259: * @throws JMSException - if JMS implementation fails to return the client ID for this
260: * Connection due to some internal
261: */
262: public String getClientID() throws JMSException {
263: TraceJms.logger.log(BasicLevel.DEBUG, "");
264: return xac.getClientID();
265: }
266:
267: /**
268: * Set the client identifier for this connection.
269: * If another connection with clientID is already running when this method is called,
270: * the JMS Provider should detect the duplicate id and throw InvalidClientIDException.
271: * @param clientID - the unique client identifier
272: * @throws JMSException - general exception if JMS implementation fails to set the client
273: * ID for this Connection due to some internal error.
274: * @throws InvalidClientIDException - if JMS client specifies an invalid or duplicate client id.
275: * @throws IllegalStateException - if attempting to set a connection's client identifier at
276: * the wrong time or when it has been administratively configured.
277: */
278: public void setClientID(String clientID) throws JMSException {
279: TraceJms.logger.log(BasicLevel.DEBUG, "");
280: xac.setClientID(clientID);
281: }
282:
283: /**
284: * Get the meta data for this connection.
285: * @return the connection meta data.
286: * @throws JMSException - general exception if JMS implementation fails to get the Connection
287: * meta-data for this Connection.
288: */
289: public ConnectionMetaData getMetaData() throws JMSException {
290: TraceJms.logger.log(BasicLevel.DEBUG, "");
291: return xac.getMetaData();
292: }
293:
294: /**
295: * Get the ExceptionListener for this Connection.
296: * @return the ExceptionListener for this Connection.
297: * @throws JMSException - general exception if JMS implementation fails to get
298: * the Exception listener for this Connection.
299: */
300: public ExceptionListener getExceptionListener() throws JMSException {
301: TraceJms.logger.log(BasicLevel.DEBUG, "");
302: return xac.getExceptionListener();
303: }
304:
305: /**
306: * Set an exception listener for this connection.
307: * @param listener - the exception listener.
308: * @throws JMSException - general exception if JMS implementation fails to set
309: * the Exception listener for this Connection.
310: */
311: public void setExceptionListener(ExceptionListener listener)
312: throws JMSException {
313: TraceJms.logger.log(BasicLevel.DEBUG, "");
314: xac.setExceptionListener(listener);
315: }
316:
317: /**
318: * Start (or restart) a Connection's delivery of incoming messages.
319: * @throws JMSException - if JMS implementation fails to start the message
320: * delivery due to some internal error.
321: */
322: public void start() throws JMSException {
323: TraceJms.logger.log(BasicLevel.DEBUG, "");
324: xac.start();
325: }
326:
327: /**
328: * Used to temporarily stop a Connection's delivery of incoming messages.
329: * It can be restarted using its start method.
330: * When stopped, delivery to all the Connection's message consumers is inhibited:
331: * synchronous receive's block and messages are not delivered to message listeners.
332: * @throws JMSException - if JMS implementation fails to start the message
333: * delivery due to some internal error.
334: */
335: public void stop() throws JMSException {
336: TraceJms.logger.log(BasicLevel.DEBUG, "");
337: xac.stop();
338: }
339: }
|