001: /**
002: * Sequoia: Database clustering technology.
003: * Copyright (C) 2006 Continuent, Inc.
004: * Contact: sequoia@continuent.org
005: *
006: * Licensed under the Apache License, Version 2.0 (the "License");
007: * you may not use this file except in compliance with the License.
008: * You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing, software
013: * distributed under the License is distributed on an "AS IS" BASIS,
014: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015: * See the License for the specific language governing permissions and
016: * limitations under the License.
017: *
018: * Initial developer(s): Emmanuel Cecchet.
019: * Contributor(s): Damian Arregui.
020: */package org.continuent.sequoia.controller.virtualdatabase.protocol;
021:
022: import java.io.Serializable;
023: import java.sql.SQLException;
024: import java.util.LinkedList;
025:
026: import org.continuent.hedera.common.Member;
027: import org.continuent.sequoia.common.exceptions.VirtualDatabaseException;
028: import org.continuent.sequoia.common.i18n.Translate;
029: import org.continuent.sequoia.common.log.Trace;
030: import org.continuent.sequoia.controller.scheduler.AbstractScheduler;
031: import org.continuent.sequoia.controller.virtualdatabase.DistributedVirtualDatabase;
032:
033: /**
034: * This class a SuspendActivity message which does the following:
035: * <ol>
036: * <li>Suspend new persistent connections, new transactions and new writes (in
037: * this order)
038: * <li>Wait for completion of current transactions and persistent connections.
039: * </ol>
040: * <p>
041: *
042: * @author <a href="mailto:ralph.hannus@continuent.com">Ralph Hannus</a>
043: * @version 1.0
044: */
045: public class SuspendActivity extends DistributedVirtualDatabaseMessage {
046: private static final long serialVersionUID = -8451114082404567986L;
047:
048: private transient LinkedList totalOrderQueue;
049:
050: /**
051: * Creates a new <code>DisableBackendsAndSetCheckpoint</code> object
052: */
053: public SuspendActivity() {
054: }
055:
056: /**
057: * @see org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedVirtualDatabaseMessage#handleMessageSingleThreaded(org.continuent.sequoia.controller.virtualdatabase.DistributedVirtualDatabase,
058: * org.continuent.hedera.common.Member)
059: */
060: public Object handleMessageSingleThreaded(
061: DistributedVirtualDatabase dvdb, Member sender) {
062: totalOrderQueue = dvdb.getTotalOrderQueue();
063: if (totalOrderQueue == null)
064: return new VirtualDatabaseException(Translate.get(
065: "virtualdatabase.no.total.order.queue", dvdb
066: .getVirtualDatabaseName()));
067:
068: synchronized (totalOrderQueue) {
069: SuspendWritesMessage request = new SuspendWritesMessage(
070: "Suspend activity");
071: totalOrderQueue.addLast(request);
072: return request;
073: }
074: }
075:
076: /**
077: * @see org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedVirtualDatabaseMessage#handleMessageMultiThreaded(org.continuent.sequoia.controller.virtualdatabase.DistributedVirtualDatabase,
078: * org.continuent.hedera.common.Member, java.lang.Object)
079: */
080: public Serializable handleMessageMultiThreaded(
081: DistributedVirtualDatabase dvdb, Member sender,
082: Object handleMessageSingleThreadedResult) {
083: Trace logger = dvdb.getLogger();
084: dvdb.addOngoingActivitySuspension(sender);
085:
086: // Wait for our turn to execute
087: boolean found = dvdb.waitForTotalOrder(
088: handleMessageSingleThreadedResult, false);
089:
090: AbstractScheduler scheduler = dvdb.getRequestManager()
091: .getScheduler();
092:
093: // Suspend new persistent connections
094: scheduler.suspendNewPersistentConnections();
095:
096: // Suspend new transactions and writes
097: scheduler.suspendNewTransactionsAndWrites();
098:
099: // Remove ourselves from the queue to allow others to complete if needed
100: if (!found)
101: logger
102: .error("Suspend activity was not found in total order queue, posting out of order");
103: else
104: synchronized (totalOrderQueue) {
105: totalOrderQueue.removeFirst();
106: totalOrderQueue.notifyAll();
107: }
108:
109: // Wait for writes to complete
110: try {
111: scheduler.waitForSuspendedWritesToComplete();
112: } catch (SQLException e) {
113: dvdb.getLogger().error(
114: "Failed to wait for writes to complete");
115: return e;
116: }
117:
118: // Wait for transactions to complete
119: try {
120: scheduler.waitForSuspendedTransactionsToComplete();
121: } catch (SQLException e) {
122: dvdb
123: .getLogger()
124: .error(
125: "Failed to wait for suspended transactions to complete");
126: return e;
127: }
128:
129: // Wait for opened persistent connections to be closed
130: try {
131: scheduler.waitForPersistentConnectionsToComplete();
132: } catch (SQLException e) {
133: dvdb
134: .getLogger()
135: .error(
136: "Failed to wait for persistent connections to close");
137: return e;
138: }
139:
140: if (!dvdb.hasRecoveryLog()) {
141: // Resume transactions, writes and persistent connections
142: scheduler
143: .resumeWritesTransactionsAndPersistentConnections();
144: return new VirtualDatabaseException(Translate
145: .get("virtualdatabase.no.recovery.log"));
146: }
147:
148: return null;
149: }
150:
151: /**
152: * @see org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedVirtualDatabaseMessage#cancel(org.continuent.sequoia.controller.virtualdatabase.DistributedVirtualDatabase)
153: */
154: public void cancel(DistributedVirtualDatabase dvdb) {
155: if (dvdb.getLogger().isWarnEnabled())
156: dvdb
157: .getLogger()
158: .warn(
159: "Canceling SuspendActivity message: resuming writes, transactions and persistent connections");
160: AbstractScheduler scheduler = dvdb.getRequestManager()
161: .getScheduler();
162: scheduler.resumeWritesTransactionsAndPersistentConnections();
163: }
164: }
|