001: /******************************************************************************
002: * Copyright (C) Lars Ivar Almli. All rights reserved. *
003: * ---------------------------------------------------------------------------*
004: * This file is part of MActor. *
005: * *
006: * MActor is free software; you can redistribute it and/or modify *
007: * it under the terms of the GNU General Public License as published by *
008: * the Free Software Foundation; either version 2 of the License, or *
009: * (at your option) any later version. *
010: * *
011: * MActor is distributed in the hope that it will be useful, *
012: * but WITHOUT ANY WARRANTY; without even the implied warranty of *
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
014: * GNU General Public License for more details. *
015: * *
016: * You should have received a copy of the GNU General Public License *
017: * along with MActor; if not, write to the Free Software *
018: * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA *
019: ******************************************************************************/package org.mactor.brokers;
020:
021: import java.util.HashMap;
022: import java.util.HashSet;
023: import java.util.LinkedList;
024: import java.util.Map;
025: import java.util.Set;
026: import org.apache.log4j.Logger;
027: import org.mactor.framework.MactorException;
028: import org.mactor.framework.extensioninterface.MessageSelectorCommand;
029: import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig;
030:
031: /**
032: * Abstract class that contains functinally common to most message broker
033: * implementations (such as maintaining subscribers).
034: *
035: * @author Lars Ivar Almli
036: */
037: public abstract class AbstractMessageBroker implements MessageBroker {
038: protected static Logger log = Logger
039: .getLogger(AbstractMessageBroker.class);
040: protected MessageBrokerConfig config;
041: private Map<String, ChannelListener> channelListenerMap = new HashMap<String, ChannelListener>();
042: private Object channelLock = new Object();
043:
044: public AbstractMessageBroker(MessageBrokerConfig config) {
045: this .config = config;
046: }
047:
048: Set<String> initializedChannels = new HashSet<String>();
049:
050: public void subscribe(String channel, MessageSubscriber subscriber,
051: MessageSelectorCommand messageSelector)
052: throws MactorException {
053: ChannelListener cl = null;
054: synchronized (channelLock) {
055: cl = channelListenerMap.get(channel);
056: if (cl == null) {
057: cl = new ChannelListener(channel);
058: channelListenerMap.put(channel, cl);
059: if (!initializedChannels.contains(channel)) {
060: initializedChannels.add(channel);
061: onFirstSubscribe(channel);
062: }
063: }
064: }
065: cl.addSubscriber(subscriber, messageSelector);
066: }
067:
068: public void unsubscribe(String channel, MessageSubscriber subscriber) {
069: ChannelListener cl = null;
070: synchronized (channelLock) {
071: cl = channelListenerMap.get(channel);
072: }
073: if (cl != null)
074: cl.removeSubscriber(subscriber);
075: }
076:
077: /**
078: * This method shold be invoked message broker implementations when message
079: * is received. The method delivers the message to the subscribers of the
080: * channel
081: *
082: * @param channel
083: * the channel the message was recevied
084: * @param message
085: * the message
086: * @param broadcast
087: * a flag that indicates if the message should be distributed to
088: * all subscribers that accepts the message, or just the first
089: * (random) subscriber that accepets the message.
090: * @return a result message that might be used by message broker
091: * implementation to return a synchrounous result to the source of
092: * the incoming messages.
093: * @throws MactorException
094: */
095: protected Message raiseOnMessage(String channel, Message message,
096: boolean broadcast) throws MactorException {
097: ChannelListener cl = null;
098: synchronized (channelLock) {
099: cl = channelListenerMap.get(channel);
100: }
101: if (cl == null)
102: throw new MactorException("Unknown channel '" + channel
103: + "'");
104: Message resultMessage = null;
105: LinkedList<SC> subs = cl.getSnapshotCopyOfSubscribersList();
106: for (SC s : subs) {
107: if (s.messageSelector.isAcceptableMessage(message)) {
108: resultMessage = s.subscriber.onMessage(message);
109: if (!broadcast && message.isConsumed()) {
110: break;
111: }
112: }
113: }
114: return resultMessage;
115: }
116:
117: protected abstract void onFirstSubscribe(String channel)
118: throws MactorException;
119:
120: private static class ChannelListener {
121: private HashMap<MessageSubscriber, SC> subscribers = new HashMap<MessageSubscriber, SC>();
122: private Object lock = new Object();
123: private String channel;
124:
125: public ChannelListener(String channel) {
126: this .channel = channel;
127: }
128:
129: public void addSubscriber(MessageSubscriber subscriber,
130: MessageSelectorCommand messageSelector) {
131: synchronized (lock) {
132: subscribers.put(subscriber, new SC(subscriber,
133: messageSelector));
134: }
135: }
136:
137: public boolean removeSubscriber(MessageSubscriber subscriber) {
138: synchronized (lock) {
139: return subscribers.remove(subscriber) != null;
140: }
141: }
142:
143: public LinkedList<SC> getSnapshotCopyOfSubscribersList() {
144: synchronized (lock) {
145: return new LinkedList<SC>(subscribers.values());
146: }
147: }
148: }
149:
150: private static class SC {
151: MessageSubscriber subscriber;
152: MessageSelectorCommand messageSelector;
153:
154: public SC(MessageSubscriber subscriber,
155: MessageSelectorCommand messageSelector) {
156: super();
157: this.subscriber = subscriber;
158: this.messageSelector = messageSelector;
159: }
160: }
161: }
|