001: /*
002: * Copyright (c) 1998-2008 Caucho Technology -- all rights reserved
003: *
004: * This file is part of Resin(R) Open Source
005: *
006: * Each copy or derived work must preserve the copyright notice and this
007: * notice unmodified.
008: *
009: * Resin Open Source is free software; you can redistribute it and/or modify
010: * it under the terms of the GNU General Public License as published by
011: * the Free Software Foundation; either version 2 of the License, or
012: * (at your option) any later version.
013: *
014: * Resin Open Source is distributed in the hope that it will be useful,
015: * but WITHOUT ANY WARRANTY; without even the implied warranty of
016: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty
017: * of NON-INFRINGEMENT. See the GNU General Public License for more
018: * details.
019: *
020: * You should have received a copy of the GNU General Public License
021: * along with Resin Open Source; if not, write to the
022: *
023: * Free Software Foundation, Inc.
024: * 59 Temple Place, Suite 330
025: * Boston, MA 02111-1307 USA
026: *
027: * @author Scott Ferguson
028: */
029:
030: package com.caucho.jms.queue;
031:
032: import java.util.logging.*;
033:
034: import javax.jms.*;
035:
036: import java.io.Serializable;
037: import java.util.Collection;
038: import java.util.Iterator;
039: import java.util.concurrent.BlockingQueue;
040: import java.util.concurrent.TimeUnit;
041:
042: import com.caucho.jms.JmsRuntimeException;
043: import com.caucho.jms.message.*;
044: import com.caucho.jms.connection.*;
045:
046: import com.caucho.util.*;
047: import com.caucho.webbeans.component.*;
048:
049: /**
050: * Implements an abstract queue.
051: */
052: abstract public class AbstractDestination extends
053: java.util.AbstractQueue implements javax.jms.Destination,
054: BlockingQueue, java.io.Serializable, HandleAware {
055: private static final L10N L = new L10N(AbstractDestination.class);
056: private static final Logger log = Logger
057: .getLogger(AbstractDestination.class.getName());
058:
059: private static long _idRandom;
060: private static long _idCount;
061:
062: private String _name = "default";
063:
064: protected MessageFactory _messageFactory = new MessageFactory();
065:
066: // queue api
067: private ConnectionFactoryImpl _connectionFactory;
068: private Connection _conn;
069:
070: private Object _readLock = new Object();
071: private Object _writeLock = new Object();
072:
073: private JmsSession _writeSession;
074: private JmsSession _readSession;
075: private MessageConsumerImpl _consumer;
076:
077: // serialization
078: private Object _serializationHandle;
079:
080: protected AbstractDestination() {
081: synchronized (AbstractDestination.class) {
082: if (_idRandom == 0 || Alarm.isTest()) {
083: _idRandom = RandomUtil.getRandomLong();
084: _idCount = Alarm.getCurrentTime() << 16;
085: }
086: }
087: }
088:
089: public void setName(String name) {
090: _name = name;
091: }
092:
093: public String getName() {
094: return _name;
095: }
096:
097: public String getQueueName() {
098: return getName();
099: }
100:
101: public String getTopicName() {
102: return getName();
103: }
104:
105: /**
106: * Serialization callback to set the handle
107: */
108: public void setSerializationHandle(Object handle) {
109: _serializationHandle = handle;
110: }
111:
112: //
113: // JMX configuration data
114: //
115:
116: /**
117: * Returns a descriptive URL for the queue.
118: */
119: public String getUrl() {
120: return getClass().getSimpleName() + ":";
121: }
122:
123: //
124: // runtime methods
125: //
126:
127: public void addConsumer(MessageConsumerImpl consumer) {
128: throw new UnsupportedOperationException(getClass().getName());
129: }
130:
131: public void removeConsumer(MessageConsumerImpl consumer) {
132: throw new UnsupportedOperationException(getClass().getName());
133: }
134:
135: abstract public void send(JmsSession session, MessageImpl msg,
136: long timeout) throws JMSException;
137:
138: /**
139: * Polls the next message from the store. Returns null if no message
140: * is available.
141: *
142: * @param isAutoAcknowledge if true, automatically acknowledge the message
143: */
144: public MessageImpl receive(boolean isAutoAcknowledge)
145: throws JMSException {
146: return null;
147: }
148:
149: public boolean hasMessage() {
150: return false;
151: }
152:
153: /**
154: * Acknowledge receipt of the message.
155: */
156: public void acknowledge(String msgId) {
157: }
158:
159: /**
160: * Rollback the message read.
161: */
162: public void rollback(String msgId) {
163: }
164:
165: public final String generateMessageID() {
166: StringBuilder cb = new StringBuilder();
167:
168: cb.append("ID:");
169: generateMessageID(cb);
170:
171: return cb.toString();
172: }
173:
174: protected void generateMessageID(StringBuilder cb) {
175: long id;
176:
177: synchronized (AbstractDestination.class) {
178: id = _idCount++;
179: }
180:
181: Base64.encode(cb, _idRandom);
182: Base64.encode(cb, id);
183: }
184:
185: public Destination getJMSDestination() {
186: return new DestinationHandle(toString());
187: }
188:
189: //
190: // BlockingQueue api
191: //
192:
193: public int size() {
194: return 0;
195: }
196:
197: public Iterator iterator() {
198: throw new UnsupportedOperationException(getClass().getName());
199: }
200:
201: /**
202: * Adds the item to the queue, waiting if necessary
203: */
204: public boolean offer(Object value, long timeout, TimeUnit unit) {
205: try {
206: synchronized (_writeLock) {
207: JmsSession session = getWriteSession();
208:
209: Message msg = session
210: .createObjectMessage((Serializable) value);
211:
212: session.send(this , msg, 0, 0, Integer.MAX_VALUE);
213:
214: return true;
215: }
216: } catch (RuntimeException e) {
217: throw e;
218: } catch (Exception e) {
219: throw new JmsRuntimeException(e);
220: }
221: }
222:
223: public Object poll(long timeout, TimeUnit unit) {
224: try {
225: synchronized (_readLock) {
226: MessageConsumerImpl consumer = getReadConsumer();
227:
228: long msTimeout = unit.toMillis(timeout);
229:
230: Message msg = consumer.receive(msTimeout);
231:
232: if (msg instanceof ObjectMessage) {
233: return ((ObjectMessage) msg).getObject();
234: } else if (msg instanceof TextMessage) {
235: return ((TextMessage) msg).getText();
236: } else if (msg == null)
237: return null;
238: else
239: throw new JmsRuntimeException(
240: L
241: .l(
242: "'{0}' is an unsupported message for the BlockingQueue API.",
243: msg));
244: }
245: } catch (RuntimeException e) {
246: throw e;
247: } catch (Exception e) {
248: throw new JmsRuntimeException(e);
249: }
250: }
251:
252: public boolean offer(Object value) {
253: return offer(value, 0, TimeUnit.SECONDS);
254: }
255:
256: public void put(Object value) {
257: offer(value, Integer.MAX_VALUE, TimeUnit.SECONDS);
258: }
259:
260: public int remainingCapacity() {
261: return Integer.MAX_VALUE;
262: }
263:
264: public Object peek() {
265: throw new UnsupportedOperationException(getClass().getName());
266: }
267:
268: public Object poll() {
269: return poll(0, TimeUnit.MILLISECONDS);
270: }
271:
272: public Object take() {
273: return poll(Integer.MAX_VALUE, TimeUnit.SECONDS);
274: }
275:
276: public int drainTo(Collection c) {
277: throw new UnsupportedOperationException();
278: }
279:
280: public int drainTo(Collection c, int max) {
281: throw new UnsupportedOperationException();
282: }
283:
284: protected JmsSession getWriteSession() throws JMSException {
285: if (_conn == null) {
286: _connectionFactory = new ConnectionFactoryImpl();
287: _conn = _connectionFactory.createConnection();
288: _conn.start();
289: }
290:
291: if (_writeSession == null) {
292: _writeSession = (JmsSession) _conn.createSession(false,
293: Session.AUTO_ACKNOWLEDGE);
294: }
295:
296: return _writeSession;
297: }
298:
299: protected MessageConsumerImpl getReadConsumer() throws JMSException {
300: if (_conn == null) {
301: _connectionFactory = new ConnectionFactoryImpl();
302: _conn = _connectionFactory.createConnection();
303: _conn.start();
304: }
305:
306: if (_readSession == null) {
307: _readSession = (JmsSession) _conn.createSession(false,
308: Session.AUTO_ACKNOWLEDGE);
309: }
310:
311: if (_consumer == null) {
312: _consumer = (MessageConsumerImpl) _readSession
313: .createConsumer(this );
314: }
315:
316: return _consumer;
317: }
318:
319: /**
320: * Serialization handle
321: */
322: private Object writeReplace() {
323: return _serializationHandle;
324: }
325:
326: public void close() {
327: }
328:
329: public String toString() {
330: return getClass().getSimpleName() + "[" + getName() + "]";
331: }
332: }
|