001: /******************************************************************************
002: * Copyright (C) Lars Ivar Almli. All rights reserved. *
003: * ---------------------------------------------------------------------------*
004: * This file is part of MActor. *
005: * *
006: * MActor is free software; you can redistribute it and/or modify *
007: * it under the terms of the GNU General Public License as published by *
008: * the Free Software Foundation; either version 2 of the License, or *
009: * (at your option) any later version. *
010: * *
011: * MActor is distributed in the hope that it will be useful, *
012: * but WITHOUT ANY WARRANTY; without even the implied warranty of *
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
014: * GNU General Public License for more details. *
015: * *
016: * You should have received a copy of the GNU General Public License *
017: * along with MActor; if not, write to the Free Software *
018: * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA *
019: ******************************************************************************/package org.mactor.brokers.http;
020:
021: import java.io.BufferedReader;
022: import java.io.ByteArrayOutputStream;
023: import java.io.InputStreamReader;
024: import java.io.OutputStream;
025: import java.io.OutputStreamWriter;
026: import java.net.HttpURLConnection;
027: import java.net.MalformedURLException;
028: import java.net.URL;
029: import java.nio.charset.Charset;
030: import java.util.HashMap;
031: import java.util.Map;
032: import org.mactor.brokers.AbstractMessageBroker;
033: import org.mactor.brokers.Message;
034: import org.mactor.brokers.MessageBroker;
035: import org.mactor.framework.ConfigException;
036: import org.mactor.framework.MactorException;
037: import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig;
038: import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig.ChannelConfig;
039:
040: /**
041: * A simple message broker that supports posting/receinving XML messages over
042: * HTTP (optionally with a response)
043: *
044: * <p>
045: * Message broker config structure (sample):
046: *
047: * <pre>
048: * <message_broker_config xmlns="http://schemas.mactor.org/framework">
049: * <message_broker name="DemoHttpBroker"
050: * archive_consumed_messages="true" archive_dead_letter_messages="true"
051: * archive_path="/tmp/archive_http"
052: * message_read_interval_seconds="1" message_read_limit="40"
053: * broker_class="org.mactor.brokers.http.HttpMessageBroker">
054: *
055: * <channel name="OutgoingOrder">
056: * <value name="Method">POST</value>
057: * <value name="URL">http://localhost:8877/mactor/deliverOrder</value>
058: * </channel>
059: * <channel name="IncomingOrderStatus">
060: * <value name="Method">POST</value>
061: * <value name="URL">http://localhost:8878/mactor/receiveOrerStatus</value>
062: * </channel>
063: * </message_broker>
064: * </message_broker_config>
065: * </pre>
066: * </p>
067: *
068: * @author Lars Ivar Almli
069: * @see MessageBroker
070: */
071: public class HttpMessageBroker extends AbstractMessageBroker {
072: public HttpMessageBroker(MessageBrokerConfig config) {
073: super (config);
074: }
075:
076: public void publish(String channel, Message message)
077: throws MactorException {
078: ChannelConfig cc = config.getRequieredChannelConfig(channel);
079: sendMessage(cc.getRequieredValue("URL"), cc
080: .getRequieredValue("Method"), message, false);
081: }
082:
083: public Message publishWithResponse(String channel, Message message)
084: throws MactorException {
085: ChannelConfig cc = config.getRequieredChannelConfig(channel);
086: return sendMessage(cc.getRequieredValue("URL"), cc
087: .getRequieredValue("Method"), message, true);
088: }
089:
090: @Override
091: protected void onFirstSubscribe(String channel)
092: throws MactorException {
093: registerListener(channel);
094: }
095:
096: private synchronized UrlListener registerListener(String channel)
097: throws MactorException {
098: ChannelConfig cc = config.getRequieredChannelConfig(channel);
099: try {
100: String endpoint = cc.getRequieredValue("URL");
101: URL url = new URL(endpoint);
102: int port = url.getDefaultPort();
103: if (url.getPort() > 0)
104: port = url.getPort();
105: String real = url.getPath().toLowerCase();
106: String key = port + "-" + real;
107: UrlListener l = endpointListenerMap.get(key);
108: if (l == null) {
109: l = new UrlListener(channel);
110: HttpServerManager.getHttpServer(port)
111: .addRequestListener(real, l);
112: endpointListenerMap.put(key, l);
113: }
114: return l;
115: } catch (MalformedURLException e) {
116: throw new ConfigException(e);
117: }
118: }
119:
120: private Map<String, UrlListener> endpointListenerMap = new HashMap<String, UrlListener>();
121:
122: private class UrlListener implements HttpRequestListener {
123: private String channel;
124:
125: public UrlListener(String channel) {
126: this .channel = channel;
127: }
128:
129: public HttpResponse onRequest(HttpRequest request)
130: throws Exception {
131: Message m = Message.createMessage(request.getData());
132: Message result = raiseOnMessage(channel, m, false);
133: if (result != null) {
134: HttpResponse res = new HttpResponse();
135: res.setData(result.getContentDocument());
136: res.addHeader("Content-type", " text/xml; charset="
137: + Charset.defaultCharset().name());
138: return res;
139: } else {
140: HttpResponse res = new HttpResponse();
141: res.addHeader("Content-type", " text/plain; charset="
142: + Charset.defaultCharset().name());
143: res.setData("");
144: return res;
145: }
146: }
147: }
148:
149: private Message sendMessage(String endPoint, String method,
150: Message message, boolean expectResponse)
151: throws MactorException {
152: try {
153: URL url = new URL(endPoint);
154: HttpURLConnection conn = (HttpURLConnection) url
155: .openConnection();
156: ByteArrayOutputStream bos = new ByteArrayOutputStream();
157: OutputStreamWriter w = new OutputStreamWriter(bos);
158: message.getContentDocument().write(w);
159: w.flush();
160: byte[] contentBuffer = bos.toByteArray();
161: conn.setRequestProperty("Content-Length",
162: contentBuffer.length + "");
163: conn.setRequestProperty("Content-Type",
164: " text/xml; charset="
165: + Charset.defaultCharset().name());
166: conn.setRequestMethod(method);
167: conn.setDoOutput(true);
168: conn.setDoInput(true);
169: OutputStream out = conn.getOutputStream();
170: out.write(contentBuffer);
171: out.flush();
172: out.close();
173: int rc = conn.getResponseCode();
174: if (rc < 200 || rc > 299)
175: throw new MactorException(
176: "Server rejected the message. URL '" + endPoint
177: + " '. Method '" + method
178: + "' HTTP response code: " + rc);
179: BufferedReader in = new BufferedReader(
180: new InputStreamReader(conn.getInputStream()));
181: if (expectResponse) {
182: Message response = Message.createMessage(in);
183: in.close();
184: return response;
185: } else {
186: in.close();
187: return null;
188: }
189: } catch (Exception me) {
190: // TODO more specific error description
191: throw new MactorException("Failed to send HTTP message "
192: + me.getMessage(), me);
193: }
194: }
195: }
|