0001: /*
0002: * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
0003: *
0004: * Copyright 1997-2007 Sun Microsystems, Inc. All rights reserved.
0005: *
0006: * The contents of this file are subject to the terms of either the GNU
0007: * General Public License Version 2 only ("GPL") or the Common Development
0008: * and Distribution License("CDDL") (collectively, the "License"). You
0009: * may not use this file except in compliance with the License. You can obtain
0010: * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
0011: * or glassfish/bootstrap/legal/LICENSE.txt. See the License for the specific
0012: * language governing permissions and limitations under the License.
0013: *
0014: * When distributing the software, include this License Header Notice in each
0015: * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
0016: * Sun designates this particular file as subject to the "Classpath" exception
0017: * as provided by Sun in the GPL Version 2 section of the License file that
0018: * accompanied this code. If applicable, add the following below the License
0019: * Header, with the fields enclosed by brackets [] replaced by your own
0020: * identifying information: "Portions Copyrighted [year]
0021: * [name of copyright owner]"
0022: *
0023: * Contributor(s):
0024: *
0025: * If you wish your version of this file to be governed by only the CDDL or
0026: * only the GPL Version 2, indicate your decision by adding "[Contributor]
0027: * elects to include this software in this distribution under the [CDDL or GPL
0028: * Version 2] license." If you don't indicate a single choice of license, a
0029: * recipient has the option to distribute your version of this file under
0030: * either the CDDL, the GPL Version 2 or to extend the choice of license to
0031: * its licensees as provided above. However, if you add GPL Version 2 code
0032: * and therefore, elected the GPL Version 2 license, then the option applies
0033: * only if the new code is made subject to such option by the copyright
0034: * holder.
0035: */
0036: package com.sun.xml.ws.tx.at;
0037:
0038: import com.sun.xml.ws.api.SOAPVersion;
0039: import com.sun.xml.ws.api.tx.Protocol;
0040: import com.sun.xml.ws.api.tx.TXException;
0041: import com.sun.xml.ws.developer.MemberSubmissionEndpointReference;
0042: import static com.sun.xml.ws.tx.at.ATParticipant.STATE.*;
0043: import com.sun.xml.ws.tx.common.AT_2PC_State;
0044: import static com.sun.xml.ws.tx.common.AT_2PC_State.ABORTING;
0045: import static com.sun.xml.ws.tx.common.AT_2PC_State.ACTIVE;
0046: import static com.sun.xml.ws.tx.common.AT_2PC_State.COMMITTING;
0047: import static com.sun.xml.ws.tx.common.AT_2PC_State.PREPARED_SUCCESS;
0048: import static com.sun.xml.ws.tx.common.AT_2PC_State.PREPARING;
0049: import com.sun.xml.ws.tx.common.AddressManager;
0050: import com.sun.xml.ws.tx.common.TransactionManagerImpl;
0051: import com.sun.xml.ws.tx.common.TxFault;
0052: import com.sun.xml.ws.tx.common.TxLogger;
0053: import com.sun.xml.ws.tx.common.WsaHelper;
0054: import com.sun.xml.ws.tx.coordinator.CoordinationContextInterface;
0055: import com.sun.xml.ws.tx.coordinator.Coordinator;
0056: import com.sun.xml.ws.tx.coordinator.Registrant;
0057: import com.sun.xml.ws.tx.webservice.member.at.CoordinatorPortType;
0058: import com.sun.xml.ws.tx.webservice.member.at.WSATCoordinator;
0059: import com.sun.xml.ws.tx.webservice.member.coord.CreateCoordinationContextType;
0060:
0061: import javax.transaction.RollbackException;
0062: import javax.transaction.Synchronization;
0063: import javax.transaction.SystemException;
0064: import javax.transaction.Transaction;
0065: import javax.transaction.xa.XAException;
0066: import javax.transaction.xa.XAResource;
0067: import javax.transaction.xa.Xid;
0068: import javax.xml.ws.EndpointReference;
0069: import javax.xml.ws.WebServiceContext;
0070: import javax.xml.ws.WebServiceException;
0071: import java.net.URI;
0072: import java.util.ArrayList;
0073: import java.util.Collection;
0074: import java.util.Collections;
0075: import java.util.Iterator;
0076: import java.util.LinkedHashMap;
0077: import java.util.List;
0078: import java.util.Map;
0079: import java.util.logging.Level;
0080:
0081: /**
0082: * Atomic Transaction Coordinator
0083: * <p/>
0084: * <p/>
0085: * Coordinator States: NONE, ACTIVE, Volatile2PCPrepare, Durable2PCPrepare, Committing, Aborting
0086: * <p/>
0087: * <p/>
0088: * <p/>
0089: * <p/>
0090: * <p/>
0091: * Relationship between ATCoordinator and Java Transaction Manager.
0092: * ATCoordinator is registered as an XAResource with Java Transaction Manager.
0093: * This enables Java Transaction Manager to be root transaction manager that
0094: * for ATCoordinator durable 2pc participants. ATCoordinator registers for
0095: * Transaction Synchronization if it has volatile participants. This enables
0096: * volatile participants to be prepared BEFORE durable 2pc participants are prepared.
0097: * <p/>
0098: * <p/>
0099: * <p/>
0100: * Coordination Context expires specifies the period, measured from
0101: * the point in time at which the context was first created or received, after which a
0102: * transaction MAY be terminated solely due to its length of operation.
0103: * From that point forward, the coordinator MAY elect to unilaterally roll back the transaction,
0104: * so long as it has not made a commit decision.
0105: *
0106: * @author Ryan.Shoemaker@Sun.COM
0107: * @author Joe.Fialli@Sun.COM
0108: * @version $Revision: 1.19 $
0109: * @since 1.0
0110: */
0111: public class ATCoordinator extends Coordinator implements
0112: Synchronization, XAResource {
0113:
0114: public static final URI localCoordinationProtocolServiceURI = AddressManager
0115: .getPreferredAddress(CoordinatorPortType.class);
0116:
0117: // TODO: short term solution so waitFor* do not hang. Remove when implement transaction timeout.
0118: static private final int MAX_WAIT_ITERATION = 300;
0119: static private final long WAIT_SLEEP = 2000;
0120:
0121: static private TxLogger logger = TxLogger
0122: .getATLogger(ATCoordinator.class);
0123: static final protected TransactionManagerImpl tm = TransactionManagerImpl
0124: .getInstance();
0125:
0126: enum ACTION {
0127: PREPARE, COMMIT, ROLLBACK
0128: };
0129:
0130: enum KIND {
0131: VOLATILE, DURABLE
0132: };
0133:
0134: /* map <Registrant.getId(), Registrant> of volatile 2pc participants */
0135: private final Map<String, ATParticipant> volatileParticipants = new LinkedHashMap<String, ATParticipant>(
0136: 4);
0137: private AT_2PC_State volatileParticipantsState = ACTIVE;
0138:
0139: /* map <Registrant.getId(), Registrant> of durable 2pc participants */
0140: private final Map<String, ATParticipant> durableParticipants = new LinkedHashMap<String, ATParticipant>(
0141: 4);
0142: private AT_2PC_State durableParticipantsState = ACTIVE;
0143:
0144: /* the completion registrant - only allowed on root ATCoordinator
0145: */
0146: private ATCompletion completionRegistrant;
0147:
0148: private boolean guardTimeout = false;
0149: private boolean forgotten = false;
0150:
0151: /**
0152: * From WSAT 2004 S3.1.1, new participants are not allowed once coordinator has responses from all volatile 2PC participants.
0153: */
0154: private boolean allowNewParticipants = true;
0155:
0156: // associated JTA transaction
0157: // since JTA provides no portable way to look up a transaction by id, cache Java Transaction with
0158: // ATCoordinator.
0159: protected Transaction transaction = null;
0160:
0161: /**
0162: * Construct a new Coordinator object from the specified context and soap request.
0163: * <p/>
0164: * This method is an entry point for the Activation service's createCoordinationContext
0165: * operation. This entry point probably won't be used much, or not at all if we choose
0166: * not to publish the operation (which is optional in the WS-Coordination spec).
0167: *
0168: * @param context The coordination context
0169: * @param request The soap request
0170: */
0171: public ATCoordinator(CoordinationContextInterface context,
0172: CreateCoordinationContextType request) {
0173: super (context, request);
0174: }
0175:
0176: /**
0177: * Construct a new Coordinator object from the specified context.
0178: * <p/>
0179: * This constructor will be the main entry point for activity within the
0180: * AppServer.
0181: *
0182: * @param context The coordination context
0183: */
0184: public ATCoordinator(CoordinationContextInterface context) {
0185: super (context);
0186: }
0187:
0188: /**
0189: * Set once field.
0190: */
0191: public void setTransaction(final Transaction txn) {
0192: transaction = txn;
0193: if (txn == null) {
0194: return;
0195: }
0196:
0197: try {
0198: if (!this .isSubordinateCoordinator()) {
0199: // see #beforeCompletion and #afterCompletion for what this does.
0200: // NEVER to be used for subordinate coordiator.
0201: registerSynchronization();
0202: }
0203:
0204: // MUST register synchronization BEFORE next line that
0205: // causes local transaction to upgrade to JTS txn in glassfish.
0206: // (Otherwise registerSynchronization with local txn even though JTS transaction exists.
0207: // Bug appears as beforeCompletion and afterCompletion never get called due to
0208: // mis-registration.)
0209: registerWithDurableParent();
0210: } catch (SystemException ex) {
0211: logger.severe("setTransaction", LocalizationMessages
0212: .XA_REGISTER_0004(ex.getLocalizedMessage()));
0213: // TODO: link and rethrow
0214: } catch (IllegalStateException ex) {
0215: logger.severe("setTransaction", LocalizationMessages
0216: .XA_REGISTER_0004(ex.getLocalizedMessage()));
0217: // TODO: link and rethrow
0218: } catch (RollbackException ex) {
0219: logger.severe("setTransaction", LocalizationMessages
0220: .XA_REGISTER_0004(ex.getLocalizedMessage()));
0221: // TODO: link and rethrow
0222: }
0223: }
0224:
0225: public Transaction getTransaction() {
0226: return transaction;
0227: }
0228:
0229: void setAborting() {
0230: volatileParticipantsState = ABORTING;
0231: durableParticipantsState = ABORTING;
0232: }
0233:
0234: boolean isAborting() {
0235: return volatileParticipantsState == ABORTING
0236: || durableParticipantsState == ABORTING;
0237: }
0238:
0239: /**
0240: * Get the list of {@link com.sun.xml.ws.tx.coordinator.Registrant}s for this coordinated activity.
0241: * <p/>
0242: * The returned list is unmodifiable (read-only). Add new Registrants
0243: * with the {@link #addRegistrant(com.sun.xml.ws.tx.coordinator.Registrant, javax.xml.ws.WebServiceContext)} api instead.
0244: * <p/>
0245: *
0246: * @return the list of Registrant objects
0247: */
0248: public List<Registrant> getRegistrants() {
0249: final ArrayList<Registrant> list;
0250: if (completionRegistrant != null) {
0251: list = new ArrayList<Registrant>(volatileParticipants
0252: .size()
0253: + durableParticipants.size() + 1);
0254: } else {
0255: list = new ArrayList<Registrant>(volatileParticipants
0256: .size()
0257: + durableParticipants.size());
0258: }
0259: list.addAll(volatileParticipants.values());
0260: list.addAll(durableParticipants.values());
0261: if (completionRegistrant != null) {
0262: list.add(completionRegistrant);
0263: }
0264: return Collections.unmodifiableList(list);
0265: }
0266:
0267: protected void registerWithVolatileParent() {
0268: registerSynchronization();
0269: }
0270:
0271: /**
0272: * Enlist with parent of ATCoordinator which is JTA transaction manager.
0273: */
0274: protected boolean registerWithDurableParent()
0275: throws RollbackException, SystemException {
0276: boolean result = false;
0277: if (getTransaction() != null) {
0278: result = getTransaction().enlistResource(this );
0279: }
0280: return result;
0281: }
0282:
0283: /**
0284: * Add the specified Registrant to the list of registrants for this
0285: * coordinated activity.
0286: *
0287: * @param registrant The {@link Registrant}
0288: */
0289: @Override
0290: public void addRegistrant(final Registrant registrant,
0291: final WebServiceContext wsContext) {
0292: if (!allowNewParticipants) {
0293: // send fault S4.1 ws:coor Invalid State
0294: if (wsContext != null) {
0295: WsaHelper
0296: .sendFault(
0297: wsContext,
0298: SOAPVersion.SOAP_11,
0299: TxFault.InvalidState,
0300: "Invalid to register a new participant after the first durable participant is prepared. Registrant id: "
0301: + // no I18N - spec requires xml:lang="en"
0302: registrant.getIdValue());
0303: }
0304: throw new IllegalStateException(LocalizationMessages
0305: .LATE_PARTICIPANT_REGISTRATION_0002());
0306: }
0307: // TODO: check for duplicate registration and send fault S4.6 ws:coor Already Registered
0308: switch (registrant.getProtocol()) {
0309: case COMPLETION:
0310: // Unimplemented OPTIONAL functionality.
0311: // TODO: do we need to see if this field is already set?
0312: // TODO: disallow if subordinate coordinator
0313: completionRegistrant = (ATCompletion) registrant;
0314: break;
0315:
0316: case DURABLE:
0317: logger.fine("ATCoordinator.addRegistrant",
0318: getCoordIdPartId(registrant));
0319: durableParticipants.put(registrant.getIdValue(),
0320: (ATParticipant) registrant);
0321: break;
0322:
0323: case VOLATILE:
0324: volatileParticipants.put(registrant.getIdValue(),
0325: (ATParticipant) registrant);
0326: break;
0327:
0328: default:
0329: throw new UnsupportedOperationException(
0330: LocalizationMessages
0331: .UNKNOWN_PROTOCOL_0003((registrant
0332: .getProtocol().getUri())));
0333: }
0334: }
0335:
0336: /**
0337: * Get the registrant with the specified id or null if it does not exist.
0338: *
0339: * @param id the registrant id
0340: * @return the Registrant object or null if the id does not exist
0341: */
0342: public Registrant getRegistrant(final String id) {
0343: Registrant r = volatileParticipants.get(id);
0344:
0345: if (r == null) {
0346: r = durableParticipants.get(id);
0347: }
0348:
0349: if ((r == null) && (completionRegistrant != null)
0350: && (completionRegistrant.getId().getValue().equals(id))) {
0351: r = completionRegistrant;
0352: }
0353:
0354: return r;
0355: }
0356:
0357: public void removeRegistrant(final String id) {
0358: forget(id);
0359: }
0360:
0361: /**
0362: * Return a Collection of volatile 2pc participants.
0363: * <p/>
0364: * This Collection is modifiable.
0365: *
0366: * @return A modifiable Collection of the volatile 2pc participants
0367: */
0368: public Collection<ATParticipant> getVolatileParticipants() {
0369: return volatileParticipants.values();
0370: }
0371:
0372: public Collection<ATParticipant> getVolatileParticipantsSnapshot() {
0373: return new ArrayList<ATParticipant>(volatileParticipants
0374: .values());
0375: }
0376:
0377: /**
0378: * Return a Collection of durable 2pc participants.
0379: * <p/>
0380: * This Collection is modifiable.
0381: *
0382: * @return A modifiable Collection of the durable 2pc participants
0383: */
0384: public Collection<ATParticipant> getDurableParticipants() {
0385: return durableParticipants.values();
0386: }
0387:
0388: public Collection<ATParticipant> getDurableParticipantsSnapshot() {
0389: return new ArrayList<ATParticipant>(durableParticipants
0390: .values());
0391: }
0392:
0393: /**
0394: * Get the completion registrant.
0395: *
0396: * @return The completion registrant
0397: */
0398: public ATCompletion getCompletionRegistrant() {
0399: return completionRegistrant;
0400: }
0401:
0402: /**
0403: * Send 2PC prepare to all volatile participants
0404: * <p/>
0405: * Volatile 2PC prepare constraint from 2004 WS-AT, section 3.3.1
0406: * the root coordinator begins the prepare phase of all participants registered for the Volatile 2PC protocol.
0407: * All participants registered for this protocol must respond before a Prepare is issued to a
0408: * participant registered for Durable 2PC. Further participants may register with the coordinator until the
0409: * coordinator issues a Prepare to any durable participant.
0410: */
0411: public void initiateVolatilePrepare() {
0412: // send prepare to all volatile participants before durable participants.
0413: // Section 3.3.1
0414: if (isAborting()) {
0415: initiateRollback();
0416: return;
0417: }
0418: volatileParticipantsState = PREPARING;
0419: actionForAllParticipants(getVolatileParticipantsSnapshot(),
0420: ACTION.PREPARE);
0421: }
0422:
0423: private void actionForAllParticipants(
0424: final Collection<ATParticipant> particpants,
0425: final ACTION action) {
0426: for (ATParticipant participant : particpants) {
0427: switch (action) {
0428: case PREPARE:
0429: try {
0430: participant.prepare();
0431: } catch (TXException ex) {
0432: setAborting();
0433: return;
0434: }
0435: break;
0436:
0437: case COMMIT:
0438: try {
0439: participant.commit();
0440: } catch (TXException ex) {
0441: setAborting();
0442: return;
0443: }
0444: break;
0445:
0446: case ROLLBACK:
0447: participant.abort();
0448: break;
0449:
0450: default:
0451: break;
0452: }
0453: }
0454: }
0455:
0456: /**
0457: * Wait for all volatile participants to respond to prepare.
0458: * <p/>
0459: * <p/>
0460: * Volatile participant state is set before this method returns.
0461: */
0462: protected void waitForVolatilePrepareResponse() {
0463: final String METHOD_NAME = "waitForVolatilePrepareResponse";
0464: final int numParticipants = getVolatileParticipants().size();
0465: if (volatileParticipantsState == PREPARED_SUCCESS
0466: || numParticipants == 0) {
0467: if (logger.isLogging(Level.FINER)) {
0468: logger.exiting(METHOD_NAME, "prepared coordId="
0469: + getIdValue() + " state="
0470: + volatileParticipantsState
0471: + " numParticipants=" + numParticipants);
0472: }
0473: return;
0474: }
0475:
0476: boolean communicationTimeout = false; // TODO: resend prepare due to communication timeout. Assume msg lost.
0477: boolean allPrepared;
0478: for (int i = 0; i < MAX_WAIT_ITERATION; i++) {
0479: allPrepared = true; // assume all prepared until encounter participant is not
0480: final Iterator<ATParticipant> iter = getVolatileParticipantsSnapshot()
0481: .iterator();
0482: while (iter.hasNext()) {
0483: final ATParticipant participant = iter.next();
0484: if (isAborting()) {
0485: return;
0486: }
0487: switch (participant.getState()) {
0488: case ACTIVE:
0489: // accomodate late registration: volatile 2PC prepare can register new volatile or durable participant.
0490: allPrepared = false;
0491: try {
0492: participant.prepare();
0493: } catch (TXException ex) {
0494: logger
0495: .warning(
0496: METHOD_NAME,
0497: LocalizationMessages
0498: .CAUGHT_TX_EX_DURING_PREPARE_0005(ex
0499: .getLocalizedMessage()));
0500: setAborting();
0501: }
0502: break;
0503:
0504: case PREPARING:
0505: allPrepared = false;
0506: if (communicationTimeout) {
0507: try {
0508: participant.prepare();
0509: } catch (TXException ex) {
0510: logger
0511: .warning(
0512: METHOD_NAME,
0513: LocalizationMessages
0514: .CAUGHT_TX_EX_DURING_PREPARE_0005(ex
0515: .getLocalizedMessage()));
0516: setAborting();
0517: }
0518: }
0519: break;
0520:
0521: case ABORTING:
0522: setAborting();
0523: return;
0524:
0525: // these states indicate a response to prepare request
0526: case PREPARED:
0527: case PREPARED_SUCCESS:
0528: case COMMITTING:
0529: break;
0530:
0531: case NONE:
0532: case COMMITTED:
0533: case ABORTED:
0534: case READONLY:
0535: forget(participant);
0536: break;
0537: }
0538: }
0539: if (isAborting()) {
0540: return;
0541: } else if (allPrepared) {
0542: volatileParticipantsState = PREPARED_SUCCESS;
0543: if (logger.isLogging(Level.FINER)) {
0544: logger.exiting(METHOD_NAME, "prepared coordId="
0545: + getIdValue() + " state="
0546: + volatileParticipantsState);
0547: }
0548: return;
0549: } else { //wait some before checking again
0550: try {
0551: if (logger.isLogging(Level.FINEST)) {
0552: logger.finest(METHOD_NAME, "checking...");
0553: }
0554: Thread.sleep(WAIT_SLEEP);
0555: } catch (InterruptedException ex) {
0556: logger.warning(METHOD_NAME, ex
0557: .getLocalizedMessage());
0558: }
0559: }
0560: }
0561: if (logger.isLogging(Level.FINE)) {
0562: dumpParticipantsState(getVolatileParticipantsSnapshot(),
0563: KIND.VOLATILE);
0564: }
0565: setAborting();
0566: if (logger.isLogging(Level.FINER)) {
0567: logger.warning(METHOD_NAME, LocalizationMessages
0568: .TIMEOUT_0006(getIdValue(),
0569: volatileParticipantsState));
0570: }
0571: }
0572:
0573: /**
0574: * TODO: Each PREPARED/READONLY Volatile ATParticipant should check if it is time to start
0575: * the durable 2PC phase by calling this method.
0576: */
0577: public void initiateDurablePrepare() {
0578: final Collection<ATParticipant> ps = getDurableParticipants();
0579: final int numParticipants = ps == null ? 0 : ps.size();
0580: if (logger.isLogging(Level.FINEST)) {
0581: logger.finest("initializeDurablePrepare", " coordId="
0582: + getIdValue() + " numDurableParticipants="
0583: + numParticipants + " volatile participant state="
0584: + volatileParticipantsState
0585: + " numVolatileParticipants"
0586: + getVolatileParticipants().size());
0587: }
0588:
0589: // PRE-CONDITION: volatileParticipantState is PREPARED
0590: if (isAborting()) {
0591: initiateRollback();
0592: return;
0593: }
0594:
0595: assert volatileParticipantsState == PREPARED_SUCCESS
0596: || getVolatileParticipants().size() == 0;
0597:
0598: // No outstanding volatile participants at this point.
0599:
0600: // No new participants allowed as soon as durable 2PC begins.
0601: allowNewParticipants = false;
0602: durableParticipantsState = PREPARING;
0603: actionForAllParticipants(getDurableParticipantsSnapshot(),
0604: ACTION.PREPARE);
0605: }
0606:
0607: /**
0608: * Wait for all Durable participants to respond to prepare.
0609: * <p/>
0610: * <p/>
0611: * Durable participant state is set before this method returns.
0612: */
0613: protected void waitForDurablePrepareResponse() {
0614: // TODO: implement logic to resend prepare due to communication timeout.
0615: // Assumes prepare request was lost on way to participant OR the participant's response was lost/delayed.
0616: // boolean communicationTimeout = false;
0617:
0618: boolean allPrepared = false;
0619: for (int i = 0; i < MAX_WAIT_ITERATION; i++) {
0620: if (isAborting()) {
0621: break;
0622: }
0623: allPrepared = true; // assume true until find at least one participant that is not prepared yet.
0624: for (ATParticipant participant : getDurableParticipantsSnapshot()) {
0625: switch (participant.getState()) {
0626: case PREPARING:
0627: allPrepared = false;
0628: if (logger.isLogging(Level.FINEST)) {
0629: logger.finest("intitatedurableParticipant",
0630: "not prepared, readonly or aborted "
0631: + getCoordIdPartId(participant)
0632: + " state="
0633: + participant.getState());
0634: }
0635: break;
0636:
0637: case ACTIVE:
0638: case NONE:
0639: logger
0640: .warning(
0641: "waitForDurablePrepareResponse",
0642: LocalizationMessages
0643: .INITIATE_ROLLBACK_0007(
0644: this
0645: .getCoordIdPartId(participant),
0646: participant
0647: .getState()));
0648: allPrepared = false;
0649: setAborting();
0650:
0651: // TODO: throw illegal state exception
0652: assert false;
0653:
0654: case ABORTING:
0655: setAborting();
0656: return;
0657:
0658: // these states indicate a response to prepare request
0659: case PREPARED:
0660: case PREPARED_SUCCESS:
0661: case COMMITTING:
0662: break;
0663:
0664: case ABORTED:
0665: setAborting();
0666: participant.forget();
0667: break;
0668:
0669: case COMMITTED:
0670: case READONLY:
0671: participant.forget();
0672: break;
0673: }
0674: }
0675: if (isAborting()) {
0676: return;
0677: } else if (allPrepared) {
0678: durableParticipantsState = PREPARED_SUCCESS;
0679: if (logger.isLogging(Level.FINER)) {
0680: logger.exiting("waitForDurablePrepare", "coordId="
0681: + getIdValue() + "state:"
0682: + durableParticipantsState);
0683: }
0684: return;
0685: } else { //wait some before checking again
0686: try {
0687: if (logger.isLogging(Level.FINEST)) {
0688: logger.finest("waitForDurablePrepare",
0689: "checking...");
0690: }
0691: Thread.sleep(WAIT_SLEEP);
0692: } catch (InterruptedException ex) {
0693: }
0694: }
0695: }
0696: if (logger.isLogging(Level.FINE)) {
0697: dumpParticipantsState(getDurableParticipantsSnapshot(),
0698: KIND.DURABLE);
0699: }
0700: // some participants not prepared still, timing out
0701: setAborting();
0702: logger.warning("waitForDurablePrepare", LocalizationMessages
0703: .TIMEOUT_0006(getIdValue(), durableParticipantsState));
0704: }
0705:
0706: private void dumpParticipantsState(
0707: final Collection<ATParticipant> lst, final KIND kind) {
0708: final StringBuffer str = new StringBuffer(100);
0709: str.append(" " + kind.toString() + " ");
0710: for (ATParticipant p : lst) {
0711: str.append("Part: " + p.getIdValue() + " state:"
0712: + p.getState());
0713: }
0714: if (logger.isLogging(Level.FINE)) {
0715: logger.fine("dumpParticipantState", "coordId="
0716: + getIdValue() + str);
0717: }
0718: }
0719:
0720: public void initiateCommit() {
0721: initiateVolatileCommit();
0722: initiateDurableCommit();
0723: }
0724:
0725: public void initiateDurableCommit() {
0726: // assert all participants must be in PREPARED, PREPARED_SUCCESS, COMMITTING, READONLY
0727:
0728: // PRE-CONDITION: durableParticipantState is PREPARED
0729: if (isAborting()) {
0730: initiateRollback();
0731: return;
0732: }
0733: if (durableParticipantsState != PREPARED_SUCCESS) {
0734: logger
0735: .warning(
0736: "durableVolatileCommit",
0737: LocalizationMessages
0738: .UNEXPECTED_STATE_0008(durableParticipantsState));
0739: }
0740: durableParticipantsState = COMMITTING;
0741: guardTimeout = true;
0742: actionForAllParticipants(getDurableParticipantsSnapshot(),
0743: ACTION.COMMIT);
0744: }
0745:
0746: public void initiateVolatileCommit() {
0747: // assert all participants must be in PREPARED, PREPARED_SUCCESS, COMMITTING, READONLY
0748:
0749: // PRE-CONDITION: durableParticipantState is PREPARED
0750: if (isAborting()) {
0751: initiateRollback();
0752: return;
0753: }
0754: if (volatileParticipantsState != PREPARED_SUCCESS
0755: && getVolatileParticipants().size() != 0) {
0756: logger
0757: .warning(
0758: "initateVolatileCommit",
0759: LocalizationMessages
0760: .UNEXPECTED_STATE_0008(volatileParticipantsState));
0761: }
0762:
0763: // No new participants allowed as soon as durable 2PC begins.
0764: volatileParticipantsState = COMMITTING;
0765: actionForAllParticipants(getVolatileParticipantsSnapshot(),
0766: ACTION.COMMIT);
0767: }
0768:
0769: public void initiateRollback() {
0770: initiateVolatileRollback();
0771: initiateDurableRollback();
0772: }
0773:
0774: public void initiateDurableRollback() {
0775: durableParticipantsState = ABORTING;
0776: for (ATParticipant durableP : getDurableParticipantsSnapshot()) {
0777: durableP.abort();
0778: }
0779: }
0780:
0781: public void initiateVolatileRollback() {
0782: volatileParticipantsState = ABORTING;
0783: for (ATParticipant volatileP : getVolatileParticipantsSnapshot()) {
0784: volatileP.abort();
0785: }
0786: }
0787:
0788: /**
0789: * Register this with TransactionSynchronizationRegistery. This should get called by JTS
0790: * transaction system before 2PC Participants and XAResources are prepared.
0791: */
0792: public void beforeCompletion() {
0793: logger.finest("beforeCompletion",
0794: "beforeCompletion called for coordId=" + getIdValue());
0795: if (volatileParticipants.size() != 0) {
0796: initiateVolatilePrepare();
0797: waitForVolatilePrepareResponse();
0798: }
0799: }
0800:
0801: public void afterCompletion(final int i) {
0802: logger.finest("afterCompletion",
0803: "afterCompletion called for coordId=" + getIdValue());
0804: forget();
0805: }
0806:
0807: protected void waitForCommitOrRollbackResponse(
0808: final Protocol protocol) {
0809: // all participants have been committed or rolled back.
0810: // wait for all outstanding participants to send final notification of wsat COMMITTED or ABORTED.
0811: // boolean communicationTimeout = false; // TODO: resend prepare due to communication timeout. Assume msg lost.
0812: boolean allProcessed;
0813:
0814: for (int i = 0; i < MAX_WAIT_ITERATION; i++) {
0815: allProcessed = true; // assume all committed/aborted until encounter participant is not.
0816: if (protocol == Protocol.DURABLE) {
0817:
0818: for (ATParticipant participant : getDurableParticipantsSnapshot()) {
0819: if (participant.getState() == COMMITTED
0820: || participant.getState() == ABORTED
0821: || participant.getState() == READONLY) {
0822: participant.forget();
0823: } else {
0824: allProcessed = false;
0825: if (logger.isLogging(Level.FINEST)) {
0826: logger.finest("waitForCommitRollback",
0827: getCoordIdPartId(participant)
0828: + " state:"
0829: + participant.getState());
0830: }
0831: /* Don't retry aggressively. Have to put communication timeout to retry.
0832: if (isAborting()){
0833: participant.abort();
0834: } else {
0835: try {
0836: participant.commit();
0837: } catch (TXException ex) {
0838: logger.warning("waitForCommitOrRollbackResponse", ex.getLocalizedMessage());
0839: }
0840: }
0841: **/
0842: }
0843: }
0844: } else if (protocol == Protocol.VOLATILE) {
0845: // best effort to receive committed/aborted from volatile participants. But do not wait for them to send committed.
0846: allProcessed = true; // assume all committed/aborted until encounter participant is not.
0847: for (ATParticipant participant : getVolatileParticipantsSnapshot()) {
0848: if (participant.getState() != COMMITTED
0849: && participant.getState() != ABORTED
0850: || participant.getState() != READONLY) {
0851: logger.warning(
0852: "waitForCommitOrRollbackResponse",
0853: LocalizationMessages.FORGETTING_0009(
0854: participant.getState(),
0855: getCoordIdPartId(participant)));
0856: participant.forget();
0857: }
0858: }
0859: }
0860: if (allProcessed) {
0861: guardTimeout = false;
0862: if (logger.isLogging(Level.FINER)) {
0863: logger.exiting("waitForCommitRollback", "coordId="
0864: + getIdValue());
0865: }
0866: return;
0867: } else { //wait some before checking again
0868: try {
0869: if (logger.isLogging(Level.FINEST)) {
0870: logger.finest("waitForCommitRollback",
0871: "checking...");
0872: }
0873: Thread.sleep(WAIT_SLEEP);
0874: } catch (InterruptedException ex) {
0875: }
0876: }
0877: }
0878: }
0879:
0880: /**
0881: * Synchronous prepare request invoked by JTS coordinator as part of its 2PC protocol.
0882: * <p/>
0883: * <p>Prepare this coordinator and return result of preparation.
0884: */
0885: public int prepare(final Xid xid) throws XAException {
0886: if (logger.isLogging(Level.FINER)) {
0887: logger.entering("XAResource_prepare(xid=" + xid + ")");
0888: }
0889: int result = 0;
0890:
0891: synchronized (this ) {
0892: initiateDurablePrepare();
0893:
0894: // Map asynchonous WS-AT 2PC protocol to XAResource synchronous protocol.
0895: // Wait for all possible pending responses to prepare message.
0896: waitForDurablePrepareResponse(); // result in durableParticipantsState: PREPARED, COMMITTED, ABORTING
0897: }
0898:
0899: // check if volatile or durable WS-AT participants aborted
0900: if (isAborting()) {
0901: // TODO: be more specific on XAException error code for why rollback occurred. Using generic code now.
0902: throw new XAException(XAException.XA_RBROLLBACK);
0903: } else if (getDurableParticipants().size() == 0
0904: && this .getVolatileParticipants().size() == 0) {
0905: result = XAResource.XA_RDONLY;
0906: } else {
0907: result = XAResource.XA_OK;
0908: }
0909:
0910: if (logger.isLogging(Level.FINER)) {
0911: logger.exiting("XAResource_prepare", result);
0912: }
0913: return result;
0914: }
0915:
0916: public void commit(final Xid xid, final boolean onePhase)
0917: throws XAException {
0918: if (logger.isLogging(Level.FINER)) {
0919: logger.entering("XAResource_commit(xid=" + xid
0920: + " ,onePhase=" + onePhase + ")");
0921: }
0922:
0923: int result = 0;
0924: if (onePhase) {
0925: // if one phase commit, need to do prepare here
0926: try {
0927: result = prepare(xid);
0928: } catch (XAException e) {
0929: logger.warning("commit(1PC)", LocalizationMessages
0930: .PREPARE_FAILED_0010(e.toString()));
0931: initiateRollback();
0932: waitForCommitOrRollbackResponse(Protocol.DURABLE);
0933: if (logger.isLogging(Level.FINER)) {
0934: logger.exiting("XAResource_commit", e);
0935: }
0936: throw e;
0937: }
0938: }
0939:
0940: // Commit volatile and durable 2PC participants. No ordering required.
0941: if (result != XAResource.XA_RDONLY) {
0942: initiateCommit();
0943: waitForCommitOrRollbackResponse(Protocol.DURABLE);
0944: waitForCommitOrRollbackResponse(Protocol.VOLATILE);
0945: }
0946:
0947: if (logger.isLogging(Level.FINER)) {
0948: logger.exiting("XAResource_commit");
0949: }
0950: }
0951:
0952: public void rollback(final Xid xid) throws XAException {
0953: if (logger.isLogging(Level.FINER)) {
0954: logger.entering("XAResource_rollback(xid=" + xid + ")");
0955: }
0956: // Commit volatile and durable 2PC participants. No ordering required.
0957: initiateRollback();
0958: waitForCommitOrRollbackResponse(Protocol.DURABLE);
0959: waitForCommitOrRollbackResponse(Protocol.VOLATILE);
0960: guardTimeout = false;
0961: if (logger.isLogging(Level.FINER)) {
0962: logger.exiting("XAResource_rollback");
0963: }
0964: }
0965:
0966: public Xid[] recover(final int i) throws XAException {
0967: throw new UnsupportedOperationException("Not yet implemented");
0968: }
0969:
0970: public boolean setTransactionTimeout(final int i)
0971: throws XAException {
0972: setExpires(i * 1000L);
0973: return true;
0974: }
0975:
0976: public void start(final Xid xid, final int flags)
0977: throws XAException {
0978: // Start transaction hook
0979: }
0980:
0981: /**
0982: */
0983: public void end(final Xid xid, final int flags) throws XAException {
0984: switch (flags) {
0985: case TMSUCCESS:
0986: //TODO
0987: break;
0988: case TMSUSPEND:
0989: // TODO
0990: break;
0991: case TMFAIL:
0992: setAborting();
0993: break;
0994: }
0995: }
0996:
0997: /**
0998: * forget everything about this transaction.
0999: * <p/>
1000: * <p/>
1001: * Recovers resources held by a transaction. After a transaction is committed or aborted, it is forgotten.
1002: */
1003: public void forget(final Xid xid) throws XAException {
1004: logger.finest("forget",
1005: "XAResource.forget(XID) called for coordId="
1006: + getIdValue());
1007: forget();
1008: }
1009:
1010: public int getTransactionTimeout() throws XAException {
1011: return (int) (getExpires() / 1000L);
1012: }
1013:
1014: public boolean isSameRM(final XAResource xAResource)
1015: throws XAException {
1016: return false;
1017: }
1018:
1019: public void prepared(final String participantId) {
1020: prepared(participantId, null);
1021: }
1022:
1023: public void prepared(final String participantId,
1024: final EndpointReference unknownParticipantReplyEPR) {
1025: if (logger.isLogging(Level.FINER)) {
1026: logger
1027: .entering("prepared",
1028: getCoordIdPartId(participantId));
1029: }
1030: final ATParticipant participant = (ATParticipant) getRegistrant(participantId);
1031: if (participant == null) {
1032: if (unknownParticipantReplyEPR != null) {
1033: logger.warning("prepared", LocalizationMessages
1034: .UNKNOWN_CORD_OR_PART_0011(
1035: getCoordIdPartId(participantId),
1036: unknownParticipantReplyEPR));
1037:
1038: ATParticipant.getATParticipantWS(
1039: unknownParticipantReplyEPR, null, false)
1040: .rollbackOperation(null);
1041: }
1042: } else {
1043: participant.prepared();
1044: }
1045: if (logger.isLogging(Level.FINER)) {
1046: logger.exiting("prepared", getCoordIdPartId(participantId));
1047: }
1048: }
1049:
1050: public void committed(final String participantId) {
1051: if (logger.isLogging(Level.FINER)) {
1052: logger.entering("committed",
1053: getCoordIdPartId(participantId));
1054: }
1055: final ATParticipant participant = (ATParticipant) getRegistrant(participantId);
1056: if (participant != null) {
1057: participant.committed();
1058: participant.forget();
1059: } else {
1060: logger
1061: .warning(
1062: "committed",
1063: LocalizationMessages
1064: .UNKNOWN_PART_0012(getCoordIdPartId(participantId)));
1065: }
1066: if (logger.isLogging(Level.FINER)) {
1067: logger
1068: .exiting("committed",
1069: getCoordIdPartId(participantId));
1070: }
1071: }
1072:
1073: public void readonly(final String participantId) {
1074: if (logger.isLogging(Level.FINER)) {
1075: logger
1076: .entering("readonly",
1077: getCoordIdPartId(participantId));
1078: }
1079: final ATParticipant participant = (ATParticipant) getRegistrant(participantId);
1080: if (participant == null) {
1081: logger
1082: .warning(
1083: "readonly",
1084: LocalizationMessages
1085: .UNKNOWN_PART_0012(getCoordIdPartId(participantId)));
1086: } else {
1087: participant.readonly();
1088: participant.forget();
1089: if (logger.isLogging(Level.FINER)) {
1090: logger.exiting("readonly",
1091: getCoordIdPartId(participantId));
1092: }
1093: }
1094: }
1095:
1096: public void aborted(final String participantId) {
1097: if (logger.isLogging(Level.FINER)) {
1098: logger.entering("aborted", getCoordIdPartId(participantId));
1099: }
1100: setAborting();
1101: final ATParticipant participant = (ATParticipant) getRegistrant(participantId);
1102: if (participant == null) {
1103: if (logger.isLogging(Level.WARNING)) {
1104: logger
1105: .warning(
1106: "aborted",
1107: LocalizationMessages
1108: .UNKNOWN_PART_0012(getCoordIdPartId(participantId)));
1109: }
1110: } else {
1111: participant.aborted();
1112: participant.forget();
1113: if (logger.isLogging(Level.FINER)) {
1114: logger.exiting("aborted",
1115: getCoordIdPartId(participantId));
1116: }
1117: }
1118: }
1119:
1120: /**
1121: * Implement inbound event <i>replay</i> for Atomic Transaction 2PC Protocol(Coordinator View).
1122: */
1123: public void replay(final String participantId) {
1124: final String METHOD_NAME = "replay";
1125: if (logger.isLogging(Level.FINER)) {
1126: logger.entering(METHOD_NAME,
1127: getCoordIdPartId(participantId));
1128: }
1129:
1130: final ATParticipant participant = (ATParticipant) getRegistrant(participantId);
1131: final ATParticipant.STATE state = participant.getState();
1132: switch (state) {
1133: case NONE:
1134: if (participant.isDurable()) {
1135: participant.abort();
1136: } else { // participant.isVolatile()
1137: // TODO: Invalid State. Send back an invalid state fault.
1138: logger
1139: .severe(
1140: METHOD_NAME,
1141: LocalizationMessages
1142: .INVALID_STATE_0013(getCoordIdPartId(participantId)));
1143: }
1144: break;
1145:
1146: case ACTIVE:
1147: case PREPARING:
1148: case ABORTING:
1149: participant.abort();
1150: break;
1151:
1152: case COMMITTING:
1153: try {
1154: participant.commit();
1155: } catch (TXException ex) {
1156: logger.warning(METHOD_NAME, ex.getLocalizedMessage());
1157: }
1158: break;
1159: case PREPARED:
1160: case PREPARED_SUCCESS:
1161: // nothing to do for all other cases.
1162: }
1163: if (logger.isLogging(Level.FINER)) {
1164: logger
1165: .exiting(METHOD_NAME,
1166: getCoordIdPartId(participantId));
1167: }
1168: }
1169:
1170: volatile private boolean registeredSynchronization = false;
1171:
1172: /**
1173: * Register interposed synchronization for this instance.
1174: * <p/>
1175: * Initial volatile participant registration triggers this registration.
1176: */
1177: private void registerSynchronization() {
1178: if (!registeredSynchronization) {
1179: registeredSynchronization = true;
1180: TransactionManagerImpl.getInstance()
1181: .registerSynchronization(this );
1182: if (logger.isLogging(Level.FINEST)) {
1183: logger.finest("registerSynchronization",
1184: "Synchronization registered for WS-AT coordinated activity "
1185: + this .getIdValue());
1186: }
1187: }
1188: }
1189:
1190: public boolean isSubordinateCoordinator() {
1191: return false;
1192: }
1193:
1194: public EndpointReference getParentCoordinatorRegistrationEPR() {
1195: if (getContext() == null) {
1196: return null;
1197: } else {
1198: return getContext().getRootRegistrationService();
1199: }
1200: }
1201:
1202: static private WSATCoordinator wsatCoordinatorService = new WSATCoordinator();
1203:
1204: public static WSATCoordinator getWSATCoordinatorService() {
1205: return wsatCoordinatorService;
1206: }
1207:
1208: protected String getCoordIdPartId(final Registrant registrant) {
1209: return getCoordIdPartId(registrant.getIdValue());
1210: }
1211:
1212: protected String getCoordIdPartId(final String participantId) {
1213: return " coordId=" + getIdValue() + " partId:" + participantId
1214: + " ";
1215: }
1216:
1217: public void forget(final ATParticipant part) {
1218: forget(part.getIdValue());
1219: }
1220:
1221: public void forget(final String partId) {
1222: ATParticipant removed = volatileParticipants.remove(partId);
1223: if (removed != null) {
1224: if (logger.isLogging(Level.FINE)) {
1225: logger.fine("forget", "forgot volatile participant "
1226: + getCoordIdPartId(partId));
1227: }
1228: if (!hasOutstandingParticipants()) {
1229: forget();
1230: }
1231: return;
1232: }
1233: removed = durableParticipants.remove(partId);
1234: if (removed != null) {
1235: if (logger.isLogging(Level.FINE)) {
1236: logger.fine("forget", "forgot durable participant "
1237: + getCoordIdPartId(partId));
1238: }
1239: if (!hasOutstandingParticipants()) {
1240: forget();
1241: }
1242: return;
1243: }
1244: /*
1245: * TODO: implement if optional completion is ever supported.
1246: if ((completionRegistrant != null) && (completionRegistrant.getId().equals(id))) {
1247: r = completionRegistrant;
1248: }
1249: */
1250: }
1251:
1252: static public final EndpointReference localCoordinatorProtocolService;
1253:
1254: static {
1255: MemberSubmissionEndpointReference epr = new MemberSubmissionEndpointReference();
1256: epr.addr = new MemberSubmissionEndpointReference.Address();
1257: epr.addr.uri = localCoordinationProtocolServiceURI.toString();
1258: localCoordinatorProtocolService = epr;
1259: }
1260:
1261: public EndpointReference getCoordinatorProtocolServiceForRegistrant(
1262: final Registrant r) {
1263: return localCoordinatorProtocolService;
1264: }
1265:
1266: /**
1267: * Return false if it is okay to rollback the transaction.
1268: * Do not allow transaction expiration after Phase 2 begins.
1269: */
1270: public boolean expirationGuard() {
1271: synchronized (this ) {
1272: return guardTimeout;
1273: }
1274: }
1275:
1276: @Override
1277: public void expire() {
1278: if (!expirationGuard()) {
1279: setAborting();
1280: }
1281: super .expire();
1282: }
1283:
1284: @Override
1285: public void forget() {
1286: synchronized (this ) {
1287: if (forgotten) {
1288: return;
1289: } else {
1290: forgotten = true;
1291: }
1292: for (ATParticipant participant : getDurableParticipantsSnapshot()) {
1293: participant.forget();
1294: }
1295: for (ATParticipant participant : getVolatileParticipantsSnapshot()) {
1296: participant.forget();
1297: }
1298: super .forget();
1299: }
1300: }
1301:
1302: public void resumeTransaction() throws WebServiceException {
1303: if (transaction != null) {
1304: try {
1305: tm.resume(transaction);
1306: logger.finest("resumeTransaction",
1307: "successfully resumed txn " + transaction);
1308: } catch (Exception ex) {
1309: String handlerMsg = LocalizationMessages
1310: .TXN_MGR_RESUME_FAILED_0032(transaction
1311: .toString());
1312: logger.warning("resumeTransaction", handlerMsg, ex);
1313: throw new WebServiceException(handlerMsg, ex);
1314: }
1315: }
1316: }
1317:
1318: public Transaction suspendTransaction() {
1319: Transaction tx = null;
1320: try {
1321: tx = tm.suspend();
1322: logger.finest("suspendTransation",
1323: tx == null ? "no txn to suspend" : "suspended txn "
1324: + tx.toString());
1325: return tx;
1326: } catch (SystemException ex) {
1327: String handlerMsg = LocalizationMessages
1328: .TXN_MGR_OPERATION_FAILED_0031("suspend");
1329: logger.warning("suspendTransaction", handlerMsg, ex);
1330: return tx;
1331: }
1332: }
1333:
1334: public boolean hasOutstandingParticipants() {
1335: return getDurableParticipants().size() != 0
1336: || getVolatileParticipants().size() != 0;
1337: }
1338: }
|