001: /******************************************************************************
002: * Copyright (C) Lars Ivar Almli. All rights reserved. *
003: * ---------------------------------------------------------------------------*
004: * This file is part of MActor. *
005: * *
006: * MActor is free software; you can redistribute it and/or modify *
007: * it under the terms of the GNU General Public License as published by *
008: * the Free Software Foundation; either version 2 of the License, or *
009: * (at your option) any later version. *
010: * *
011: * MActor is distributed in the hope that it will be useful, *
012: * but WITHOUT ANY WARRANTY; without even the implied warranty of *
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
014: * GNU General Public License for more details. *
015: * *
016: * You should have received a copy of the GNU General Public License *
017: * along with MActor; if not, write to the Free Software *
018: * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA *
019: ******************************************************************************/package org.mactor.brokers;
020:
021: import java.util.HashMap;
022: import java.util.LinkedList;
023: import java.util.List;
024: import java.util.Map;
025: import org.apache.log4j.Logger;
026: import org.mactor.brokers.file.FileMessageBroker;
027: import org.mactor.framework.MactorException;
028: import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig;
029:
030: /**
031: * A template for implementing message brokers for polling bases protocols (i.e. such as file system polling)
032: *
033: * @see FileMessageBroker
034: * @author Lars Ivar Almli
035: */
036: public abstract class PollingMessageBrokerTemplate extends
037: AbstractMessageBroker {
038: protected static Logger log = Logger
039: .getLogger(PollingMessageBrokerTemplate.class);
040: private final int messageReadLimit;
041: private final int messageReadIntervalSeconds;
042:
043: public PollingMessageBrokerTemplate(MessageBrokerConfig config) {
044: super (config);
045: this .messageReadLimit = config.getMessageReadLimit();
046: this .messageReadIntervalSeconds = config
047: .getMessageReadIntervalSeconds();
048: }
049:
050: public Message publishWithResponse(String channel, Message message)
051: throws MactorException {
052: throw new UnsupportedOperationException(
053: "publishMessageWithResponse is not supported by this polling based message brokers");
054: }
055:
056: public void publish(String channel, Message message)
057: throws MactorException {
058: doPublishMessage(channel, message);
059: }
060:
061: /**
062: * Template method called to check for incoming messages (the polling
063: * interval is defined by the message broker config)
064: *
065: * @param channel
066: * the channel
067: * @param maxMessageCount
068: * the max number of messages to return (as defined in the
069: * message broker config).
070: * @return list of messages
071: * @throws MactorException
072: * if a problem occured (this will not cause the test to fail)
073: */
074: protected abstract List<Message> doGetMessages(String channel,
075: int maxMessageCount) throws MactorException;
076:
077: /**
078: * Template method called to publish a message.
079: *
080: * @param channel
081: * the channel
082: * @param message
083: * the message
084: * @throws MactorException
085: * if a problem occured (this will cause the test to fail)
086: */
087: protected abstract void doPublishMessage(String channel,
088: Message message) throws MactorException;
089:
090: Map<String, PollingWorker> workerMap = new HashMap<String, PollingWorker>();
091:
092: @Override
093: protected void onFirstSubscribe(String channel) {
094: workerMap.put(channel, new PollingWorker(channel));
095: }
096:
097: private class PollingWorker {
098: private String channel;
099:
100: public PollingWorker(String channel) {
101: this .channel = channel;
102: new Thread(new Runnable() {
103: public void run() {
104: work();
105: };
106: }).start();
107: }
108:
109: private Object waitLock = new Object();
110:
111: private void work() {
112: try {
113: while (true) {
114: synchronized (waitLock) {
115: waitLock
116: .wait(messageReadIntervalSeconds * 1000);
117: }
118: List<Message> candidates = null;
119: try {
120: log.debug("Polling for messages on channel '"
121: + channel + "'");
122: candidates = doGetMessages(this .channel,
123: messageReadLimit);
124: } catch (Exception me) {
125: log.error(
126: "Error while polling for messages on channel '"
127: + channel + "'", me);
128: }
129: if (candidates == null || candidates.size() == 0)
130: continue;
131: candidates = new LinkedList<Message>(candidates);
132: for (Message message : candidates) {
133: try {
134: PollingMessageBrokerTemplate.this
135: .raiseOnMessage(channel, message,
136: true);
137: } catch (Exception e) {
138: log.info(
139: "Exception while delivering message to subscribers. Message: "
140: + message, e);
141: }
142: }
143: }
144: } catch (InterruptedException ie) {
145: } finally {
146: log.info("Channel polling worker for channel '"
147: + channel + "' terminated");
148: }
149: }
150: }
151: }
|