001: /*
002: * JFox - The most lightweight Java EE Application Server!
003: * more details please visit http://www.huihoo.org/jfox or http://www.jfox.org.cn.
004: *
005: * JFox is licenced and re-distributable under GNU LGPL.
006: */
007:
008: /* JFox, the OpenSource J2EE Application Server
009: *
010: * Distributable under GNU LGPL license by gun.org
011: * more details please visit http://www.huihoo.org/jfox
012: */
013:
014: package org.jfox.jms.destination;
015:
016: import java.io.Serializable;
017: import java.util.Comparator;
018: import java.util.List;
019: import java.util.ArrayList;
020: import java.util.Collection;
021: import java.util.Collections;
022: import java.util.concurrent.PriorityBlockingQueue;
023: import java.util.concurrent.ExecutorService;
024: import java.util.concurrent.Executors;
025: import java.util.concurrent.locks.ReentrantLock;
026: import java.util.concurrent.locks.Condition;
027: import javax.jms.Destination;
028: import javax.jms.Message;
029: import javax.jms.MessageListener;
030: import javax.jms.JMSException;
031:
032: import org.apache.log4j.Logger;
033:
034: /**
035: * @author <a href="mailto:young_yy@hotmail.com">Young Yang</a>
036: */
037:
038: public abstract class JMSDestination implements Destination,
039: Serializable, Runnable {
040:
041: Logger logger = Logger.getLogger(getClass());
042:
043: private static Comparator<Message> MESSAGE_COMPARATOR = new Comparator<Message>() {
044:
045: public int compare(Message msg1, Message msg2) {
046: try {
047: return Integer.valueOf(msg1.getJMSPriority())
048: .compareTo(msg2.getJMSPriority());
049: } catch (JMSException e) {
050: return 0;
051: }
052: }
053: };
054:
055: private final transient PriorityBlockingQueue<Message> queue = new PriorityBlockingQueue<Message>(
056: 1, MESSAGE_COMPARATOR);
057:
058: protected final List<MessageListener> listeners = new ArrayList<MessageListener>(
059: 2);
060:
061: private ExecutorService threadExecutor = Executors
062: .newCachedThreadPool();
063:
064: protected final ReentrantLock lock = new ReentrantLock();
065: protected final Condition notEmptyMessage = lock.newCondition();
066: protected final Condition notEmptyListener = lock.newCondition();
067:
068: private volatile boolean started = false;
069:
070: private String name;
071:
072: private volatile long messageSend = 0;
073:
074: public JMSDestination(String name) {
075: this .name = name;
076: // start thread
077: start();
078: }
079:
080: public String toString() {
081: return "JMSDestination [" + (isTopic() ? "Topic" : "Queue")
082: + "] " + getName();
083: }
084:
085: protected String getName() {
086: return name;
087: }
088:
089: public boolean equals(Object pObject) {
090: if (this == pObject)
091: return true;
092: if (!(pObject instanceof JMSDestination))
093: return false;
094:
095: final JMSDestination jmsDestination = (JMSDestination) pObject;
096:
097: return !(name != null ? !name.equals(jmsDestination.name)
098: : jmsDestination.name != null);
099:
100: }
101:
102: public int hashCode() {
103: return (name != null ? name.hashCode() : 0);
104: }
105:
106: public abstract boolean isTopic();
107:
108: public void putMessage(Message msg) {
109: lock.lock();
110: try {
111: queue.offer(msg);
112: // äº¤ç»™çº¿ç¨‹æ± æ‰§è¡Œæ¶ˆæ?¯åˆ†å?‘工作
113: notEmptyMessage.signalAll();
114: } finally {
115: lock.unlock();
116: }
117: }
118:
119: /**
120: * ��消�,由Queue/Topic实现
121: *
122: * @param message message
123: */
124: public abstract void sendMessage(Message message);
125:
126: public void registerMessageListener(MessageListener listener) {
127: lock.lock();
128: try {
129: listeners.add(listener);
130: notEmptyListener.signalAll();
131: } finally {
132: lock.unlock();
133: }
134: }
135:
136: public void unregisterMessageListener(MessageListener listener) {
137: lock.lock();
138: try {
139: listeners.remove(listener);
140: } finally {
141: lock.unlock();
142: }
143: }
144:
145: public Collection<MessageListener> getMessageListeners() {
146: return Collections.unmodifiableCollection(listeners);
147: }
148:
149: public void start() {
150: started = true;
151: threadExecutor.submit(this );
152: }
153:
154: public void stop() {
155: started = false;
156: // 使用 shutdownNow å?¯ä»¥å¼ºè¡Œä¸æ¢çº¿ç¨‹
157: threadExecutor.shutdownNow();
158: }
159:
160: public void run() {
161: while (started) {
162: lock.lock(); // 获得�
163: try {
164: // 分�消�
165: if (queue.isEmpty()) {
166: notEmptyMessage.await();
167: }
168: if (listeners.isEmpty()) {
169: notEmptyListener.await();
170: }
171: final Message message = queue.take();
172: // 有å?¯èƒ½æ˜¯stop调用,所以需è¦?åˆ¤æ– message == null
173: if (message != null) {
174: // 使用新线程��消�
175: threadExecutor.execute(new Runnable() {
176: public void run() {
177: sendMessage(message);
178: }
179: });
180: messageSend++;
181: }
182: } catch (InterruptedException e) {
183: logger.warn("Dispatcher Thread Interrupted.", e);
184: } finally {
185: lock.unlock();
186: }
187: }
188: }
189:
190: /**
191: * 有多少个 Client
192: */
193: public int countMessageConsumers() {
194: return listeners.size();
195: }
196:
197: /**
198: * Queue/Topicä¸çš„消æ?¯æ•°ç›®
199: */
200: public int countMessageHold() {
201: return queue.size();
202: }
203:
204: /**
205: * Queue/Topic 已���的消�的数目
206: */
207: public long countMessageSend() {
208: return messageSend;
209: }
210:
211: }
|