001: /******************************************************************************
002: * Copyright (C) Lars Ivar Almli. All rights reserved. *
003: * ---------------------------------------------------------------------------*
004: * This file is part of MActor. *
005: * *
006: * MActor is free software; you can redistribute it and/or modify *
007: * it under the terms of the GNU General Public License as published by *
008: * the Free Software Foundation; either version 2 of the License, or *
009: * (at your option) any later version. *
010: * *
011: * MActor is distributed in the hope that it will be useful, *
012: * but WITHOUT ANY WARRANTY; without even the implied warranty of *
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
014: * GNU General Public License for more details. *
015: * *
016: * You should have received a copy of the GNU General Public License *
017: * along with MActor; if not, write to the Free Software *
018: * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA *
019: ******************************************************************************/package org.mactor.brokers;
020:
021: import java.lang.reflect.Constructor;
022: import java.lang.reflect.InvocationTargetException;
023: import java.util.HashMap;
024: import java.util.Map;
025: import org.mactor.framework.MactorException;
026: import org.mactor.framework.extensioninterface.MessageSelectorCommand;
027: import org.mactor.framework.spec.MessageBrokersConfig;
028: import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig;
029: import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig.ChannelConfig;
030:
031: /**
032: * Singleton that loads and maintains all message brokers. And works as a facade to
033: * the message brokers (hiding the broker-channel association).
034: *
035: * @author Lars Ivar Almli
036: */
037: public class MessageBrokerManager {
038: private static MessageBrokerManager manager;
039: private Map<String, MessageBrokerHolder> channelToBrokerMap = new HashMap<String, MessageBrokerHolder>();
040:
041: public static synchronized MessageBrokerManager getManager(
042: MessageBrokersConfig config) throws MactorException {
043: if (manager == null)
044: manager = new MessageBrokerManager(config);
045: return manager;
046: }
047:
048: private MessageBrokerManager(MessageBrokersConfig config)
049: throws MactorException {
050: for (MessageBrokerConfig mbc : config.getMessageBrokers()) {
051: MessageBroker broker = createMessageBroker(mbc);
052: boolean archiveMessages = mbc.isArchiveConsumedMessages()
053: && mbc.getArchivePath() != null
054: && mbc.getArchivePath().length() > 0;
055: broker = new MessageBrokerDecorator(archiveMessages, mbc
056: .getArchivePath(), broker);
057: addBroker(mbc, broker);
058: }
059: }
060:
061: private synchronized void addBroker(MessageBrokerConfig mbc,
062: MessageBroker broker) throws MactorException {
063: for (ChannelConfig cc : mbc.getChannels().values()) {
064: if (channelToBrokerMap.containsKey(cc.getName()))
065: throw new MactorException(
066: "Duplicate channel definition. Channel '"
067: + cc.getName()
068: + "' is defined in both '"
069: + mbc.getName()
070: + "' and '"
071: + channelToBrokerMap.get(cc.getName()).config
072: .getName() + "'");
073: channelToBrokerMap.put(cc.getName(),
074: new MessageBrokerHolder(broker, mbc));
075: }
076: }
077:
078: private MessageBroker getBroker(String channel)
079: throws MactorException {
080: MessageBrokerHolder brokerHolder = channelToBrokerMap
081: .get(channel);
082: if (brokerHolder == null)
083: throw new MactorException(
084: "No MessageBroker is registered for channel '"
085: + channel + "'");
086: return brokerHolder.broker;
087: }
088:
089: public void publish(String channel, Message message)
090: throws MactorException {
091: getBroker(channel).publish(channel, message);
092: }
093:
094: public Message publishWithResponse(String channel, Message message)
095: throws MactorException {
096: return getBroker(channel).publishWithResponse(channel, message);
097: }
098:
099: public void subscribe(String channel, MessageSubscriber subscriber,
100: MessageSelectorCommand messageSelector)
101: throws MactorException {
102: getBroker(channel).subscribe(channel, subscriber,
103: messageSelector);
104: }
105:
106: public void unsubscribe(String channel, MessageSubscriber subscriber)
107: throws MactorException {
108: getBroker(channel).unsubscribe(channel, subscriber);
109: }
110:
111: private class MessageBrokerHolder {
112: MessageBroker broker;
113: MessageBrokerConfig config;
114:
115: public MessageBrokerHolder(MessageBroker broker,
116: MessageBrokerConfig config) {
117: this .broker = broker;
118: this .config = config;
119: }
120: }
121:
122: private static MessageBroker createMessageBroker(
123: MessageBrokerConfig config) throws MactorException {
124: String className = config.getBrokerClass();
125: try {
126: Class c = Class.forName(className);
127: Constructor ct = c
128: .getConstructor(new Class[] { MessageBrokerConfig.class });
129: Object mb = ct.newInstance(new Object[] { config });
130: if (!(mb instanceof MessageBroker))
131: throw new MactorException(
132: "The specified message broker class does not implement '"
133: + MessageBroker.class.getName() + "'");
134: return (MessageBroker) mb;
135: } catch (ClassNotFoundException cnf) {
136: throw new MactorException(
137: "The specified MessageBroker implementation was not found in the classpath. Class '"
138: + className + "'", cnf);
139: } catch (NoSuchMethodException nm) {
140: throw new MactorException(
141: "The specified MessageBroker implementation class '"
142: + className
143: + "' does not have the requiered contructor (that takes a single argument of type '"
144: + MessageBrokerConfig.class.getName()
145: + "')", nm);
146: } catch (IllegalAccessException iae) {
147: throw new MactorException(
148: "Unable to instantiate the MessageBroker '"
149: + className + "'. Illegal access", iae);
150: } catch (InvocationTargetException it) {
151: throw new MactorException(
152: "Unable to instantiate the MessageBroker '"
153: + className
154: + "'. InvocationTargetException", it);
155: } catch (InstantiationException ie) {
156: throw new MactorException(
157: "Unable to instantiate the MessageBroker '"
158: + className + "'. InstantiationException",
159: ie);
160: }
161: }
162: }
|