001: /**
002: * Sequoia: Database clustering technology.
003: * Copyright (C) 2002-2004 French National Institute For Research In Computer
004: * Science And Control (INRIA).
005: * Contact: sequoia@continuent.org
006: *
007: * Licensed under the Apache License, Version 2.0 (the "License");
008: * you may not use this file except in compliance with the License.
009: * You may obtain a copy of the License at
010: *
011: * http://www.apache.org/licenses/LICENSE-2.0
012: *
013: * Unless required by applicable law or agreed to in writing, software
014: * distributed under the License is distributed on an "AS IS" BASIS,
015: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016: * See the License for the specific language governing permissions and
017: * limitations under the License.
018: *
019: * Initial developer(s): Emmanuel Cecchet.
020: * Contributor(s): Julie Marguerite.
021: */package org.continuent.sequoia.controller.loadbalancer.raidb2;
022:
023: import java.sql.SQLException;
024: import java.util.ArrayList;
025: import java.util.HashMap;
026:
027: import org.continuent.sequoia.common.exceptions.NoMoreBackendException;
028: import org.continuent.sequoia.common.exceptions.UnreachableBackendException;
029: import org.continuent.sequoia.common.i18n.Translate;
030: import org.continuent.sequoia.common.xml.DatabasesXmlTags;
031: import org.continuent.sequoia.controller.backend.DatabaseBackend;
032: import org.continuent.sequoia.controller.backend.result.ControllerResultSet;
033: import org.continuent.sequoia.controller.backend.result.ExecuteResult;
034: import org.continuent.sequoia.controller.cache.metadata.MetadataCache;
035: import org.continuent.sequoia.controller.loadbalancer.WeightedBalancer;
036: import org.continuent.sequoia.controller.loadbalancer.policies.WaitForCompletionPolicy;
037: import org.continuent.sequoia.controller.loadbalancer.policies.createtable.CreateTablePolicy;
038: import org.continuent.sequoia.controller.requests.SelectRequest;
039: import org.continuent.sequoia.controller.requests.StoredProcedure;
040: import org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase;
041:
042: /**
043: * RAIDb-2 Weighted Round Robin load balancer.
044: * <p>
045: * The read requests coming from the request manager are sent to the backend
046: * nodes using a weighted round robin. Write requests are broadcasted to all
047: * backends.
048: * <p>
049: * The weighted round-robin works as follows. If the backend weight is set to 0,
050: * no read requests are sent to this backend unless it is the last one available
051: * on this controller. The load balancer maintains a current weight that is
052: * increased by one each time a new read request is executed. <br>
053: * If backend1 has a weight of 5 and backend2 a weight of 10, backend1 will
054: * receive the 5 first requests and backend2 the next 10 requests. Then we
055: * restart with backend1. Be careful that large weight values will heavily load
056: * backends in turn but will probably not balance the load in an effective way.
057: *
058: * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
059: * @author <a href="mailto:Julie.Marguerite@inria.fr">Julie Marguerite </a>
060: * @version 1.0
061: */
062: public class RAIDb2_WRR extends RAIDb2 {
063: private HashMap weights;
064: private int currentWeight;
065:
066: /*
067: * Constructors
068: */
069:
070: /**
071: * Creates a new RAIDb-2 Weighted Round Robin request load balancer.
072: *
073: * @param vdb The virtual database this load balancer belongs to.
074: * @param waitForCompletionPolicy How many backends must complete before
075: * returning the result?
076: * @param createTablePolicy The policy defining how 'create table' statements
077: * should be handled
078: * @exception Exception if an error occurs
079: */
080: public RAIDb2_WRR(VirtualDatabase vdb,
081: WaitForCompletionPolicy waitForCompletionPolicy,
082: CreateTablePolicy createTablePolicy) throws Exception {
083: super (vdb, waitForCompletionPolicy, createTablePolicy);
084: currentWeight = -1;
085: }
086:
087: /*
088: * Request Handling
089: */
090:
091: /**
092: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#statementExecuteQuery(org.continuent.sequoia.controller.requests.SelectRequest,
093: * org.continuent.sequoia.controller.cache.metadata.MetadataCache)
094: */
095: public ControllerResultSet statementExecuteQuery(
096: SelectRequest request, MetadataCache metadataCache)
097: throws SQLException {
098: return null;
099: }
100:
101: /**
102: * Chooses the node to execute the stored procedure using a round-robin
103: * algorithm. If the next node has not the needed stored procedure, we try the
104: * next one and so on until a suitable backend is found.
105: *
106: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#readOnlyCallableStatementExecuteQuery(StoredProcedure,
107: * MetadataCache)
108: */
109: public ControllerResultSet readOnlyCallableStatementExecuteQuery(
110: StoredProcedure proc, MetadataCache metadataCache)
111: throws SQLException {
112: return (ControllerResultSet) callStoredProcedure(proc,
113: CALLABLE_STATEMENT_EXECUTE_QUERY, metadataCache);
114: }
115:
116: /**
117: * Chooses the node to execute the stored procedure using a round-robin
118: * algorithm. If the next node has not the needed stored procedure, we try the
119: * next one and so on until a suitable backend is found.
120: *
121: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#readOnlyCallableStatementExecute(org.continuent.sequoia.controller.requests.StoredProcedure,
122: * org.continuent.sequoia.controller.cache.metadata.MetadataCache)
123: */
124: public ExecuteResult readOnlyCallableStatementExecute(
125: StoredProcedure proc, MetadataCache metadataCache)
126: throws SQLException {
127: return (ExecuteResult) callStoredProcedure(proc,
128: CALLABLE_STATEMENT_EXECUTE, metadataCache);
129: }
130:
131: /**
132: * Common code to execute a StoredProcedure using executeQuery or
133: * executeUpdate on a backend chosen using a LPRF algorithm.
134: *
135: * @param proc a <code>StoredProcedure</code>
136: * @param callType one of CALLABLE_STATEMENT_EXECUTE_QUERY or
137: * CALLABLE_STATEMENT_EXECUTE
138: * @param metadataCache the metadataCache if any or null
139: * @return a <code>ControllerResultSet</code> or an
140: * <code>ExecuteResult</code> object
141: * @throws SQLException if an error occurs
142: */
143: private Object callStoredProcedure(StoredProcedure proc,
144: int callType, MetadataCache metadataCache)
145: throws SQLException {
146: // Choose a backend
147: try {
148: vdb.acquireReadLockBackendLists();
149: } catch (InterruptedException e) {
150: String msg = Translate.get(
151: "loadbalancer.backendlist.acquire.readlock.failed",
152: e);
153: logger.error(msg);
154: throw new SQLException(msg);
155: }
156:
157: DatabaseBackend backend = null;
158:
159: // Note that vdb lock is released in the finally clause of this try/catch
160: // block
161: try {
162: ArrayList backends = vdb.getBackends();
163: int size = backends.size();
164:
165: if (size == 0)
166: throw new NoMoreBackendException(Translate.get(
167: "loadbalancer.execute.no.backend.available",
168: proc.getId()));
169:
170: // Choose the backend (WRR algorithm starts here)
171: int w = 0; // cumulative weight
172: for (int i = 0; i < size; i++) {
173: DatabaseBackend b = (DatabaseBackend) backends.get(i);
174: if (b.isReadEnabled()
175: && b.hasStoredProcedure(proc.getProcedureKey(),
176: proc.getNbOfParameters())) {
177: if (backend == null)
178: backend = b; // Fallback if no backend found
179:
180: // Add the weight of this backend
181: Integer weight = (Integer) weights.get(b.getName());
182: if (weight == null)
183: logger.error("No weight defined for backend "
184: + b.getName());
185: else {
186: int backendWeight = weight.intValue();
187: if (backendWeight == 0)
188: continue; // Weight of 0, avoid using that backend
189: else
190: w += backendWeight;
191: }
192:
193: // Ok we reached the needed weight, take this backend
194: if (currentWeight <= w) {
195: backend = b;
196: currentWeight++; // Next time take the next
197: break;
198: }
199: }
200: }
201:
202: if (backend == null)
203: throw new NoMoreBackendException(Translate.get(
204: "loadbalancer.execute.no.backend.enabled", proc
205: .getId()));
206:
207: // We are over the total weight and we are using the
208: // first available node. Let's reset the index to 1
209: // since we used this first node (0++).
210: if (currentWeight > w)
211: currentWeight = 1;
212: } catch (RuntimeException e) {
213: String msg = Translate.get(
214: "loadbalancer.execute.find.backend.failed",
215: new String[] {
216: proc.getSqlShortForm(vdb
217: .getSqlShortFormLength()),
218: e.getMessage() });
219: logger.error(msg, e);
220: throw new SQLException(msg);
221: } finally {
222: vdb.releaseReadLockBackendLists();
223: }
224:
225: // Execute the request on the chosen backend
226: try {
227: switch (callType) {
228: case CALLABLE_STATEMENT_EXECUTE_QUERY:
229: return executeStoredProcedureOnBackend(proc, true,
230: backend, metadataCache);
231: case CALLABLE_STATEMENT_EXECUTE:
232: return executeStoredProcedureOnBackend(proc, false,
233: backend, metadataCache);
234: default:
235: throw new RuntimeException("Unhandled call type "
236: + callType + " in executeRoundRobin");
237: }
238: } catch (UnreachableBackendException urbe) {
239: // Try to execute query on different backend
240: return callStoredProcedure(proc, callType, metadataCache);
241: } catch (SQLException se) {
242: String msg = Translate.get("loadbalancer.something.failed",
243: new String[] { "Stored procedure ",
244: String.valueOf(proc.getId()),
245: se.getMessage() });
246: if (logger.isInfoEnabled())
247: logger.info(msg);
248: throw se;
249: } catch (RuntimeException e) {
250: String msg = Translate.get(
251: "loadbalancer.something.failed.on", new String[] {
252: "Stored procedure ",
253: proc.getSqlShortForm(vdb
254: .getSqlShortFormLength()),
255: backend.getName(), e.getMessage() });
256: logger.error(msg, e);
257: throw new SQLException(msg);
258: }
259: }
260:
261: /*
262: * Backends management
263: */
264:
265: /**
266: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#setWeight(String,
267: * int)
268: */
269: public void setWeight(String name, int w) throws SQLException {
270: throw new SQLException(
271: "Weight is not supported with this load balancer");
272: }
273:
274: /*
275: * Debug/Monitoring
276: */
277:
278: /**
279: * Gets information about the request load balancer.
280: *
281: * @return <code>String</code> containing information
282: */
283: public String getInformation() {
284: if (weights == null)
285: return "RAIDb-2 Weighted Round Robin Request load balancer: !!!Warning!!! No backend nodes found\n";
286: else
287: return "RAIDb-2 Weighted Round Robin Request load balancer balancing over "
288: + weights.size() + " nodes\n";
289: }
290:
291: /**
292: * @see org.continuent.sequoia.controller.loadbalancer.raidb2.RAIDb2#getRaidb2Xml
293: */
294: public String getRaidb2Xml() {
295: return WeightedBalancer.getRaidbXml(weights,
296: DatabasesXmlTags.ELT_RAIDb_2_WeightedRoundRobin);
297: }
298: }
|