001: /**
002: * Sequoia: Database clustering technology.
003: * Copyright (C) 2002-2004 French National Institute For Research In Computer
004: * Science And Control (INRIA).
005: * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
006: * Copyright (C) 2005-2006 Continuent, Inc.
007: * Contact: sequoia@continuent.org
008: *
009: * Licensed under the Apache License, Version 2.0 (the "License");
010: * you may not use this file except in compliance with the License.
011: * You may obtain a copy of the License at
012: *
013: * http://www.apache.org/licenses/LICENSE-2.0
014: *
015: * Unless required by applicable law or agreed to in writing, software
016: * distributed under the License is distributed on an "AS IS" BASIS,
017: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018: * See the License for the specific language governing permissions and
019: * limitations under the License.
020: *
021: * Initial developer(s): Jean-Bernard van Zuylen.
022: * Contributor(s): Emmanuel Cecchet.
023: */package org.continuent.sequoia.controller.virtualdatabase.protocol;
024:
025: import java.io.Serializable;
026: import java.sql.SQLException;
027: import java.util.LinkedList;
028:
029: import org.continuent.sequoia.common.exceptions.NoMoreBackendException;
030: import org.continuent.sequoia.common.i18n.Translate;
031: import org.continuent.sequoia.controller.loadbalancer.AllBackendsFailedException;
032: import org.continuent.sequoia.controller.requestmanager.TransactionMetaData;
033: import org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager;
034: import org.continuent.sequoia.controller.requests.AbstractRequest;
035: import org.continuent.sequoia.controller.requests.UnknownWriteRequest;
036:
037: /**
038: * Execute a distributed rollback to savepoint
039: *
040: * @author <a href="mailto:jbvanzuylen@transwide.com">Jean-Bernard van Zuylen
041: * </a>
042: * @author <a href="mailto:emmanuel.cecchet@continuent.com">Emmanuel Cecchet
043: * </a>
044: * @version 1.0
045: */
046: public class DistributedRollbackToSavepoint extends
047: DistributedTransactionMarker {
048: private static final long serialVersionUID = -8670132997537808225L;
049:
050: private String savepointName;
051:
052: /**
053: * Creates a new <code>RollbackToSavepoint</code> message
054: *
055: * @param transactionId the transaction identifier
056: * @param savepointName the savepoint name
057: */
058: public DistributedRollbackToSavepoint(long transactionId,
059: String savepointName) {
060: super (transactionId);
061: this .savepointName = savepointName;
062: }
063:
064: /**
065: * @see org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedTransactionMarker#scheduleCommand(org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager)
066: */
067: public Object scheduleCommand(DistributedRequestManager drm)
068: throws SQLException {
069: LinkedList totalOrderQueue = drm.getVirtualDatabase()
070: .getTotalOrderQueue();
071: if (totalOrderQueue != null) {
072: synchronized (totalOrderQueue) {
073: totalOrderQueue.addLast(this );
074: }
075: }
076: return this ;
077: }
078:
079: /**
080: * @see org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedTransactionMarker#executeCommand(org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager)
081: */
082: public Serializable executeCommand(DistributedRequestManager drm)
083: throws SQLException {
084: boolean hasBeenScheduled = false;
085:
086: // Let's find the transaction marker since it will be used even for
087: // logging purposes
088: Long tid = new Long(transactionId);
089: TransactionMetaData tm;
090: try {
091: tm = drm.getTransactionMetaData(tid);
092: } catch (SQLException e) {
093: // Remove the rollback from the total order queue
094: drm.getLoadBalancer()
095: .removeObjectFromAndNotifyTotalOrderQueue(this );
096: throw e;
097: }
098:
099: // Check that a savepoint with given name has been set
100: if (!drm.hasSavepoint(tid, savepointName)) {
101: // Remove the rollback from the total order queue
102: drm.getLoadBalancer()
103: .removeObjectFromAndNotifyTotalOrderQueue(this );
104: addRollbackFailureOnAllBackends(drm, hasBeenScheduled, tm);
105: throw new SQLException(Translate.get(
106: "transaction.savepoint.not.found", new String[] {
107: savepointName,
108: String.valueOf(transactionId) }));
109: }
110:
111: try {
112: // Wait for the scheduler to give us the authorization to execute
113: drm.getScheduler().rollback(tm, savepointName, this );
114: hasBeenScheduled = true;
115:
116: if (drm.getLogger().isDebugEnabled())
117: drm
118: .getLogger()
119: .debug(
120: Translate
121: .get(
122: "transaction.rollbacksavepoint",
123: new String[] {
124: savepointName,
125: String
126: .valueOf(transactionId) }));
127:
128: // Send to load balancer
129: drm.getLoadBalancer()
130: .rollbackToSavepoint(tm, savepointName);
131:
132: // Update recovery log
133: drm.getRecoveryLog().logRequestCompletion(tm.getLogId(),
134: true, 0);
135:
136: // Notify scheduler for completion
137: drm.getScheduler().savepointCompleted(transactionId);
138: } catch (NoMoreBackendException e) {
139: if (drm.getLogger().isDebugEnabled())
140: drm
141: .getLogger()
142: .debug(
143: Translate
144: .get(
145: "virtualdatabase.distributed.rollbacksavepoint.logging.only",
146: new String[] {
147: savepointName,
148: String
149: .valueOf(transactionId) }));
150:
151: addRollbackFailureOnAllBackends(drm, hasBeenScheduled, tm);
152: throw e;
153: } catch (SQLException e) {
154: addRollbackFailureOnAllBackends(drm, hasBeenScheduled, tm);
155: drm
156: .getLogger()
157: .warn(
158: Translate
159: .get("virtualdatabase.distributed.rollbacksavepoint.sqlexception"),
160: e);
161: return e;
162: } catch (RuntimeException re) {
163: addRollbackFailureOnAllBackends(drm, hasBeenScheduled, tm);
164: drm
165: .getLogger()
166: .warn(
167: Translate
168: .get("virtualdatabase.distributed.rollbacksavepoint.exception"),
169: re);
170: throw new SQLException(re.getMessage());
171: } catch (AllBackendsFailedException e) {
172: addRollbackFailureOnAllBackends(drm, hasBeenScheduled, tm);
173: if (drm.getLogger().isDebugEnabled())
174: drm
175: .getLogger()
176: .debug(
177: Translate
178: .get(
179: "virtualdatabase.distributed.rollbacksavepoint.all.backends.locally.failed",
180: new String[] {
181: savepointName,
182: String
183: .valueOf(transactionId) }));
184: return e;
185: } finally {
186: // Remove all the savepoints set after the savepoint we rollback to
187: drm.removeSavepoints(tid, savepointName);
188: }
189: return Boolean.TRUE;
190: }
191:
192: private void addRollbackFailureOnAllBackends(
193: DistributedRequestManager drm, boolean hasBeenScheduled,
194: TransactionMetaData tm) {
195: AbstractRequest request = new UnknownWriteRequest("rollback "
196: + savepointName, false, 0, "\n");
197: request.setTransactionId(transactionId);
198: request.setLogId(tm.getLogId());
199: drm.addFailedOnAllBackends(request, hasBeenScheduled);
200: }
201:
202: /**
203: * Returns the savepointName value.
204: *
205: * @return Returns the savepointName.
206: */
207: public String getSavepointName() {
208: return savepointName;
209: }
210:
211: /**
212: * @see java.lang.Object#equals(java.lang.Object)
213: */
214: public boolean equals(Object obj) {
215: if (super .equals(obj))
216: return savepointName
217: .equals(((DistributedRollbackToSavepoint) obj)
218: .getSavepointName());
219: else
220: return false;
221: }
222:
223: /**
224: * @see java.lang.Object#toString()
225: */
226: public String toString() {
227: return "Rollback transaction " + transactionId
228: + " to savepoint " + savepointName;
229: }
230: }
|