001: /*
002: * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
003: *
004: * Copyright 1997-2007 Sun Microsystems, Inc. All rights reserved.
005: *
006: * The contents of this file are subject to the terms of either the GNU
007: * General Public License Version 2 only ("GPL") or the Common Development
008: * and Distribution License("CDDL") (collectively, the "License"). You
009: * may not use this file except in compliance with the License. You can obtain
010: * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
011: * or glassfish/bootstrap/legal/LICENSE.txt. See the License for the specific
012: * language governing permissions and limitations under the License.
013: *
014: * When distributing the software, include this License Header Notice in each
015: * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
016: * Sun designates this particular file as subject to the "Classpath" exception
017: * as provided by Sun in the GPL Version 2 section of the License file that
018: * accompanied this code. If applicable, add the following below the License
019: * Header, with the fields enclosed by brackets [] replaced by your own
020: * identifying information: "Portions Copyrighted [year]
021: * [name of copyright owner]"
022: *
023: * Contributor(s):
024: *
025: * If you wish your version of this file to be governed by only the CDDL or
026: * only the GPL Version 2, indicate your decision by adding "[Contributor]
027: * elects to include this software in this distribution under the [CDDL or GPL
028: * Version 2] license." If you don't indicate a single choice of license, a
029: * recipient has the option to distribute your version of this file under
030: * either the CDDL, the GPL Version 2 or to extend the choice of license to
031: * its licensees as provided above. However, if you add GPL Version 2 code
032: * and therefore, elected the GPL Version 2 license, then the option applies
033: * only if the new code is made subject to such option by the copyright
034: * holder.
035: */
036: package com.sun.xml.ws.tx.at;
037:
038: import com.sun.enterprise.transaction.TransactionImport;
039: import com.sun.xml.ws.api.tx.Participant;
040: import com.sun.xml.ws.api.tx.Protocol;
041: import com.sun.xml.ws.api.tx.TXException;
042: import com.sun.xml.ws.tx.common.TransactionImportManager;
043: import com.sun.xml.ws.tx.common.TransactionManagerImpl;
044: import com.sun.xml.ws.tx.common.TxLogger;
045: import com.sun.xml.ws.tx.coordinator.CoordinationContextInterface;
046: import com.sun.xml.ws.tx.coordinator.Registrant;
047: import com.sun.xml.ws.tx.webservice.member.coord.CreateCoordinationContextType;
048:
049: import javax.resource.spi.XATerminator;
050: import javax.transaction.SystemException;
051: import javax.transaction.Transaction;
052: import javax.transaction.TransactionManager;
053: import javax.transaction.xa.XAException;
054: import javax.transaction.xa.XAResource;
055: import javax.transaction.xa.Xid;
056: import javax.xml.ws.WebServiceContext;
057: import java.util.logging.Level;
058: import javax.xml.ws.WebServiceException;
059:
060: /**
061: * @author jf39279
062: */
063: public class ATSubCoordinator extends ATCoordinator {
064: static private TxLogger logger = TxLogger
065: .getATLogger(ATCoordinator.class);
066:
067: final TransactionImport importTm = TransactionImportManager
068: .getInstance();
069:
070: // This Subordinate coordinator is a volatile participant of parent coordinator.
071: private ATParticipant rootVolatileParticipant = null;
072: // This subordinate coordinator is also a durable participant of its parent coordinator.
073: private ATParticipant rootDurableParticipant = null;
074:
075: private boolean guardTimeout = false;
076:
077: private boolean forgotten = false;
078:
079: /**
080: * Creates a new instance of ATSubCoordinator
081: */
082: public ATSubCoordinator(CoordinationContextInterface context,
083: CreateCoordinationContextType request) {
084: super (context, request);
085: assert getContext().getRootRegistrationService() != null;
086: }
087:
088: public ATSubCoordinator(CoordinationContextInterface context) {
089: super (context);
090: assert getContext().getRootRegistrationService() != null;
091: }
092:
093: public void setTransaction(final Transaction txn) {
094: super .setTransaction(txn);
095: if (txn == null) {
096: xaTerminator = null;
097: } else if (xaTerminator == null) {
098: xaTerminator = importTm.getXATerminator();
099: }
100: }
101:
102: protected void registerWithVolatileParent() {
103: if (rootVolatileParticipant == null) {
104: if (logger.isLogging(Level.FINE)) {
105: logger.fine(
106: "ATSubCoordinator.registerWithVolatileParent",
107: "register volatile2PC with coordId:"
108: + getIdValue());
109: }
110: rootVolatileParticipant = new ATParticipant(this ,
111: new VolatileParticipant());
112: rootVolatileParticipant.register();
113: }
114: }
115:
116: protected boolean registerWithDurableParent() {
117: boolean result = false;
118: if (rootDurableParticipant == null) {
119: if (logger.isLogging(Level.FINE)) {
120: logger.fine(
121: "ATSubCoordinator.registerWithDurableParent",
122: "register durable2PC with coordId:"
123: + getIdValue());
124: }
125: rootDurableParticipant = new ATParticipant(this ,
126: new DurableParticipant());
127: try {
128: rootDurableParticipant.register();
129: result = true;
130: } catch (Exception e) {
131: logger
132: .severe(
133: "registerWithDurableParent",
134: LocalizationMessages
135: .REG_WITH_DURABLE_FAILED_0022(getCoordIdPartId(rootDurableParticipant)));
136: throw new WebServiceException(
137: LocalizationMessages
138: .REG_WITH_DURABLE_FAILED_0022(getCoordIdPartId(rootDurableParticipant)),
139: e);
140:
141: }
142: }
143: return result;
144: }
145:
146: public boolean isSubordinateCoordinator() {
147: return true;
148: }
149:
150: private Xid xid = null;
151:
152: public Xid getCoordinationXid() {
153: if (xid == null) {
154: xid = CoordinationXid.lookupOrCreate(getContext()
155: .getIdentifier());
156: }
157: return xid;
158: }
159:
160: public class VolatileParticipant implements Participant {
161: public Protocol getProtocol() {
162: return Protocol.VOLATILE;
163: }
164:
165: public Participant.STATE prepare() throws TXException {
166: synchronized (this ) {
167: initiateVolatilePrepare();
168: waitForVolatilePrepareResponse();
169: if (isAborting()) {
170: rootVolatileParticipant.aborted();
171: throw new TXException(
172: "VolatileParticipant.prepare aborted");
173: } else if (getVolatileParticipants().size() == 0) {
174: rootVolatileParticipant.readonly();
175: return Participant.STATE.P_READONLY;
176: } else {
177: rootVolatileParticipant.prepared();
178: guardTimeout = true;
179: }
180: }
181: return Participant.STATE.P_OK;
182: }
183:
184: public void commit() {
185: initiateVolatileCommit();
186: rootVolatileParticipant.committed();
187: }
188:
189: public void abort() {
190: initiateVolatileRollback();
191: // TODO: is waitForVolatileRollbackResponse needed.
192: rootVolatileParticipant.aborted();
193: }
194: }
195:
196: public class DurableParticipant implements Participant {
197: int xaResult = XAResource.XA_OK;
198:
199: public Protocol getProtocol() {
200: return Protocol.DURABLE;
201: }
202:
203: public Participant.STATE prepare() throws TXException {
204: Participant.STATE result = Participant.STATE.P_OK;
205: final String METHOD = "durableParticipant";
206: if (logger.isLogging(Level.FINER)) {
207: logger.entering(METHOD,
208: getCoordIdPartId(rootDurableParticipant));
209: }
210:
211: // synchronize to avoid race conditions between Coordinator time out and potentially
212: // multiple WS-AT prepare messages. (Coordinator can resend prepare after some interval
213: // has passed with no reply.
214: synchronized (this ) {
215: initiateDurablePrepare();
216: if (getXATerminator() != null) {
217: try {
218: if (logger.isLogging(Level.FINER)) {
219: logger.entering("XATerminator.prepare()");
220: }
221: xaResult = getXATerminator().prepare(
222: getCoordinationXid());
223: if (logger.isLogging(Level.FINER)) {
224: logger.exiting("XATerminator.prepare()",
225: xaResult);
226: }
227: } catch (XAException ex) {
228: setAborting();
229: if (logger.isLogging(Level.FINEST)) {
230: logger.finest(METHOD, LocalizationMessages
231: .XATERM_THREW_0023(ex.getClass()
232: .getName()), ex);
233: } else {
234: logger.info(METHOD, LocalizationMessages
235: .XATERM_THREW_0023(ex.getClass()
236: .getName()));
237: }
238: // TODO: set linked exception ex to TxException
239: throw new TXException(ex.getClass().getName());
240: }
241:
242: // Next line required to support remote participants in this coordinators durable participants.
243: waitForDurablePrepareResponse();
244: if (isAborting()) {
245: abort();
246: } else if (xaResult == XAResource.XA_RDONLY
247: && getDurableParticipants().size() == 0) {
248: guardTimeout = true;
249: rootDurableParticipant.readonly();
250: result = Participant.STATE.P_READONLY;
251: } else if (xaResult == XAResource.XA_OK) { //implied no durable participants are aborting since isAborting() is false
252: // Participant Subordinator coordinator must not timeout after sending
253: // prepared to superior coordinator.
254: guardTimeout = true;
255: rootDurableParticipant.prepared();
256: }
257: }
258: }
259: if (logger.isLogging(Level.FINER)) {
260: logger.exiting("ATSubCoordinator.durableParticipant",
261: getCoordIdPartId(rootDurableParticipant));
262: }
263:
264: return result;
265: }
266:
267: public void commit() {
268: final String METHOD = "ATSubCoordinator.DurableParticipant.commit";
269: boolean xaCommitFailed = false;
270: synchronized (this ) {
271: initiateDurableCommit();
272: if (getXATerminator() != null && xaResult == XA_OK) {
273: try {
274: getXATerminator().commit(getCoordinationXid(),
275: false);
276: } catch (XAException ex) {
277: xaCommitFailed = true;
278:
279: logger.severe(METHOD, LocalizationMessages
280: .XATERM_THREW_0023(ex
281: .getLocalizedMessage()));
282:
283: }
284: }
285: if (xaCommitFailed || isAborting()) {
286: logger.severe(METHOD, LocalizationMessages
287: .ABORT_DURING_COMMIT_0024(getIdValue()));
288: rootDurableParticipant.aborted();
289: } else {
290: logger.info(METHOD, LocalizationMessages
291: .COMMITTED_SUB_COOR_0025(getIdValue()));
292: rootDurableParticipant.committed();
293: }
294: }
295: }
296:
297: public void abort() {
298: final String METHOD = "ATSubCoordinator.DurableParticipant.abort";
299: synchronized (this ) {
300: if (getXATerminator() != null && xaResult == XA_OK) {
301: try {
302: logger.severe(METHOD, LocalizationMessages
303: .XATERM_ABORT_0026(getIdValue()));
304: getXATerminator()
305: .rollback(getCoordinationXid());
306: } catch (XAException ex) {
307: logger.severe(METHOD, LocalizationMessages
308: .CAUGHT_XAEX_0027(ex.getMessage()));
309: }
310: }
311: initiateDurableRollback();
312: waitForCommitOrRollbackResponse(Protocol.DURABLE);
313: rootDurableParticipant.aborted();
314: }
315: }
316: }
317:
318: private XATerminator xaTerminator = null;
319:
320: private XATerminator getXATerminator() {
321: if (xaTerminator == null) {
322: xaTerminator = importTm.getXATerminator();
323: }
324: return xaTerminator;
325: }
326:
327: // Synchronization
328:
329: /**
330: * Subordinate Coordinator does not use this mechanism. Ensure it is not called.
331: */
332: public void beforeCompletion() {
333: // Ensure that beforeCompletion disabled for subordinate coordinator
334: // Subordinate coordinator should not be registered with TransactionSynchronizationRegistry.
335: throw new UnsupportedOperationException(
336: "No beforeCompletion for subordinate coordinator");
337: }
338:
339: public void afterCompletion(final int i) {
340: // Ensure that afterCompletion disabled for subordinate coordinator
341: throw new UnsupportedOperationException(
342: "No afterCompletion for subordinate coordinator");
343: }
344:
345: // XAResource Not implemented yet
346:
347: /**
348: * Synchronous prepare request from subordinate coordinator.
349: * <p/>
350: * <p>Prepare this coordinator and return result of preparation.
351: */
352: public int prepare(final Xid xid) throws XAException {
353: if (logger.isLogging(Level.FINER)) {
354: logger.entering("XAResource_prepare(xid=" + xid + ")");
355: }
356: int result = 0;
357: result = XA_OK;
358: // TODO: Prepare AT Subordinate Coordinator state so it can be recovered in case of failure.
359: // Only durable participants need to be recoverable.
360:
361: // This coordinator prepares when root ws-at coordinator sends a prepare to this subordinate
362: // coordinator that is a participant to that coordinator.
363: if (logger.isLogging(Level.FINER)) {
364: logger.exiting("XAResource_prepare", result);
365: }
366: return result;
367: }
368:
369: public void commit(final Xid xid, final boolean onePhase)
370: throws XAException {
371: if (logger.isLogging(Level.FINER)) {
372: logger.entering("XAResource_commit(xid=" + xid
373: + " ,onePhase=" + onePhase + ")");
374: }
375: // TODO: Commit AT Subordinate Coordinator state so it can be recovered in case of failure.
376: // Only durable participants need to be recoverable.
377:
378: // This coordinator commits when root ws-at coordinator sends a prepare to this subordinate
379: // coordinator that is a participant to that coordinator.
380: if (logger.isLogging(Level.FINER)) {
381: logger.exiting("XAResource_commit");
382: }
383: }
384:
385: public void rollback(final Xid xid) throws XAException {
386: if (logger.isLogging(Level.FINER)) {
387: logger.entering("XAResource_rollback(xid=" + xid + ")");
388: }
389: // TODO: Rollack AT Subordinate Coordinator state.
390: // Only durable participants require to be rolled back.
391:
392: // This coordinator rolls back when root ws-at coordinator sends a abort to this subordinate
393: // coordinator that is a participant to that coordinator.
394: if (logger.isLogging(Level.FINER)) {
395: logger.exiting("XAResource_rollback");
396: }
397: }
398:
399: @Override
400: public void addRegistrant(final Registrant registrant,
401: final WebServiceContext wsContext) {
402: //
403: if (registerWithRootRegistrationService(registrant)) {
404: // registrant is either volatile or durable participant with subordinate coordinator's parent coordinator.
405: // do not add this participant to ATSubCoordinator list of participants it is managing.
406: return;
407: }
408: super .addRegistrant(registrant, wsContext);
409: switch (registrant.getProtocol()) {
410: case VOLATILE:
411: registerWithVolatileParent();
412: break;
413:
414: case DURABLE:
415: registerWithDurableParent();
416: break;
417:
418: case COMPLETION:
419: throw new UnsupportedOperationException(
420: "can not register for completion with subordinate coordinator");
421: }
422: }
423:
424: /**
425: * Get the registrant with the specified id or null if it does not exist.
426: *
427: * @param id the registrant id
428: * @return the Registrant object or null if the id does not exist
429: */
430: public Registrant getRegistrant(final String id) {
431: Registrant result = super .getRegistrant(id);
432:
433: // check subordinate participants
434: if (result == null && rootVolatileParticipant != null
435: && rootVolatileParticipant.getIdValue().equals(id)) {
436: result = rootVolatileParticipant;
437: }
438: if (result == null && rootDurableParticipant != null
439: && rootDurableParticipant.getIdValue().equals(id)) {
440: result = rootDurableParticipant;
441: }
442:
443: return result;
444: }
445:
446: public void removeRegistrant(final String id) {
447: forget(id);
448: }
449:
450: /**
451: * Represent all volatile and durable participants for this subordinate coordinator through
452: * two special participants of SubordinateCoordinator. They are the only participants that parent
453: * coordinator is aware of.
454: */
455: public boolean registerWithRootRegistrationService(
456: final Registrant participant) {
457: // Note: intended instance comparision, not object comparison
458: if (participant == rootVolatileParticipant
459: || participant == rootDurableParticipant) {
460: return true;
461: } else {
462: return super
463: .registerWithRootRegistrationService(participant);
464: }
465: }
466:
467: @Override
468: public boolean hasOutstandingParticipants() {
469: return rootDurableParticipant != null
470: || rootVolatileParticipant != null
471: || super .hasOutstandingParticipants();
472: }
473:
474: public void forget(final String partId) {
475: if (rootVolatileParticipant != null
476: && rootVolatileParticipant.getIdValue().equals(partId)) {
477: rootVolatileParticipant = null;
478: if (logger.isLogging(Level.FINE)) {
479: logger.fine("forget",
480: "forgot volatile participant link to parent "
481: + getCoordIdPartId(partId));
482: }
483: if (!hasOutstandingParticipants()) {
484: forget();
485: }
486: return;
487: }
488: if (rootDurableParticipant != null
489: && rootDurableParticipant.getIdValue().equals(partId)) {
490: rootDurableParticipant = null;
491: if (logger.isLogging(Level.FINE)) {
492: logger.fine("forget",
493: "forgot durable participant link to parent "
494: + getCoordIdPartId(partId));
495: }
496: if (!hasOutstandingParticipants()) {
497: forget();
498: }
499: return;
500: }
501:
502: // see if just a regular participant of this coordinator
503: super .forget(partId);
504: }
505:
506: @Override
507: public boolean expirationGuard() {
508: synchronized (this ) {
509: return guardTimeout;
510: }
511: }
512:
513: @Override
514: public void forget() {
515: synchronized (this ) {
516: if (forgotten) {
517: return;
518: } else {
519: forgotten = true;
520: }
521: if (rootVolatileParticipant != null) {
522: rootVolatileParticipant.forget();
523: rootVolatileParticipant = null;
524: }
525: if (rootDurableParticipant != null) {
526: rootDurableParticipant.forget();
527: rootDurableParticipant = null;
528: }
529: CoordinationXid.forget(this .getIdValue());
530: super .forget();
531: }
532: }
533:
534: /**
535: * Import a transactional context from an external transaction manager via WS-AT Coordination Context
536: * that was propagated in a SOAP request message.
537: *
538: * @see #endImportTransaction()
539: */
540: public void beginImportTransaction() {
541: Transaction currentTxn = null;
542:
543: try {
544: importTm.recreate(getCoordinationXid(), getExpires());
545: } catch (IllegalStateException ex) {
546: String message = LocalizationMessages
547: .IMPORT_TRANSACTION_FAILED_0028(getIdValue(),
548: getCoordinationXid().toString());
549: logger.warning("beginImportTransaction", message, ex);
550: throw new WebServiceException(message, ex);
551: }
552: try {
553: currentTxn = tm.getTransaction();
554: } catch (SystemException ex) {
555: String message = LocalizationMessages
556: .IMPORT_TXN_GET_TXN_FAILED_0030(getIdValue());
557: logger.warning("beginImportTransaction", message, ex);
558: throw new WebServiceException(message, ex);
559: }
560: assert currentTxn != null;
561: setTransaction(currentTxn);
562: tm.setCoordinationContext(getContext());
563: }
564:
565: /**
566: * Ends the importing of an external transaction.
567: * <p/>
568: * <p> Post-condition: terminates beginImportTransaction.
569: *
570: * @see #beginImportTransaction()
571: */
572: public void endImportTransaction() {
573: if (transaction != null) {
574: try {
575: importTm.release(getCoordinationXid());
576: } catch (Error e) {
577: logger
578: .warning(
579: "endImportTransaction",
580: LocalizationMessages
581: .EXCEPTION_RELEASING_IMPORTED_TRANSACTION_0029(),
582: e);
583: }
584: setTransaction(null);
585: }
586: }
587:
588: }
|