001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: */
016: package org.apache.catalina.tribes.group.interceptors;
017:
018: import java.util.concurrent.LinkedBlockingQueue;
019: import java.util.concurrent.ThreadPoolExecutor;
020: import java.util.concurrent.atomic.AtomicLong;
021:
022: import org.apache.catalina.tribes.ChannelMessage;
023: import org.apache.catalina.tribes.Member;
024: import org.apache.catalina.tribes.group.InterceptorPayload;
025: import org.apache.catalina.tribes.transport.bio.util.LinkObject;
026: import java.util.concurrent.TimeUnit;
027:
028: /**
029: *
030: * Same implementation as the MessageDispatchInterceptor
031: * except is ues an atomic long for the currentSize calculation
032: * and uses a thread pool for message sending.
033: *
034: * @author Filip Hanik
035: * @version 1.0
036: */
037:
038: public class MessageDispatch15Interceptor extends
039: MessageDispatchInterceptor {
040:
041: protected AtomicLong currentSize = new AtomicLong(0);
042: protected ThreadPoolExecutor executor = null;
043: protected int maxThreads = 10;
044: protected int maxSpareThreads = 2;
045: protected long keepAliveTime = 5000;
046: protected LinkedBlockingQueue<Runnable> runnablequeue = new LinkedBlockingQueue<Runnable>();
047:
048: public long getCurrentSize() {
049: return currentSize.get();
050: }
051:
052: public long addAndGetCurrentSize(long inc) {
053: return currentSize.addAndGet(inc);
054: }
055:
056: public long setAndGetCurrentSize(long value) {
057: currentSize.set(value);
058: return value;
059: }
060:
061: public boolean addToQueue(ChannelMessage msg, Member[] destination,
062: InterceptorPayload payload) {
063: final LinkObject obj = new LinkObject(msg, destination, payload);
064: Runnable r = new Runnable() {
065: public void run() {
066: sendAsyncData(obj);
067: }
068: };
069: executor.execute(r);
070: return true;
071: }
072:
073: public LinkObject removeFromQueue() {
074: return null; //not used, thread pool contains its own queue.
075: }
076:
077: public void startQueue() {
078: if (run)
079: return;
080: executor = new ThreadPoolExecutor(maxSpareThreads, maxThreads,
081: keepAliveTime, TimeUnit.MILLISECONDS, runnablequeue);
082: run = true;
083: }
084:
085: public void stopQueue() {
086: run = false;
087: executor.shutdownNow();
088: setAndGetCurrentSize(0);
089: runnablequeue.clear();
090: }
091:
092: public long getKeepAliveTime() {
093: return keepAliveTime;
094: }
095:
096: public int getMaxSpareThreads() {
097: return maxSpareThreads;
098: }
099:
100: public int getMaxThreads() {
101: return maxThreads;
102: }
103:
104: public void setKeepAliveTime(long keepAliveTime) {
105: this .keepAliveTime = keepAliveTime;
106: }
107:
108: public void setMaxSpareThreads(int maxSpareThreads) {
109: this .maxSpareThreads = maxSpareThreads;
110: }
111:
112: public void setMaxThreads(int maxThreads) {
113: this.maxThreads = maxThreads;
114: }
115:
116: }
|