001: /**
002: * Sequoia: Database clustering technology.
003: * Copyright (C) 2007 Continuent, Inc.
004: * Contact: sequoia@continuent.org
005: *
006: * Licensed under the Apache License, Version 2.0 (the "License");
007: * you may not use this file except in compliance with the License.
008: * You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing, software
013: * distributed under the License is distributed on an "AS IS" BASIS,
014: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015: * See the License for the specific language governing permissions and
016: * limitations under the License.
017: *
018: */package org.continuent.sequoia.controller.virtualdatabase.protocol;
019:
020: import java.io.Serializable;
021: import java.sql.SQLException;
022: import java.util.LinkedList;
023:
024: import org.continuent.hedera.common.Member;
025: import org.continuent.sequoia.common.exceptions.VirtualDatabaseException;
026: import org.continuent.sequoia.common.i18n.Translate;
027: import org.continuent.sequoia.common.log.Trace;
028: import org.continuent.sequoia.controller.scheduler.AbstractScheduler;
029: import org.continuent.sequoia.controller.virtualdatabase.DistributedVirtualDatabase;
030:
031: /**
032: * This class defines a BlockActivity message.
033: * <ol>
034: * <li>Suspend new persistent connections, new transactions and new writes (in
035: * this order)</li>
036: * <li><strong>Do not wait for completion of current transactions and
037: * persistent connections</strong> (as it is the case in the
038: * {@link SuspendActivity} message).</li>
039: * </ol>
040: * Resuming the activity is done in the same way as for the
041: * {@link SuspendActivity} message.
042: */
043: public class BlockActivity extends DistributedVirtualDatabaseMessage {
044: private static final long serialVersionUID = -8451114082404567986L;
045:
046: private transient LinkedList totalOrderQueue;
047:
048: /**
049: * Public constructor
050: */
051: public BlockActivity() {
052: }
053:
054: /**
055: * @see org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedVirtualDatabaseMessage#handleMessageSingleThreaded(org.continuent.sequoia.controller.virtualdatabase.DistributedVirtualDatabase,
056: * org.continuent.hedera.common.Member)
057: */
058: public Object handleMessageSingleThreaded(
059: DistributedVirtualDatabase dvdb, Member sender) {
060: totalOrderQueue = dvdb.getTotalOrderQueue();
061: if (totalOrderQueue == null)
062: return new VirtualDatabaseException(Translate.get(
063: "virtualdatabase.no.total.order.queue", dvdb
064: .getVirtualDatabaseName()));
065:
066: synchronized (totalOrderQueue) {
067: SuspendWritesMessage request = new SuspendWritesMessage(
068: "Block activity");
069: totalOrderQueue.addLast(request);
070: return request;
071: }
072: }
073:
074: /**
075: * @see org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedVirtualDatabaseMessage#handleMessageMultiThreaded(org.continuent.sequoia.controller.virtualdatabase.DistributedVirtualDatabase,
076: * org.continuent.hedera.common.Member, java.lang.Object)
077: */
078: public Serializable handleMessageMultiThreaded(
079: DistributedVirtualDatabase dvdb, Member sender,
080: Object handleMessageSingleThreadedResult) {
081: Trace logger = dvdb.getLogger();
082: dvdb.addOngoingActivitySuspension(sender);
083:
084: // Wait for our turn to execute
085: boolean found = dvdb.waitForTotalOrder(
086: handleMessageSingleThreadedResult, false);
087:
088: AbstractScheduler scheduler = dvdb.getRequestManager()
089: .getScheduler();
090:
091: // Suspend new persistent connections
092: scheduler.suspendNewPersistentConnections();
093:
094: // Suspend new transactions and writes
095: scheduler.suspendNewTransactionsAndWrites();
096:
097: // Remove ourselves from the queue to allow others to complete if needed
098: if (!found)
099: logger
100: .error("block activity was not found in total order queue, posting out of order");
101: else
102: synchronized (totalOrderQueue) {
103: totalOrderQueue.removeFirst();
104: totalOrderQueue.notifyAll();
105: }
106:
107: // Wait for writes to complete
108: try {
109: scheduler.waitForSuspendedWritesToComplete();
110: } catch (SQLException e) {
111: dvdb.getLogger().error(
112: "Failed to wait for writes to complete");
113: return e;
114: }
115: return null;
116: }
117:
118: /**
119: * @see org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedVirtualDatabaseMessage#cancel(org.continuent.sequoia.controller.virtualdatabase.DistributedVirtualDatabase)
120: */
121: public void cancel(DistributedVirtualDatabase dvdb) {
122: if (dvdb.getLogger().isWarnEnabled())
123: dvdb
124: .getLogger()
125: .warn(
126: "Canceling BlockActivity message: resuming writes, transactions and persistent connections");
127: AbstractScheduler scheduler = dvdb.getRequestManager()
128: .getScheduler();
129: scheduler.resumeWritesTransactionsAndPersistentConnections();
130: }
131: }
|