001: /**
002: * Sequoia: Database clustering technology.
003: * Copyright (C) 2005-2006 Continuent, Inc.
004: * Contact: sequoia@continuent.org
005: *
006: * Licensed under the Apache License, Version 2.0 (the "License");
007: * you may not use this file except in compliance with the License.
008: * You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing, software
013: * distributed under the License is distributed on an "AS IS" BASIS,
014: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015: * See the License for the specific language governing permissions and
016: * limitations under the License.
017: *
018: * Initial developer(s): Emmanuel Cecchet.
019: * Contributor(s): ______________________.
020: */package org.continuent.sequoia.controller.virtualdatabase.protocol;
021:
022: import java.io.Serializable;
023: import java.sql.SQLException;
024: import java.util.LinkedList;
025:
026: import org.continuent.sequoia.common.exceptions.NoMoreBackendException;
027: import org.continuent.sequoia.common.i18n.Translate;
028: import org.continuent.sequoia.common.sql.schema.DatabaseProcedureSemantic;
029: import org.continuent.sequoia.controller.loadbalancer.AllBackendsFailedException;
030: import org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager;
031: import org.continuent.sequoia.controller.requests.StoredProcedure;
032: import org.continuent.sequoia.controller.requests.StoredProcedureCallResult;
033:
034: /**
035: * This class defines a distributed call to CallableStatement.execute()
036: *
037: * @author <a href="mailto:emmanuel.cecchet@continuent.com">Emmanuel Cecchet
038: * </a>
039: * @version 1.0
040: */
041: public class DistributedCallableStatementExecute extends
042: DistributedRequest {
043: private static final long serialVersionUID = 8634424524848530342L;
044:
045: /**
046: * Creates a new <code>DistributedCallableStatementExecute</code> object to
047: * execute a stored procedure on multiple controllers.
048: *
049: * @param proc the stored procedure to execute
050: */
051: public DistributedCallableStatementExecute(StoredProcedure proc) {
052: super (proc);
053: }
054:
055: /**
056: * @see org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRequest#scheduleRequest(org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager)
057: */
058: public Object scheduleRequest(DistributedRequestManager drm)
059: throws SQLException {
060: LinkedList totalOrderQueue = drm.getVirtualDatabase()
061: .getTotalOrderQueue();
062: synchronized (totalOrderQueue) {
063: totalOrderQueue.addLast(request);
064: }
065: return request;
066: }
067:
068: /**
069: * @see org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRequest#executeScheduledRequest(org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager)
070: */
071: public Serializable executeScheduledRequest(
072: DistributedRequestManager drm) throws SQLException {
073: Serializable result = null;
074: StoredProcedure proc = (StoredProcedure) request;
075: boolean hasBeenScheduled = false;
076: try {
077: drm.getLoadBalancer().waitForSuspendWritesToComplete(
078: request);
079:
080: // Even if we do not execute this query, we have to log the begin if any
081: drm.lazyTransactionStart(request);
082: drm.scheduleStoredProcedure(proc);
083: hasBeenScheduled = true;
084:
085: if (drm.getLogger().isDebugEnabled())
086: drm
087: .getLogger()
088: .debug(
089: Translate
090: .get(
091: "requestmanager.read.stored.procedure",
092: new String[] {
093: String
094: .valueOf(request
095: .getId()),
096: request
097: .getSqlShortForm(drm
098: .getVirtualDatabase()
099: .getSqlShortFormLength()) }));
100:
101: DatabaseProcedureSemantic semantic = proc.getSemantic();
102: if (proc.isReadOnly()
103: || ((semantic != null) && (semantic.isReadOnly()))) {
104: // Need to remove the query from the total order queue because
105: // loadBalancer.readOnlyCallableStatementExecute does not do it.
106: // Fixes SEQUOIA-413
107: drm.getLoadBalancer()
108: .removeObjectFromAndNotifyTotalOrderQueue(
109: request);
110: }
111:
112: result = new StoredProcedureCallResult(proc, drm
113: .loadBalanceCallableStatementExecute(proc));
114:
115: if (drm.storeRequestResult(request, result))
116: result = DistributedRequestManager.SUCCESSFUL_COMPLETION;
117:
118: drm.updateRecoveryLogFlushCacheAndRefreshSchema(proc);
119:
120: // Notify scheduler of completion
121: drm.getScheduler().storedProcedureCompleted(proc);
122:
123: return result;
124: } catch (NoMoreBackendException e) {
125: if (drm.getLogger().isDebugEnabled())
126: drm
127: .getLogger()
128: .debug(
129: Translate
130: .get(
131: "virtualdatabase.distributed.read.procedure.logging.only",
132: request
133: .getSqlShortForm(drm
134: .getVirtualDatabase()
135: .getSqlShortFormLength())));
136:
137: DatabaseProcedureSemantic semantic = proc.getSemantic();
138: if (proc.isReadOnly()
139: || ((semantic != null) && (semantic.isReadOnly()))) {
140: // Need to remove the query from the total order queue because
141: // loadBalancer.readOnlyCallableStatementExecute does not do it.
142: // Fixes SEQUOIA-413
143: drm.getLoadBalancer()
144: .removeObjectFromAndNotifyTotalOrderQueue(
145: request);
146: }
147:
148: // Add to failed list, the scheduler will be notified when the response
149: // will be received from the other controllers.
150: drm.addFailedOnAllBackends(request, hasBeenScheduled);
151: return e;
152: } catch (AllBackendsFailedException e) {
153: // Add to failed list, the scheduler will be notified when the response
154: // will be received from the other controllers.
155: drm.addFailedOnAllBackends(request, hasBeenScheduled);
156: if (drm.getLogger().isDebugEnabled())
157: drm
158: .getLogger()
159: .debug(
160: Translate
161: .get(
162: "virtualdatabase.distributed.read.procedure.all.backends.locally.failed",
163: request
164: .getSqlShortForm(drm
165: .getVirtualDatabase()
166: .getSqlShortFormLength())));
167: return e;
168: } catch (SQLException e) {
169: // Something bad more likely happened during the notification. Let's
170: // notify the scheduler (possibly again) to be safer.
171: drm.addFailedOnAllBackends(request, hasBeenScheduled);
172: drm
173: .getLogger()
174: .warn(
175: Translate
176: .get(
177: "virtualdatabase.distributed.read.procedure.sqlexception",
178: e.getMessage()), e);
179: return e;
180: } catch (RuntimeException re) {
181: // Something bad more likely happened during the notification. Let's
182: // notify the scheduler (possibly again) to be safer.
183: drm.addFailedOnAllBackends(request, hasBeenScheduled);
184: drm
185: .getLogger()
186: .warn(
187: Translate
188: .get(
189: "virtualdatabase.distributed.read.procedure.exception",
190: re.getMessage()), re);
191: return new SQLException(re.getMessage());
192: }
193: }
194:
195: /**
196: * @see java.lang.Object#toString()
197: */
198: public String toString() {
199: return "E " + request.getId() + " "
200: + request.getTransactionId() + " "
201: + request.getUniqueKey();
202: }
203:
204: }
|