001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.messaging.channel.plugins.handler;
023:
024: import org.jboss.messaging.interfaces.*;
025: import org.jboss.messaging.interfaces.Consumer;
026: import org.jboss.messaging.interfaces.MessageReference;
027:
028: /**
029: * An abstract channel handler
030: *
031: * @author <a href="mailto:adrian@jboss.org">Adrian Brock</a>
032: * @version $Revision: 57195 $
033: */
034: public abstract class AbstractChannelHandler implements ChannelHandler {
035: // Constants -----------------------------------------------------
036:
037: // Attributes ----------------------------------------------------
038:
039: /** The message set */
040: protected MessageSet messages;
041:
042: // Static --------------------------------------------------------
043:
044: // Constructors --------------------------------------------------
045:
046: /**
047: * Create a new AbstractChannelHandler.
048: *
049: * @param messages the messages
050: */
051: public AbstractChannelHandler(MessageSet messages) {
052: this .messages = messages;
053: messages.setConsumer(this );
054: }
055:
056: // Public --------------------------------------------------------
057:
058: // Consumer implementation ---------------------------------------
059:
060: public boolean accepts(MessageReference reference, boolean active) {
061: // We accept all messages
062: return true;
063: }
064:
065: public void onMessage(MessageReference reference) {
066: Consumer consumer;
067: messages.lock();
068: try {
069: consumer = findConsumer(reference);
070: } finally {
071: messages.unlock();
072: }
073:
074: if (consumer != null)
075: consumer.onMessage(reference);
076: }
077:
078: // ChannelHandler implementation ---------------------------------
079:
080: public void addMessage(MessageReference reference) {
081: Consumer consumer;
082: messages.lock();
083: try {
084: consumer = findConsumer(reference);
085: if (consumer == null)
086: messages.add(reference);
087: } finally {
088: messages.unlock();
089: }
090:
091: if (consumer != null)
092: consumer.onMessage(reference);
093: }
094:
095: public MessageReference removeMessage(Consumer consumer) {
096: messages.lock();
097: try {
098: return messages.remove(consumer);
099: } finally {
100: messages.unlock();
101: }
102: }
103:
104: public void waitMessage(Consumer consumer, long wait) {
105: MessageReference message;
106: messages.lock();
107: try {
108: message = messages.remove(consumer);
109: // Nothing found, wait
110: if (message == null)
111: addConsumer(consumer, wait);
112: } finally {
113: messages.unlock();
114: }
115:
116: // We found a message, deliver it
117: if (message != null)
118: consumer.onMessage(message);
119: }
120:
121: public void stopWaitMessage(Consumer consumer) {
122: messages.lock();
123: try {
124: removeConsumer(consumer);
125: } finally {
126: messages.unlock();
127: }
128: }
129:
130: // Protected -----------------------------------------------------
131:
132: /**
133: * Add a consumer
134: *
135: * @param consumer the consumer to wait for a message
136: * @param wait the length of time to wait
137: */
138: protected abstract void addConsumer(Consumer consumer, long wait);
139:
140: /**
141: * Remove a consumer
142: *
143: * @param consumer the consumer to remove
144: */
145: protected abstract void removeConsumer(Consumer consumer);
146:
147: /**
148: * Find a consumer for a message
149: *
150: * @param reference the message
151: * @return the consumer or null if there are none for the message
152: */
153: protected abstract Consumer findConsumer(MessageReference reference);
154:
155: // Package Private -----------------------------------------------
156:
157: // Private -------------------------------------------------------
158:
159: // Inner Classes -------------------------------------------------
160: }
|