001: /**
002: * Sequoia: Database clustering technology.
003: * Copyright (C) 2002-2004 French National Institute For Research In Computer
004: * Science And Control (INRIA).
005: * Contact: sequoia@continuent.org
006: *
007: * Licensed under the Apache License, Version 2.0 (the "License");
008: * you may not use this file except in compliance with the License.
009: * You may obtain a copy of the License at
010: *
011: * http://www.apache.org/licenses/LICENSE-2.0
012: *
013: * Unless required by applicable law or agreed to in writing, software
014: * distributed under the License is distributed on an "AS IS" BASIS,
015: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016: * See the License for the specific language governing permissions and
017: * limitations under the License.
018:
019:
020: * Free Software Foundation; either version 2.1 of the License, or any later
021: * version.
022: *
023: * This library is distributed in the hope that it will be useful, but WITHOUT
024: * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
025: * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
026: * for more details.
027: *
028: * You should have received a copy of the GNU Lesser General Public License
029: * along with this library; if not, write to the Free Software Foundation,
030: * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
031: *
032: * Initial developer(s): Emmanuel Cecchet.
033: * Contributor(s): Julie Marguerite, Jaco Swart.
034: */package org.continuent.sequoia.controller.loadbalancer.tasks;
035:
036: import java.sql.Connection;
037: import java.sql.SQLException;
038:
039: import org.continuent.sequoia.common.exceptions.NoTransactionStartWhenDisablingException;
040: import org.continuent.sequoia.common.exceptions.UnreachableBackendException;
041: import org.continuent.sequoia.common.i18n.Translate;
042: import org.continuent.sequoia.common.log.Trace;
043: import org.continuent.sequoia.controller.backend.DatabaseBackend;
044: import org.continuent.sequoia.controller.backend.result.ControllerResultSet;
045: import org.continuent.sequoia.controller.cache.metadata.MetadataCache;
046: import org.continuent.sequoia.controller.connection.AbstractConnectionManager;
047: import org.continuent.sequoia.controller.connection.PooledConnection;
048: import org.continuent.sequoia.controller.core.ControllerConstants;
049: import org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer;
050: import org.continuent.sequoia.controller.loadbalancer.BackendWorkerThread;
051: import org.continuent.sequoia.controller.requests.AbstractRequest;
052: import org.continuent.sequoia.controller.requests.SelectRequest;
053:
054: /**
055: * Executes a <code>SELECT</code> statement.
056: *
057: * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
058: * @author <a href="mailto:Julie.Marguerite@inria.fr">Julie Marguerite </a>
059: * @author <a href="mailto:jaco.swart@iblocks.co.uk">Jaco Swart </a>
060: * @version 1.0
061: */
062: public class StatementExecuteQueryTask extends AbstractTask {
063: private SelectRequest request;
064: private MetadataCache metadataCache;
065: private ControllerResultSet result = null;
066:
067: static Trace endUserLogger = Trace
068: .getLogger("org.continuent.sequoia.enduser");
069:
070: /**
071: * Creates a new <code>StatementExecuteUpdateTask</code> instance.
072: *
073: * @param nbToComplete number of threads that must succeed before returning
074: * @param totalNb total number of threads
075: * @param request an <code>AbstractWriteRequest</code>
076: * @param metadataCache MetadataCache (null if none)
077: */
078: public StatementExecuteQueryTask(int nbToComplete, int totalNb,
079: SelectRequest request, MetadataCache metadataCache) {
080: super (nbToComplete, totalNb, request.isPersistentConnection(),
081: request.getPersistentConnectionId());
082: this .request = request;
083: this .metadataCache = metadataCache;
084: }
085:
086: /**
087: * Executes a write request with the given backend thread
088: *
089: * @param backendThread the backend thread that will execute the task
090: * @throws SQLException if an error occurs
091: */
092: public void executeTask(BackendWorkerThread backendThread)
093: throws SQLException {
094: DatabaseBackend backend = backendThread.getBackend();
095:
096: try {
097: AbstractConnectionManager cm = backend
098: .getConnectionManager(request.getLogin());
099: if (cm == null) {
100: SQLException se = new SQLException(
101: "No Connection Manager for Virtual Login:"
102: + request.getLogin());
103: try {
104: notifyFailure(backendThread, -1, se);
105: } catch (SQLException ignore) {
106:
107: }
108: throw se;
109: }
110:
111: Trace logger = backendThread.getLogger();
112: if (request.isAutoCommit())
113: executeInAutoCommit(backendThread, backend, cm, logger);
114: else
115: executeInTransaction(backendThread, backend, cm, logger);
116:
117: if (result != null)
118: notifySuccess(backendThread);
119: } finally {
120: backend.getTaskQueues().completeWriteRequestExecution(this );
121: }
122: }
123:
124: private void executeInAutoCommit(BackendWorkerThread backendThread,
125: DatabaseBackend backend, AbstractConnectionManager cm,
126: Trace logger) throws SQLException {
127: if (!backend.canAcceptTasks(request)) {
128: // Backend is disabling, we do not execute queries except the one in the
129: // transaction we already started. Just notify the completion for the
130: // others.
131: notifyCompletion(backendThread);
132: return;
133: }
134:
135: // Use a connection just for this request
136: PooledConnection c = null;
137: try {
138: c = cm.retrieveConnectionInAutoCommit(request);
139: } catch (UnreachableBackendException e1) {
140: SQLException se = new SQLException("Backend "
141: + backend.getName() + " is no more reachable.");
142: try {
143: notifyFailure(backendThread, -1, se);
144: } catch (SQLException ignore) {
145: }
146: // Disable this backend (it is no more in sync) by killing the backend
147: // thread
148: backendThread.getLoadBalancer().disableBackend(backend,
149: true);
150: String msg = Translate.get(
151: "loadbalancer.backend.disabling.unreachable",
152: backend.getName());
153: logger.error(msg);
154: endUserLogger.error(msg);
155: throw se;
156: }
157:
158: // Sanity check
159: if (c == null) {
160: SQLException se = new SQLException("No more connections");
161: try { // All backends failed, just ignore
162: if (!notifyFailure(backendThread,
163: request.getTimeout() * 1000L, se))
164: return;
165: } catch (SQLException ignore) {
166: }
167: // Disable this backend (it is no more in sync) by killing the backend
168: // thread
169: backendThread.getLoadBalancer().disableBackend(backend,
170: true);
171: endUserLogger.error(Translate
172: .get("loadbalancer.backend.disabling", backend
173: .getName()));
174: throw new SQLException(
175: "Request '"
176: + request
177: .getSqlShortForm(ControllerConstants.SQL_SHORT_FORM_LENGTH)
178: + "' failed on backend "
179: + backend.getName() + " (" + se + ")");
180: }
181:
182: // Execute Query
183: try {
184: result = AbstractLoadBalancer
185: .executeStatementExecuteQueryOnBackend(request,
186: backend, backendThread, c.getConnection(),
187: metadataCache);
188: } catch (Throwable e) {
189: try { // All backends failed, just ignore
190: if (!notifyFailure(backendThread,
191: request.getTimeout() * 1000L, e)) {
192: result = null;
193: return;
194: }
195: } catch (SQLException ignore) {
196: }
197: throw new SQLException(
198: "Request '"
199: + request
200: .getSqlShortForm(ControllerConstants.SQL_SHORT_FORM_LENGTH)
201: + "' failed on backend "
202: + backend.getName() + " (" + e + ")");
203: } finally {
204: cm.releaseConnectionInAutoCommit(request, c);
205: }
206: }
207:
208: private void executeInTransaction(
209: BackendWorkerThread backendThread, DatabaseBackend backend,
210: AbstractConnectionManager cm, Trace logger)
211: throws SQLException {
212: Connection c;
213: long tid = request.getTransactionId();
214:
215: try {
216: c = backend
217: .getConnectionForTransactionAndLazyBeginIfNeeded(
218: request, cm);
219: } catch (UnreachableBackendException ube) {
220: SQLException se = new SQLException("Backend "
221: + backend.getName() + " is no more reachable.");
222: try {
223: notifyFailure(backendThread, -1, se);
224: } catch (SQLException ignore) {
225: }
226: // Disable this backend (it is no more in sync) by killing the backend
227: // thread
228: backendThread.getLoadBalancer().disableBackend(backend,
229: true);
230: String msg = Translate.get(
231: "loadbalancer.backend.disabling.unreachable",
232: backend.getName());
233: logger.error(msg);
234: endUserLogger.error(msg);
235: throw se;
236: } catch (NoTransactionStartWhenDisablingException e) {
237: // Backend is disabling, we do not execute queries except the one in the
238: // transaction we already started. Just notify the completion for the
239: // others.
240: notifyCompletion(backendThread);
241: return;
242: } catch (SQLException e1) {
243: SQLException se = new SQLException(
244: "Unable to get connection for transaction " + tid);
245: try { // All backends failed, just ignore
246: if (!notifyFailure(backendThread,
247: request.getTimeout() * 1000L, se))
248: return;
249: } catch (SQLException ignore) {
250: }
251: // Disable this backend (it is no more in sync) by killing the
252: // backend thread
253: backendThread.getLoadBalancer().disableBackend(backend,
254: true);
255: String msg = "Request '"
256: + request.getSqlShortForm(backend
257: .getSqlShortFormLength())
258: + "' failed on backend " + backend.getName()
259: + " but " + getSuccess() + " succeeded (" + se
260: + ")";
261: logger.error(msg);
262: endUserLogger.error(Translate
263: .get("loadbalancer.backend.disabling", backend
264: .getName()));
265: throw new SQLException(msg);
266: }
267:
268: // Sanity check
269: if (c == null) { // Bad connection
270: SQLException se = new SQLException(
271: "Unable to retrieve connection for transaction "
272: + tid);
273: try { // All backends failed, just ignore
274: if (!notifyFailure(backendThread,
275: request.getTimeout() * 1000L, se))
276: return;
277: } catch (SQLException ignore) {
278: }
279: // Disable this backend (it is no more in sync) by killing the
280: // backend thread
281: backendThread.getLoadBalancer().disableBackend(backend,
282: true);
283: String msg = "Request '"
284: + request.getSqlShortForm(backend
285: .getSqlShortFormLength())
286: + "' failed on backend " + backend.getName()
287: + " but " + getSuccess() + " succeeded (" + se
288: + ")";
289: logger.error(msg);
290: endUserLogger.error(Translate
291: .get("loadbalancer.backend.disabling", backend
292: .getName()));
293: throw new SQLException(msg);
294: }
295:
296: // Execute Query
297: try {
298: result = AbstractLoadBalancer
299: .executeStatementExecuteQueryOnBackend(request,
300: backend, backendThread, c, metadataCache);
301: } catch (Throwable e) {
302: try { // All backends failed, just ignore
303: if (!notifyFailure(backendThread,
304: request.getTimeout() * 1000L, e)) {
305: result = null;
306: return;
307: }
308: } catch (SQLException ignore) {
309: }
310: throw new SQLException(
311: "Request '"
312: + request
313: .getSqlShortForm(ControllerConstants.SQL_SHORT_FORM_LENGTH)
314: + "' failed on backend "
315: + backend.getName() + " (" + e + ")");
316: }
317: }
318:
319: /**
320: * @see org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask#getRequest()
321: */
322: public AbstractRequest getRequest() {
323: return request;
324: }
325:
326: /**
327: * Returns the result.
328: *
329: * @return a <code>ResultSet</code>
330: */
331: public ControllerResultSet getResult() {
332: return result;
333: }
334:
335: /**
336: * @see org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask#getTransactionId()
337: */
338: public long getTransactionId() {
339: return request.getTransactionId();
340: }
341:
342: /**
343: * @see org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask#isAutoCommit()
344: */
345: public boolean isAutoCommit() {
346: return request.isAutoCommit();
347: }
348:
349: /**
350: * @see java.lang.Object#equals(java.lang.Object)
351: */
352: public boolean equals(Object other) {
353: if ((other == null)
354: || !(other instanceof StatementExecuteQueryTask))
355: return false;
356:
357: StatementExecuteQueryTask seqt = (StatementExecuteQueryTask) other;
358: return this .request.equals(seqt.getRequest());
359: }
360:
361: /**
362: * @see java.lang.Object#hashCode()
363: */
364: public int hashCode() {
365: return (int) request.getId();
366: }
367:
368: /**
369: * @see java.lang.Object#toString()
370: */
371: public String toString() {
372: if (request.isAutoCommit())
373: return "Autocommit StatementExecuteQueryTask ("
374: + request.getUniqueKey() + ")";
375: else
376: return "StatementExecuteQueryTask from transaction "
377: + request.getTransactionId() + " ("
378: + request.getUniqueKey() + ")";
379: }
380: }
|