001: package org.objectweb.celtix.bus.ws.rm;
002:
003: import java.io.IOException;
004: import java.math.BigInteger;
005: import java.util.Collection;
006: import java.util.HashMap;
007: import java.util.Map;
008: import java.util.Timer;
009: import java.util.logging.Level;
010: import java.util.logging.Logger;
011:
012: import javax.annotation.PostConstruct;
013: import javax.annotation.PreDestroy;
014: import javax.annotation.Resource;
015: import javax.xml.ws.handler.LogicalHandler;
016: import javax.xml.ws.handler.LogicalMessageContext;
017: import javax.xml.ws.handler.MessageContext;
018:
019: import org.objectweb.celtix.Bus;
020: import org.objectweb.celtix.bindings.AbstractBindingBase;
021: import org.objectweb.celtix.bindings.BindingBase;
022: import org.objectweb.celtix.bindings.BindingContextUtils;
023: import org.objectweb.celtix.bindings.ClientBinding;
024: import org.objectweb.celtix.bindings.JAXWSConstants;
025: import org.objectweb.celtix.bindings.ServerBinding;
026:
027: import org.objectweb.celtix.bus.jaxws.EndpointImpl;
028: import org.objectweb.celtix.bus.jaxws.ServiceImpl;
029: import org.objectweb.celtix.bus.ws.addressing.AddressingPropertiesImpl;
030: import org.objectweb.celtix.bus.ws.addressing.ContextUtils;
031: import org.objectweb.celtix.bus.ws.addressing.VersionTransformer;
032: import org.objectweb.celtix.bus.ws.rm.persistence.RMStoreFactory;
033: import org.objectweb.celtix.common.logging.LogUtils;
034: import org.objectweb.celtix.configuration.Configuration;
035: import org.objectweb.celtix.configuration.ConfigurationBuilder;
036: import org.objectweb.celtix.configuration.ConfigurationBuilderFactory;
037: import org.objectweb.celtix.configuration.ConfigurationProvider;
038: import org.objectweb.celtix.context.ObjectMessageContext;
039: import org.objectweb.celtix.context.OutputStreamMessageContext;
040: import org.objectweb.celtix.handlers.SystemHandler;
041: import org.objectweb.celtix.transports.ClientTransport;
042: import org.objectweb.celtix.transports.ServerTransport;
043: import org.objectweb.celtix.transports.Transport;
044: import org.objectweb.celtix.ws.addressing.AddressingProperties;
045: import org.objectweb.celtix.ws.addressing.RelatesToType;
046: import org.objectweb.celtix.ws.addressing.v200408.AttributedURI;
047: import org.objectweb.celtix.ws.addressing.v200408.EndpointReferenceType;
048: import org.objectweb.celtix.ws.rm.AckRequestedType;
049: import org.objectweb.celtix.ws.rm.CreateSequenceResponseType;
050: import org.objectweb.celtix.ws.rm.CreateSequenceType;
051: import org.objectweb.celtix.ws.rm.Identifier;
052: import org.objectweb.celtix.ws.rm.RMProperties;
053: import org.objectweb.celtix.ws.rm.SequenceAcknowledgement;
054: import org.objectweb.celtix.ws.rm.SequenceType;
055: import org.objectweb.celtix.ws.rm.TerminateSequenceType;
056: import org.objectweb.celtix.ws.rm.persistence.RMStore;
057: import org.objectweb.celtix.ws.rm.wsdl.SequenceFault;
058: import org.objectweb.celtix.wsdl.EndpointReferenceUtils;
059:
060: public class RMHandler implements
061: LogicalHandler<LogicalMessageContext>, SystemHandler {
062:
063: public static final String RM_CONFIGURATION_URI = "http://celtix.objectweb.org/bus/ws/rm/rm-config";
064: public static final String RM_CONFIGURATION_ID = "rm-handler";
065:
066: private static final Logger LOG = LogUtils
067: .getL7dLogger(RMHandler.class);
068: private static Map<BindingBase, RMHandler> handlers;
069:
070: private RMSource source;
071: private RMDestination destination;
072: private RMProxy proxy;
073: private RMServant servant;
074: private Configuration configuration;
075: private RMStore store;
076: private Timer timer;
077: private boolean busLifeCycleListenerRegistered;
078:
079: @Resource(name=JAXWSConstants.BUS_PROPERTY)
080: private Bus bus;
081: @Resource(name=JAXWSConstants.CLIENT_BINDING_PROPERTY)
082: private ClientBinding clientBinding;
083: @Resource(name=JAXWSConstants.SERVER_BINDING_PROPERTY)
084: private ServerBinding serverBinding;
085: @Resource(name=JAXWSConstants.CLIENT_TRANSPORT_PROPERTY)
086: private ClientTransport clientTransport;
087: @Resource(name=JAXWSConstants.SERVER_TRANSPORT_PROPERTY)
088: private ServerTransport serverTransport;
089:
090: public RMHandler() {
091: proxy = new RMProxy(this );
092: servant = new RMServant();
093: }
094:
095: @PostConstruct
096: protected synchronized void initialise() {
097: if (null == handlers) {
098: handlers = new HashMap<BindingBase, RMHandler>();
099: }
100: handlers.put(getBinding(), this );
101:
102: if (null == configuration) {
103: configuration = createConfiguration();
104: }
105:
106: if (null == store) {
107: store = new RMStoreFactory().getStore(configuration);
108: }
109:
110: if (null == getSource()) {
111: source = new RMSource(this );
112: source.restore();
113: }
114: if (null == destination) {
115: destination = new RMDestination(this );
116: destination.restore();
117: }
118:
119: if (null == timer) {
120: timer = new Timer();
121: }
122:
123: if (!busLifeCycleListenerRegistered) {
124: getBinding().getBus().getLifeCycleManager()
125: .registerLifeCycleListener(
126: new RMBusLifeCycleListener(getSource()));
127: busLifeCycleListenerRegistered = true;
128: }
129: }
130:
131: public static RMHandler getHandler(BindingBase binding) {
132: return handlers.get(binding);
133: }
134:
135: public void close(MessageContext context) {
136: // TODO commit transaction
137: }
138:
139: public boolean handleFault(LogicalMessageContext context) {
140:
141: open(context);
142: return false;
143: }
144:
145: public boolean handleMessage(LogicalMessageContext context) {
146:
147: open(context);
148:
149: try {
150: if (ContextUtils.isOutbound(context)) {
151: handleOutbound(context);
152: } else {
153: handleInbound(context);
154: }
155: } catch (SequenceFault sf) {
156: sf.printStackTrace();
157: LOG.log(Level.SEVERE, "SequenceFault", sf);
158: }
159: return true;
160: }
161:
162: @PreDestroy
163: public void shutdown() {
164: if (null != getSource()) {
165: getSource().shutdown();
166: }
167: }
168:
169: public Configuration getConfiguration() {
170: return configuration;
171: }
172:
173: public RMStore getStore() {
174: return store;
175: }
176:
177: public Timer getTimer() {
178: return timer;
179: }
180:
181: public Bus getBus() {
182: return bus;
183: }
184:
185: public Transport getTransport() {
186: return null == clientTransport ? serverTransport
187: : clientTransport;
188: }
189:
190: public ClientTransport getClientTransport() {
191: return clientTransport;
192: }
193:
194: public ServerTransport getServerTransport() {
195: return serverTransport;
196: }
197:
198: public ClientBinding getClientBinding() {
199: return clientBinding;
200: }
201:
202: public ServerBinding getServerBinding() {
203: return serverBinding;
204: }
205:
206: public boolean isServerSide() {
207: return null != serverBinding;
208: }
209:
210: public AbstractBindingBase getBinding() {
211: if (null != clientBinding) {
212: return (AbstractBindingBase) clientBinding;
213: }
214: return (AbstractBindingBase) serverBinding;
215: }
216:
217: public RMProxy getProxy() {
218: return proxy;
219: }
220:
221: public RMServant getServant() {
222: return servant;
223: }
224:
225: protected RMSource getSource() {
226: return source;
227: }
228:
229: protected RMDestination getDestination() {
230: return destination;
231: }
232:
233: protected void open(LogicalMessageContext context) {
234: // TODO begin transaction
235: getSource().getRetransmissionQueue().start(
236: getBus().getWorkQueueManager().getAutomaticWorkQueue());
237: }
238:
239: protected Configuration createConfiguration() {
240:
241: Configuration busCfg = getBinding().getBus().getConfiguration();
242: ConfigurationBuilder builder = ConfigurationBuilderFactory
243: .getBuilder();
244: Configuration parent;
245: org.objectweb.celtix.ws.addressing.EndpointReferenceType ref = getBinding()
246: .getEndpointReference();
247:
248: if (null != clientBinding) {
249: String id = EndpointReferenceUtils.getServiceName(ref)
250: .toString()
251: + "/" + EndpointReferenceUtils.getPortName(ref);
252: parent = builder.getConfiguration(
253: ServiceImpl.PORT_CONFIGURATION_URI, id, busCfg);
254: } else {
255: parent = builder.getConfiguration(
256: EndpointImpl.ENDPOINT_CONFIGURATION_URI,
257: EndpointReferenceUtils.getServiceName(ref)
258: .toString(), busCfg);
259: }
260:
261: Configuration cfg = builder.getConfiguration(
262: RM_CONFIGURATION_URI, RM_CONFIGURATION_ID, parent);
263: if (null == cfg) {
264: cfg = builder.buildConfiguration(RM_CONFIGURATION_URI,
265: RM_CONFIGURATION_ID, parent);
266:
267: }
268: boolean policyProviderRegistered = false;
269: for (ConfigurationProvider p : cfg.getProviders()) {
270: if (p instanceof RMPolicyProvider) {
271: policyProviderRegistered = true;
272: break;
273: }
274: }
275: if (!policyProviderRegistered) {
276: cfg.getProviders().add(
277: new RMPolicyProvider(getBinding().getBus(),
278: getBinding().getEndpointReference()));
279: }
280:
281: return cfg;
282:
283: }
284:
285: protected void handleOutbound(LogicalMessageContext context)
286: throws SequenceFault {
287: LOG.entering(getClass().getName(), "handleOutbound");
288: AddressingPropertiesImpl maps = ContextUtils.retrieveMAPs(
289: context, false, true);
290:
291: // ensure the appropriate version of WS-Addressing is used
292: maps
293: .exposeAs(VersionTransformer.Names200408.WSA_NAMESPACE_NAME);
294:
295: String action = null;
296: if (maps != null && null != maps.getAction()) {
297: action = maps.getAction().getValue();
298: }
299:
300: // nothing to do if this is a CreateSequence, TerminateSequence or
301: // SequenceInfo request
302:
303: if (LOG.isLoggable(Level.FINE)) {
304: LOG.fine("Action: " + action);
305: }
306:
307: boolean isApplicationMessage = true;
308:
309: if (RMUtils.getRMConstants().getCreateSequenceAction().equals(
310: action)
311: || RMUtils.getRMConstants()
312: .getCreateSequenceResponseAction().equals(
313: action)
314: || RMUtils.getRMConstants()
315: .getTerminateSequenceAction().equals(action)
316: || RMUtils.getRMConstants().getLastMessageAction()
317: .equals(action)
318: || RMUtils.getRMConstants()
319: .getSequenceAcknowledgmentAction().equals(
320: action)
321: || RMUtils.getRMConstants().getSequenceInfoAction()
322: .equals(action)) {
323: isApplicationMessage = false;
324: }
325:
326: RMPropertiesImpl rmpsOut = (RMPropertiesImpl) RMContextUtils
327: .retrieveRMProperties(context, true);
328: if (null == rmpsOut) {
329: rmpsOut = new RMPropertiesImpl();
330: RMContextUtils.storeRMProperties(context, rmpsOut, true);
331: }
332:
333: RMPropertiesImpl rmpsIn = null;
334: Identifier inSeqId = null;
335: BigInteger inMessageNumber = null;
336:
337: if (isApplicationMessage) {
338:
339: rmpsIn = (RMPropertiesImpl) RMContextUtils
340: .retrieveRMProperties(context, false);
341:
342: if (null != rmpsIn && null != rmpsIn.getSequence()) {
343: inSeqId = rmpsIn.getSequence().getIdentifier();
344: inMessageNumber = rmpsIn.getSequence()
345: .getMessageNumber();
346: }
347: LOG.fine("inbound sequence: "
348: + (null == inSeqId ? "null" : inSeqId.getValue()));
349:
350: // not for partial responses to oneway requests
351:
352: if (!(isServerSide() && BindingContextUtils
353: .isOnewayTransport(context))) {
354:
355: if (!ContextUtils.isRequestor(context)) {
356: assert null != inSeqId;
357: }
358:
359: // get the current sequence, requesting the creation of a new one if necessary
360:
361: SourceSequence seq = getSequence(inSeqId, context, maps);
362: assert null != seq;
363:
364: // increase message number and store a sequence type object in
365: // context
366:
367: seq.nextMessageNumber(inSeqId, inMessageNumber);
368: rmpsOut.setSequence(seq);
369:
370: // if this was the last message in the sequence, reset the
371: // current sequence so that a new one will be created next
372: // time the handler is invoked
373:
374: if (seq.isLastMessage()) {
375: source.setCurrent(null);
376: }
377: }
378: }
379:
380: // add Acknowledgements (to application messages or explicitly
381: // created Acknowledgement messages only)
382:
383: if (isApplicationMessage
384: || RMUtils.getRMConstants()
385: .getSequenceAcknowledgmentAction().equals(
386: action)) {
387: AttributedURI to = VersionTransformer.convert(maps.getTo());
388: assert null != to;
389: addAcknowledgements(rmpsOut, inSeqId, to);
390: }
391:
392: // indicate to the binding that a response is expected from the transport although
393: // the web method is a oneway method
394:
395: if (BindingContextUtils.isOnewayMethod(context)
396: || RMUtils.getRMConstants().getLastMessageAction()
397: .equals(action)) {
398: context.put(OutputStreamMessageContext.ONEWAY_MESSAGE_TF,
399: Boolean.FALSE);
400: }
401: }
402:
403: protected void handleInbound(LogicalMessageContext context)
404: throws SequenceFault {
405:
406: LOG.entering(getClass().getName(), "handleInbound");
407: RMProperties rmps = RMContextUtils.retrieveRMProperties(
408: context, false);
409:
410: final AddressingPropertiesImpl maps = ContextUtils
411: .retrieveMAPs(context, false, false);
412: assert null != maps;
413:
414: String action = null;
415: if (null != maps.getAction()) {
416: action = maps.getAction().getValue();
417: }
418:
419: if (LOG.isLoggable(Level.FINE)) {
420: LOG.fine("Action: " + action);
421: }
422:
423: if (RMUtils.getRMConstants().getCreateSequenceResponseAction()
424: .equals(action)) {
425: Object[] parameters = (Object[]) context
426: .get(ObjectMessageContext.METHOD_PARAMETERS);
427: CreateSequenceResponseType csr = (CreateSequenceResponseType) parameters[0];
428: getServant().createSequenceResponse(getSource(), csr,
429: getProxy().getOfferedIdentifier());
430:
431: return;
432: } else if (RMUtils.getRMConstants().getCreateSequenceAction()
433: .equals(action)) {
434: Object[] parameters = (Object[]) context
435: .get(ObjectMessageContext.METHOD_PARAMETERS);
436: CreateSequenceType cs = (CreateSequenceType) parameters[0];
437:
438: final CreateSequenceResponseType csr = getServant()
439: .createSequence(getDestination(), cs, maps);
440:
441: Runnable response = new Runnable() {
442: public void run() {
443: try {
444: getProxy().createSequenceResponse(maps, csr);
445: } catch (IOException ex) {
446: ex.printStackTrace();
447: } catch (SequenceFault sf) {
448: sf.printStackTrace();
449: }
450: }
451: };
452: getBinding().getBus().getWorkQueueManager()
453: .getAutomaticWorkQueue().execute(response);
454:
455: return;
456: } else if (RMUtils.getRMConstants()
457: .getTerminateSequenceAction().equals(action)) {
458: Object[] parameters = (Object[]) context
459: .get(ObjectMessageContext.METHOD_PARAMETERS);
460: TerminateSequenceType cs = (TerminateSequenceType) parameters[0];
461:
462: getServant().terminateSequence(getDestination(),
463: cs.getIdentifier());
464: }
465:
466: // for application AND out of band messages
467:
468: if (null != rmps) {
469:
470: processAcknowledgments(rmps);
471:
472: processAcknowledgmentRequests(rmps);
473:
474: processSequence(rmps, maps);
475: }
476: }
477:
478: private void processAcknowledgments(RMProperties rmps) {
479: Collection<SequenceAcknowledgement> acks = rmps.getAcks();
480: if (null != acks) {
481: for (SequenceAcknowledgement ack : acks) {
482: getSource().setAcknowledged(ack);
483: }
484: }
485: }
486:
487: private void processSequence(RMProperties rmps,
488: AddressingProperties maps) throws SequenceFault {
489: SequenceType s = rmps.getSequence();
490: if (null == s) {
491: return;
492: }
493: getDestination().acknowledge(
494: s,
495: null == maps.getReplyTo() ? null : maps.getReplyTo()
496: .getAddress().getValue());
497: }
498:
499: private void processAcknowledgmentRequests(RMProperties rmps) {
500: Collection<AckRequestedType> requested = rmps
501: .getAcksRequested();
502: if (null != requested) {
503: for (AckRequestedType ar : requested) {
504: DestinationSequence seq = getDestination().getSequence(
505: ar.getIdentifier());
506: if (null != seq) {
507: seq.scheduleImmediateAcknowledgement();
508: } else {
509: LOG.severe("No such sequence.");
510: }
511: }
512: }
513: }
514:
515: private void addAcknowledgements(RMPropertiesImpl rmpsOut,
516: Identifier inSeqId, AttributedURI to) {
517:
518: for (DestinationSequence seq : getDestination()
519: .getAllSequences()) {
520: if (seq.sendAcknowledgement()
521: && ((seq.getAcksTo().getAddress().getValue()
522: .equals(
523: RMUtils.getAddressingConstants()
524: .getAnonymousURI()) && AbstractSequenceImpl
525: .identifierEquals(seq.getIdentifier(),
526: inSeqId)) || to.getValue().equals(
527: seq.getAcksTo().getAddress().getValue()))) {
528: rmpsOut.addAck(seq);
529: } else if (LOG.isLoggable(Level.FINE)) {
530: if (!seq.sendAcknowledgement()) {
531: LOG
532: .fine("no need to add an acknowledgements for sequence "
533: + seq.getIdentifier().getValue());
534: } else {
535: LOG.fine("sequences acksTo ("
536: + seq.getAcksTo().getAddress().getValue()
537: + ") does not match to (" + to.getValue()
538: + ")");
539: }
540: }
541: }
542:
543: if (LOG.isLoggable(Level.FINE)) {
544: Collection<SequenceAcknowledgement> acks = rmpsOut
545: .getAcks();
546: if (null == acks) {
547: LOG.fine("No acknowledgements added");
548: } else {
549: LOG.fine("Added " + acks.size() + " acknowledgements.");
550: }
551: }
552: }
553:
554: private SourceSequence getSequence(Identifier inSeqId,
555: LogicalMessageContext context, AddressingPropertiesImpl maps)
556: throws SequenceFault {
557: SourceSequence seq = getSource().getCurrent(inSeqId);
558:
559: if (null == seq) {
560: // TODO: better error handling
561: org.objectweb.celtix.ws.addressing.EndpointReferenceType to = null;
562: try {
563: EndpointReferenceType acksTo = null;
564: RelatesToType relatesTo = null;
565: if (isServerSide()) {
566: AddressingPropertiesImpl inMaps = ContextUtils
567: .retrieveMAPs(context, false, false);
568: inMaps
569: .exposeAs(VersionTransformer.Names200408.WSA_NAMESPACE_NAME);
570: acksTo = RMUtils.createReference(inMaps.getTo()
571: .getValue());
572: to = inMaps.getReplyTo();
573: getServant().setUnattachedIdentifier(inSeqId);
574: relatesTo = ContextUtils.WSA_OBJECT_FACTORY
575: .createRelatesToType();
576: DestinationSequence inSeq = getDestination()
577: .getSequence(inSeqId);
578: relatesTo.setValue(inSeq != null ? inSeq
579: .getCorrelationID() : null);
580: } else {
581: acksTo = VersionTransformer.convert(maps
582: .getReplyTo());
583: // for oneways
584: if (Names.WSA_NONE_ADDRESS.equals(acksTo
585: .getAddress().getValue())) {
586: acksTo = RMUtils
587: .createReference(Names.WSA_ANONYMOUS_ADDRESS);
588: }
589: }
590:
591: getProxy().createSequence(getSource(), to, acksTo,
592: relatesTo);
593: } catch (IOException ex) {
594: ex.printStackTrace();
595: }
596:
597: seq = getSource().awaitCurrent(inSeqId);
598: seq.setTarget(to);
599: }
600:
601: return seq;
602: }
603:
604: public void destroy() {
605: getSource().getRetransmissionQueue().stop();
606: }
607:
608: }
|