001: /*
002: * Distributed as part of c3p0 v.0.9.1.2
003: *
004: * Copyright (C) 2005 Machinery For Change, Inc.
005: *
006: * Author: Steve Waldman <swaldman@mchange.com>
007: *
008: * This library is free software; you can redistribute it and/or modify
009: * it under the terms of the GNU Lesser General Public License version 2.1, as
010: * published by the Free Software Foundation.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
015: * GNU Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public License
018: * along with this software; see the file LICENSE. If not, write to the
019: * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
020: * Boston, MA 02111-1307, USA.
021: */
022:
023: package com.mchange.v2.async;
024:
025: import com.mchange.v2.log.*;
026: import com.mchange.v2.util.ResourceClosedException;
027:
028: /**
029: * A class that provides for effecient asynchronous execution
030: * of multiple tasks that may block, but that do not contend
031: * for the same locks. The order in which tasks will be executed
032: * is not guaranteed.
033: */
034: public class RoundRobinAsynchronousRunner implements
035: AsynchronousRunner, Queuable {
036: private final static MLogger logger = MLog
037: .getLogger(RoundRobinAsynchronousRunner.class);
038:
039: //MT: unchanging, individual elements are thread-safe
040: final RunnableQueue[] rqs;
041:
042: //MT: protected by this' lock
043: int task_turn = 0;
044:
045: //MT: protected by this' lock
046: int view_turn = 0;
047:
048: public RoundRobinAsynchronousRunner(int num_threads, boolean daemon) {
049: this .rqs = new RunnableQueue[num_threads];
050: for (int i = 0; i < num_threads; ++i)
051: rqs[i] = new CarefulRunnableQueue(daemon, false);
052: }
053:
054: public synchronized void postRunnable(Runnable r) {
055: try {
056: int index = task_turn;
057: task_turn = (task_turn + 1) % rqs.length;
058: rqs[index].postRunnable(r);
059:
060: /* we do this "long-hand" to avoid bad fragility if an exception */
061: /* occurs in postRunnable, causing the mod step of the original */
062: /* concise code to get skipped, and leading (if */
063: /* task_turn == rqs.length - 1 when the exception occurs) to an */
064: /* endless cascade of ArrayIndexOutOfBoundsExceptions. */
065: /* we might alternatively have just put the mod step into a */
066: /* finally block, but that's too fancy. */
067: /* thanks to Travis Reeder for reporting this problem. */
068:
069: //rqs[task_turn++].postRunnable( r );
070: //task_turn %= rqs.length;
071: } catch (NullPointerException e) {
072: //e.printStackTrace();
073: if (Debug.DEBUG) {
074: if (logger.isLoggable(MLevel.FINE))
075: logger
076: .log(
077: MLevel.FINE,
078: "NullPointerException while posting Runnable -- Probably we're closed.",
079: e);
080: }
081: this .close(true);
082: throw new ResourceClosedException(
083: "Attempted to use a RoundRobinAsynchronousRunner in a closed or broken state.");
084: }
085: }
086:
087: public synchronized RunnableQueue asRunnableQueue() {
088: try {
089: int index = view_turn;
090: view_turn = (view_turn + 1) % rqs.length;
091: return new RunnableQueueView(index);
092:
093: /* same explanation as above */
094:
095: //RunnableQueue out = new RunnableQueueView( view_turn++ );
096: //view_turn %= rqs.length;
097: //return out;
098: } catch (NullPointerException e) {
099: //e.printStackTrace();
100: if (Debug.DEBUG) {
101: if (logger.isLoggable(MLevel.FINE))
102: logger
103: .log(
104: MLevel.FINE,
105: "NullPointerException in asRunnableQueue() -- Probably we're closed.",
106: e);
107: }
108: this .close(true);
109: throw new ResourceClosedException(
110: "Attempted to use a RoundRobinAsynchronousRunner in a closed or broken state.");
111: }
112: }
113:
114: public synchronized void close(boolean skip_remaining_tasks) {
115: for (int i = 0, len = rqs.length; i < len; ++i) {
116: attemptClose(rqs[i], skip_remaining_tasks);
117: rqs[i] = null;
118: }
119: }
120:
121: public void close() {
122: close(true);
123: }
124:
125: static void attemptClose(RunnableQueue rq,
126: boolean skip_remaining_tasks) {
127: try {
128: rq.close(skip_remaining_tasks);
129: } catch (Exception e) {
130: //e.printStackTrace();
131: if (logger.isLoggable(MLevel.WARNING))
132: logger.log(MLevel.WARNING,
133: "RunnableQueue close FAILED.", e);
134: }
135: }
136:
137: class RunnableQueueView implements RunnableQueue {
138: final int rq_num;
139:
140: RunnableQueueView(int rq_num) {
141: this .rq_num = rq_num;
142: }
143:
144: public void postRunnable(Runnable r) {
145: rqs[rq_num].postRunnable(r);
146: }
147:
148: public void close(boolean skip_remaining_tasks) {
149: }
150:
151: public void close() { /* ignore */
152: }
153: }
154: }
|