001: /**
002: * Sequoia: Database clustering technology.
003: * Copyright (C) 2002-2004 French National Institute For Research In Computer
004: * Science And Control (INRIA).
005: * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
006: * Copyright (C) 2005-2006 Continuent, Inc.
007: * Contact: sequoia@continuent.org
008: *
009: * Licensed under the Apache License, Version 2.0 (the "License");
010: * you may not use this file except in compliance with the License.
011: * You may obtain a copy of the License at
012: *
013: * http://www.apache.org/licenses/LICENSE-2.0
014: *
015: * Unless required by applicable law or agreed to in writing, software
016: * distributed under the License is distributed on an "AS IS" BASIS,
017: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018: * See the License for the specific language governing permissions and
019: * limitations under the License.
020: *
021: * Initial developer(s): Emmanuel Cecchet.
022: * Contributor(s): ______________________.
023: */package org.continuent.sequoia.controller.virtualdatabase.protocol;
024:
025: import java.io.Serializable;
026: import java.sql.SQLException;
027: import java.util.LinkedList;
028:
029: import org.continuent.sequoia.common.exceptions.NoMoreBackendException;
030: import org.continuent.sequoia.common.i18n.Translate;
031: import org.continuent.sequoia.common.sql.schema.DatabaseProcedureSemantic;
032: import org.continuent.sequoia.controller.loadbalancer.AllBackendsFailedException;
033: import org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager;
034: import org.continuent.sequoia.controller.requests.StoredProcedure;
035: import org.continuent.sequoia.controller.requests.StoredProcedureCallResult;
036:
037: /**
038: * This class defines a distributed call to CallableStatement.executeQuery()
039: *
040: * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
041: * @version 1.0
042: */
043: public class DistributedCallableStatementExecuteQuery extends
044: DistributedRequest {
045: private static final long serialVersionUID = 8634424524848530342L;
046:
047: /**
048: * Creates a new <code>CallableStatementExecuteQuery</code> object to
049: * execute a read stored procedure on multiple controllers.
050: *
051: * @param proc the stored procedure to execute
052: */
053: public DistributedCallableStatementExecuteQuery(StoredProcedure proc) {
054: super (proc);
055: }
056:
057: /**
058: * @see org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRequest#scheduleRequest(org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager)
059: */
060: public Object scheduleRequest(DistributedRequestManager drm)
061: throws SQLException {
062: LinkedList totalOrderQueue = drm.getVirtualDatabase()
063: .getTotalOrderQueue();
064: synchronized (totalOrderQueue) {
065: totalOrderQueue.addLast(request);
066: }
067: return request;
068: }
069:
070: /**
071: * @see org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRequest#executeScheduledRequest(org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager)
072: */
073: public Serializable executeScheduledRequest(
074: DistributedRequestManager drm) throws SQLException {
075: Serializable result = null;
076: StoredProcedure proc = (StoredProcedure) request;
077: boolean hasBeenScheduled = false;
078: try {
079: drm.getLoadBalancer().waitForSuspendWritesToComplete(
080: request);
081:
082: // Even if we do not execute this query, we have to log the begin if any
083: drm.lazyTransactionStart(request);
084: drm.scheduleStoredProcedure((StoredProcedure) request);
085: hasBeenScheduled = true;
086:
087: if (drm.getLogger().isDebugEnabled())
088: drm
089: .getLogger()
090: .debug(
091: Translate
092: .get(
093: "requestmanager.read.stored.procedure",
094: new String[] {
095: String
096: .valueOf(request
097: .getId()),
098: request
099: .getSqlShortForm(drm
100: .getVirtualDatabase()
101: .getSqlShortFormLength()) }));
102:
103: DatabaseProcedureSemantic semantic = proc.getSemantic();
104: if (proc.isReadOnly()
105: || ((semantic != null) && (semantic.isReadOnly()))) {
106: // Need to remove the query from the total order queue because
107: // loadBalancer.readOnlyCallableStatementExecuteQuery does not do it.
108: // Fixes SEQUOIA-413
109: drm.getLoadBalancer()
110: .removeObjectFromAndNotifyTotalOrderQueue(
111: request);
112: }
113:
114: result = new StoredProcedureCallResult(proc, drm
115: .loadBalanceCallableStatementExecuteQuery(proc));
116:
117: if (drm.storeRequestResult(request, result))
118: result = DistributedRequestManager.SUCCESSFUL_COMPLETION;
119:
120: drm.updateRecoveryLogFlushCacheAndRefreshSchema(proc);
121:
122: // Notify scheduler of completion
123: drm.getScheduler().storedProcedureCompleted(proc);
124:
125: return result;
126: } catch (NoMoreBackendException e) {
127: if (drm.getLogger().isDebugEnabled())
128: drm
129: .getLogger()
130: .debug(
131: Translate
132: .get(
133: "virtualdatabase.distributed.read.procedure.logging.only",
134: request
135: .getSqlShortForm(drm
136: .getVirtualDatabase()
137: .getSqlShortFormLength())));
138:
139: DatabaseProcedureSemantic semantic = proc.getSemantic();
140: if (proc.isReadOnly()
141: || ((semantic != null) && (semantic.isReadOnly()))) {
142: // Need to remove the query from the total order queue because
143: // loadBalancer.readOnlyCallableStatementExecuteQuery does not do it.
144: // Fixes SEQUOIA-413
145: drm.getLoadBalancer()
146: .removeObjectFromAndNotifyTotalOrderQueue(
147: request);
148: }
149:
150: // Add to failed list, the scheduler will be notified when the response
151: // will be received from the other controllers.
152: drm.addFailedOnAllBackends(request, hasBeenScheduled);
153: throw e;
154: } catch (AllBackendsFailedException e) {
155: // Add to failed list, the scheduler will be notified when the response
156: // will be received from the other controllers.
157: drm.addFailedOnAllBackends(request, hasBeenScheduled);
158: if (drm.getLogger().isDebugEnabled())
159: drm
160: .getLogger()
161: .debug(
162: Translate
163: .get(
164: "virtualdatabase.distributed.read.procedure.all.backends.locally.failed",
165: request
166: .getSqlShortForm(drm
167: .getVirtualDatabase()
168: .getSqlShortFormLength())));
169: return e;
170: } catch (SQLException e) {
171: // Add to failed list, the scheduler will be notified when the response
172: // will be received from the other controllers.
173: drm.addFailedOnAllBackends(request, hasBeenScheduled);
174: drm
175: .getLogger()
176: .warn(
177: Translate
178: .get(
179: "virtualdatabase.distributed.read.procedure.sqlexception",
180: e.getMessage()), e);
181: return e;
182: } catch (RuntimeException re) {
183: // Something bad more likely happened during the notification.
184: // Add to failed list, the scheduler will be notified when the response
185: // will be received from the other controllers.
186: drm.addFailedOnAllBackends(request, hasBeenScheduled);
187: drm
188: .getLogger()
189: .warn(
190: Translate
191: .get(
192: "virtualdatabase.distributed.read.procedure.exception",
193: re.getMessage()), re);
194: return new SQLException(re.getMessage());
195: }
196: }
197:
198: /**
199: * @see java.lang.Object#toString()
200: */
201: public String toString() {
202: return "S " + request.getId() + " "
203: + request.getTransactionId() + " "
204: + request.getUniqueKey();
205: }
206:
207: }
|