001: /**
002: * Sequoia: Database clustering technology.
003: * Copyright (C) 2002-2004 French National Institute For Research In Computer
004: * Science And Control (INRIA).
005: * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
006: * Contact: sequoia@continuent.org
007: *
008: * Licensed under the Apache License, Version 2.0 (the "License");
009: * you may not use this file except in compliance with the License.
010: * You may obtain a copy of the License at
011: *
012: * http://www.apache.org/licenses/LICENSE-2.0
013: *
014: * Unless required by applicable law or agreed to in writing, software
015: * distributed under the License is distributed on an "AS IS" BASIS,
016: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017: * See the License for the specific language governing permissions and
018: * limitations under the License.
019: *
020: * Initial developer(s): Emmanuel Cecchet.
021: * Contributor(s): Jean-Bernard van Zuylen.
022: */package org.continuent.sequoia.controller.scheduler.raidb1;
023:
024: import java.sql.SQLException;
025:
026: import org.continuent.sequoia.common.exceptions.RollbackException;
027: import org.continuent.sequoia.common.xml.DatabasesXmlTags;
028: import org.continuent.sequoia.controller.requestmanager.RAIDbLevels;
029: import org.continuent.sequoia.controller.requests.AbstractRequest;
030: import org.continuent.sequoia.controller.requests.AbstractWriteRequest;
031: import org.continuent.sequoia.controller.requests.ParsingGranularities;
032: import org.continuent.sequoia.controller.requests.SelectRequest;
033: import org.continuent.sequoia.controller.requests.StoredProcedure;
034: import org.continuent.sequoia.controller.scheduler.AbstractScheduler;
035:
036: /**
037: * This scheduler provides query level scheduling for RAIDb-1 controllers. Reads
038: * can execute in parallel until a write comes in. Then the write waits for the
039: * completion of the reads. Any new read is stacked after the write and they are
040: * released together when the write has completed its execution.
041: *
042: * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
043: * @author <a href="mailto:jbvanzuylen@transwide.com">Jean-Bernard van Zuylen
044: * </a>
045: * @version 1.0
046: * @deprecated since Sequoia 2.2
047: */
048: public class RAIDb1QueryLevelScheduler extends AbstractScheduler {
049:
050: //
051: // How the code is organized ?
052: //
053: // 1. Member variables
054: // 2. Constructor
055: // 3. Request handling
056: // 4. Transaction management
057: // 5. Debug/Monitoring
058: //
059:
060: private int pendingReads;
061:
062: // We have to distinguish read and write to wake up only
063: // waiting reads or writes according to the situation
064: private Object readSync; // to synchronize on reads completion
065: private Object writeSync; // to synchronize on writes completion
066:
067: //
068: // Constructor
069: //
070:
071: /**
072: * Creates a new Query Level Scheduler
073: */
074: public RAIDb1QueryLevelScheduler() {
075: super (RAIDbLevels.RAIDb1, ParsingGranularities.NO_PARSING);
076: pendingReads = 0;
077: readSync = new Object();
078: writeSync = new Object();
079: }
080:
081: //
082: // Request Handling
083: //
084:
085: /**
086: * @see org.continuent.sequoia.controller.scheduler.AbstractScheduler#scheduleNonSuspendedReadRequest(SelectRequest)
087: */
088: public void scheduleNonSuspendedReadRequest(SelectRequest request)
089: throws SQLException {
090: // Now deal with synchronization
091: synchronized (this .writeSync) {
092: if (getPendingWrites() == 0) { // No writes pending, go ahead !
093: synchronized (this .readSync) {
094: pendingReads++;
095: if (logger.isDebugEnabled())
096: logger
097: .debug("Request "
098: + request.getId()
099: + (request.isAutoCommit() ? ""
100: : " transaction "
101: + request
102: .getTransactionId())
103: + " scheduled for read ("
104: + pendingReads
105: + " pending reads)");
106: return;
107: }
108: }
109:
110: // Wait for the writes completion
111: try {
112: if (logger.isDebugEnabled())
113: logger.debug("Request " + request.getId()
114: + " waiting for " + getPendingWrites()
115: + " pending writes)");
116:
117: int timeout = request.getTimeout();
118: if (timeout > 0) {
119: long start = System.currentTimeMillis();
120: // Convert seconds to milliseconds for wait call
121: long lTimeout = timeout * 1000L;
122: this .writeSync.wait(lTimeout);
123: long end = System.currentTimeMillis();
124: int remaining = (int) (lTimeout - (end - start));
125: if (remaining > 0)
126: request.setTimeout(remaining);
127: else {
128: String msg = "Timeout (" + request.getTimeout()
129: + ") for request: " + request.getId();
130: logger.warn(msg);
131: throw new SQLException(msg);
132: }
133: } else
134: this .writeSync.wait();
135:
136: synchronized (this .readSync) {
137: pendingReads++;
138: if (logger.isDebugEnabled())
139: logger.debug("Request " + request.getId()
140: + " scheduled for read ("
141: + pendingReads + " pending reads)");
142: return; // Ok, write completed before timeout
143: }
144: } catch (InterruptedException e) {
145: // Timeout
146: if (logger.isWarnEnabled())
147: logger.warn("Request " + request.getId()
148: + " timed out (" + request.getTimeout()
149: + " s)");
150: throw new SQLException("Timeout ("
151: + request.getTimeout() + ") for request: "
152: + request.getId());
153: }
154: }
155: }
156:
157: /**
158: * @see org.continuent.sequoia.controller.scheduler.AbstractScheduler#readCompletedNotify(SelectRequest)
159: */
160: public final void readCompletedNotify(SelectRequest request) {
161: synchronized (this .readSync) {
162: pendingReads--;
163: if (logger.isDebugEnabled())
164: logger.debug("Read request " + request.getId()
165: + " completed - " + pendingReads
166: + " pending reads");
167: if (pendingReads == 0) {
168: if (logger.isDebugEnabled())
169: logger
170: .debug("Last read completed, notifying writes");
171: readSync.notifyAll(); // Wakes up any waiting write query
172: }
173: }
174: }
175:
176: /**
177: * @see org.continuent.sequoia.controller.scheduler.AbstractScheduler#scheduleWriteRequest(AbstractWriteRequest)
178: */
179: public void scheduleNonSuspendedWriteRequest(
180: AbstractWriteRequest request) throws SQLException {
181: // We have to take the locks in the same order as reads else
182: // we could have a deadlock
183: synchronized (this .writeSync) {
184: synchronized (this .readSync) {
185: if (pendingReads == 0) { // No read pending, go ahead
186: if (logger.isDebugEnabled())
187: logger
188: .debug("Request "
189: + request.getId()
190: + (request.isAutoCommit() ? ""
191: : " transaction "
192: + request
193: .getTransactionId())
194: + " scheduled for write ("
195: + getPendingWrites()
196: + " pending writes)");
197: return;
198: }
199: }
200: }
201:
202: waitForReadCompletion(request);
203: scheduleNonSuspendedWriteRequest(request);
204: }
205:
206: /**
207: * @see org.continuent.sequoia.controller.scheduler.AbstractScheduler#notifyWriteCompleted(AbstractWriteRequest)
208: */
209: public final synchronized void notifyWriteCompleted(
210: AbstractWriteRequest request) {
211: synchronized (this .writeSync) {
212: if (logger.isDebugEnabled())
213: logger.debug("Request " + request.getId()
214: + " completed - " + getPendingWrites()
215: + " pending writes");
216: if (getPendingWrites() == 0) {
217: if (logger.isDebugEnabled())
218: logger
219: .debug("Last write completed, notifying reads");
220: writeSync.notifyAll(); // Wakes up all waiting read queries
221: }
222: }
223: }
224:
225: /**
226: * @see org.continuent.sequoia.controller.scheduler.AbstractScheduler#scheduleNonSuspendedStoredProcedure(org.continuent.sequoia.controller.requests.StoredProcedure)
227: */
228: public final synchronized void scheduleNonSuspendedStoredProcedure(
229: StoredProcedure proc) throws SQLException,
230: RollbackException {
231: // We have to take the locks in the same order as reads else
232: // we could have a deadlock
233: synchronized (this .writeSync) {
234: synchronized (this .readSync) {
235: if (pendingReads == 0) { // No read pending, go ahead
236: if (logger.isDebugEnabled())
237: logger
238: .debug("Stored procedure "
239: + proc.getId()
240: + (proc.isAutoCommit() ? ""
241: : " transaction "
242: + proc
243: .getTransactionId())
244: + " scheduled for write ("
245: + getPendingWrites()
246: + " pending writes)");
247: return;
248: }
249: }
250: }
251:
252: waitForReadCompletion(proc);
253: scheduleNonSuspendedStoredProcedure(proc);
254: }
255:
256: /**
257: * @see org.continuent.sequoia.controller.scheduler.AbstractScheduler#notifyStoredProcedureCompleted(org.continuent.sequoia.controller.requests.StoredProcedure)
258: */
259: public final void notifyStoredProcedureCompleted(
260: StoredProcedure proc) {
261: synchronized (this .writeSync) {
262: if (logger.isDebugEnabled())
263: logger.debug("Stored procedure " + proc.getId()
264: + " completed - " + getPendingWrites()
265: + " pending writes");
266: if (getPendingWrites() == 0) {
267: if (logger.isDebugEnabled())
268: logger
269: .debug("Last write completed, notifying reads");
270: writeSync.notifyAll(); // Wakes up all waiting read queries
271: }
272: }
273: }
274:
275: /**
276: * Wait for the reads completion. Synchronizes on this.readSync.
277: *
278: * @param request the request that is being scheduled
279: * @throws SQLException if an error occurs
280: */
281: private void waitForReadCompletion(AbstractRequest request)
282: throws SQLException {
283: synchronized (this .readSync) {
284: // Wait for the reads completion
285: try {
286: if (logger.isDebugEnabled())
287: logger.debug("Request " + request.getId()
288: + " waiting for " + pendingReads
289: + " pending reads)");
290:
291: int timeout = request.getTimeout();
292: if (timeout > 0) {
293: long start = System.currentTimeMillis();
294: // Convert seconds to milliseconds for wait call
295: long lTimeout = timeout * 1000L;
296: this .readSync.wait(lTimeout);
297: long end = System.currentTimeMillis();
298: int remaining = (int) (lTimeout - (end - start));
299: if (remaining > 0)
300: request.setTimeout(remaining);
301: else {
302: String msg = "Timeout (" + request.getTimeout()
303: + ") for request: " + request.getId();
304: logger.warn(msg);
305: throw new SQLException(msg);
306: }
307: } else
308: this .readSync.wait();
309: } catch (InterruptedException e) {
310: // Timeout
311: if (logger.isWarnEnabled())
312: logger.warn("Request " + request.getId()
313: + " timed out (" + request.getTimeout()
314: + " s)");
315: throw new SQLException("Timeout ("
316: + request.getTimeout() + ") for request: "
317: + request.getId());
318: }
319: }
320: }
321:
322: //
323: // Transaction Management
324: //
325:
326: /**
327: * @see org.continuent.sequoia.controller.scheduler.AbstractScheduler#commitTransaction(long)
328: */
329: protected final void commitTransaction(long transactionId) {
330: }
331:
332: /**
333: * @see org.continuent.sequoia.controller.scheduler.AbstractScheduler#rollbackTransaction(long)
334: */
335: protected final void rollbackTransaction(long transactionId) {
336: }
337:
338: /**
339: * @see org.continuent.sequoia.controller.scheduler.AbstractScheduler#rollbackTransaction(long,
340: * String)
341: */
342: protected final void rollbackTransaction(long transactionId,
343: String savepointName) {
344: }
345:
346: /**
347: * @see org.continuent.sequoia.controller.scheduler.AbstractScheduler#setSavepointTransaction(long,
348: * String)
349: */
350: protected final void setSavepointTransaction(long transactionId,
351: String name) {
352: }
353:
354: /**
355: * @see org.continuent.sequoia.controller.scheduler.AbstractScheduler#releaseSavepointTransaction(long,
356: * String)
357: */
358: protected final void releaseSavepointTransaction(
359: long transactionId, String name) {
360: }
361:
362: //
363: // Debug/Monitoring
364: //
365: /**
366: * @see org.continuent.sequoia.controller.scheduler.AbstractScheduler#getXmlImpl()
367: */
368: public String getXmlImpl() {
369: return "<" + DatabasesXmlTags.ELT_RAIDb1Scheduler + " "
370: + DatabasesXmlTags.ATT_level + "=\""
371: + DatabasesXmlTags.VAL_query + "\"/>";
372: }
373: }
|