001: /*
002: * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
003: *
004: * Copyright 1997-2007 Sun Microsystems, Inc. All rights reserved.
005: *
006: * The contents of this file are subject to the terms of either the GNU
007: * General Public License Version 2 only ("GPL") or the Common Development
008: * and Distribution License("CDDL") (collectively, the "License"). You
009: * may not use this file except in compliance with the License. You can obtain
010: * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
011: * or glassfish/bootstrap/legal/LICENSE.txt. See the License for the specific
012: * language governing permissions and limitations under the License.
013: *
014: * When distributing the software, include this License Header Notice in each
015: * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
016: * Sun designates this particular file as subject to the "Classpath" exception
017: * as provided by Sun in the GPL Version 2 section of the License file that
018: * accompanied this code. If applicable, add the following below the License
019: * Header, with the fields enclosed by brackets [] replaced by your own
020: * identifying information: "Portions Copyrighted [year]
021: * [name of copyright owner]"
022: *
023: * Contributor(s):
024: *
025: * If you wish your version of this file to be governed by only the CDDL or
026: * only the GPL Version 2, indicate your decision by adding "[Contributor]
027: * elects to include this software in this distribution under the [CDDL or GPL
028: * Version 2] license." If you don't indicate a single choice of license, a
029: * recipient has the option to distribute your version of this file under
030: * either the CDDL, the GPL Version 2 or to extend the choice of license to
031: * its licensees as provided above. However, if you add GPL Version 2 code
032: * and therefore, elected the GPL Version 2 license, then the option applies
033: * only if the new code is made subject to such option by the copyright
034: * holder.
035: */
036:
037: /*
038: * OutboundSequence.java
039: *
040: * @author Mike Grogan
041: * Created on November 24, 2005, 9:40 AM
042: *
043: */
044:
045: package com.sun.xml.ws.rm.jaxws.runtime;
046:
047: import com.sun.xml.ws.api.SOAPVersion;
048: import com.sun.xml.ws.api.message.Headers;
049: import com.sun.xml.ws.rm.*;
050: import com.sun.xml.ws.rm.jaxws.util.ProcessingFilter;
051: import com.sun.xml.ws.rm.protocol.AbstractAckRequested;
052: import com.sun.xml.ws.rm.protocol.AbstractSequence;
053: import com.sun.xml.ws.rm.protocol.AbstractSequenceAcknowledgement;
054: import com.sun.xml.ws.rm.protocol.AcknowledgementHandler;
055: import com.sun.xml.ws.rm.v200502.AckRequestedElement;
056:
057: import javax.xml.bind.Marshaller;
058: import java.net.URI;
059: import java.util.logging.Logger;
060:
061: /**
062: *
063: */
064: public abstract class OutboundSequence extends Sequence {
065:
066: private static final Logger logger = Logger
067: .getLogger(OutboundSequence.class.getName());
068:
069: /**
070: * Common destination for all application messages in the sequence.
071: */
072: protected URI destination;
073:
074: /**
075: * Endpoint for protocol responses. May be the WS-Addressing anonymous endpoint.
076: * There are several variations depending on whether this EPR is the same as
077: * the one used by application messages in the companion <code>InboundSequence</code>
078: */
079: protected URI acksTo;
080:
081: /**
082: * Companion <code>InboundSequence</code>
083: */
084: protected InboundSequence inboundSequence;
085:
086: /**
087: * Sequence acknowledgement to be sent back to client on next
088: * available message to the AcksTo endpoint.
089: */
090: protected AbstractSequenceAcknowledgement sequenceAcknowledgement;
091:
092: /**
093: * Instance of helper class that processes SequnceAcknowledgement headers.
094: */
095: protected AcknowledgementHandler ackHandler;
096:
097: /**
098: * Flag determines whether messages will be saved. Will only be
099: * false in the case of companions to ServerInboundSequences for
100: * endpoints with no two-way operations.
101: */
102: public boolean saveMessages = true;
103:
104: /**
105: * Processing filter whose handleRequestHeaders method
106: * can access headers before they are marshalled.
107: */
108: private ProcessingFilter filter = null;
109:
110: /**
111: * Space available in receiving buffer at destination, if
112: * this can be determined.
113: */
114: protected int bufferRemaining;
115:
116: /**
117: * Accessor for the value of the Destination URI.
118: *
119: * @return The destination URI.
120: */
121: public URI getDestination() {
122:
123: return destination;
124: }
125:
126: /**
127: * Accessor for the value of the Destination URI.
128: *
129: * @return The destination String.
130: */
131: public URI getAcksTo() {
132:
133: return acksTo;
134: }
135:
136: /**
137: * Invoked by Incoming message processor to post Sequence Acknowledgement
138: * from companion Incoming Sequence for transmission on next OutboundMessage.l
139: */
140: public void setSequenceAcknowledgement(
141: AbstractSequenceAcknowledgement element) {
142: this .sequenceAcknowledgement = element;
143: }
144:
145: /**
146: * Accessor for the <code>inboundSequence</code> field.
147: *
148: * @return The <code>inboundSequence</code> field.
149: */
150: public InboundSequence getInboundSequence() {
151: return inboundSequence;
152: }
153:
154: /**
155: * Accessor for bufferRemaining field.
156: */
157: public int getBufferRemaining() {
158: return bufferRemaining;
159: }
160:
161: /**
162: * Mutator for bufferRemaining field.
163: */
164: public void setBufferRemaining(int value) {
165: bufferRemaining = value;
166: }
167:
168: /**
169: * Handles an <code>OutboundMessage</code>.
170: * <br>
171: * <ul>
172: * <li> Store the message </li>
173: * <li> If ackRequested flag is set, add an <code>AckRequestedElement</code>
174: * header to the message.</code>
175: * <li> If complanion <code>ClientInboundSequence</code> has queued an acknowledgement,
176: * add a <code>SequenceAcknowledgementElement</code> header to the
177: * message.</li>
178: * </ul>
179: *
180: * @param mess The OutboundMessage.
181: * @param marshaller The Marshaller to use
182: */
183:
184: public void processOutboundMessage(Message mess,
185: Marshaller marshaller)
186: throws InvalidMessageNumberException, BufferFullException,
187: DuplicateMessageException {
188:
189: if (saveMessages && !mess.isOneWayResponse) {
190: //Add the message to the sequence unless this has been done previously
191: int messageNumber = mess.getMessageNumber();
192:
193: if (messageNumber == 0) {
194: messageNumber = set(getNextIndex(), mess);
195: } else {
196: set(messageNumber, mess);
197: }
198:
199: AbstractSequence element = null;
200: if (config.getRMVersion() == RMVersion.WSRM10) {
201: element = new com.sun.xml.ws.rm.v200502.SequenceElement();
202: } else {
203: element = new com.sun.xml.ws.rm.v200702.SequenceElement();
204: }
205: element.setNumber(messageNumber);
206: element.setId(this .getId());
207:
208: //mess.addHeader(Headers.create(getVersion(),marshaller,element));
209:
210: mess.setSequenceElement(element);
211:
212: //if it is time to request an ack for this sequence, add AckRequestedHeader
213: if (isAckRequested()) {
214: AbstractAckRequested ack = null;
215: if (config.getRMVersion() == RMVersion.WSRM10) {
216: ack = new AckRequestedElement();
217: ack.setId(this .getId());
218: } else {
219: ack = new com.sun.xml.ws.rm.v200702.AckRequestedElement();
220: ack.setId(this .getId());
221: }
222:
223: mess.setAckRequestedElement(ack);
224: }
225: }
226:
227: //if companion Inbound sequence is returning an acknowledgement, add the
228: //SequenceAcknowledgement header
229: if (sequenceAcknowledgement != null) {
230: //mess.addHeader(Headers.create(getVersion(), marshaller,sequenceAcknowledgement));
231: if (sequenceAcknowledgement instanceof com.sun.xml.ws.rm.v200502.SequenceAcknowledgementElement) {
232: mess
233: .setSequenceAcknowledgementElement((com.sun.xml.ws.rm.v200502.SequenceAcknowledgementElement) sequenceAcknowledgement);
234: } else {
235: mess
236: .setSequenceAcknowledgementElement((com.sun.xml.ws.rm.v200702.SequenceAcknowledgementElement) sequenceAcknowledgement);
237: }
238:
239: sequenceAcknowledgement = null;
240: }
241:
242: if (filter != null) {
243: filter.handleOutboundHeaders(mess);
244: }
245:
246: if (mess.getSequenceElement() != null) {
247: /*
248: mess.addHeader(Headers.create(getVersion(),
249: marshaller,
250: mess.getSequenceElement()));
251: */
252: mess.addHeader(createHeader(mess.getSequenceElement()));
253: }
254:
255: if (mess.getAckRequestedElement() != null) {
256: /*
257: mess.addHeader(Headers.create(getVersion(),
258: marshaller,
259: mess.getAckRequestedElement()));
260: */
261: mess.addHeader(createHeader(mess.getAckRequestedElement()));
262: }
263:
264: if (mess.getSequenceAcknowledgementElement() != null) {
265: /*
266: mess.addHeader(Headers.create(getVersion(),
267: marshaller,
268: mess.getSequenceAcknowledgementElement()));
269: */
270: mess.addHeader(createHeader(mess
271: .getSequenceAcknowledgementElement()));
272: }
273: }
274:
275: /**
276: * Add a pending acknowledgement to a message without adding message to sequence. Used
277: * for sending final ack on a TerminateSequence message if necessary.
278: *
279: * @param mess The OutboundMessage.
280: * @param marshaller The Marshaller to use
281: */
282:
283: public synchronized void processAcknowledgement(Message mess,
284: Marshaller marshaller) throws RMException {
285: //if companion Inbound sequence is returning an acknowledgement, add the
286: //SequenceAcknowledgement header
287: if (sequenceAcknowledgement != null) {
288: //mess.addHeader(Headers.create(getVersion(), marshaller,sequenceAcknowledgement));
289: mess.addHeader(createHeader(sequenceAcknowledgement));
290:
291: sequenceAcknowledgement = null;
292: }
293: }
294:
295: /**
296: * Sets the state of the message at the specified index to complete, and discards
297: * a com.sun.xml.ws.api.message.Message.
298: *
299: * @param i Index to set.
300: */
301: public synchronized void acknowledge(int i)
302: throws InvalidMessageNumberException {
303:
304: Message mess;
305: if (i >= nextIndex || (null == (mess = get(i)))) {
306: throw new InvalidMessageNumberException();
307: }
308:
309: if (!mess.isComplete()) {
310:
311: --storedMessages;
312: if (storedMessages == 0) {
313: //A thread on which waitForAcks() has been called
314: //may be waiting for all the acks to arrive.
315: notifyAll();
316: }
317:
318: mess.complete();
319: }
320: }
321:
322: /**
323: * Removes acked messages from list.
324: *(For anonymous client, need to widen definition of acked to include the
325: * requirement that responses have arrived.)
326: *
327: * @param element The <code>SequenceAcknowledgementElement</code> containing the
328: * ranges of messages to be removed.
329: */
330: public void handleAckResponse(
331: AbstractSequenceAcknowledgement element)
332: throws InvalidMessageNumberException {
333:
334: if (ackHandler == null) {
335: ackHandler = new AcknowledgementHandler(config);
336: }
337: ackHandler.handleAcknowledgement(this , element);
338: }
339:
340: /**
341: *
342: * Called by disconnect before sending Last and Terminate sequence. Blocks until all messages
343: * have been acked. The notifyAll method is called by OutboundSequence.acknowledge when
344: * stored message count reaches 0.
345: */
346: public synchronized void waitForAcks() {
347:
348: while (storedMessages != 0) {
349: try {
350: //wait for the specified timeout or a notify(), which is called
351: //whenever a message is acked.
352: long timeout = config.getCloseTimeout();
353: wait(timeout);
354:
355: if (storedMessages > 0) {
356: logger
357: .severe(Messages.TIMEOUT_IN_WAITFORACKS_STRING
358: .format(timeout / 1000,
359: storedMessages));
360: break;
361: }
362: } catch (InterruptedException e) {
363: //allow preDestroy to continue
364: break;
365: }
366: }
367: }
368:
369: protected boolean isAckRequested() {
370: //For oneway messages it does not make sense to send
371: // AckRequestedElement on the ServerOutbound messages
372: //saveMessages will be true in case of two way messages
373: // for AckRequestedElement will be generated then
374: //otherwise it will return false
375: return saveMessages;
376: }
377:
378: protected boolean isResendDue() {
379: return true;
380: }
381:
382: private SOAPVersion getVersion() {
383: return config.getSoapVersion();
384: }
385:
386: public void setProcessingFilter(ProcessingFilter filter) {
387: this .filter = filter;
388: }
389:
390: /**
391: * Add AckRequested element to an existing message if one is not already
392: * present. This is used to ensure that an AckRequested header is included
393: * on every resend.
394: *
395: * @param mess The message
396: * @param marshaller
397: */
398: public void ensureAckRequested(Message mess, Marshaller marshaller) {
399: if (mess.getAckRequestedElement() == null) {
400:
401: AbstractAckRequested ack = null;
402: if (config.getRMVersion() == RMVersion.WSRM10) {
403: ack = new AckRequestedElement();
404: ack.setId(this .getId());
405: } else {
406: ack = new com.sun.xml.ws.rm.v200702.AckRequestedElement();
407: ack.setId(this .getId());
408: }
409:
410: mess.setAckRequestedElement(ack);
411: /*
412: mess.addHeader(Headers.create(getVersion(),
413: marshaller,
414: mess.getAckRequestedElement()));
415: */
416: mess.addHeader(createHeader(mess.getAckRequestedElement()));
417: }
418: }
419:
420: protected com.sun.xml.ws.api.message.Header createHeader(Object obj) {
421: return Headers.create(config.getRMVersion()
422: .getJAXBRIContextHeaders(), obj);
423: }
424:
425: public Message getUnacknowledgedMessage() {
426: for (int i = 0; i < nextIndex; i++) {
427: try {
428: Message mess = get(i);
429: if (mess != null && !mess.isComplete()) {
430: return mess;
431: }
432: } catch (InvalidMessageNumberException e) {
433: }
434: }
435: return null;
436: }
437:
438: }
|