001: package org.objectweb.celtix.bindings;
002:
003: import java.io.IOException;
004: import java.io.InputStream;
005: import java.util.List;
006: import java.util.concurrent.Executor;
007: import java.util.concurrent.RejectedExecutionException;
008: import java.util.logging.Level;
009: import java.util.logging.Logger;
010:
011: import javax.wsdl.Port;
012: import javax.wsdl.WSDLException;
013: import javax.wsdl.extensions.ExtensibilityElement;
014: import javax.xml.namespace.QName;
015: import javax.xml.ws.handler.MessageContext;
016:
017: import org.objectweb.celtix.Bus;
018: import org.objectweb.celtix.BusException;
019: import org.objectweb.celtix.common.injection.ResourceInjector;
020: import org.objectweb.celtix.common.logging.LogUtils;
021: import org.objectweb.celtix.context.InputStreamMessageContext;
022: import org.objectweb.celtix.context.ObjectMessageContext;
023: import org.objectweb.celtix.context.OutputStreamMessageContext;
024: import org.objectweb.celtix.resource.DefaultResourceManager;
025: import org.objectweb.celtix.resource.ResourceManager;
026: import org.objectweb.celtix.resource.ResourceResolver;
027: import org.objectweb.celtix.transports.ServerTransport;
028: import org.objectweb.celtix.transports.ServerTransportCallback;
029: import org.objectweb.celtix.transports.TransportFactory;
030: import org.objectweb.celtix.ws.addressing.EndpointReferenceType;
031: import org.objectweb.celtix.wsdl.EndpointReferenceUtils;
032:
033: import static org.objectweb.celtix.bindings.JAXWSConstants.BUS_PROPERTY;
034: import static org.objectweb.celtix.bindings.JAXWSConstants.SERVER_BINDING_PROPERTY;
035: import static org.objectweb.celtix.bindings.JAXWSConstants.SERVER_TRANSPORT_PROPERTY;
036:
037: public abstract class AbstractServerBinding extends AbstractBindingBase
038: implements ServerBinding {
039:
040: private static final Logger LOG = LogUtils
041: .getL7dLogger(AbstractServerBinding.class);
042:
043: protected ServerBindingEndpointCallback sbeCallback;
044:
045: public AbstractServerBinding(Bus b, EndpointReferenceType ref,
046: ServerBindingEndpointCallback sbcb) {
047: super (b, ref);
048: sbeCallback = sbcb;
049: }
050:
051: // --- ServerBinding interface ---
052:
053: public void activate() throws WSDLException, IOException {
054: transport = createTransport(reference);
055:
056: ServerTransportCallback tc = new ServerTransportCallback() {
057:
058: public void dispatch(InputStreamMessageContext ctx,
059: ServerTransport t) {
060: AbstractServerBinding.this .dispatch(ctx, t);
061: }
062:
063: public Executor getExecutor() {
064: return sbeCallback.getExecutor();
065: }
066: };
067: serverTransport().activate(tc);
068:
069: injectSystemHandlers();
070: }
071:
072: public void deactivate() throws IOException {
073: serverTransport().deactivate();
074: }
075:
076: /**
077: * Make an initial partial response to an incoming request. The partial
078: * response may only contain 'header' information, and not a 'body'.
079: *
080: * @param context object message context
081: * @param callback callback for data binding
082: */
083: public void partialResponse(
084: OutputStreamMessageContext outputContext,
085: DataBindingCallback callback) throws IOException {
086: ObjectMessageContext objectMessageContext = createObjectContext();
087: objectMessageContext.putAll(outputContext);
088: BindingContextUtils.storeDataBindingCallback(
089: objectMessageContext, callback);
090:
091: if (callback != null) {
092: Request request = new Request(this , transport,
093: objectMessageContext);
094: request.setOneway(true);
095:
096: try {
097: request.process(outputContext);
098: terminateOutputContext(outputContext);
099: } finally {
100: request.complete();
101: }
102: } else {
103: transport.finalPrepareOutputStreamContext(outputContext);
104: terminateOutputContext(outputContext);
105: }
106: }
107:
108: // --- ServerBinding interface ---
109:
110: // --- Methods to be implemented by concrete server bindings ---
111:
112: public abstract AbstractBindingImpl getBindingImpl();
113:
114: public abstract QName getOperationName(MessageContext ctx);
115:
116: // --- Methods to be implemented by concrete server bindings ---
117:
118: protected void finalPrepareOutputStreamContext(ServerTransport t,
119: MessageContext bindingContext,
120: OutputStreamMessageContext ostreamContext)
121: throws IOException {
122: t.finalPrepareOutputStreamContext(ostreamContext);
123: }
124:
125: protected boolean isFault(ObjectMessageContext objCtx,
126: MessageContext bindingCtx) {
127: if (getBindingImpl().hasFault(bindingCtx)) {
128: return true;
129: }
130: return objCtx.getException() != null;
131: }
132:
133: protected void dispatch(InputStreamMessageContext istreamCtx,
134: final ServerTransport t) {
135: LOG.info("Dispatched to binding on thread : "
136: + Thread.currentThread());
137: // storeSource(istreamCtx, t);
138: BindingContextUtils.storeServerBindingEndpointCallback(
139: istreamCtx, sbeCallback);
140:
141: final ServerRequest inMsg = new ServerRequest(this , istreamCtx);
142:
143: Exception inboundException = null;
144:
145: try {
146: inMsg.processInbound();
147: if (!inMsg.doDispatch()) {
148: LOG
149: .log(
150: Level.INFO,
151: "handlers have halted inbound message processing or specifically prevent dispatch");
152: }
153: } catch (Exception ex) {
154: inboundException = ex;
155: LOG
156: .log(
157: Level.INFO,
158: "inbound message processing resulted in exception: ",
159: ex);
160: }
161:
162: // if an error occured during processing of the inbound request
163: // or if the processing direction was halted by one of the handlers
164: // or if this is a one-way operation: send response (but traverse
165: // system handlers only if operation is one-way).
166:
167: boolean doDispatch = null == inboundException
168: && inMsg.doDispatch();
169:
170: if (!doDispatch || inMsg.isOneway()) {
171:
172: inMsg.processOutbound(t, inboundException);
173:
174: if (!doDispatch) {
175: return;
176: }
177: }
178:
179: // everything was OK: dispatch to implementor
180:
181: Runnable invoker = new Runnable() {
182: public void run() {
183: LOG.log(Level.INFO, "Before invoking on implementor");
184: assert null != inMsg.getObjectCtx();
185: inMsg.doInvocation();
186: LOG.log(Level.INFO, "After invoking on implementor");
187: if (!inMsg.isOneway()) {
188: // process response
189: inMsg.processOutbound(t, null);
190: }
191: }
192: };
193:
194: // the dispatch must be async if the request is decoupled or oneway and the
195: // transport is unable to proceed to the next request until this thread
196: // is freed up
197: if ((BindingContextUtils.retrieveDecoupledResponse(inMsg
198: .getObjectCtx()) || inMsg.isOneway())
199: && BindingContextUtils
200: .retrieveAsyncOnewayDispatch(istreamCtx)) {
201: // invoke implementor asynchronously
202: executeAsync(invoker);
203: } else {
204: // invoke implementor directly
205: invoker.run();
206: }
207: }
208:
209: protected ServerTransport createTransport(EndpointReferenceType ref)
210: throws WSDLException, IOException {
211:
212: try {
213: Port port = EndpointReferenceUtils.getPort(bus
214: .getWSDLManager(), ref);
215: List<?> exts = port.getExtensibilityElements();
216: if (exts.size() > 0) {
217: ExtensibilityElement el = (ExtensibilityElement) exts
218: .get(0);
219: TransportFactory tf = bus.getTransportFactoryManager()
220: .getTransportFactory(
221: el.getElementType().getNamespaceURI());
222: return tf.createServerTransport(ref);
223: }
224: } catch (BusException ex) {
225: LOG.severe("TRANSPORT_FACTORY_RETRIEVAL_FAILURE_MSG");
226: }
227: return null;
228: }
229:
230: protected ServerTransport serverTransport() {
231: return (ServerTransport) transport;
232: }
233:
234: /*
235: protected void storeSource(MessageContext context, ServerTransport st) {
236: BindingContextUtils.storeBinding(context, this);
237: BindingContextUtils.storeTransport(context, st);
238: BindingContextUtils.storeBus(context, bus);
239: }
240: */
241:
242: private void injectSystemHandlers() {
243: ResourceManager rm = new DefaultResourceManager();
244: rm.addResourceResolver(new ResourceResolver() {
245: @SuppressWarnings("unchecked")
246: public <T> T resolve(String resourceName,
247: Class<T> resourceType) {
248: if (BUS_PROPERTY.equals(resourceName)) {
249: return (T) AbstractServerBinding.this .getBus();
250: } else if (SERVER_BINDING_PROPERTY.equals(resourceName)) {
251: return (T) AbstractServerBinding.this ;
252: } else if (SERVER_TRANSPORT_PROPERTY
253: .equals(resourceName)) {
254: return (T) transport;
255: }
256: return null;
257: }
258:
259: public InputStream getAsStream(String name) {
260: return null;
261: }
262: });
263: ResourceInjector injector = new ResourceInjector(rm);
264:
265: getBindingImpl().injectSystemHandlers(injector);
266: }
267:
268: private void terminateOutputContext(
269: OutputStreamMessageContext outputContext)
270: throws IOException {
271: outputContext.getOutputStream().flush();
272: outputContext.getOutputStream().close();
273: }
274:
275: private void executeAsync(Runnable command) {
276: Executor executor = sbeCallback.getExecutor() != null ? sbeCallback
277: .getExecutor()
278: : getBus().getWorkQueueManager()
279: .getAutomaticWorkQueue();
280: try {
281: executor.execute(command);
282: } catch (RejectedExecutionException ree) {
283: LOG
284: .log(Level.WARNING,
285: "ONEWAY_FALLBACK_TO_DIRECT_MSG", ree);
286: command.run();
287: }
288: }
289: }
|