001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */
019: package org.apache.synapse.transport.nhttp;
020:
021: import org.apache.axis2.addressing.EndpointReference;
022: import org.apache.axis2.context.MessageContext;
023: import org.apache.axis2.AxisFault;
024: import org.apache.axis2.Constants;
025: import org.apache.synapse.transport.nhttp.util.PipeImpl;
026: import org.apache.synapse.transport.nhttp.util.RESTUtil;
027: import org.apache.axis2.transport.http.HTTPConstants;
028: import org.apache.axis2.transport.MessageFormatter;
029: import org.apache.axis2.transport.TransportUtils;
030: import org.apache.http.*;
031: import org.apache.http.message.BasicHttpEntityEnclosingRequest;
032: import org.apache.http.message.BasicHttpRequest;
033: import org.apache.http.protocol.HTTP;
034: import org.apache.http.entity.BasicHttpEntity;
035: import org.apache.axiom.om.OMOutputFormat;
036: import org.apache.commons.logging.Log;
037: import org.apache.commons.logging.LogFactory;
038:
039: import java.io.IOException;
040: import java.io.OutputStream;
041: import java.io.ByteArrayOutputStream;
042: import java.nio.channels.Channels;
043: import java.nio.channels.ReadableByteChannel;
044: import java.util.Map;
045: import java.util.Iterator;
046:
047: /**
048: * Represents an outgoing Axis2 HTTP/s request. It holds the EPR of the destination, the
049: * Axis2 MessageContext to be sent, an HttpHost object which captures information about the
050: * destination, and a Pipe used to write the message stream to the destination
051: */
052: public class Axis2HttpRequest {
053:
054: private static final Log log = LogFactory
055: .getLog(Axis2HttpRequest.class);
056:
057: /** the EPR of the destination */
058: private EndpointReference epr = null;
059: /** the HttpHost that contains the HTTP connection information */
060: private HttpHost httpHost = null;
061: /** the message context being sent */
062: private MessageContext msgContext = null;
063: /** the Pipe which facilitates the serialization output to be written to the channel */
064: private PipeImpl pipe = null;
065: /** The Axis2 MessageFormatter that will ensure proper serialization as per Axis2 semantics */
066: MessageFormatter messageFormatter = null;
067: /** The OM Output format holder */
068: OMOutputFormat format = null;
069: protected boolean completed = false; //added for request complete checking
070:
071: public Axis2HttpRequest(EndpointReference epr, HttpHost httpHost,
072: MessageContext msgContext) {
073: this .epr = epr;
074: this .httpHost = httpHost;
075: this .msgContext = msgContext;
076: this .format = NhttpUtils.getOMOutputFormat(msgContext);
077: try {
078: messageFormatter = TransportUtils
079: .getMessageFormatter(msgContext);
080: } catch (AxisFault axisFault) {
081: log.error("Cannot find a suitable MessageFormatter : "
082: + axisFault.getMessage());
083: }
084: try {
085: this .pipe = new PipeImpl();
086: } catch (IOException e) {
087: log.error("Error creating pipe to write message body", e);
088: }
089: }
090:
091: public EndpointReference getEpr() {
092: return epr;
093: }
094:
095: public HttpHost getHttpHost() {
096: return httpHost;
097: }
098:
099: public MessageContext getMsgContext() {
100: return msgContext;
101: }
102:
103: /**
104: * Create and return a new HttpPost request to the destination EPR
105: * @return the HttpRequest to be sent out
106: */
107: public HttpRequest getRequest() throws IOException {
108:
109: boolean doingGET = Constants.Configuration.HTTP_METHOD_GET
110: .equals(msgContext
111: .getProperty(Constants.Configuration.HTTP_METHOD));
112: HttpRequest httpRequest = null;
113: if (msgContext.isPropertyTrue(NhttpConstants.FORCE_HTTP_1_0)) {
114:
115: if (doingGET) {
116:
117: httpRequest = new BasicHttpRequest("GET", RESTUtil
118: .getURI(msgContext, epr.getAddress()),
119: HttpVersion.HTTP_1_0);
120:
121: } else {
122:
123: httpRequest = new BasicHttpEntityEnclosingRequest(
124: "POST", epr.getAddress(), HttpVersion.HTTP_1_0);
125:
126: ByteArrayOutputStream baos = new ByteArrayOutputStream();
127: messageFormatter
128: .writeTo(msgContext, format, baos, true);
129: BasicHttpEntity entity = new BasicHttpEntity();
130: entity.setContentLength(baos.toByteArray().length);
131: ((BasicHttpEntityEnclosingRequest) httpRequest)
132: .setEntity(entity);
133: }
134:
135: } else {
136:
137: if (doingGET) {
138:
139: httpRequest = new BasicHttpRequest("GET", RESTUtil
140: .getURI(msgContext, epr.getAddress()));
141:
142: } else {
143: httpRequest = new BasicHttpEntityEnclosingRequest(
144: "POST", epr.getAddress());
145: ((BasicHttpEntityEnclosingRequest) httpRequest)
146: .setEntity(new BasicHttpEntity());
147: }
148: }
149:
150: // set any transport headers
151: Object o = msgContext
152: .getProperty(MessageContext.TRANSPORT_HEADERS);
153: if (o != null && o instanceof Map) {
154: Map headers = (Map) o;
155: Iterator iter = headers.keySet().iterator();
156: while (iter.hasNext()) {
157: Object header = iter.next();
158: Object value = headers.get(header);
159: if (header instanceof String && value != null
160: && value instanceof String) {
161: httpRequest.setHeader((String) header,
162: (String) value);
163: }
164: }
165: }
166:
167: // if the message is SOAP 11 (for which a SOAPAction is *required*), and
168: // the msg context has a SOAPAction or a WSA-Action (give pref to SOAPAction)
169: // use that over any transport header that may be available
170: String soapAction = msgContext.getSoapAction();
171: if (soapAction == null) {
172: soapAction = msgContext.getWSAAction();
173: }
174: if (soapAction == null) {
175: msgContext.getAxisOperation().getInputAction();
176: }
177:
178: if (msgContext.isSOAP11() && soapAction != null
179: && soapAction.length() > 0) {
180: Header existingHeader = httpRequest
181: .getFirstHeader(HTTPConstants.HEADER_SOAP_ACTION);
182: if (existingHeader != null) {
183: httpRequest.removeHeader(existingHeader);
184: }
185: httpRequest.setHeader(HTTPConstants.HEADER_SOAP_ACTION,
186: soapAction);
187: }
188:
189: httpRequest.setHeader(HTTP.CONTENT_TYPE, messageFormatter
190: .getContentType(msgContext, format, msgContext
191: .getSoapAction()));
192:
193: return httpRequest;
194: }
195:
196: /**
197: * Return the source channel of the pipe that bridges the serialized output to the socket
198: * @return source channel to read serialized message contents
199: */
200: public ReadableByteChannel getSourceChannel() {
201: if (log.isDebugEnabled()) {
202: log
203: .debug("get source channel of the pipe on which the outgoing response is written");
204: }
205: return pipe.source();
206: }
207:
208: /**
209: * Start streaming the message into the Pipe, so that the contents could be read off the source
210: * channel returned by getSourceChannel()
211: * @throws AxisFault on error
212: */
213: public void streamMessageContents() throws AxisFault {
214:
215: if (log.isDebugEnabled()) {
216: log.debug("start streaming outgoing http request");
217: }
218: OutputStream out = Channels.newOutputStream(pipe.sink());
219: try {
220: messageFormatter.writeTo(msgContext, format, out, true);
221: } catch (Exception e) {
222: /* close PipeImpl will manually raise exception
223: while streaming, so blocking status will be released */
224: if (e instanceof AxisFault) {
225: throw (AxisFault) e;
226: } else {
227: handleException("Error streaming message context", e);
228: }
229: } finally {
230: try {
231: out.flush();
232: out.close();
233: } catch (IOException e) {
234: handleException(
235: "Error closing outgoing message stream", e);
236: }
237: }
238: }
239:
240: // -------------- utility methods -------------
241: private void handleException(String msg, Exception e)
242: throws AxisFault {
243: log.error(msg, e);
244: throw new AxisFault(msg, e);
245: }
246:
247: public boolean isCompleted() {
248: return completed;
249: }
250:
251: public void setCompleted(boolean completed) {
252: if (completed && !isCompleted()) {
253: this.pipe.close();
254: }
255: this.completed = completed;
256: }
257: }
|