001: /******************************************************************************
002: * Copyright (C) Lars Ivar Almli. All rights reserved. *
003: * ---------------------------------------------------------------------------*
004: * This file is part of MActor. *
005: * *
006: * MActor is free software; you can redistribute it and/or modify *
007: * it under the terms of the GNU General Public License as published by *
008: * the Free Software Foundation; either version 2 of the License, or *
009: * (at your option) any later version. *
010: * *
011: * MActor is distributed in the hope that it will be useful, *
012: * but WITHOUT ANY WARRANTY; without even the implied warranty of *
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
014: * GNU General Public License for more details. *
015: * *
016: * You should have received a copy of the GNU General Public License *
017: * along with MActor; if not, write to the Free Software *
018: * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA *
019: ******************************************************************************/package org.mactor.brokers.file;
020:
021: import java.io.File;
022: import java.io.FileWriter;
023: import java.io.IOException;
024: import java.util.LinkedList;
025: import java.util.List;
026: import org.mactor.brokers.Message;
027: import org.mactor.brokers.MessageBroker;
028: import org.mactor.brokers.PollingMessageBrokerTemplate;
029: import org.mactor.framework.MactorException;
030: import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig;
031: import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig.ChannelConfig;
032:
033: /**
034: * A message broker for communicating via file shares.
035: *
036: * <p>
037: * Message broker config structure (sample):
038: *
039: * <pre>
040: * <message_broker_config xmlns="http://schemas.mactor.org/framework">
041: * <message_broker name="MactorDemoFileBroker"
042: * archive_consumed_messages="true" archive_dead_letter_messages="true"
043: * archive_path="/tmp/archive_fileshare"
044: * message_read_interval_seconds="1" message_read_limit="40"
045: * broker_class="org.mactor.brokers.file.FileMessageBroker">
046: *
047: * <value name="PostAction">RENAME</value> <!--RENAME|DELETE-->
048: * <channel name="OutgoingOrder">
049: * <value name="dir">/tmp/mactor/file_channels/OutgoingOrder</value>
050: * <value name="suffix">.xml</value>
051: * </channel>
052: * <channel name="IncomingOrderStatus">
053: * <value name="dir">/tmp/mactor/file_channels/IncomingOrderStatus</value>
054: * <value name="filter_pattern">.xml</value>
055: * </channel>
056: * </message_broker>
057: * </message_broker_config>
058: * </pre>
059: * </p>
060: *
061: * @author Lars Ivar Almli
062: * @see MessageBroker
063: */
064: public class FileMessageBroker extends PollingMessageBrokerTemplate {
065: private MessageBrokerConfig config;
066:
067: private final boolean postActionDelete;
068:
069: public FileMessageBroker(MessageBrokerConfig config)
070: throws MactorException {
071: super (config);
072: this .config = config;
073: this .postActionDelete = "DELETE".equalsIgnoreCase(config
074: .getValue("PostAction"));
075: }
076:
077: @Override
078: protected synchronized List<Message> doGetMessages(String channel,
079: int maxMessageCount) throws MactorException {
080: ChannelConfig cf = config.getRequieredChannelConfig(channel);
081: String filterPattern = cf.getValue("filter_patter", ".xml");
082: File dir = new File(getDir(cf));
083: List<File> candidates = new LinkedList<File>();
084: for (File f : dir.listFiles()) {
085: if (f.isFile() && f.getName().endsWith(filterPattern)) {
086: File target = new File(f.getAbsolutePath() + "_seen");
087: if (f.renameTo(target))
088: candidates.add(target);
089: else
090: log
091: .error("Failed to parse message received on channel '"
092: + channel
093: + "' to '"
094: + target.getAbsolutePath() + "'");
095: }
096: if (candidates.size() >= maxMessageCount)
097: break;
098: }
099: List<Message> messages = new LinkedList<Message>();
100: for (File file : candidates) {
101: try {
102: messages.add(Message.createMessage(file));
103: if (postActionDelete)
104: if (!file.delete())
105: log.error("Failed to delete file '"
106: + file.getAbsolutePath()
107: + "' received on channel '" + channel
108: + "'");
109: } catch (MactorException me) {
110: log.error(
111: "Failed to parse message received on channel '"
112: + channel + "'", me);
113: }
114: }
115: return messages;
116: }
117:
118: @Override
119: protected synchronized void doPublishMessage(String channel,
120: Message message) throws MactorException {
121: ChannelConfig cf = config.getRequieredChannelConfig(channel);
122: String dir = getDir(cf);
123: String fn = dir + getNext() + cf.getValue("suffix", ".xml")
124: + "__";
125: try {
126: File f = new File(fn);
127: FileWriter fw = new FileWriter(fn);
128: fw.write(message.getContent() + "");
129: fw.close();
130: f.renameTo(new File(fn.substring(0, fn.length() - 2)));
131: } catch (IOException ioe) {
132: throw new MactorException(
133: "Failed to write message to dir '" + dir
134: + "'. File name: ''" + fn + "' . Error:"
135: + ioe.getMessage());
136: }
137: }
138:
139: private String getDir(ChannelConfig cf) throws MactorException {
140: String dir = cf.getRequieredValue("dir");
141: File f = new File(dir);
142: if (!f.exists())
143: if (!f.mkdirs())
144: throw new MactorException(
145: "Unable to create the dir '"
146: + dir
147: + "' specified in the channel config for channel: "
148: + cf.getName());
149: if (!dir.endsWith("\\") && !dir.endsWith("/"))
150: dir = dir + "/";
151: return dir;
152: }
153:
154: private static long counter = System.currentTimeMillis();
155:
156: private static synchronized long getNext() {
157: return counter++;
158: }
159: }
|