001: /**
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */package org.apache.cxf.binding.coloc;
019:
020: import java.util.List;
021: import java.util.ResourceBundle;
022: import java.util.SortedSet;
023: import java.util.TreeSet;
024: import java.util.logging.Level;
025: import java.util.logging.Logger;
026:
027: import org.apache.cxf.Bus;
028: import org.apache.cxf.BusFactory;
029: import org.apache.cxf.binding.Binding;
030: import org.apache.cxf.common.i18n.BundleUtils;
031: import org.apache.cxf.common.logging.LogUtils;
032: import org.apache.cxf.endpoint.ClientImpl;
033: import org.apache.cxf.endpoint.Endpoint;
034: import org.apache.cxf.endpoint.Server;
035: import org.apache.cxf.endpoint.ServerRegistry;
036: import org.apache.cxf.interceptor.Fault;
037: import org.apache.cxf.interceptor.InterceptorChain;
038: import org.apache.cxf.message.Exchange;
039: import org.apache.cxf.message.Message;
040: import org.apache.cxf.message.MessageImpl;
041: import org.apache.cxf.phase.AbstractPhaseInterceptor;
042: import org.apache.cxf.phase.Phase; //import org.apache.cxf.phase.PhaseInterceptorChain;
043: import org.apache.cxf.phase.PhaseManager;
044: import org.apache.cxf.service.Service;
045: import org.apache.cxf.service.model.BindingOperationInfo;
046: import org.apache.cxf.service.model.EndpointInfo; //import org.apache.cxf.service.model.OperationInfo;
047: import org.apache.cxf.transport.MessageObserver;
048:
049: public class ColocOutInterceptor extends
050: AbstractPhaseInterceptor<Message> {
051: private static final ResourceBundle BUNDLE = BundleUtils
052: .getBundle(ColocOutInterceptor.class);
053: private static final Logger LOG = LogUtils
054: .getL7dLogger(ClientImpl.class);
055: private static final String COLOCATED = Message.class.getName()
056: + ".COLOCATED";
057: private MessageObserver colocObserver;
058: private Bus bus;
059:
060: public ColocOutInterceptor() {
061: super (Phase.POST_LOGICAL);
062: }
063:
064: public void setBus(Bus bus) {
065: this .bus = bus;
066: }
067:
068: public void handleMessage(Message message) throws Fault {
069: if (bus == null) {
070: bus = message.getExchange().get(Bus.class);
071: if (bus == null) {
072: bus = BusFactory.getDefaultBus(false);
073: }
074: if (bus == null) {
075: throw new Fault(new org.apache.cxf.common.i18n.Message(
076: "BUS_NOT_FOUND", BUNDLE));
077: }
078: }
079:
080: ServerRegistry registry = bus
081: .getExtension(ServerRegistry.class);
082:
083: if (registry == null) {
084: throw new Fault(new org.apache.cxf.common.i18n.Message(
085: "SERVER_REGISTRY_NOT_FOUND", BUNDLE));
086: }
087:
088: Exchange exchange = message.getExchange();
089: Endpoint senderEndpoint = exchange.get(Endpoint.class);
090:
091: if (senderEndpoint == null) {
092: throw new Fault(new org.apache.cxf.common.i18n.Message(
093: "ENDPOINT_NOT_FOUND", BUNDLE));
094: }
095:
096: BindingOperationInfo boi = exchange
097: .get(BindingOperationInfo.class);
098:
099: if (boi == null) {
100: throw new Fault(new org.apache.cxf.common.i18n.Message(
101: "OPERATIONINFO_NOT_FOUND", BUNDLE));
102: }
103:
104: Server srv = isColocated(registry.getServers(), senderEndpoint,
105: boi);
106:
107: if (srv != null) {
108: if (LOG.isLoggable(Level.FINE)) {
109: LOG.fine("Operation:" + boi.getName()
110: + " dispatched as colocated call.");
111: }
112:
113: InterceptorChain outChain = message.getInterceptorChain();
114: outChain.abort();
115: exchange.put(Bus.class, bus);
116: message.put(COLOCATED, Boolean.TRUE);
117: message.put(Message.WSDL_OPERATION, boi.getName());
118: message.put(Message.WSDL_INTERFACE, boi.getBinding()
119: .getInterface().getName());
120: invokeColocObserver(message, srv.getEndpoint());
121: if (!exchange.isOneWay()) {
122: invokeInboundChain(exchange, senderEndpoint);
123: }
124: } else {
125: if (LOG.isLoggable(Level.FINE)) {
126: LOG.fine("Operation:" + boi.getName()
127: + " dispatched as remote call.");
128: }
129:
130: message.put(COLOCATED, Boolean.FALSE);
131: }
132: }
133:
134: protected void invokeColocObserver(Message outMsg,
135: Endpoint inboundEndpoint) {
136: if (colocObserver == null) {
137: colocObserver = new ColocMessageObserver(inboundEndpoint,
138: bus);
139: }
140: if (LOG.isLoggable(Level.FINE)) {
141: LOG.fine("Invoke on Coloc Observer.");
142: }
143:
144: colocObserver.onMessage(outMsg);
145: }
146:
147: protected void invokeInboundChain(Exchange ex, Endpoint ep) {
148: Message m = getInBoundMessage(ex);
149: Message inMsg = ep.getBinding().createMessage();
150: MessageImpl.copyContent(m, inMsg);
151:
152: //Copy Response Context to Client inBound Message
153: //TODO a Context Filter Strategy required.
154: inMsg.putAll(m);
155:
156: inMsg.put(Message.REQUESTOR_ROLE, Boolean.TRUE);
157: inMsg.put(Message.INBOUND_MESSAGE, Boolean.TRUE);
158: inMsg.setExchange(ex);
159:
160: Exception exc = inMsg.getContent(Exception.class);
161: if (exc != null) {
162: ex.setInFaultMessage(inMsg);
163: ColocInFaultObserver observer = new ColocInFaultObserver(
164: bus);
165: observer.onMessage(inMsg);
166: } else {
167: //Handle Response
168: ex.setInMessage(inMsg);
169: PhaseManager pm = bus.getExtension(PhaseManager.class);
170: SortedSet<Phase> phases = new TreeSet<Phase>(pm
171: .getInPhases());
172: ColocUtil.setPhases(phases, Phase.USER_LOGICAL,
173: Phase.PRE_INVOKE);
174:
175: InterceptorChain chain = ColocUtil.getInInterceptorChain(
176: ex, phases);
177: inMsg.setInterceptorChain(chain);
178: chain.doIntercept(inMsg);
179: }
180: ex.put(ClientImpl.FINISHED, Boolean.TRUE);
181: }
182:
183: protected Message getInBoundMessage(Exchange ex) {
184: return (ex.getInFaultMessage() != null) ? ex
185: .getInFaultMessage() : ex.getInMessage();
186: }
187:
188: protected void setMessageObserver(MessageObserver observer) {
189: colocObserver = observer;
190: }
191:
192: protected Server isColocated(List<Server> servers,
193: Endpoint endpoint, BindingOperationInfo boi) {
194: if (servers != null) {
195: Service senderService = endpoint.getService();
196: EndpointInfo senderEI = endpoint.getEndpointInfo();
197: for (Server s : servers) {
198: Endpoint receiverEndpoint = s.getEndpoint();
199: Service receiverService = receiverEndpoint.getService();
200: EndpointInfo receiverEI = receiverEndpoint
201: .getEndpointInfo();
202: if (receiverService.getName().equals(
203: senderService.getName())
204: && receiverEI.getName().equals(
205: senderEI.getName())) {
206: //Check For Operation Match.
207: BindingOperationInfo receiverOI = receiverEI
208: .getBinding().getOperation(boi.getName());
209: if (receiverOI != null
210: && isSameOperationInfo(boi, receiverOI)) {
211: return s;
212: }
213: }
214: }
215: }
216:
217: return null;
218: }
219:
220: protected boolean isSameOperationInfo(BindingOperationInfo sender,
221: BindingOperationInfo receiver) {
222: return ColocUtil.isSameOperationInfo(sender.getOperationInfo(),
223: receiver.getOperationInfo());
224: }
225:
226: public void setExchangeProperties(Exchange exchange, Endpoint ep) {
227: exchange.put(Endpoint.class, ep);
228: exchange.put(Service.class, ep.getService());
229: exchange.put(Binding.class, ep.getBinding());
230: exchange.put(Bus.class, bus == null ? BusFactory
231: .getDefaultBus(false) : bus);
232: }
233: }
|