001: /**
002: * Sequoia: Database clustering technology.
003: * Copyright (C) 2002-2004 French National Institute For Research In Computer
004: * Science And Control (INRIA).
005: * Contact: sequoia@continuent.org
006: *
007: * Licensed under the Apache License, Version 2.0 (the "License");
008: * you may not use this file except in compliance with the License.
009: * You may obtain a copy of the License at
010: *
011: * http://www.apache.org/licenses/LICENSE-2.0
012: *
013: * Unless required by applicable law or agreed to in writing, software
014: * distributed under the License is distributed on an "AS IS" BASIS,
015: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016: * See the License for the specific language governing permissions and
017: * limitations under the License.
018: *
019: * Initial developer(s): Emmanuel Cecchet.
020: * Contributor(s): ______________________.
021: */package org.continuent.sequoia.controller.loadbalancer.raidb2;
022:
023: import java.sql.SQLException;
024: import java.util.ArrayList;
025: import java.util.Collection;
026:
027: import org.continuent.sequoia.common.exceptions.NoMoreBackendException;
028: import org.continuent.sequoia.common.exceptions.UnreachableBackendException;
029: import org.continuent.sequoia.common.i18n.Translate;
030: import org.continuent.sequoia.common.xml.DatabasesXmlTags;
031: import org.continuent.sequoia.controller.backend.DatabaseBackend;
032: import org.continuent.sequoia.controller.backend.result.ControllerResultSet;
033: import org.continuent.sequoia.controller.backend.result.ExecuteResult;
034: import org.continuent.sequoia.controller.cache.metadata.MetadataCache;
035: import org.continuent.sequoia.controller.loadbalancer.policies.WaitForCompletionPolicy;
036: import org.continuent.sequoia.controller.loadbalancer.policies.createtable.CreateTablePolicy;
037: import org.continuent.sequoia.controller.requests.SelectRequest;
038: import org.continuent.sequoia.controller.requests.StoredProcedure;
039: import org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase;
040:
041: /**
042: * RAIDb-2 Round Robin load balancer featuring (Least Pending Requests First
043: * load balancing algorithm).
044: * <p>
045: * The read requests coming from the request manager are sent to the node that
046: * has the Least pending read requests among the nodes that can execute the
047: * request.
048: *
049: * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
050: * @version 1.0
051: */
052: public class RAIDb2_LPRF extends RAIDb2 {
053:
054: /*
055: * How the code is organized ? 1. Member variables 2. Constructor(s) 3.
056: * Request handling 4. Debug/Monitoring
057: */
058:
059: /*
060: * Constructors
061: */
062:
063: /**
064: * Creates a new RAIDb-2 Round Robin request load balancer.
065: *
066: * @param vdb the virtual database this load balancer belongs to.
067: * @param waitForCompletionPolicy how many backends must complete before
068: * returning the result?
069: * @param createTablePolicy the policy defining how 'create table' statements
070: * should be handled
071: * @exception Exception if an error occurs
072: */
073: public RAIDb2_LPRF(VirtualDatabase vdb,
074: WaitForCompletionPolicy waitForCompletionPolicy,
075: CreateTablePolicy createTablePolicy) throws Exception {
076: super (vdb, waitForCompletionPolicy, createTablePolicy);
077: }
078:
079: /*
080: * Request Handling
081: */
082:
083: /**
084: * Chooses the node to execute the request using a round-robin algorithm. If
085: * the next node has not the tables needed to execute the requests, we try the
086: * next one and so on until a suitable backend is found.
087: *
088: * @param request an <code>SelectRequest</code>
089: * @param metadataCache cached metadata to use to construct the result set
090: * @return the corresponding <code>java.sql.ResultSet</code>
091: * @exception SQLException if an error occurs
092: * @see org.continuent.sequoia.controller.loadbalancer.raidb2.RAIDb2#statementExecuteQuery(SelectRequest,
093: * MetadataCache)
094: */
095: public ControllerResultSet statementExecuteQuery(
096: SelectRequest request, MetadataCache metadataCache)
097: throws SQLException {
098: // Choose a backend
099: try {
100: vdb.acquireReadLockBackendLists();
101: } catch (InterruptedException e) {
102: String msg = Translate.get(
103: "loadbalancer.backendlist.acquire.readlock.failed",
104: e);
105: logger.error(msg);
106: throw new SQLException(msg);
107: }
108:
109: DatabaseBackend backend = null; // The backend that will execute the query
110:
111: // Note that vdb lock is released in the finally clause of this try/catch
112: // block
113: try {
114: ArrayList backends = vdb.getBackends();
115: int size = backends.size();
116:
117: if (size == 0)
118: throw new NoMoreBackendException(Translate.get(
119: "loadbalancer.execute.no.backend.available",
120: request.getId()));
121:
122: // Choose the backend that has the least pending requests
123: int leastRequests = 0;
124: int enabledBackends = 0;
125: Collection tables = request.getFrom();
126: if (tables == null)
127: throw new SQLException(Translate.get(
128: "loadbalancer.from.not.found", request
129: .getSqlShortForm(vdb
130: .getSqlShortFormLength())));
131:
132: for (int i = 0; i < size; i++) {
133: DatabaseBackend b = (DatabaseBackend) backends.get(i);
134: if (b.isReadEnabled()) {
135: enabledBackends++;
136: if (b.hasTables(tables)) {
137: int pending = b.getPendingRequests().size();
138: if (((backend == null) || (pending < leastRequests))) {
139: backend = b;
140: if (pending == 0)
141: break; // Stop here we will never find a less loaded node
142: else
143: leastRequests = pending;
144: }
145: }
146: }
147: }
148:
149: if (backend == null) {
150: if (enabledBackends == 0)
151: throw new NoMoreBackendException(Translate.get(
152: "loadbalancer.execute.no.backend.enabled",
153: request.getId()));
154: else
155: throw new SQLException(Translate.get(
156: "loadbalancer.backend.no.required.tables",
157: tables.toString()));
158: }
159:
160: } catch (RuntimeException e) {
161: String msg = Translate.get(
162: "loadbalancer.request.failed.on.backend",
163: new String[] {
164: request.getSqlShortForm(vdb
165: .getSqlShortFormLength()),
166: backend.getName(), e.getMessage() });
167: logger.error(msg, e);
168: throw new SQLException(msg);
169: } finally {
170: vdb.releaseReadLockBackendLists();
171: }
172:
173: // Execute the request on the chosen backend
174: ControllerResultSet rs = null;
175: try {
176: rs = executeReadRequestOnBackend(request, backend,
177: metadataCache);
178: } catch (UnreachableBackendException se) {
179: // Try on another backend
180: return statementExecuteQuery(request, metadataCache);
181: } catch (SQLException se) {
182: String msg = Translate.get("loadbalancer.request.failed",
183: new String[] { String.valueOf(request.getId()),
184: se.getMessage() });
185: if (logger.isInfoEnabled())
186: logger.info(msg);
187: throw new SQLException(msg);
188: } catch (Throwable e) {
189: String msg = Translate.get(
190: "loadbalancer.request.failed.on.backend",
191: new String[] {
192: request.getSqlShortForm(vdb
193: .getSqlShortFormLength()),
194: backend.getName(), e.getMessage() });
195: logger.error(msg, e);
196: throw new SQLException(msg);
197: }
198:
199: return rs;
200: }
201:
202: /**
203: * Chooses the node to execute the stored procedure using a LPRF algorithm. If
204: * the next node has not the needed stored procedure, we try the next one and
205: * so on until a suitable backend is found.
206: *
207: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#readOnlyCallableStatementExecuteQuery(StoredProcedure,
208: * MetadataCache)
209: */
210: public ControllerResultSet readOnlyCallableStatementExecuteQuery(
211: StoredProcedure proc, MetadataCache metadataCache)
212: throws SQLException {
213: return (ControllerResultSet) callStoredProcedure(proc,
214: CALLABLE_STATEMENT_EXECUTE_QUERY, metadataCache);
215: }
216:
217: /**
218: * Chooses the node to execute the stored procedure using a LPRF algorithm. If
219: * the next node has not the needed stored procedure, we try the next one and
220: * so on until a suitable backend is found.
221: *
222: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#readOnlyCallableStatementExecute(org.continuent.sequoia.controller.requests.StoredProcedure,
223: * org.continuent.sequoia.controller.cache.metadata.MetadataCache)
224: */
225: public ExecuteResult readOnlyCallableStatementExecute(
226: StoredProcedure proc, MetadataCache metadataCache)
227: throws SQLException {
228: return (ExecuteResult) callStoredProcedure(proc,
229: CALLABLE_STATEMENT_EXECUTE, metadataCache);
230: }
231:
232: /**
233: * Common code to execute a StoredProcedure using executeQuery or
234: * executeUpdate on a backend chosen using a LPRF algorithm.
235: *
236: * @param proc a <code>StoredProcedure</code>
237: * @param callType one of CALLABLE_STATEMENT_EXECUTE_QUERY or
238: * CALLABLE_STATEMENT_EXECUTE
239: * @param metadataCache the metadataCache if any or null
240: * @return a <code>ControllerResultSet</code> or an
241: * <code>ExecuteResult</code> object
242: * @throws SQLException if an error occurs
243: */
244: private Object callStoredProcedure(StoredProcedure proc,
245: int callType, MetadataCache metadataCache)
246: throws SQLException {
247: // Choose a backend
248: try {
249: vdb.acquireReadLockBackendLists();
250: } catch (InterruptedException e) {
251: String msg = Translate.get(
252: "loadbalancer.backendlist.acquire.readlock.failed",
253: e);
254: logger.error(msg);
255: throw new SQLException(msg);
256: }
257:
258: DatabaseBackend backend = null; // The backend that will execute the query
259:
260: // Note that vdb lock is released in the finally clause of this try/catch
261: // block
262: try {
263: DatabaseBackend failedBackend = null;
264: SQLException failedException = null;
265: Object result = null;
266: do {
267: ArrayList backends = vdb.getBackends();
268: int size = backends.size();
269:
270: if (size == 0)
271: throw new NoMoreBackendException(
272: Translate
273: .get(
274: "loadbalancer.execute.no.backend.available",
275: proc.getId()));
276:
277: // Choose the backend that has the least pending requests
278: int leastRequests = 0;
279: int enabledBackends = 0;
280:
281: for (int i = 0; i < size; i++) {
282: DatabaseBackend b = (DatabaseBackend) backends
283: .get(i);
284: if (b.isReadEnabled()) {
285: enabledBackends++;
286: if (b.hasStoredProcedure(
287: proc.getProcedureKey(), proc
288: .getNbOfParameters())) {
289: int pending = b.getPendingRequests().size();
290: if ((b != failedBackend)
291: && ((backend == null) || (pending < leastRequests))) {
292: backend = b;
293: if (pending == 0)
294: break; // Stop here we will never find a less loaded node
295: else
296: leastRequests = pending;
297: }
298: }
299: }
300: }
301:
302: if (backend == null) {
303: if (enabledBackends == 0)
304: throw new SQLException(
305: Translate
306: .get(
307: "loadbalancer.storedprocedure.backend.no.enabled",
308: proc.getId()));
309: else if (failedBackend == null)
310: throw new SQLException(
311: Translate
312: .get(
313: "loadbalancer.backend.no.required.storedprocedure",
314: proc.getProcedureKey()));
315: else
316: // Bad query, the only backend that could execute it has failed
317: throw failedException;
318: }
319:
320: // Execute the request on the chosen backend
321: boolean toDisable = false;
322: try {
323: switch (callType) {
324: case CALLABLE_STATEMENT_EXECUTE_QUERY:
325: result = executeStoredProcedureOnBackend(proc,
326: true, backend, metadataCache);
327: break;
328: case CALLABLE_STATEMENT_EXECUTE:
329: result = executeStoredProcedureOnBackend(proc,
330: false, backend, metadataCache);
331: break;
332: default:
333: throw new RuntimeException(
334: "Unhandled call type " + callType
335: + " in executeLPRF");
336: }
337: if (failedBackend != null) { // Previous backend failed
338: if (logger.isWarnEnabled())
339: logger
340: .warn(Translate
341: .get(
342: "loadbalancer.storedprocedure.status",
343: new String[] {
344: String
345: .valueOf(proc
346: .getId()),
347: backend
348: .getName(),
349: failedBackend
350: .getName() }));
351: toDisable = true;
352: }
353: } catch (UnreachableBackendException se) {
354: // Retry on an other backend.
355: continue;
356: } catch (SQLException se) {
357: if (failedBackend != null) { // Bad query, no backend can execute it
358: String msg = Translate
359: .get(
360: "loadbalancer.storedprocedure.failed.twice",
361: new String[] {
362: String.valueOf(proc
363: .getId()),
364: se.getMessage() });
365: if (logger.isInfoEnabled())
366: logger.info(msg);
367: throw new SQLException(msg);
368: } else { // We are the first to fail on this query
369: failedBackend = backend;
370: failedException = se;
371: if (logger.isInfoEnabled())
372: logger
373: .info(Translate
374: .get(
375: "loadbalancer.storedprocedure.failed.on.backend",
376: new String[] {
377: proc
378: .getSqlShortForm(vdb
379: .getSqlShortFormLength()),
380: backend
381: .getName(),
382: se
383: .getMessage() }));
384: continue;
385: }
386: }
387:
388: if (toDisable) { // retry has succeeded and we need to disable the first node that
389: // failed
390: try {
391: if (logger.isWarnEnabled())
392: logger.warn(Translate.get(
393: "loadbalancer.backend.disabling",
394: failedBackend.getName()));
395: disableBackend(failedBackend, true);
396: } catch (SQLException ignore) {
397: } finally {
398: failedBackend = null; // to exit the do{}while
399: }
400: }
401: } while (failedBackend != null);
402: return result;
403: } catch (RuntimeException e) {
404: String msg = Translate.get(
405: "loadbalancer.storedprocedure.failed.on.backend",
406: new String[] {
407: proc.getSqlShortForm(vdb
408: .getSqlShortFormLength()),
409: backend.getName(), e.getMessage() });
410: logger.fatal(msg, e);
411: throw new SQLException(msg);
412: } finally {
413: vdb.releaseReadLockBackendLists();
414: }
415: }
416:
417: /*
418: * Debug/Monitoring
419: */
420:
421: /**
422: * Gets information about the request load balancer.
423: *
424: * @return <code>String</code> containing information
425: */
426: public String getInformation() {
427: // We don't lock since we don't need a completely accurate value
428: int size = vdb.getBackends().size();
429:
430: if (size == 0)
431: return "RAIDb-2 Least Pending Requests First load balancer: !!!Warning!!! No backend nodes found\n";
432: else
433: return "RAIDb-2 Least Pending Requests First load balancer ("
434: + size + " backends)\n";
435: }
436:
437: /**
438: * @see org.continuent.sequoia.controller.loadbalancer.raidb2.RAIDb2#getRaidb2Xml
439: */
440: public String getRaidb2Xml() {
441: return "<"
442: + DatabasesXmlTags.ELT_RAIDb_2_LeastPendingRequestsFirst
443: + "/>";
444: }
445: }
|