001: /*
002: * Copyright (c) 1998-2008 Caucho Technology -- all rights reserved
003: *
004: * This file is part of Resin(R) Open Source
005: *
006: * Each copy or derived work must preserve the copyright notice and this
007: * notice unmodified.
008: *
009: * Resin Open Source is free software; you can redistribute it and/or modify
010: * it under the terms of the GNU General Public License as published by
011: * the Free Software Foundation; either version 2 of the License, or
012: * (at your option) any later version.
013: *
014: * Resin Open Source is distributed in the hope that it will be useful,
015: * but WITHOUT ANY WARRANTY; without even the implied warranty of
016: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty
017: * of NON-INFRINGEMENT. See the GNU General Public License for more
018: * details.
019: *
020: * You should have received a copy of the GNU General Public License
021: * along with Resin Open Source; if not, write to the
022: *
023: * Free Software Foundation, Inc.
024: * 59 Temple Place, Suite 330
025: * Boston, MA 02111-1307 USA
026: *
027: * @author Scott Ferguson
028: */
029:
030: package com.caucho.jms.queue;
031:
032: import java.util.*;
033: import java.util.logging.*;
034: import java.io.Serializable;
035:
036: import javax.annotation.*;
037: import javax.jms.*;
038:
039: import com.caucho.jms.JmsRuntimeException;
040: import com.caucho.jms.message.*;
041: import com.caucho.jms.connection.*;
042:
043: import com.caucho.util.*;
044:
045: /**
046: * Implements an abstract queue.
047: */
048: abstract public class AbstractQueue extends AbstractDestination
049: implements javax.jms.Queue {
050: private static final L10N L = new L10N(AbstractQueue.class);
051: private static final Logger log = Logger
052: .getLogger(AbstractQueue.class.getName());
053:
054: private QueueAdmin _admin;
055:
056: private ArrayList<MessageConsumerImpl> _messageConsumerList = new ArrayList<MessageConsumerImpl>();
057:
058: private int _roundRobin;
059:
060: private int _enqueueCount;
061:
062: // stats
063: private long _listenerFailCount;
064: private long _listenerFailLastTime;
065:
066: protected AbstractQueue() {
067: }
068:
069: public void setQueueName(String name) {
070: setName(name);
071: }
072:
073: //
074: // JMX statistics
075: //
076:
077: /**
078: * Returns the number of active message consumers
079: */
080: public int getConsumerCount() {
081: return _messageConsumerList.size();
082: }
083:
084: /**
085: * Returns the queue size
086: */
087: public int getQueueSize() {
088: return -1;
089: }
090:
091: /**
092: * Returns the number of listener failures.
093: */
094: public long getListenerFailCountTotal() {
095: return _listenerFailCount;
096: }
097:
098: /**
099: * Returns the number of listener failures.
100: */
101: public long getListenerFailLastTime() {
102: return _listenerFailLastTime;
103: }
104:
105: public void init() {
106: }
107:
108: @PostConstruct
109: public void postConstruct() {
110: init();
111:
112: _admin = new QueueAdmin(this );
113: _admin.register();
114: }
115:
116: public void addConsumer(MessageConsumerImpl consumer) {
117: synchronized (_messageConsumerList) {
118: if (!_messageConsumerList.contains(consumer))
119: _messageConsumerList.add(consumer);
120:
121: startPoll();
122: }
123: }
124:
125: public void removeConsumer(MessageConsumerImpl consumer) {
126: synchronized (_messageConsumerList) {
127: _messageConsumerList.remove(consumer);
128:
129: // force a poll to avoid missing messages
130: for (int i = 0; i < _messageConsumerList.size(); i++) {
131: _messageConsumerList.get(i).notifyMessageAvailable();
132: }
133:
134: if (_messageConsumerList.size() == 0)
135: stopPoll();
136: }
137: }
138:
139: protected void notifyMessageAvailable() {
140: synchronized (_messageConsumerList) {
141: if (_messageConsumerList.size() > 0) {
142: MessageConsumerImpl consumer;
143: int count = _messageConsumerList.size();
144:
145: // notify until one of the consumers signals readiness to read
146: do {
147: int roundRobin = _roundRobin++
148: % _messageConsumerList.size();
149:
150: consumer = _messageConsumerList.get(roundRobin);
151: } while (!consumer.notifyMessageAvailable()
152: && count-- > 0);
153: }
154: }
155: }
156:
157: public ArrayList<MessageImpl> getBrowserList() {
158: return new ArrayList<MessageImpl>();
159: }
160:
161: protected void startPoll() {
162: }
163:
164: protected void stopPoll() {
165: }
166:
167: /**
168: * Called when a listener throws an excepton
169: */
170: public void addListenerException(Exception e) {
171: synchronized (this ) {
172: _listenerFailCount++;
173: _listenerFailLastTime = Alarm.getCurrentTime();
174: }
175: }
176:
177: @PreDestroy
178: public void close() {
179: stopPoll();
180:
181: super .close();
182: }
183:
184: public String toString() {
185: String className = getClass().getName();
186:
187: int p = className.lastIndexOf('.');
188:
189: return className.substring(p + 1) + "[" + getName() + "]";
190: }
191: }
|