001: /*
002: * Copyright (c) 1998-2008 Caucho Technology -- all rights reserved
003: *
004: * This file is part of Resin(R) Open Source
005: *
006: * Each copy or derived work must preserve the copyright notice and this
007: * notice unmodified.
008: *
009: * Resin Open Source is free software; you can redistribute it and/or modify
010: * it under the terms of the GNU General Public License as published by
011: * the Free Software Foundation; either version 2 of the License, or
012: * (at your option) any later version.
013: *
014: * Resin Open Source is distributed in the hope that it will be useful,
015: * but WITHOUT ANY WARRANTY; without even the implied warranty of
016: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty
017: * of NON-INFRINGEMENT. See the GNU General Public License for more
018: * details.
019: *
020: * You should have received a copy of the GNU General Public License
021: * along with Resin Open Source; if not, write to the
022: *
023: * Free Software Foundation, Inc.
024: * 59 Temple Place, Suite 330
025: * Boston, MA 02111-1307 USA
026: *
027: * @author Scott Ferguson
028: */
029:
030: package com.caucho.jms.connection;
031:
032: import com.caucho.util.L10N;
033: import com.caucho.lifecycle.Lifecycle;
034: import com.caucho.loader.*;
035: import com.caucho.jms.memory.*;
036:
037: import javax.jms.*;
038: import javax.jms.IllegalStateException;
039: import java.util.ArrayList;
040: import java.util.HashMap;
041: import java.util.logging.Level;
042: import java.util.logging.Logger;
043:
044: /**
045: * A connection.
046: */
047: public class ConnectionImpl implements XAConnection {
048: static final Logger log = Logger.getLogger(ConnectionImpl.class
049: .getName());
050: static final L10N L = new L10N(ConnectionImpl.class);
051:
052: private static int _clientIdGenerator;
053:
054: private ConnectionFactoryImpl _factory;
055: private boolean _isXA;
056:
057: private String _clientId;
058: private boolean _isClientIdSet;
059:
060: private ExceptionListener _exceptionListener;
061:
062: private ArrayList<JmsSession> _sessions = new ArrayList<JmsSession>();
063:
064: private HashMap<String, TopicSubscriber> _durableSubscriberMap = new HashMap<String, TopicSubscriber>();
065:
066: private HashMap<String, Queue> _dynamicQueueMap = new HashMap<String, Queue>();
067:
068: private HashMap<String, Topic> _dynamicTopicMap = new HashMap<String, Topic>();
069:
070: private final Lifecycle _lifecycle = new Lifecycle(log);
071:
072: public ConnectionImpl(ConnectionFactoryImpl factory, boolean isXA) {
073: this (factory);
074:
075: _isXA = isXA;
076: }
077:
078: public ConnectionImpl(ConnectionFactoryImpl factory) {
079: _factory = factory;
080:
081: Environment.addCloseListener(this );
082: }
083:
084: /**
085: * Returns true for an XA connection.
086: */
087: public boolean isXA() {
088: return _isXA;
089: }
090:
091: /**
092: * Returns the connection's client identifier.
093: */
094: public String getClientID() throws JMSException {
095: checkOpen();
096:
097: return _clientId;
098: }
099:
100: /**
101: * Sets the connections client identifier.
102: *
103: * @param the new client identifier.
104: */
105: public void setClientID(String clientId) throws JMSException {
106: checkOpen();
107:
108: if (_isClientIdSet)
109: throw new IllegalStateException(
110: L
111: .l(
112: "Can't set client id '{0}' after the connection has been used.",
113: clientId));
114:
115: ConnectionImpl oldConn = _factory.findByClientID(clientId);
116:
117: if (oldConn != null)
118: throw new InvalidClientIDException(L.l(
119: "'{0}' is a duplicate client id.", clientId));
120:
121: _clientId = clientId;
122: _isClientIdSet = true;
123: _lifecycle.setName(toString());
124: _lifecycle.setLevel(Level.FINER);
125: }
126:
127: /**
128: * Returns the connection factory.
129: */
130: public ConnectionFactoryImpl getConnectionFactory() {
131: return _factory;
132: }
133:
134: /**
135: * Returns the connection's exception listener.
136: */
137: public ExceptionListener getExceptionListener() throws JMSException {
138: checkOpen();
139:
140: return _exceptionListener;
141: }
142:
143: /**
144: * Returns the connection's exception listener.
145: */
146: public void setExceptionListener(ExceptionListener listener)
147: throws JMSException {
148: checkOpen();
149:
150: assignClientID();
151:
152: _exceptionListener = listener;
153: }
154:
155: /**
156: * Returns the connection's metadata.
157: */
158: public ConnectionMetaData getMetaData() throws JMSException {
159: checkOpen();
160:
161: return new ConnectionMetaDataImpl();
162: }
163:
164: /**
165: * Start (or restart) a connection.
166: */
167: public void start() throws JMSException {
168: checkOpen();
169: assignClientID();
170:
171: if (!_lifecycle.toActive())
172: return;
173:
174: synchronized (_sessions) {
175: for (int i = 0; i < _sessions.size(); i++) {
176: _sessions.get(i).start();
177: }
178: }
179: }
180:
181: /**
182: * Stops the connection temporarily.
183: */
184: public void stop() throws JMSException {
185: checkOpen();
186:
187: if (!_lifecycle.toStopping())
188: return;
189:
190: try {
191: assignClientID();
192:
193: synchronized (_sessions) {
194: for (int i = 0; i < _sessions.size(); i++) {
195: try {
196: _sessions.get(i).stop();
197: } catch (Exception e) {
198: log.log(Level.FINE, e.toString(), e);
199: }
200: }
201: }
202: } finally {
203: _lifecycle.toStop();
204: }
205: }
206:
207: /**
208: * Returns true if the connection is started.
209: */
210: boolean isActive() {
211: return _lifecycle.isActive();
212: }
213:
214: /**
215: * Returns true if the connection is stopping.
216: */
217: boolean isStopping() {
218: return _lifecycle.isStopping();
219: }
220:
221: /**
222: * Creates a new connection session.
223: */
224: public Session createSession(boolean transacted, int acknowledgeMode)
225: throws JMSException {
226: checkOpen();
227:
228: assignClientID();
229:
230: return new JmsSession(this , transacted, acknowledgeMode, isXA());
231: }
232:
233: /**
234: * Creates a new connection session.
235: */
236: public XASession createXASession() throws JMSException {
237: checkOpen();
238:
239: assignClientID();
240:
241: return new JmsSession(this , true, 0, true);
242: }
243:
244: /**
245: * Adds a session.
246: */
247: protected void addSession(JmsSession session) {
248: _sessions.add(session);
249:
250: if (_lifecycle.isActive())
251: session.start();
252: }
253:
254: /**
255: * Removes a session.
256: */
257: void removeSession(JmsSession session) {
258: _sessions.remove(session);
259: }
260:
261: /**
262: * Creates a dynamic queue.
263: */
264: Queue createQueue(String name) {
265: Queue queue = _dynamicQueueMap.get(name);
266:
267: if (queue != null)
268: return queue;
269:
270: MemoryQueue memoryQueue = new MemoryQueue();
271: memoryQueue.setName(name);
272: _dynamicQueueMap.put(name, memoryQueue);
273:
274: return memoryQueue;
275: }
276:
277: /**
278: * Creates a dynamic topic.
279: */
280: Topic createTopic(String name) {
281: Topic topic = _dynamicTopicMap.get(name);
282:
283: if (topic != null)
284: return topic;
285:
286: MemoryTopic memoryTopic = new MemoryTopic();
287: memoryTopic.setName(name);
288: _dynamicTopicMap.put(name, memoryTopic);
289:
290: return memoryTopic;
291: }
292:
293: /**
294: * Gets a durable subscriber.
295: */
296: TopicSubscriber getDurableSubscriber(String name) {
297: return _durableSubscriberMap.get(name);
298: }
299:
300: /**
301: * Adds a durable subscriber.
302: */
303: TopicSubscriber putDurableSubscriber(String name,
304: TopicSubscriber subscriber) {
305: return _durableSubscriberMap.put(name, subscriber);
306: }
307:
308: /**
309: * Removes a durable subscriber.
310: */
311: TopicSubscriber removeDurableSubscriber(String name) {
312: return _durableSubscriberMap.remove(name);
313: }
314:
315: /**
316: * Creates a new consumer (optional)
317: */
318: public ConnectionConsumer createConnectionConsumer(
319: Destination destination, String messageSelector,
320: ServerSessionPool sessionPool, int maxMessages)
321: throws JMSException {
322: throw new UnsupportedOperationException();
323: }
324:
325: /**
326: * Creates a new consumer (optional)
327: */
328: public ConnectionConsumer createDurableConnectionConsumer(
329: Topic topic, String name, String messageSelector,
330: ServerSessionPool sessionPool, int maxMessages)
331: throws JMSException {
332: checkOpen();
333:
334: throw new UnsupportedOperationException();
335: }
336:
337: /**
338: * Closes the connection.
339: */
340: public void close() throws JMSException {
341: if (_lifecycle.isDestroyed())
342: return;
343:
344: stop();
345:
346: if (!_lifecycle.toDestroy())
347: return;
348:
349: _factory.removeConnection(this );
350:
351: ArrayList<JmsSession> sessions;
352:
353: synchronized (_sessions) {
354: sessions = new ArrayList<JmsSession>(_sessions);
355: _sessions.clear();
356: }
357:
358: for (int i = 0; i < sessions.size(); i++) {
359: try {
360: sessions.get(i).close();
361: } catch (Throwable e) {
362: log.log(Level.WARNING, e.toString(), e);
363: }
364: }
365: }
366:
367: /**
368: * Checks that the session is open.
369: */
370: protected void checkOpen() throws IllegalStateException {
371: if (_lifecycle.isDestroyed())
372: throw new IllegalStateException(L.l("connection is closed"));
373: }
374:
375: /**
376: * Assigns a random client id.
377: *
378: * XXX: possibly wrong, i.e. shouldn't assign, for durable subscriptions
379: */
380: protected void assignClientID() {
381: if (_clientId == null)
382: _clientId = "resin-temp-" + _clientIdGenerator++;
383: _isClientIdSet = true;
384:
385: _lifecycle.setName(toString());
386: }
387:
388: public String toString() {
389: return "JmsConnection[" + _clientId + "]";
390: }
391: }
|