001: package org.objectweb.celtix.bindings;
002:
003: import java.io.IOException;
004: import java.io.InputStream;
005: import java.lang.ref.WeakReference;
006: import java.util.List;
007: import java.util.concurrent.Executor;
008: import java.util.concurrent.Future;
009: import java.util.logging.Level;
010: import java.util.logging.Logger;
011:
012: import javax.wsdl.Port;
013: import javax.wsdl.WSDLException;
014: import javax.wsdl.extensions.ExtensibilityElement;
015: import javax.xml.ws.handler.MessageContext;
016:
017: import org.objectweb.celtix.Bus;
018: import org.objectweb.celtix.BusException;
019: import org.objectweb.celtix.buslifecycle.BusLifeCycleListener;
020: import org.objectweb.celtix.common.i18n.Message;
021: import org.objectweb.celtix.common.injection.ResourceInjector;
022: import org.objectweb.celtix.common.logging.LogUtils;
023: import org.objectweb.celtix.configuration.Configuration;
024: import org.objectweb.celtix.context.InputStreamMessageContext;
025: import org.objectweb.celtix.context.ObjectMessageContext;
026: import org.objectweb.celtix.context.OutputStreamMessageContext;
027: import org.objectweb.celtix.handlers.HandlerInvoker;
028: import org.objectweb.celtix.resource.DefaultResourceManager;
029: import org.objectweb.celtix.resource.ResourceManager;
030: import org.objectweb.celtix.resource.ResourceResolver;
031: import org.objectweb.celtix.transports.ClientTransport;
032: import org.objectweb.celtix.transports.TransportFactory;
033: import org.objectweb.celtix.ws.addressing.EndpointReferenceType;
034: import org.objectweb.celtix.wsdl.EndpointReferenceUtils;
035:
036: import static org.objectweb.celtix.bindings.JAXWSConstants.BUS_PROPERTY;
037: import static org.objectweb.celtix.bindings.JAXWSConstants.CLIENT_BINDING_PROPERTY;
038: import static org.objectweb.celtix.bindings.JAXWSConstants.CLIENT_TRANSPORT_PROPERTY;
039:
040: public abstract class AbstractClientBinding extends AbstractBindingBase
041: implements ClientBinding {
042: private static final Logger LOG = LogUtils
043: .getL7dLogger(AbstractClientBinding.class);
044:
045: protected Port port;
046: private ResponseCorrelator responseCorrelator;
047:
048: public AbstractClientBinding(Bus b, EndpointReferenceType ref)
049: throws WSDLException, IOException {
050: super (b, ref);
051: bus.getLifeCycleManager().registerLifeCycleListener(
052: new ShutdownListener(this ));
053: transport = null;
054: }
055:
056: private class ShutdownListener extends
057: WeakReference<AbstractClientBinding> implements
058: BusLifeCycleListener {
059:
060: ShutdownListener(AbstractClientBinding c) {
061: super (c);
062: }
063:
064: public void initComplete() {
065: // nothing
066: }
067:
068: public void preShutdown() {
069: if (get() != null) {
070: get().shutdown();
071: clear();
072: }
073: }
074:
075: public void postShutdown() {
076: clearResponseCorrelator();
077: }
078: }
079:
080: public void clearResponseCorrelator() {
081: responseCorrelator = null;
082: }
083:
084: // --- Methods to be implemented by concrete client bindings ---
085:
086: public abstract AbstractBindingImpl getBindingImpl();
087:
088: // --- Methods to be implemented by concrete client bindings ---
089:
090: // --- BindingBase interface ---
091:
092: public void configureSystemHandlers(
093: Configuration endpointConfiguration) {
094: super .configureSystemHandlers(endpointConfiguration);
095:
096: ResourceManager rm = new DefaultResourceManager();
097: rm.addResourceResolver(new ResourceResolver() {
098: @SuppressWarnings("unchecked")
099: public <T> T resolve(String resourceName,
100: Class<T> resourceType) {
101: if (BUS_PROPERTY.equals(resourceName)) {
102: return (T) AbstractClientBinding.this .getBus();
103: } else if (CLIENT_BINDING_PROPERTY.equals(resourceName)) {
104: return (T) AbstractClientBinding.this ;
105: } else if (CLIENT_TRANSPORT_PROPERTY
106: .equals(resourceName)) {
107: try {
108: return (T) AbstractClientBinding.this
109: .getTransport();
110: } catch (IOException ex) {
111: Message msg = new Message(
112: "SYSTEM_HANDLER_RESOURCE_INJECTION_FAILURE_MSG",
113: LOG, resourceName);
114: LOG.log(Level.WARNING, msg.toString(), ex);
115: }
116: }
117: return null;
118: }
119:
120: public InputStream getAsStream(String name) {
121: return null;
122: }
123: });
124: ResourceInjector injector = new ResourceInjector(rm);
125: getBindingImpl().injectSystemHandlers(injector);
126:
127: }
128:
129: // --- BindingBase interface ---
130:
131: // --- ClientBinding interface ---
132:
133: public ObjectMessageContext invoke(ObjectMessageContext objectCtx,
134: DataBindingCallback callback) throws IOException {
135:
136: // storeSource(objectCtx);
137: BindingContextUtils.storeDataBindingCallback(objectCtx,
138: callback);
139:
140: Request request = new Request(this , getTransport(), objectCtx);
141:
142: try {
143: OutputStreamMessageContext ostreamCtx = request
144: .process(null);
145:
146: if (null != ostreamCtx) {
147:
148: InputStreamMessageContext responseContext = clientTransport()
149: .invoke(ostreamCtx);
150: Response fullResponse = null;
151: if (BindingContextUtils
152: .retrieveDecoupledResponse(responseContext)) {
153: // partial response traverses complete handler chain first
154: Response partialResponse = new Response(request);
155: partialResponse.processProtocol(responseContext);
156: partialResponse.processLogical(callback);
157:
158: if (BindingContextUtils.isOnewayMethod(objectCtx)) {
159: // no full response
160: objectCtx = partialResponse
161: .getObjectMessageContext();
162: } else {
163: // wait for decoupled full response and tarverse logical chain
164: // (protocol chain already traversed by ResponseCorrelator)
165: fullResponse = getResponseCorrelator()
166: .getResponse(request);
167: fullResponse.setObjectMessageContext(objectCtx);
168: fullResponse.setHandlerInvoker(request
169: .getHandlerInvoker());
170: fullResponse.processLogical(callback);
171: objectCtx = fullResponse
172: .getObjectMessageContext();
173: }
174: } else {
175: // synchronous full response
176: fullResponse = new Response(request);
177: fullResponse.processProtocol(responseContext);
178: fullResponse.processLogical(callback);
179: objectCtx = fullResponse.getObjectMessageContext();
180: }
181:
182: }
183:
184: } finally {
185: request.complete();
186: }
187:
188: return objectCtx;
189: }
190:
191: public void invokeOneWay(ObjectMessageContext objectCtx,
192: DataBindingCallback callback) throws IOException {
193: // storeSource(objectCtx);
194: BindingContextUtils.storeDataBindingCallback(objectCtx,
195: callback);
196:
197: Request request = new Request(this , getTransport(), objectCtx);
198: request.setOneway(true);
199:
200: try {
201: OutputStreamMessageContext ostreamCtx = request
202: .process(null);
203:
204: if (null != ostreamCtx) {
205: // one of the (system handlers) may have indicated that it expects
206: // headers to be piggybacked in the response
207: // if this is the case, use the transports invoke rather than invokeOneway
208: // to give the handlers a chance to process these headers
209:
210: if (BindingContextUtils.isOnewayTransport(ostreamCtx)) {
211: clientTransport().invokeOneway(ostreamCtx);
212: } else {
213: LOG
214: .fine("Sending message as a twoway request as required by system handlers.");
215: InputStreamMessageContext istreamCtx = clientTransport()
216: .invoke(ostreamCtx);
217: Response response = new Response(request);
218: response.processProtocol(istreamCtx);
219: response.processLogical(null);
220: }
221: }
222:
223: } finally {
224: request.complete();
225: }
226: }
227:
228: public Future<ObjectMessageContext> invokeAsync(
229: ObjectMessageContext objectCtx,
230: DataBindingCallback callback, Executor executor)
231: throws IOException {
232:
233: // storeSource(objectCtx);
234: BindingContextUtils.storeDataBindingCallback(objectCtx,
235: callback);
236:
237: Request request = new Request(this , getTransport(), objectCtx);
238: AsyncFuture asyncFuture = null;
239:
240: try {
241: OutputStreamMessageContext ostreamCtx = request
242: .process(null);
243:
244: if (null != ostreamCtx) {
245:
246: Future<InputStreamMessageContext> ins = clientTransport()
247: .invokeAsync(ostreamCtx, executor);
248: asyncFuture = new AsyncFuture(ins, this , callback,
249: request.getHandlerInvoker(), objectCtx);
250: }
251:
252: } finally {
253: request.complete();
254: }
255:
256: return asyncFuture;
257: }
258:
259: public synchronized ResponseCallback createResponseCallback() {
260: responseCorrelator = new ResponseCorrelator(this );
261: return responseCorrelator;
262: }
263:
264: // --- ClientBinding interface ---
265:
266: // --- helpers ---
267:
268: protected synchronized void shutdown() {
269: if (transport != null) {
270: transport.shutdown();
271: transport = null;
272: }
273: }
274:
275: public synchronized ClientTransport getTransport()
276: throws IOException {
277: if (transport == null) {
278: try {
279: transport = createTransport(reference);
280: } catch (WSDLException e) {
281: throw (IOException) (new IOException(e.getMessage())
282: .initCause(e));
283: }
284: }
285: assert transport != null : "transport is null";
286: return clientTransport();
287: }
288:
289: protected ClientTransport createTransport(EndpointReferenceType ref)
290: throws WSDLException, IOException {
291: ClientTransport ret = null;
292: try {
293: LOG.info("creating client transport for " + ref);
294:
295: port = EndpointReferenceUtils.getPort(bus.getWSDLManager(),
296: ref);
297: List<?> exts = port.getExtensibilityElements();
298: if (exts.size() > 0) {
299: ExtensibilityElement el = (ExtensibilityElement) exts
300: .get(0);
301:
302: TransportFactory factory = bus
303: .getTransportFactoryManager()
304: .getTransportFactory(
305: el.getElementType().getNamespaceURI());
306: ret = factory.createClientTransport(ref, this );
307: }
308: } catch (BusException ex) {
309: LOG.severe("TRANSPORT_FACTORY_RETREIVAL_FAILURE_MSG");
310: }
311: assert ret != null;
312: return ret;
313: }
314:
315: protected ClientTransport clientTransport() {
316: return (ClientTransport) transport;
317: }
318:
319: public synchronized ResponseCorrelator getResponseCorrelator() {
320: if (responseCorrelator == null) {
321: responseCorrelator = (ResponseCorrelator) clientTransport()
322: .getResponseCallback();
323: }
324: return responseCorrelator;
325: }
326:
327: protected void finalPrepareOutputStreamContext(
328: MessageContext bindingContext,
329: OutputStreamMessageContext ostreamContext)
330: throws IOException {
331: transport.finalPrepareOutputStreamContext(ostreamContext);
332: }
333:
334: public ObjectMessageContext getObjectMessageContextAsync(
335: InputStreamMessageContext ins,
336: HandlerInvoker handlerInvoker,
337: DataBindingCallback callback, ObjectMessageContext objectCtx) {
338: Response response = new Response(this, handlerInvoker);
339: try {
340: response.setObjectMessageContext(objectCtx);
341: response.processProtocol(ins);
342: response.processLogical(callback);
343: } finally {
344: handlerInvoker.mepComplete(objectCtx);
345: }
346:
347: return response.getObjectMessageContext();
348: }
349: }
|