001: /*
002: * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
003: *
004: * Copyright 1997-2007 Sun Microsystems, Inc. All rights reserved.
005: *
006: * The contents of this file are subject to the terms of either the GNU
007: * General Public License Version 2 only ("GPL") or the Common Development
008: * and Distribution License("CDDL") (collectively, the "License"). You
009: * may not use this file except in compliance with the License. You can obtain
010: * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
011: * or glassfish/bootstrap/legal/LICENSE.txt. See the License for the specific
012: * language governing permissions and limitations under the License.
013: *
014: * When distributing the software, include this License Header Notice in each
015: * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
016: * Sun designates this particular file as subject to the "Classpath" exception
017: * as provided by Sun in the GPL Version 2 section of the License file that
018: * accompanied this code. If applicable, add the following below the License
019: * Header, with the fields enclosed by brackets [] replaced by your own
020: * identifying information: "Portions Copyrighted [year]
021: * [name of copyright owner]"
022: *
023: * Contributor(s):
024: *
025: * If you wish your version of this file to be governed by only the CDDL or
026: * only the GPL Version 2, indicate your decision by adding "[Contributor]
027: * elects to include this software in this distribution under the [CDDL or GPL
028: * Version 2] license." If you don't indicate a single choice of license, a
029: * recipient has the option to distribute your version of this file under
030: * either the CDDL, the GPL Version 2 or to extend the choice of license to
031: * its licensees as provided above. However, if you add GPL Version 2 code
032: * and therefore, elected the GPL Version 2 license, then the option applies
033: * only if the new code is made subject to such option by the copyright
034: * holder.
035: */
036:
037: // ClientOutboundSequence.java
038: //
039: //
040: // @author Mike Grogan
041: // Created on October 15, 2005, 3:13 PM
042: //
043: package com.sun.xml.ws.rm.jaxws.runtime.client;
044:
045: import com.sun.xml.ws.api.SOAPVersion;
046: import com.sun.xml.ws.api.addressing.AddressingVersion;
047: import com.sun.xml.ws.api.addressing.WSEndpointReference;
048: import com.sun.xml.ws.api.rm.AcknowledgementListener;
049: import com.sun.xml.ws.api.rm.SequenceSettings;
050: import com.sun.xml.ws.api.rm.client.ClientSequence;
051: import com.sun.xml.ws.rm.InvalidMessageNumberException;
052: import com.sun.xml.ws.rm.Message;
053: import com.sun.xml.ws.rm.RMException;
054: import com.sun.xml.ws.rm.RMVersion;
055: import com.sun.xml.ws.rm.jaxws.runtime.InboundSequence;
056: import com.sun.xml.ws.rm.jaxws.runtime.OutboundSequence;
057: import com.sun.xml.ws.rm.jaxws.runtime.SequenceConfig;
058: import com.sun.xml.ws.rm.jaxws.util.LoggingHelper;
059: import com.sun.xml.ws.rm.protocol.*;
060: import com.sun.xml.ws.rm.v200502.Identifier;
061: import com.sun.xml.ws.security.secext10.SecurityTokenReferenceType;
062:
063: import javax.xml.bind.JAXBElement;
064: import javax.xml.transform.Source;
065: import javax.xml.ws.Service;
066: import javax.xml.ws.wsaddressing.W3CEndpointReference;
067: import java.net.URI;
068: import java.util.UUID;
069: import java.util.logging.Level;
070: import java.util.logging.Logger;
071:
072: /**
073: * ClientOutboundSequence represents the set of all messages from a single BindingProvider instance.
074: * It includes methods that connect and disconnect to a remote RMDestination using
075: * a client for a WebService that uses CreateSequence and TerminateSequence as its request messages.
076: */
077:
078: public class ClientOutboundSequence extends OutboundSequence implements
079: ClientSequence {
080:
081: private static final Logger logger = Logger.getLogger(LoggingHelper
082: .getLoggerName(ClientOutboundSequence.class));
083:
084: /**
085: * Current value of receive buffer read from incoming SequenceAcknowledgement
086: * messages if RM Destination implements properietary Indigo Flow Control feature.
087: */
088: protected int receiveBufferSize;
089:
090: /**
091: * The helper class used to send protocol messages
092: * <code>CreateSequenceElement</code>
093: * <code>CreateSequenceResponseElement</code>
094: * <code>LastMessage</code>
095: * <code>AckRequestedElement</code>
096: *
097: */
098: protected ProtocolMessageSender protocolMessageSender;
099:
100: private SOAPVersion version;
101:
102: /**
103: * Flag to indicate if secureReliableMessaging is on
104: */
105: private boolean secureReliableMessaging;
106:
107: /**
108: * The SecurityTokenReference to pass to CreateSequence
109: */
110: private JAXBElement<SecurityTokenReferenceType> str = null;
111:
112: /**
113: * Indicates whether the sequence uses anonymous acksTo
114: */
115: private boolean isAnonymous = false;
116:
117: /*
118: * Flag which indicates whether sequence is active (disconnect() has not
119: * been called.
120: */
121: private boolean isActive = true;
122:
123: /**
124: * Time after which resend of messages in sequences is attempted at
125: * next opportunity.
126: */
127: private long resendDeadline;
128:
129: /**
130: * Time after which Ack is requested at next opportunity.
131: */
132: private long ackRequestDeadline;
133:
134: /**
135: * Can be registered to listen for sequence acknowledgements.
136: */
137: private AcknowledgementListener ackListener;
138:
139: /**
140: * Service using this sequence (if known)
141: */
142: private Service service;
143:
144: /**
145: * This field is used only as a hack to test Server-side
146: * timeout functionality. It is not intended to be used
147: * for any other purpose.
148: */
149: private static boolean sendHeartbeats = true;
150:
151: public ClientOutboundSequence(SequenceConfig config) {
152: this .config = config;
153:
154: //for now
155: this .version = config.getSoapVersion();
156: this .ackHandler = new AcknowledgementHandler(config);
157: this .rmConstants = config.getRMConstants();
158: this .bufferRemaining = config.getBufferSize();
159:
160: }
161:
162: /**
163: * Accessor for the sequenceConfig field
164: *
165: * @return The value of the field.
166: */
167: public SequenceConfig getSequenceConfig() {
168: return config;
169: }
170:
171: /**
172: * Mutator for the <code>receiveBufferSize</code> field.
173: *
174: * @param receiveBufferSize The new value for the field.
175: */
176: public void setReceiveBufferSize(int receiveBufferSize) {
177: this .receiveBufferSize = receiveBufferSize;
178: }
179:
180: /**
181: * Accessor for the <code>receiveBufferSize</code> field.
182: *
183: * @return The value for the field.
184: */
185: public int getReceiveBufferSize() {
186: return receiveBufferSize;
187: }
188:
189: public boolean isSecureReliableMessaging() {
190: return secureReliableMessaging;
191: }
192:
193: /**
194: * Return the hoped-for limit to number of stored messages. Currently
195: * the limit is not enforced, but as the number of stored messages approaches
196: * the limit, resends and ackRequests occur more frequently.
197: */
198: public int getTransferWindowSize() {
199: //Use server size receive buffer size for now. Might
200: //want to make this configurable.
201: return config.getBufferSize();
202: }
203:
204: /**
205: * Registers a <code>AcknowledgementListener</code> for this
206: * sequence
207: *
208: * @param listener The <code>AcknowledgementListener</code>
209: */
210:
211: public void setAcknowledgementListener(
212: AcknowledgementListener listener) {
213: this .ackListener = listener;
214: }
215:
216: /**
217: * Implementation of the getSequenceSettings method in
218: * com.sun.xml.ws.rm.api.client.ClientSequence. Need
219: * to populate the sequence ids in the returned SequenceSettings
220: * object, since in general, they will not be set in the underlying
221: * SequenceConfig object.
222: */
223: public SequenceSettings getSequenceSettings() {
224:
225: SequenceSettings settings = getSequenceConfig();
226: settings.sequenceId = getId();
227:
228: InboundSequence iseq = getInboundSequence();
229:
230: settings.companionSequenceId = (iseq != null) ? iseq.getId()
231: : null;
232: return settings;
233: }
234:
235: /**
236: * Accessor for the AcknowledgementListener field.
237: *
238: * @return The AcknowledgementListener.
239: */
240: public AcknowledgementListener getAcknowledgementListener() {
241: return ackListener;
242: }
243:
244: public void setSecureReliableMessaging(
245: boolean secureReliableMessaging) {
246: this .secureReliableMessaging = secureReliableMessaging;
247: }
248:
249: /**
250: * Accessor for the service field.
251: *
252: * @returns The value of the service field. May be null if not known.
253: */
254: public Service getService() {
255: return service;
256: }
257:
258: /**
259: * Sets the value of the service field.
260: *
261: * @param service The service using the sequence.
262: */
263: public void setService(Service service) {
264: this .service = service;
265: }
266:
267: /**
268: * Connects to remote RM Destination by sending request through the proxy
269: * stored in the <code>port</code> field.
270: *
271: * @param destination Destination URI for RM Destination
272: * @param acksTo reply to EPR for protocol responses. The null value indicates
273: * use of the WS-Addressing anonymous EPR
274: * @throws RMException wrapper for all exceptions thrown during execution of method.
275: */
276: public void connect(URI destination, URI acksTo, boolean twoWay)
277: throws RMException {
278: try {
279:
280: this .destination = destination;
281: this .acksTo = acksTo;
282:
283: String anonymous = rmConstants.getAnonymousURI().toString();
284: String acksToString;
285:
286: if (acksTo == null) {
287: acksToString = anonymous;
288: } else {
289: acksToString = acksTo.toString();
290:
291: }
292:
293: this .isAnonymous = acksToString.equals(anonymous);
294:
295: AbstractCreateSequence cs = null;
296: if (config.getRMVersion() == RMVersion.WSRM10) {
297: cs = new com.sun.xml.ws.rm.v200502.CreateSequenceElement();
298: } else {
299: cs = new com.sun.xml.ws.rm.v200702.CreateSequenceElement();
300: }
301:
302: // CreateSequenceElement cs = new CreateSequenceElement();
303:
304: /**
305: * ADDRESSING_FIXME
306: * This needs to be fixed commenting temporarily to get the compilation
307: * problems fixed
308: */
309: /*if (RMConstants.getAddressingVersion() == AddressingVersion.W3C){
310: cs.setAcksTo(new W3CAcksToImpl(new URI(acksToString)));
311: } else {
312: cs.setAcksTo(new MemberSubmissionAcksToImpl(new URI(acksToString)));
313:
314: }*/
315: W3CEndpointReference endpointReference = null;
316: AddressingVersion addressingVersion = rmConstants
317: .getAddressingVersion();
318: if (addressingVersion == AddressingVersion.W3C) {
319: //WSEndpointReference wsepr = new WSEndpointReference(getClass().getResourceAsStream("w3c-anonymous-acksTo.xml"), addressingVersion);
320: WSEndpointReference epr = AddressingVersion.W3C.anonymousEpr;
321: Source s = epr.asSource("AcksTo");
322: endpointReference = new W3CEndpointReference(s);
323: }/*else {
324: WSEndpointReference wsepr = new WSEndpointReference(getClass().getResourceAsStream("member-anonymous-acksTo.xml"), addressingVersion);
325: Source s = wsepr.asSource("AcksTo");
326: endpointReference = new MemberSubmissionEndpointReference(s);
327: }*/
328: cs.setAcksTo(endpointReference);
329:
330: String incomingID = "uuid:" + UUID.randomUUID();
331:
332: if (twoWay) {
333:
334: if (config.getRMVersion() == RMVersion.WSRM10) {
335: com.sun.xml.ws.rm.v200502.Identifier id = new com.sun.xml.ws.rm.v200502.Identifier();
336: com.sun.xml.ws.rm.v200502.OfferType offer = new com.sun.xml.ws.rm.v200502.OfferType();
337: id.setValue(incomingID);
338: offer.setIdentifier(id);
339: ((com.sun.xml.ws.rm.v200502.CreateSequenceElement) cs)
340: .setOffer(offer);
341: } else {
342: com.sun.xml.ws.rm.v200702.Identifier id = new com.sun.xml.ws.rm.v200702.Identifier();
343: com.sun.xml.ws.rm.v200702.OfferType offer = new com.sun.xml.ws.rm.v200702.OfferType();
344: id.setValue(incomingID);
345: offer.setIdentifier(id);
346: ((com.sun.xml.ws.rm.v200702.CreateSequenceElement) cs)
347: .setOffer(offer);
348: }
349:
350: }
351:
352: if (secureReliableMessaging) {
353: JAXBElement<SecurityTokenReferenceType> str = getSecurityTokenReference();
354: if (str != null) {
355: cs.setSecurityTokenReference(str.getValue());
356: } else {
357: throw new RMException(
358: "SecurityTokenReference is null");
359: }
360: }
361:
362: AbstractCreateSequenceResponse csr = protocolMessageSender
363: .sendCreateSequence(cs, destination, acksTo,
364: version);
365:
366: AbstractAcceptType accept = null;
367: if (csr != null) {
368: if (csr instanceof com.sun.xml.ws.rm.v200502.CreateSequenceResponseElement) {
369: Identifier idOutbound = ((com.sun.xml.ws.rm.v200502.CreateSequenceResponseElement) csr)
370: .getIdentifier();
371: this .id = idOutbound.getValue();
372:
373: accept = ((com.sun.xml.ws.rm.v200502.CreateSequenceResponseElement) csr)
374: .getAccept();
375: } else {
376: com.sun.xml.ws.rm.v200702.Identifier idOutbound = ((com.sun.xml.ws.rm.v200702.CreateSequenceResponseElement) csr)
377: .getIdentifier();
378: this .id = idOutbound.getValue();
379:
380: accept = ((com.sun.xml.ws.rm.v200702.CreateSequenceResponseElement) csr)
381: .getAccept();
382:
383: }
384:
385: if (accept != null) {
386: /**
387: * ADDRESSING_FIXME Needs to be fixes once
388: * AcksTO issue is resolved
389: */
390: /* URI uriAccept = accept.getAcksTo();*/
391: URI uriAccept = null;
392:
393: inboundSequence = new ClientInboundSequence(this ,
394: incomingID, uriAccept, config);
395: } else {
396: inboundSequence = new ClientInboundSequence(this ,
397: incomingID, null, config);
398: }
399:
400: //start the inactivity clock
401: resetLastActivityTime();
402:
403: } else {
404: //maybe a non-anonymous AcksTo
405: //Handle CreateSequenceRefused fault
406: }
407: } catch (Exception e) {
408: throw new RMException(e);
409: }
410: }
411:
412: /**
413: * Disconnect from the RMDestination by invoking <code>TerminateSequence</code> on
414: * the proxy stored in the <code>port</code> field. State of
415: * sequence is set to inactive.
416: *
417: * @throws RMException wrapper for all exceptions thrown during execution of method.
418: */
419: public void disconnect() throws RMException {
420: disconnect(false);
421: }
422:
423: /**
424: * Disconnect from the RMDestination by invoking <code>TerminateSequence</code> on
425: * the proxy stored in the <code>port</code> field.
426: *
427: * @param keepAlive If true, state of sequence is kept in
428: * active atate allowing the reuse of the sequence.
429: *
430: * @throws RMException wrapper for all exceptions thrown during execution of method.
431: */
432: public void disconnect(boolean keepAlive) throws RMException {
433:
434: //FIXME - find another check for connectiveness.. want to get rid of
435: //unnecessary InboundSequences.
436: if (inboundSequence == null) {
437: throw new IllegalStateException("Not connected.");
438: }
439:
440: isActive = keepAlive;
441:
442: //TODO
443: //Move this after waitForAcks to obviate problems caused by
444: //the LastMessage Protocol message being processed concurrently with
445: //application messages. At the moment, this may cause problems in
446: //Glassfish container with ordered delivery configured. This will
447: //probably no longer be the case when the Tube/Fibre architecture
448: //is used.
449: if (config.getRMVersion() == RMVersion.WSRM10) {
450: sendLast();
451: } else {
452: sendCloseSequence();
453: }
454:
455: //this will block until all messages are complete
456: waitForAcks();
457: AbstractTerminateSequence ts = null;
458: if (config.getRMVersion() == RMVersion.WSRM10) {
459: ts = new com.sun.xml.ws.rm.v200502.TerminateSequenceElement();
460: com.sun.xml.ws.rm.v200502.Identifier idTerminate = new com.sun.xml.ws.rm.v200502.Identifier();
461: idTerminate.setValue(id);
462: ((com.sun.xml.ws.rm.v200502.TerminateSequenceElement) ts)
463: .setIdentifier(idTerminate);
464: } else {
465: ts = new com.sun.xml.ws.rm.v200702.TerminateSequenceElement();
466: com.sun.xml.ws.rm.v200702.Identifier idTerminate = new com.sun.xml.ws.rm.v200702.Identifier();
467: idTerminate.setValue(id);
468: ((com.sun.xml.ws.rm.v200702.TerminateSequenceElement) ts)
469: .setIdentifier(idTerminate);
470:
471: }
472:
473: protocolMessageSender.sendTerminateSequence(ts, this , version);
474:
475: }
476:
477: private void sendLast() throws RMException {
478: protocolMessageSender.sendLast(this , version);
479: }
480:
481: private void sendCloseSequence() throws RMException {
482: protocolMessageSender.sendCloseSequence(this , version);
483: }
484:
485: /**
486: * Causes the specified message number to be resent.
487: *
488: * @param messageNumber The message number to resend
489: */
490: public void resend(int messageNumber) throws RMException {
491: Message mess = get(messageNumber);
492: mess.resume();
493: }
494:
495: /**
496: * Forces an ack request on next message
497: */
498: public synchronized void requestAck() {
499: ackRequestDeadline = System.currentTimeMillis();
500: }
501:
502: /**
503: * Checks whether an ack should be requested. Currently checks whether the
504: * The algorithm checks whether the ackRequest deadline has elapsed.
505: * The ackRequestDeadline is determined by the ackRequestInterval in the
506: * SequenceConfig member for this sequence.
507: *
508: */
509: protected synchronized boolean isAckRequested() {
510:
511: long time = System.currentTimeMillis();
512: if (time > ackRequestDeadline) {
513: //reset the clock
514: ackRequestDeadline = time + getAckRequestInterval();
515: return true;
516: } else {
517: return false;
518: }
519: }
520:
521: /**
522: * Checks whether a resend should happen. The algorithm checks whether
523: * the resendDeadline has elapsed.
524: * The resendDeadline is determined by the resendInterval in the
525: * SequenceConfig member for this sequence.
526: *
527: */
528: public synchronized boolean isResendDue() {
529: long time = System.currentTimeMillis();
530: if (time > resendDeadline) {
531: //reset the clock
532: resendDeadline = time + getResendInterval();
533: return true;
534: } else {
535: return false;
536: }
537: }
538:
539: private long getResendInterval() {
540:
541: //do a resend at every opportunity under these conditions
542: //1. Sequence has been terminated
543: //2. Number of stored messages exceeds 1/2 available space.
544:
545: if (!isActive || storedMessages > (getTransferWindowSize() / 2)) {
546: return 0;
547: }
548: return config.getResendInterval();
549: }
550:
551: /**
552: * Returns true if TransferWindow is full. In this case, we
553: * hold off on sending messages.
554: */
555: public boolean isTransferWindowFull() {
556: return getTransferWindowSize() == storedMessages;
557: }
558:
559: private long getAckRequestInterval() {
560: //send an ackRequest at every opportunity under these conditions
561: //1. Sequence has been terminated
562: //2. Number of stored messages exceeds 1/2 available space.
563: //3. Number of stored messages at endpoint exceeds 1/2
564: // available space.
565: if (!isActive
566: || storedMessages > (getTransferWindowSize() / 2)
567: || getReceiveBufferSize() > (config.getBufferSize() / 2)) {
568: return 0;
569: }
570: return config.getAckRequestInterval();
571: }
572:
573: /**
574: * Implementation of acknowledge defers discarding stored messages when
575: * the AcksTo endpoint is anonymous and the message is a two-way request.
576: * In this case, the actual work usually done by acknowledge() needs to
577: * wait until the response is received. The RMClientPipe invokes
578: * <code>acknowledgeResponse</code> at that time.
579: *
580: * @param i The index to acknowledge
581: * @throws InvalidMessageNumberException
582: */
583: public synchronized void acknowledge(int i)
584: throws InvalidMessageNumberException {
585:
586: Message mess = get(i);
587: if (isAnonymous() && mess.isTwoWayRequest) {
588: return;
589: } else {
590: super .acknowledge(i);
591:
592: if (ackListener != null) {
593: ackListener.notify(this , i);
594: }
595: //if this acknowledgement is not on the protocol
596: //response for the one-way message (endpoint behaved
597: //unkindly, or possibly dropped the request), the sending
598: //thread is waiting in the resend loop in RMClientPipe.
599: mess.resume();
600: }
601: }
602:
603: /**
604: * Acknowledges that a response to a two-way operation has been
605: * received. See Javadoc for <code>acknowledge</code>
606: *
607: * @param i The index to acknowledge
608: * @throws InvalidMessageNumberException
609: */
610: public synchronized void acknowledgeResponse(int i)
611: throws InvalidMessageNumberException {
612:
613: super .acknowledge(i);
614: if (ackListener != null) {
615: ackListener.notify(this , i);
616: }
617: }
618:
619: /**
620: * Return value is determined by whether the destination endpoint is the
621: * anonymous URI.
622: *
623: * @return <code>true</code> if the destination is the anonymous URI.
624: * <code>false</code> otherwise.
625: */
626: public boolean isAnonymous() {
627: return isAnonymous;
628: }
629:
630: public void registerProtocolMessageSender(ProtocolMessageSender pms) {
631: this .protocolMessageSender = pms;
632:
633: }
634:
635: public JAXBElement<SecurityTokenReferenceType> getSecurityTokenReference() {
636: return str;
637: }
638:
639: public void setSecurityTokenReference(
640: JAXBElement<SecurityTokenReferenceType> str) {
641: this .str = str;
642: }
643:
644: /**
645: * Handler periodically invoked by RMSource.MaintenanceThread.
646: * Has two duties:<p>
647: * <ul><li>Resend incomplete messages.</li>
648: * <li>Send AckRequested message down the pipeline if Inactivity
649: * timeout is approaching.</li>
650: * </ul>
651: *
652: * @throws RMException
653: */
654: public synchronized void doMaintenanceTasks() throws RMException {
655:
656: if (storedMessages > 0 && isResendDue()) {
657: int top = getNextIndex();
658: for (int i = 1; i < top; i++) {
659: Message mess = get(i);
660: if (mess != null && !mess.isComplete()) {
661: logger.fine("resending " + getId() + ":" + i);
662: resend(i);
663: }
664: }
665: } else {
666: //check whether we need to prime the pump
667: if (isGettingClose(System.currentTimeMillis()
668: - getLastActivityTime(), config
669: .getInactivityTimeout())) {
670: //send an AckRequested down the pipe. Need to use a background
671: //Thread. This is being called by the RMSource maintenance thread
672: //whose health we have to be very careful with. If the heartbeat
673: //message takes inordinately long to process, the maintenance thread
674: //could miss many assignments.
675: new AckRequestedSender(this ).start();
676: }
677: }
678: }
679:
680: private class AckRequestedSender extends Thread {
681:
682: private ClientOutboundSequence sequence;
683:
684: AckRequestedSender(ClientOutboundSequence sequence) {
685: this .sequence = sequence;
686: }
687:
688: public void run() {
689: try {
690:
691: if (sendHeartbeats) {
692:
693: logger.fine(Messages.HEARTBEAT_MESSAGE_MESSAGE
694: .format(sequence.getId(), System
695: .currentTimeMillis()));
696:
697: protocolMessageSender.sendAckRequested(sequence,
698: version);
699: }
700:
701: } catch (Exception e) {
702: //We get here in at least two cases.
703: //1. Client running in Webapp that is undeployed,
704: //2. SequenceFault from AckRequested message.
705: //
706: //In both cases the sequence is of no further use. We
707: //will assume for now that this is already the case.
708: logger.log(Level.FINE,
709: Messages.HEARTBEAT_MESSAGE_EXCEPTION.format()
710: + " " + sequence.getId(), e);
711: try {
712: RMSource.getRMSource().removeOutboundSequence(
713: sequence);
714: } catch (Exception ex) {
715: }
716: }
717: }
718: }
719:
720: }
|