001: /**
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */package org.apache.cxf.transport.local;
019:
020: import java.io.IOException;
021: import java.io.InputStream;
022: import java.io.OutputStream;
023: import java.io.PipedInputStream;
024: import java.io.PipedOutputStream;
025: import java.util.Map;
026: import java.util.Set;
027: import java.util.logging.Logger;
028:
029: import org.apache.cxf.common.logging.LogUtils;
030: import org.apache.cxf.helpers.CastUtils;
031: import org.apache.cxf.message.Exchange;
032: import org.apache.cxf.message.ExchangeImpl;
033: import org.apache.cxf.message.Message;
034: import org.apache.cxf.message.MessageImpl;
035: import org.apache.cxf.transport.AbstractConduit;
036:
037: public class LocalConduit extends AbstractConduit {
038:
039: public static final String IN_CONDUIT = LocalConduit.class
040: .getName()
041: + ".inConduit";
042: public static final String RESPONSE_CONDUIT = LocalConduit.class
043: .getName()
044: + ".inConduit";
045: public static final String IN_EXCHANGE = LocalConduit.class
046: .getName()
047: + ".inExchange";
048: public static final String DIRECT_DISPATCH = LocalConduit.class
049: .getName()
050: + ".directDispatch";
051: public static final String MESSAGE_FILTER_PROPERTIES = LocalConduit.class
052: .getName()
053: + ".filterProperties";
054:
055: private static final Logger LOG = LogUtils
056: .getL7dLogger(LocalConduit.class);
057:
058: private LocalDestination destination;
059: private LocalTransportFactory transportFactory;
060:
061: public LocalConduit(LocalTransportFactory transportFactory,
062: LocalDestination destination) {
063: super (destination.getAddress());
064: this .destination = destination;
065: this .transportFactory = transportFactory;
066: }
067:
068: public void prepare(final Message message) throws IOException {
069: if (!Boolean.TRUE.equals(message.get(DIRECT_DISPATCH))) {
070: dispatchViaPipe(message);
071: }
072: }
073:
074: @Override
075: public void close(Message message) throws IOException {
076: if (Boolean.TRUE.equals(message.get(DIRECT_DISPATCH))
077: && !Boolean.TRUE.equals(message
078: .get(Message.INBOUND_MESSAGE))) {
079: dispatchDirect(message);
080: }
081:
082: super .close(message);
083: }
084:
085: private void dispatchDirect(Message message) {
086: if (destination.getMessageObserver() == null) {
087: throw new IllegalStateException(
088: "Local destination does not have a MessageObserver on address "
089: + destination.getAddress().getAddress()
090: .getValue());
091: }
092:
093: MessageImpl copy = new MessageImpl();
094: copy.put(IN_CONDUIT, this );
095: copy.setDestination(destination);
096:
097: copy(message, copy, transportFactory
098: .getMessageFilterProperties());
099:
100: // Create a new incoming exchange and store the original exchange for the response
101: ExchangeImpl ex = new ExchangeImpl();
102: ex.setInMessage(copy);
103: ex.put(IN_EXCHANGE, message.getExchange());
104: ex.setDestination(destination);
105:
106: destination.getMessageObserver().onMessage(copy);
107: }
108:
109: public static void copy(Message message, MessageImpl copy,
110: Set<String> defaultFilter) {
111: Set<String> filter = CastUtils.cast((Set) message
112: .get(MESSAGE_FILTER_PROPERTIES));
113: if (filter == null) {
114: filter = defaultFilter;
115: }
116:
117: // copy all the contents
118: for (Map.Entry<String, Object> e : message.entrySet()) {
119: if (!filter.contains(e.getKey())) {
120: copy.put(e.getKey(), e.getValue());
121: }
122: }
123:
124: MessageImpl.copyContent(message, copy);
125: }
126:
127: private void dispatchViaPipe(final Message message)
128: throws IOException {
129: final PipedInputStream stream = new PipedInputStream();
130: final LocalConduit conduit = this ;
131: final Exchange exchange = message.getExchange();
132:
133: if (destination.getMessageObserver() == null) {
134: throw new IllegalStateException(
135: "Local destination does not have a MessageObserver on address "
136: + destination.getAddress().getAddress()
137: .getValue());
138: }
139:
140: final Runnable receiver = new Runnable() {
141: public void run() {
142: MessageImpl inMsg = new MessageImpl();
143: inMsg.setContent(InputStream.class, stream);
144: inMsg.setDestination(destination);
145: inMsg.put(IN_CONDUIT, conduit);
146:
147: ExchangeImpl ex = new ExchangeImpl();
148: ex.setInMessage(inMsg);
149: ex.put(IN_EXCHANGE, exchange);
150: destination.getMessageObserver().onMessage(inMsg);
151: }
152: };
153:
154: message.setContent(OutputStream.class, new PipedOutputStream(
155: stream));
156:
157: // TODO: put on executor
158: new Thread(receiver).start();
159: }
160:
161: protected Logger getLogger() {
162: return LOG;
163: }
164: }
|