001: /**
002: * Sequoia: Database clustering technology.
003: * Copyright (C) 2002-2004 French National Institute For Research In Computer
004: * Science And Control (INRIA).
005: * Contact: sequoia@continuent.org
006: *
007: * Licensed under the Apache License, Version 2.0 (the "License");
008: * you may not use this file except in compliance with the License.
009: * You may obtain a copy of the License at
010: *
011: * http://www.apache.org/licenses/LICENSE-2.0
012: *
013: * Unless required by applicable law or agreed to in writing, software
014: * distributed under the License is distributed on an "AS IS" BASIS,
015: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016: * See the License for the specific language governing permissions and
017: * limitations under the License.
018: *
019: * Initial developer(s): Emmanuel Cecchet.
020: * Contributor(s):
021: */package org.continuent.sequoia.controller.loadbalancer;
022:
023: import java.sql.SQLException;
024: import java.sql.Statement;
025:
026: import org.continuent.sequoia.common.i18n.Translate;
027: import org.continuent.sequoia.common.log.Trace;
028: import org.continuent.sequoia.controller.backend.DatabaseBackend;
029: import org.continuent.sequoia.controller.loadbalancer.tasks.KillThreadTask;
030: import org.continuent.sequoia.controller.loadbalancer.tasks.RollbackTask;
031: import org.continuent.sequoia.controller.virtualdatabase.activity.ActivityService;
032:
033: /**
034: * Process sequentially a set of tasks and send them to a backend.
035: *
036: * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
037: * @version 1.0
038: */
039: public class BackendWorkerThread extends Thread {
040: //
041: // How the code is organized ?
042: // 1. Member variables
043: // 2. Constructor(s)
044: // 3. Task management
045: // 4. Getter/Setters
046: //
047:
048: private AbstractLoadBalancer loadBalancer;
049: private DatabaseBackend backend;
050: private boolean isKilled = false;
051:
052: private Trace logger = null;
053: private BackendTaskQueues queues;
054: private Statement currentStatement;
055:
056: /** True if this thread only plays commit/rollback tasks */
057: private boolean playingCommitRollbackOnly = false;
058:
059: /*
060: * Constructor
061: */
062:
063: /**
064: * Creates a new <code>BackendWorkerThread</code>.
065: *
066: * @param backend the backend this thread is associated to.
067: * @param loadBalancer the load balancer instanciating this thread
068: */
069: public BackendWorkerThread(DatabaseBackend backend,
070: AbstractLoadBalancer loadBalancer) {
071: this (loadBalancer.vdb.getVirtualDatabaseName()
072: + " - BackendWorkerThread for backend '"
073: + backend.getName() + "' with RAIDb level:"
074: + loadBalancer.getRAIDbLevel(), backend, loadBalancer);
075: }
076:
077: /**
078: * Creates a new <code>BackendWorkerThread</code>.
079: *
080: * @param name the name to give to the thread
081: * @param backend the backend this thread is associated to.
082: * @param loadBalancer the load balancer instanciating this thread
083: */
084: public BackendWorkerThread(String name, DatabaseBackend backend,
085: AbstractLoadBalancer loadBalancer) {
086: super (name);
087: // Sanity checks
088: if (backend == null) {
089: String msg = Translate
090: .get("backendworkerthread.null.backend");
091: logger = Trace
092: .getLogger("org.continuent.sequoia.controller.backend.DatabaseBackend");
093: logger.error(msg);
094: throw new NullPointerException(msg);
095: }
096:
097: logger = Trace
098: .getLogger("org.continuent.sequoia.controller.backend.DatabaseBackend."
099: + backend.getName());
100:
101: if (loadBalancer == null) {
102: String msg = Translate
103: .get("backendworkerthread.null.loadbalancer");
104: logger.error(msg);
105: throw new NullPointerException(msg);
106: }
107:
108: this .backend = backend;
109: this .loadBalancer = loadBalancer;
110: this .queues = backend.getTaskQueues();
111: }
112:
113: /**
114: * Kills this thread after the next task processing and forces the load
115: * balancer to disable the backend. It also marks all remaining tasks in the
116: * task list as failed.
117: */
118: public void killAndForceDisable() {
119: kill(true);
120: }
121:
122: /**
123: * Kills this thread after the next task processing. It also disables the
124: * backend.
125: */
126: public void killWithoutDisablingBackend() {
127: kill(false);
128: }
129:
130: /**
131: * Kills this thread after the next task processing. It also marks all
132: * remaining tasks in the task list as failed.
133: *
134: * @param forceDisable true if the task must call the load balancer to disable
135: * the backend
136: */
137: private void kill(boolean forceDisable) {
138: synchronized (this ) {
139: if (logger.isDebugEnabled())
140: logger.debug(this .getName() + " is shutting down");
141:
142: isKilled = true;
143: notify(); // Wake up thread
144: }
145: if (forceDisable) {
146: try {
147: // This ensure that all worker threads get removed from the load
148: // balancer list and that the backend state is set to disable.
149: loadBalancer.disableBackend(backend, true);
150: } catch (SQLException ignore) {
151: }
152: }
153: }
154:
155: /**
156: * Process the tasklist and call <code>wait()</code> (on itself) when the
157: * tasklist becomes empty.
158: */
159: public void run() {
160: BackendTaskQueueEntry currentlyProcessingEntry = null;
161:
162: while (!isKilled) {
163: try {
164: // Take the next available task
165: if (isPlayingCommitRollbackOnly())
166: currentlyProcessingEntry = queues
167: .getNextCommitRollbackToExecute(this );
168: else
169: currentlyProcessingEntry = queues
170: .getNextEntryToExecute(this );
171:
172: if (currentlyProcessingEntry == null) {
173: logger.warn("Null task in BackendWorkerThread");
174: continue;
175: }
176:
177: // Execute the tasks
178: if (logger.isDebugEnabled())
179: logger.debug(Translate.get(
180: "backendworkerthread.execute.task",
181: currentlyProcessingEntry.getTask()
182: .toString()));
183: currentlyProcessingEntry.getTask().execute(this );
184: } catch (SQLException e) {
185: // Task should have notified of failure
186: logger.warn(Translate.get(
187: "backendworkerthread.task.failed", e));
188: } catch (Throwable re) {
189: logger
190: .fatal(
191: Translate
192: .get(
193: "backendworkerthread.task.unexpected.throwable",
194: currentlyProcessingEntry == null ? "null task"
195: : currentlyProcessingEntry
196: .toString()),
197: re);
198:
199: // We can't know for sure if the task has notified the failure or not.
200: // To prevent a deadlock, we force the failure notification here.
201: try {
202: currentlyProcessingEntry.getTask().notifyFailure(
203: this , 1, new SQLException(re.getMessage()));
204: } catch (SQLException e1) {
205: // just notify
206: }
207: } finally {
208: // do not treat "rollback" or "kill thread" tasks
209: // as activity reaching the backend
210: if (currentlyProcessingEntry != null
211: && !(currentlyProcessingEntry.getTask() instanceof RollbackTask)
212: && !(currentlyProcessingEntry.getTask() instanceof KillThreadTask)) {
213: ActivityService.getInstance().notifyActivityFor(
214: backend.getVirtualDatabaseName());
215: }
216: setCurrentStatement(null);
217: try {
218: if (currentlyProcessingEntry != null) {
219: queues
220: .completedEntryExecution(currentlyProcessingEntry);
221: if (logger.isDebugEnabled())
222: logger
223: .debug(Translate
224: .get(
225: "backendworkerthread.execute.task.completed",
226: currentlyProcessingEntry
227: .getTask()
228: .toString()));
229: }
230: } catch (RuntimeException e) {
231: logger
232: .warn(
233: Translate
234: .get(
235: "backendworkerthread.remove.task.error",
236: e), e);
237: }
238: // Trying to speed-up GC on potentially large request objects
239: // referenced by this task (think BLOBs).
240: currentlyProcessingEntry = null;
241: }
242: } // end while (!isKilled)
243: }
244:
245: /*
246: * Getter/Setter
247: */
248:
249: /**
250: * Returns the backend.
251: *
252: * @return a <code>DatabaseBackend</code> instance
253: */
254: public DatabaseBackend getBackend() {
255: return backend;
256: }
257:
258: /**
259: * Returns the currentStatement value.
260: *
261: * @return Returns the currentStatement.
262: */
263: public final Statement getCurrentStatement() {
264: return currentStatement;
265: }
266:
267: /**
268: * Sets the currentStatement value.
269: *
270: * @param currentStatement The currentStatement to set.
271: */
272: public final void setCurrentStatement(Statement currentStatement) {
273: this .currentStatement = currentStatement;
274: }
275:
276: /**
277: * Returns the loadBalancer value.
278: *
279: * @return Returns the loadBalancer.
280: */
281: public final AbstractLoadBalancer getLoadBalancer() {
282: return loadBalancer;
283: }
284:
285: /**
286: * Returns the logger for tracing.
287: *
288: * @return a <code>Trace</code> instance
289: */
290: public Trace getLogger() {
291: return logger;
292: }
293:
294: /**
295: * Returns the playCommitRollbackOnly value.
296: *
297: * @return Returns the playCommitRollbackOnly.
298: */
299: public boolean isPlayingCommitRollbackOnly() {
300: return playingCommitRollbackOnly;
301: }
302:
303: /**
304: * Sets the playCommitRollbackOnly value.
305: *
306: * @param playCommitRollbackOnly The playCommitRollbackOnly to set.
307: */
308: public void setPlayCommitRollbackOnly(boolean playCommitRollbackOnly) {
309: this .playingCommitRollbackOnly = playCommitRollbackOnly;
310: }
311:
312: /**
313: * @see java.lang.Object#equals(java.lang.Object)
314: */
315: public boolean equals(Object obj) {
316: if (obj instanceof BackendWorkerThread) {
317: return backend.equals(((BackendWorkerThread) obj)
318: .getBackend());
319: }
320: return false;
321: }
322:
323: /**
324: * @see java.lang.Object#hashCode()
325: */
326: public int hashCode() {
327: return backend.hashCode();
328: }
329:
330: }
|