001: /*
002: * Copyright 2005 Joe Walker
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016: package org.directwebremoting.contrib;
017:
018: import org.apache.commons.logging.LogFactory;
019: import org.apache.commons.logging.Log;
020: import org.directwebremoting.extend.Call;
021: import org.directwebremoting.extend.Calls;
022: import org.directwebremoting.extend.Replies;
023: import org.directwebremoting.extend.Reply;
024: import org.directwebremoting.impl.DefaultRemoter;
025:
026: import java.util.concurrent.Callable;
027: import java.util.concurrent.ExecutionException;
028: import java.util.concurrent.Executors;
029: import java.util.concurrent.Future;
030: import java.util.concurrent.ThreadPoolExecutor;
031: import java.util.concurrent.TimeUnit;
032: import java.util.concurrent.TimeoutException;
033:
034: /**
035: * This implementation is not officially supported, and may be removed
036: * at some point in the future.
037: * Remoter implementation executing in parallel a group of remote calls.
038: * @author <a href="mailto:chussenet@yahoo.com">Claude Hussenet</a>
039: */
040: public class ParallelDefaultRemoter extends DefaultRemoter {
041: /**
042: * Initialize thread pool with :
043: * Core pool size : 10;
044: * Maximum pool size = 100;
045: * Keep alive time = 5000(ms);
046: * Timeout = 10000(ms);
047: */
048: public ParallelDefaultRemoter() {
049: executorService = (ThreadPoolExecutor) Executors
050: .newCachedThreadPool();
051: executorService.setCorePoolSize(corePoolsize);
052: executorService.setMaximumPoolSize(maximumPoolsize);
053: executorService.setKeepAliveTime(keepAliveTime,
054: TimeUnit.MILLISECONDS);
055: }
056:
057: /**
058: * Sets the maximum time to wait in (ms)
059: * @param timeout Time in (ms)
060: */
061: public void setParallelDefaultRemoterTimeout(long timeout) {
062: this .timeout = timeout;
063: }
064:
065: /**
066: * Sets the core number of threads.
067: * @param corePoolsize How many threads do we use
068: */
069: public void setParallelDefaultRemoterCorePoolsize(int corePoolsize) {
070: this .corePoolsize = corePoolsize;
071: executorService.setCorePoolSize(corePoolsize);
072: }
073:
074: /**
075: * Sets the maximum allowed number of threads.
076: * @param maximumPoolsize Maximum of threads
077: */
078: public void setParallelDefaultRemoterMaximumPoolsize(
079: int maximumPoolsize) {
080: this .maximumPoolsize = maximumPoolsize;
081: executorService.setMaximumPoolSize(maximumPoolsize);
082: }
083:
084: /**
085: * Sets the time limit in (ms) for which threads may remain idle before being
086: * terminated.
087: * @param keepAliveTime Time in (ms)
088: */
089: public void setParallelDefaultRemoterKeepAliveTime(
090: long keepAliveTime) {
091: this .keepAliveTime = keepAliveTime;
092: executorService.setKeepAliveTime(keepAliveTime,
093: TimeUnit.MILLISECONDS);
094: }
095:
096: /**
097: * Execute a set of remote calls in parallel and generate set of reply data
098: * for later conversion to whatever wire protocol we are using today.
099: * @param calls The set of calls to execute in parallel
100: * @return A set of reply data objects
101: */
102: @Override
103: public Replies execute(Calls calls) {
104: Replies replies = new Replies(calls.getBatchId());
105: Future<?>[] future = new Future<?>[calls.getCallCount()];
106:
107: if (calls.getCallCount() == 1) {
108: return super .execute(calls);
109: } else {
110: for (int callNum = 0; callNum < calls.getCallCount(); callNum++) {
111: Call call = calls.getCall(callNum);
112: future[callNum] = executorService
113: .submit(new CallCallable(call));
114: }
115: for (int callNum = 0; callNum < calls.getCallCount(); callNum++) {
116: try {
117: Reply reply = (Reply) future[callNum].get(timeout,
118: TimeUnit.MILLISECONDS);
119: replies.addReply(reply);
120: } catch (InterruptedException ex) {
121: log.warn("Method execution failed: ", ex);
122: replies.addReply(new Reply(calls.getCall(callNum)
123: .getCallId(), null, ex));
124: } catch (ExecutionException ex) {
125: log.warn("Method execution failed: ", ex);
126: replies.addReply(new Reply(calls.getCall(callNum)
127: .getCallId(), null, ex));
128: } catch (TimeoutException ex) {
129: log.warn("Method execution failed: ", ex);
130: replies.addReply(new Reply(calls.getCall(callNum)
131: .getCallId(), null, ex));
132: }
133: }
134: return replies;
135: }
136: }
137:
138: /**
139: * An implementation of Callable that uses a DWR Call object.
140: */
141: private class CallCallable implements Callable<Reply> {
142: /**
143: * @param call The call to execute
144: */
145: public CallCallable(Call call) {
146: this .call = call;
147: }
148:
149: public Reply call() {
150: return execute(call);
151: }
152:
153: private final Call call;
154: }
155:
156: private static final Log log = LogFactory
157: .getLog(ParallelDefaultRemoter.class);
158:
159: private int corePoolsize = 10;
160:
161: private int maximumPoolsize = 100;
162:
163: private long keepAliveTime = 5000;
164:
165: private long timeout = 10000;
166:
167: private final ThreadPoolExecutor executorService;
168: }
|