001: /**
002: * Sequoia: Database clustering technology.
003: * Copyright (C) 2002-2004 French National Institute For Research In Computer
004: * Science And Control (INRIA).
005: * Contact: sequoia@continuent.org
006: *
007: * Licensed under the Apache License, Version 2.0 (the "License");
008: * you may not use this file except in compliance with the License.
009: * You may obtain a copy of the License at
010: *
011: * http://www.apache.org/licenses/LICENSE-2.0
012: *
013: * Unless required by applicable law or agreed to in writing, software
014: * distributed under the License is distributed on an "AS IS" BASIS,
015: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016: * See the License for the specific language governing permissions and
017: * limitations under the License.
018: *
019: * Initial developer(s): Emmanuel Cecchet.
020: * Contributor(s): Julie Marguerite, Gaetano Mazzeo.
021: */package org.continuent.sequoia.controller.loadbalancer.raidb1;
022:
023: import java.sql.SQLException;
024: import java.util.ArrayList;
025: import java.util.HashMap;
026:
027: import org.continuent.sequoia.common.exceptions.NoMoreBackendException;
028: import org.continuent.sequoia.common.exceptions.UnreachableBackendException;
029: import org.continuent.sequoia.common.i18n.Translate;
030: import org.continuent.sequoia.common.xml.DatabasesXmlTags;
031: import org.continuent.sequoia.controller.backend.DatabaseBackend;
032: import org.continuent.sequoia.controller.backend.result.ControllerResultSet;
033: import org.continuent.sequoia.controller.backend.result.ExecuteResult;
034: import org.continuent.sequoia.controller.cache.metadata.MetadataCache;
035: import org.continuent.sequoia.controller.loadbalancer.WeightedBalancer;
036: import org.continuent.sequoia.controller.loadbalancer.policies.WaitForCompletionPolicy;
037: import org.continuent.sequoia.controller.requests.AbstractRequest;
038: import org.continuent.sequoia.controller.requests.SelectRequest;
039: import org.continuent.sequoia.controller.requests.StoredProcedure;
040: import org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase;
041:
042: /**
043: * RAIDb-1 Weighted Round Robin load balancer
044: * <p>
045: * The read requests coming from the request manager are sent to the backend
046: * nodes using a weighted round robin. Write requests are broadcasted to all
047: * backends.
048: * <p>
049: * The weighted round-robin works as follows. If the backend weight is set to 0,
050: * no read requests are sent to this backend unless it is the last one available
051: * on this controller. The load balancer maintains a current weight that is
052: * increased by one each time a new read request is executed. <br>
053: * If backend1 has a weight of 5 and backend2 a weight of 10, backend1 will
054: * receive the 5 first requests and backend2 the next 10 requests. Then we
055: * restart with backend1. Be careful that large weight values will heavily load
056: * backends in turn but will probably not balance the load in an effective way.
057: *
058: * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
059: * @author <a href="mailto:Julie.Marguerite@inria.fr">Julie Marguerite </a>
060: * @author <a href="mailto:Nicolas.Modrzyk@inrialpes.fr">Nicolas Modrzyk </a>
061: * @version 1.0
062: */
063: public class RAIDb1_WRR extends RAIDb1 {
064: //
065: // How the code is organized ?
066: //
067: // 1. Member variables
068: // 2. Constructor(s)
069: // 3. Request handling
070: // 4. Debug/Monitoring
071: //
072:
073: private HashMap weights = new HashMap();
074: private int currentWeight;
075:
076: /*
077: * Constructors
078: */
079:
080: /**
081: * Creates a new RAIDb-1 Weighted Round Robin request load balancer.
082: *
083: * @param vdb the virtual database this load balancer belongs to.
084: * @param waitForCompletionPolicy How many backends must complete before
085: * returning the result?
086: * @throws Exception if an error occurs
087: */
088: public RAIDb1_WRR(VirtualDatabase vdb,
089: WaitForCompletionPolicy waitForCompletionPolicy)
090: throws Exception {
091: super (vdb, waitForCompletionPolicy);
092: currentWeight = 0;
093: }
094:
095: /*
096: * Request Handling
097: */
098:
099: /**
100: * Selects the backend using a weighted round-robin algorithm and executes the
101: * read request.
102: *
103: * @see org.continuent.sequoia.controller.loadbalancer.raidb1.RAIDb1#statementExecuteQuery(SelectRequest,
104: * MetadataCache)
105: */
106: public ControllerResultSet execSingleBackendReadRequest(
107: SelectRequest request, MetadataCache metadataCache)
108: throws SQLException {
109: return (ControllerResultSet) executeWRR(request,
110: STATEMENT_EXECUTE_QUERY, "Request ", metadataCache);
111: }
112:
113: /**
114: * Selects the backend using a weighted round-robin algorithm. The backend
115: * that has the shortest queue of currently executing queries is chosen to
116: * execute this stored procedure.
117: *
118: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#readOnlyCallableStatementExecuteQuery(StoredProcedure,
119: * MetadataCache)
120: */
121: public ControllerResultSet readOnlyCallableStatementExecuteQuery(
122: StoredProcedure proc, MetadataCache metadataCache)
123: throws SQLException {
124: return (ControllerResultSet) executeWRR(proc,
125: CALLABLE_STATEMENT_EXECUTE_QUERY, "Stored procedure ",
126: metadataCache);
127: }
128:
129: /**
130: * Selects the backend using a weighted round-robin algorithm. The backend
131: * that has the shortest queue of currently executing queries is chosen to
132: * execute this stored procedure.
133: *
134: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#readOnlyCallableStatementExecute(StoredProcedure,
135: * MetadataCache)
136: */
137: public ExecuteResult readOnlyCallableStatementExecute(
138: StoredProcedure proc, MetadataCache metadataCache)
139: throws SQLException {
140: return (ExecuteResult) executeWRR(proc,
141: CALLABLE_STATEMENT_EXECUTE, "Stored procedure ",
142: metadataCache);
143: }
144:
145: /**
146: * Common code to execute a SelectRequest or a StoredProcedure on a backend
147: * chosen using a weighted round-robin algorithm.
148: *
149: * @param request a <code>SelectRequest</code> or
150: * <code>StoredProcedure</code>
151: * @param callType one of STATEMENT_EXECUTE_QUERY,
152: * CALLABLE_STATEMENT_EXECUTE_QUERY or CALLABLE_STATEMENT_EXECUTE
153: * @param errorMsgPrefix the error message prefix, usually "Request " or
154: * "Stored procedure " ... failed because ...
155: * @param metadataCache a metadataCache if any or null
156: * @return a <code>ResultSet</code>
157: * @throws SQLException if an error occurs
158: */
159: private Object executeWRR(AbstractRequest request, int callType,
160: String errorMsgPrefix, MetadataCache metadataCache)
161: throws SQLException {
162: // Choose a backend
163: try {
164: vdb.acquireReadLockBackendLists();
165: } catch (InterruptedException e) {
166: String msg = Translate.get(
167: "loadbalancer.backendlist.acquire.readlock.failed",
168: e);
169: logger.error(msg);
170: throw new SQLException(msg);
171: }
172:
173: DatabaseBackend backend = null;
174:
175: // Note that vdb lock is released in the finally clause of this try/catch
176: // block
177: try {
178: ArrayList backends = vdb.getBackends();
179: int size = backends.size();
180:
181: if (size == 0)
182: throw new NoMoreBackendException(Translate.get(
183: "loadbalancer.execute.no.backend.available",
184: request.getId()));
185:
186: // Choose the backend (WRR algorithm starts here)
187: int w = 0; // cumulative weight
188: boolean backendFound = false;
189: for (int i = 0; i < size; i++) {
190: DatabaseBackend b = (DatabaseBackend) backends.get(i);
191: if (b.isReadEnabled()) {
192: if (backend == null)
193: backend = b; // Fallback if no backend found
194:
195: // Add the weight of this backend
196: Integer weight = (Integer) weights.get(b.getName());
197: if (weight == null)
198: logger.error("No weight defined for backend "
199: + b.getName());
200: else {
201: int backendWeight = weight.intValue();
202: if (backendWeight == 0)
203: continue; // Weight of 0, avoid using that backend
204: else
205: w += backendWeight;
206: }
207:
208: // Ok we reached the needed weight, take this backend
209: if (currentWeight < w) {
210: backend = b;
211: backendFound = true;
212: break;
213: }
214: }
215: }
216:
217: if (backend == null)
218: throw new NoMoreBackendException(Translate.get(
219: "loadbalancer.execute.no.backend.enabled",
220: request.getId()));
221:
222: // We are over the total weight and we are using the
223: // first available node. Let's reset the index to 1
224: // since we used this first node (0++).
225: if (backendFound)
226: currentWeight++; // Next time take the next
227: else
228: currentWeight = 1;
229: } catch (RuntimeException e) {
230: String msg = Translate.get(
231: "loadbalancer.execute.find.backend.failed",
232: new String[] {
233: request.getSqlShortForm(vdb
234: .getSqlShortFormLength()),
235: e.getMessage() });
236: logger.error(msg, e);
237: throw new SQLException(msg);
238: } finally {
239: vdb.releaseReadLockBackendLists();
240: }
241:
242: // Execute the request on the chosen backend
243: try {
244: switch (callType) {
245: case STATEMENT_EXECUTE_QUERY:
246: return executeRequestOnBackend((SelectRequest) request,
247: backend, metadataCache);
248: case CALLABLE_STATEMENT_EXECUTE_QUERY:
249: return executeStoredProcedureOnBackend(
250: (StoredProcedure) request, true, backend,
251: metadataCache);
252: case CALLABLE_STATEMENT_EXECUTE:
253: return executeStoredProcedureOnBackend(
254: (StoredProcedure) request, false, backend,
255: metadataCache);
256: default:
257: throw new RuntimeException("Unhandled call type "
258: + callType + " in executeRoundRobin");
259: }
260: } catch (UnreachableBackendException urbe) {
261: // Try to execute query on different backend
262: return executeWRR(request, callType, errorMsgPrefix,
263: metadataCache);
264: } catch (SQLException se) {
265: String msg = Translate.get("loadbalancer.something.failed",
266: new String[] { errorMsgPrefix,
267: String.valueOf(request.getId()),
268: se.getMessage() });
269: if (logger.isInfoEnabled())
270: logger.info(msg);
271: throw se;
272: } catch (RuntimeException e) {
273: String msg = Translate.get(
274: "loadbalancer.something.failed.on", new String[] {
275: errorMsgPrefix,
276: request.getSqlShortForm(vdb
277: .getSqlShortFormLength()),
278: backend.getName(), e.getMessage() });
279: logger.error(msg, e);
280: throw new SQLException(msg);
281: }
282: }
283:
284: /*
285: * Backends management
286: */
287:
288: /**
289: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#setWeight(String,
290: * int)
291: */
292: public void setWeight(String name, int w) throws SQLException {
293: if (logger.isDebugEnabled())
294: logger.debug(Translate.get("loadbalancer.weight.set",
295: new String[] { String.valueOf(w), name }));
296:
297: weights.put(name, new Integer(w));
298: }
299:
300: /*
301: * Debug/Monitoring
302: */
303:
304: /**
305: * Gets information about the request load balancer.
306: *
307: * @return <code>String</code> containing information
308: */
309: public String getInformation() {
310: // We don't lock since we don't need a top accurate value
311: int size = vdb.getBackends().size();
312:
313: if (size == 0)
314: return "RAIDb-1 with Weighted Round Robin Request load balancer: !!!Warning!!! No backend nodes found\n";
315: else
316: return "RAIDb-1 Weighted Round-Robin Request load balancer ("
317: + size + " backends)\n";
318: }
319:
320: /**
321: * @see org.continuent.sequoia.controller.loadbalancer.raidb1.RAIDb1#getRaidb1Xml
322: */
323: public String getRaidb1Xml() {
324: return WeightedBalancer.getRaidbXml(weights,
325: DatabasesXmlTags.ELT_RAIDb_1_WeightedRoundRobin);
326: }
327:
328: }
|