001: /******************************************************************************
002: * Copyright (C) Lars Ivar Almli. All rights reserved. *
003: * ---------------------------------------------------------------------------*
004: * This file is part of MActor. *
005: * *
006: * MActor is free software; you can redistribute it and/or modify *
007: * it under the terms of the GNU General Public License as published by *
008: * the Free Software Foundation; either version 2 of the License, or *
009: * (at your option) any later version. *
010: * *
011: * MActor is distributed in the hope that it will be useful, *
012: * but WITHOUT ANY WARRANTY; without even the implied warranty of *
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
014: * GNU General Public License for more details. *
015: * *
016: * You should have received a copy of the GNU General Public License *
017: * along with MActor; if not, write to the Free Software *
018: * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA *
019: ******************************************************************************/package org.mactor.brokers.soap;
020:
021: import java.io.BufferedReader;
022: import java.io.ByteArrayOutputStream;
023: import java.io.File;
024: import java.io.IOException;
025: import java.io.InputStreamReader;
026: import java.io.OutputStream;
027: import java.io.OutputStreamWriter;
028: import java.net.HttpURLConnection;
029: import java.net.MalformedURLException;
030: import java.net.URL;
031: import java.nio.charset.Charset;
032: import java.util.Collections;
033: import java.util.HashMap;
034: import java.util.Map;
035: import org.dom4j.io.SAXReader;
036: import org.mactor.brokers.AbstractMessageBroker;
037: import org.mactor.brokers.Message;
038: import org.mactor.brokers.MessageBroker;
039: import org.mactor.brokers.http.HttpRequest;
040: import org.mactor.brokers.http.HttpRequestListener;
041: import org.mactor.brokers.http.HttpResponse;
042: import org.mactor.brokers.http.HttpServerManager;
043: import org.mactor.framework.ConfigException;
044: import org.mactor.framework.MactorException;
045: import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig;
046: import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig.ChannelConfig;
047:
048: /**
049: * A simple soap message broker
050: *
051: * <p>
052: * Message broker config structure (sample):
053: *
054: * <pre>
055: * <message_broker_config xmlns="http://schemas.mactor.org/framework">
056: * <message_broker name="DemoSoapBroker"
057: * archive_consumed_messages="true" archive_dead_letter_messages="true"
058: * archive_path="/tmp/archive_soap"
059: * message_read_interval_seconds="1" message_read_limit="40"
060: * broker_class="org.mactor.brokers.soap.SoapMessageBroker">
061: *
062: * <channel name="IncomingOrderStatus">
063: * <value name="SoapAction">http://mactor.sourceforge.net/orderstatus/DeliverOrderStatus</value>
064: * <value name="SoapEndpoint">http://localhost:9978/mactor/DeliverOrderStatus</value>
065: * <value name="WsdlFile">samples/soap/templates/OrderStatusService.wsdl</value>
066: * </channel>
067: * <channel name="OutgoingOrder">
068: * <value name="SoapAction">http://mactor.sourceforge.net/order/DeliverOrder</value>
069: * <value name="SoapEndpoint">http://localhost:9977/mactor/DeliverOrder</value>
070: * </channel>
071: * </message_broker>
072: * </message_broker_config>
073: * </pre>
074: * </p>
075: *
076: * @author Lars Ivar Almli
077: * @see MessageBroker
078: */
079: public class SoapMessageBroker extends AbstractMessageBroker {
080: public SoapMessageBroker(MessageBrokerConfig config) {
081: super (config);
082: }
083:
084: public void publish(String channel, Message message)
085: throws MactorException {
086: ChannelConfig cc = config.getRequieredChannelConfig(channel);
087: sendSoapMessage(cc.getRequieredValue("SoapEndpoint"), cc
088: .getRequieredValue("SoapAction"), message);
089: }
090:
091: public Message publishWithResponse(String channel, Message message)
092: throws MactorException {
093: ChannelConfig cc = config.getRequieredChannelConfig(channel);
094: return sendSoapMessage(cc.getRequieredValue("SoapEndpoint"), cc
095: .getRequieredValue("SoapAction"), message);
096: }
097:
098: @Override
099: protected void onFirstSubscribe(String channel)
100: throws MactorException {
101: registerListeners(channel);
102: }
103:
104: private synchronized SoapEndpointListener registerListeners(
105: String channel) throws MactorException {
106: ChannelConfig cc = config.getRequieredChannelConfig(channel);
107: String action = cc.getRequieredValue("SoapAction");
108: try {
109: String endpoint = cc.getRequieredValue("SoapEndpoint");
110: URL url = new URL(endpoint);
111: int port = url.getDefaultPort();
112: if (url.getPort() > 0)
113: port = url.getPort();
114: String real = url.getPath().toLowerCase();
115: String key = port + "-" + real;
116: SoapEndpointListener l = endpointListenerMap.get(key);
117: if (l == null) {
118: l = new SoapEndpointListener();
119: HttpServerManager.getHttpServer(port)
120: .addRequestListener(real, l);
121: String fn = cc.getValue("WsdlFile");
122: if (fn != null && new File(fn).exists()) {
123: WsdlProvider wsdlP = new WsdlProvider(new File(fn));
124: HttpServerManager.getHttpServer(port)
125: .addRequestListener(real + "?wsdl", wsdlP);
126: HttpServerManager.getHttpServer(port)
127: .addRequestListener(real + "?wsdl=", wsdlP);
128: }
129: endpointListenerMap.put(key, l);
130: }
131: l.addActionBinding(action, channel);
132: return l;
133: } catch (MalformedURLException e) {
134: throw new ConfigException(e);
135: }
136: }
137:
138: private class WsdlProvider implements HttpRequestListener {
139: File w;
140:
141: public WsdlProvider(File w) {
142: this .w = w;
143: }
144:
145: public HttpResponse onRequest(HttpRequest request)
146: throws Exception {
147: HttpResponse res = new HttpResponse();
148: res.addHeader("Content-type", "text/xml");
149: res.setData(new SAXReader().read(w));
150: return res;
151: }
152: }
153:
154: private Map<String, SoapEndpointListener> endpointListenerMap = Collections
155: .synchronizedMap(new HashMap<String, SoapEndpointListener>());
156:
157: private class SoapEndpointListener implements HttpRequestListener {
158: private Map<String, String> actionToChannelMap = Collections
159: .synchronizedMap(new HashMap<String, String>());
160:
161: public void addActionBinding(String action, String channel) {
162: actionToChannelMap.put(action.toLowerCase(), channel);
163: }
164:
165: public HttpResponse onRequest(HttpRequest request)
166: throws Exception {
167: Message m = Message.createMessage(request.getData());
168: String action = trimAction(request.getHeader("soapaction"));
169: String channel = actionToChannelMap.get(action);
170: if (channel == null) {
171: log.info("No listeneres registered for action '"
172: + action + "'");
173: return null;
174: }
175: Message resultMessage = raiseOnMessage(channel, m, false);
176: if (resultMessage != null) {
177: HttpResponse res = new HttpResponse();
178: res.setData(resultMessage.getContentDocument());
179: res.addHeader("Content-Type", " text/xml; charset="
180: + Charset.defaultCharset().name());
181: return res;
182: }
183: return null;
184: }
185:
186: private String trimAction(String action) {
187: if (action == null || action.length() == 0)
188: return action;
189: if (action.startsWith("\"") || action.startsWith("'"))
190: action = action.substring(1);
191: if (action.endsWith("\"") || action.endsWith("'"))
192: action = action.substring(0, action.length() - 1);
193: return action.toLowerCase();
194: }
195: }
196:
197: private Message sendSoapMessage(String soapEndpoint,
198: String soapAction, Message soapEnvelopeMessage)
199: throws MactorException {
200: try {
201: URL url = new URL(soapEndpoint);
202: HttpURLConnection conn = (HttpURLConnection) url
203: .openConnection();
204: ByteArrayOutputStream bos = new ByteArrayOutputStream();
205: OutputStreamWriter w = new OutputStreamWriter(bos);
206: soapEnvelopeMessage.getContentDocument().write(w);
207: w.flush();
208: byte[] contentBuffer = bos.toByteArray();
209: conn.setRequestProperty("Content-Length",
210: contentBuffer.length + "");
211: conn.setRequestProperty("Content-Type",
212: " text/xml; charset="
213: + Charset.defaultCharset().name());
214: conn.setRequestProperty("SOAPAction", soapAction);
215: conn.setRequestMethod("POST");
216: conn.setDoOutput(true);
217: conn.setDoInput(true);
218: OutputStream out = conn.getOutputStream();
219: out.write(contentBuffer);
220: out.flush();
221: out.close();
222: int rc = conn.getResponseCode();
223: if (rc != 200)
224: throw new MactorException(
225: "Server rejected the soap message. Endpoint'"
226: + soapEndpoint + " '. Action '"
227: + soapAction + "' HTTP response code: "
228: + rc);
229: BufferedReader in = new BufferedReader(
230: new InputStreamReader(conn.getInputStream()));
231: Message response = Message.createMessage(in);
232: in.close();
233: return response;
234: } catch (IOException ioe) {
235: // TODO more specific error description
236: throw new MactorException("Failed to send soap message "
237: + ioe.getMessage(), ioe);
238: }
239: }
240: }
|