001: /**
002: * Sequoia: Database clustering technology.
003: * Copyright (C) 2002-2004 French National Institute For Research In Computer
004: * Science And Control (INRIA).
005: * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
006: * Contact: sequoia@continuent.org
007: *
008: * Licensed under the Apache License, Version 2.0 (the "License");
009: * you may not use this file except in compliance with the License.
010: * You may obtain a copy of the License at
011: *
012: * http://www.apache.org/licenses/LICENSE-2.0
013: *
014: * Unless required by applicable law or agreed to in writing, software
015: * distributed under the License is distributed on an "AS IS" BASIS,
016: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017: * See the License for the specific language governing permissions and
018: * limitations under the License.
019: *
020: * Initial developer(s): Emmanuel Cecchet.
021: * Contributor(s): Jean-Bernard van Zuylen.
022: */package org.continuent.sequoia.controller.scheduler.raidb2;
023:
024: import java.sql.SQLException;
025:
026: import org.continuent.sequoia.common.exceptions.RollbackException;
027: import org.continuent.sequoia.common.xml.DatabasesXmlTags;
028: import org.continuent.sequoia.controller.requestmanager.RAIDbLevels;
029: import org.continuent.sequoia.controller.requests.AbstractRequest;
030: import org.continuent.sequoia.controller.requests.AbstractWriteRequest;
031: import org.continuent.sequoia.controller.requests.ParsingGranularities;
032: import org.continuent.sequoia.controller.requests.SelectRequest;
033: import org.continuent.sequoia.controller.requests.StoredProcedure;
034: import org.continuent.sequoia.controller.scheduler.AbstractScheduler;
035:
036: /**
037: * This scheduler provides query level scheduling for RAIDb-2 controllers. Reads
038: * can execute in parallel until a write comes in. Then the write waits for the
039: * completion of the reads. Any new read is stacked after the write and they are
040: * released together when the write has completed its execution.
041: *
042: * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
043: * @author <a href="mailto:jbvanzuylen@transwide.com">Jean-Bernard van Zuylen
044: * </a>
045: * @version 1.0
046: * @deprecated since Sequoia 2.2
047: */
048: public class RAIDb2QueryLevelScheduler extends AbstractScheduler {
049:
050: //
051: // How the code is organized ?
052: //
053: // 1. Member variables
054: // 2. Constructor
055: // 3. Request handling
056: // 4. Transaction management
057: // 5. Debug/Monitoring
058: //
059:
060: private int pendingReads;
061:
062: // We have to distinguish read and write to wake up only
063: // waiting reads or writes according to the situation
064: private Object readSync; // to synchronize on reads completion
065: private Object writeSync; // to synchronize on writes completion
066:
067: //
068: // Constructor
069: //
070:
071: /**
072: * Creates a new Query Level Scheduler
073: */
074: public RAIDb2QueryLevelScheduler() {
075: super (RAIDbLevels.RAIDb2, ParsingGranularities.NO_PARSING);
076: pendingReads = 0;
077: readSync = new Object();
078: writeSync = new Object();
079: }
080:
081: //
082: // Request Handling
083: //
084:
085: /**
086: * @see org.continuent.sequoia.controller.scheduler.AbstractScheduler#scheduleNonSuspendedReadRequest(SelectRequest)
087: */
088: public void scheduleNonSuspendedReadRequest(SelectRequest request)
089: throws SQLException {
090: // Now deal with synchronization
091: synchronized (this .writeSync) {
092: if (getPendingWrites() == 0) { // No writes pending, go ahead !
093: synchronized (this .readSync) {
094: pendingReads++;
095: if (logger.isDebugEnabled())
096: logger.debug("Request " + request.getId()
097: + " scheduled for read ("
098: + pendingReads + " pending reads)");
099: return;
100: }
101: }
102:
103: // Wait for the writes completion
104: try {
105: if (logger.isDebugEnabled())
106: logger.debug("Request " + request.getId()
107: + " waiting for " + getPendingWrites()
108: + " pending writes)");
109:
110: int timeout = request.getTimeout();
111: if (timeout > 0) {
112: long start = System.currentTimeMillis();
113: // Convert seconds to milliseconds for wait call
114: long lTimeout = timeout * 1000L;
115: this .writeSync.wait(lTimeout);
116: long end = System.currentTimeMillis();
117: int remaining = (int) (lTimeout - (end - start));
118: if (remaining > 0)
119: request.setTimeout(remaining);
120: else {
121: String msg = "Timeout (" + request.getTimeout()
122: + ") for request: " + request.getId();
123: logger.warn(msg);
124: throw new SQLException(msg);
125: }
126: } else
127: this .writeSync.wait();
128:
129: synchronized (this .readSync) {
130: pendingReads++;
131: if (logger.isDebugEnabled())
132: logger.debug("Request " + request.getId()
133: + " scheduled for read ("
134: + pendingReads + " pending reads)");
135: return; // Ok, write completed before timeout
136: }
137: } catch (InterruptedException e) {
138: // Timeout
139: if (logger.isWarnEnabled())
140: logger.warn("Request " + request.getId()
141: + " timed out (" + request.getTimeout()
142: + " s)");
143: throw new SQLException("Timeout ("
144: + request.getTimeout() + ") for request: "
145: + request.getId());
146: }
147: }
148: }
149:
150: /**
151: * @see org.continuent.sequoia.controller.scheduler.AbstractScheduler#readCompletedNotify(SelectRequest)
152: */
153: public final void readCompletedNotify(SelectRequest request) {
154: synchronized (this .readSync) {
155: pendingReads--;
156: if (logger.isDebugEnabled())
157: logger.debug("Request " + request.getId()
158: + " completed");
159: if (pendingReads == 0) {
160: if (logger.isDebugEnabled())
161: logger
162: .debug("Last read completed, notifying writes");
163: readSync.notifyAll(); // Wakes up any waiting write query
164: }
165: }
166: }
167:
168: /**
169: * @see org.continuent.sequoia.controller.scheduler.AbstractScheduler#scheduleWriteRequest(AbstractWriteRequest)
170: */
171: public void scheduleNonSuspendedWriteRequest(
172: AbstractWriteRequest request) throws SQLException {
173: // We have to take the locks in the same order as reads else
174: // we could have a deadlock
175: synchronized (this .writeSync) {
176: synchronized (this .readSync) {
177: if (pendingReads == 0) { // No read pending, go ahead
178: if (logger.isDebugEnabled())
179: logger.debug("Request " + request.getId()
180: + " scheduled for write ("
181: + getPendingWrites()
182: + " pending writes)");
183: return;
184: }
185: }
186: }
187:
188: waitForReadCompletion(request);
189: scheduleNonSuspendedWriteRequest(request);
190: }
191:
192: /**
193: * @see org.continuent.sequoia.controller.scheduler.AbstractScheduler#notifyWriteCompleted(AbstractWriteRequest)
194: */
195: public final synchronized void notifyWriteCompleted(
196: AbstractWriteRequest request) {
197: synchronized (this .writeSync) {
198: if (logger.isDebugEnabled())
199: logger.debug("Request " + request.getId()
200: + " completed");
201: if (getPendingWrites() == 0) {
202: if (logger.isDebugEnabled())
203: logger
204: .debug("Last write completed, notifying reads");
205: writeSync.notifyAll(); // Wakes up all waiting read queries
206: }
207: }
208: }
209:
210: /**
211: * @see org.continuent.sequoia.controller.scheduler.AbstractScheduler#scheduleNonSuspendedStoredProcedure(org.continuent.sequoia.controller.requests.StoredProcedure)
212: */
213: public final synchronized void scheduleNonSuspendedStoredProcedure(
214: StoredProcedure proc) throws SQLException,
215: RollbackException {
216: // We have to take the locks in the same order as reads else
217: // we could have a deadlock
218: synchronized (this .writeSync) {
219: synchronized (this .readSync) {
220: if (pendingReads == 0) { // No read pending, go ahead
221: if (logger.isDebugEnabled())
222: logger
223: .debug("Stored procedure "
224: + proc.getId()
225: + (proc.isAutoCommit() ? ""
226: : " transaction "
227: + proc
228: .getTransactionId())
229: + " scheduled for write ("
230: + getPendingWrites()
231: + " pending writes)");
232: return;
233: }
234: }
235: }
236:
237: waitForReadCompletion(proc);
238: scheduleNonSuspendedStoredProcedure(proc);
239: }
240:
241: /**
242: * @see org.continuent.sequoia.controller.scheduler.AbstractScheduler#notifyStoredProcedureCompleted(org.continuent.sequoia.controller.requests.StoredProcedure)
243: */
244: public final void notifyStoredProcedureCompleted(
245: StoredProcedure proc) {
246: synchronized (this .writeSync) {
247: if (logger.isDebugEnabled())
248: logger.debug("Stored procedure " + proc.getId()
249: + " completed - " + getPendingWrites()
250: + " pending writes");
251: if (getPendingWrites() == 0) {
252: if (logger.isDebugEnabled())
253: logger
254: .debug("Last write completed, notifying reads");
255: writeSync.notifyAll(); // Wakes up all waiting read queries
256: }
257: }
258: }
259:
260: /**
261: * Wait for the reads completion. Synchronizes on this.readSync.
262: *
263: * @param request the request that is being scheduled
264: * @throws SQLException if an error occurs
265: */
266: private void waitForReadCompletion(AbstractRequest request)
267: throws SQLException {
268: synchronized (this .readSync) {
269: // Wait for the reads completion
270: try {
271: if (logger.isDebugEnabled())
272: logger.debug("Request " + request.getId()
273: + " waiting for " + pendingReads
274: + " pending reads)");
275:
276: int timeout = request.getTimeout();
277: if (timeout > 0) {
278: long start = System.currentTimeMillis();
279: // Convert seconds to milliseconds for wait call
280: long lTimeout = timeout * 1000L;
281: this .readSync.wait(lTimeout);
282: long end = System.currentTimeMillis();
283: int remaining = (int) (lTimeout - (end - start));
284: if (remaining > 0)
285: request.setTimeout(remaining);
286: else {
287: String msg = "Timeout (" + request.getTimeout()
288: + ") for request: " + request.getId();
289: logger.warn(msg);
290: throw new SQLException(msg);
291: }
292: } else
293: this .readSync.wait();
294: } catch (InterruptedException e) {
295: // Timeout
296: if (logger.isWarnEnabled())
297: logger.warn("Request " + request.getId()
298: + " timed out (" + request.getTimeout()
299: + " s)");
300: throw new SQLException("Timeout ("
301: + request.getTimeout() + ") for request: "
302: + request.getId());
303: }
304: }
305: }
306:
307: //
308: // Transaction Management
309: //
310:
311: /**
312: * @see org.continuent.sequoia.controller.scheduler.AbstractScheduler#commitTransaction(long)
313: */
314: protected final void commitTransaction(long transactionId) {
315: }
316:
317: /**
318: * @see org.continuent.sequoia.controller.scheduler.AbstractScheduler#rollbackTransaction(long)
319: */
320: protected final void rollbackTransaction(long transactionId) {
321: }
322:
323: /**
324: * @see org.continuent.sequoia.controller.scheduler.AbstractScheduler#rollbackTransaction(long,
325: * String)
326: */
327: protected final void rollbackTransaction(long transactionId,
328: String savepointName) {
329: }
330:
331: /**
332: * @see org.continuent.sequoia.controller.scheduler.AbstractScheduler#setSavepointTransaction(long,
333: * String)
334: */
335: protected final void setSavepointTransaction(long transactionId,
336: String name) {
337: }
338:
339: /**
340: * @see org.continuent.sequoia.controller.scheduler.AbstractScheduler#releaseSavepointTransaction(long,
341: * String)
342: */
343: protected final void releaseSavepointTransaction(
344: long transactionId, String name) {
345: }
346:
347: //
348: // Debug/Monitoring
349: //
350: /**
351: * @see org.continuent.sequoia.controller.scheduler.AbstractScheduler#getXmlImpl()
352: */
353: public String getXmlImpl() {
354: StringBuffer info = new StringBuffer();
355: info.append("<" + DatabasesXmlTags.ELT_RAIDb2Scheduler + " "
356: + DatabasesXmlTags.ATT_level + "=\""
357: + DatabasesXmlTags.VAL_query + "\"/>");
358: info.append(System.getProperty("line.separator"));
359: return info.toString();
360: }
361: }
|