001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: */
016:
017: package org.apache.catalina.tribes.group.interceptors;
018:
019: import org.apache.catalina.tribes.Channel;
020: import org.apache.catalina.tribes.ChannelException;
021: import org.apache.catalina.tribes.ChannelMessage;
022: import org.apache.catalina.tribes.Member;
023: import org.apache.catalina.tribes.group.ChannelInterceptorBase;
024: import org.apache.catalina.tribes.group.InterceptorPayload;
025: import org.apache.catalina.tribes.transport.bio.util.FastQueue;
026: import org.apache.catalina.tribes.transport.bio.util.LinkObject;
027: import org.apache.catalina.tribes.UniqueId;
028:
029: /**
030: *
031: * The message dispatcher is a way to enable asynchronous communication
032: * through a channel. The dispatcher will look for the <code>Channel.SEND_OPTIONS_ASYNCHRONOUS</code>
033: * flag to be set, if it is, it will queue the message for delivery and immediately return to the sender.
034: *
035: *
036: *
037: * @author Filip Hanik
038: * @version 1.0
039: */
040: public class MessageDispatchInterceptor extends ChannelInterceptorBase
041: implements Runnable {
042: protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory
043: .getLog(MessageDispatchInterceptor.class);
044:
045: protected long maxQueueSize = 1024 * 1024 * 64; //64MB
046: protected FastQueue queue = new FastQueue();
047: protected boolean run = false;
048: protected Thread msgDispatchThread = null;
049: protected long currentSize = 0;
050: protected boolean useDeepClone = true;
051: protected boolean alwaysSend = true;
052:
053: public MessageDispatchInterceptor() {
054: setOptionFlag(Channel.SEND_OPTIONS_ASYNCHRONOUS);
055: }
056:
057: public void sendMessage(Member[] destination, ChannelMessage msg,
058: InterceptorPayload payload) throws ChannelException {
059: boolean async = (msg.getOptions() & Channel.SEND_OPTIONS_ASYNCHRONOUS) == Channel.SEND_OPTIONS_ASYNCHRONOUS;
060: if (async && run) {
061: if ((getCurrentSize() + msg.getMessage().getLength()) > maxQueueSize) {
062: if (alwaysSend) {
063: super .sendMessage(destination, msg, payload);
064: return;
065: } else {
066: throw new ChannelException(
067: "Asynchronous queue is full, reached its limit of "
068: + maxQueueSize + " bytes, current:"
069: + getCurrentSize() + " bytes.");
070: }//end if
071: }//end if
072: //add to queue
073: if (useDeepClone)
074: msg = (ChannelMessage) msg.deepclone();
075: if (!addToQueue(msg, destination, payload)) {
076: throw new ChannelException(
077: "Unable to add the message to the async queue, queue bug?");
078: }
079: addAndGetCurrentSize(msg.getMessage().getLength());
080: } else {
081: super .sendMessage(destination, msg, payload);
082: }
083: }
084:
085: public boolean addToQueue(ChannelMessage msg, Member[] destination,
086: InterceptorPayload payload) {
087: return queue.add(msg, destination, payload);
088: }
089:
090: public LinkObject removeFromQueue() {
091: return queue.remove();
092: }
093:
094: public void startQueue() {
095: msgDispatchThread = new Thread(this );
096: msgDispatchThread
097: .setName("MessageDispatchInterceptor.MessageDispatchThread");
098: msgDispatchThread.setDaemon(true);
099: msgDispatchThread.setPriority(Thread.MAX_PRIORITY);
100: queue.setEnabled(true);
101: run = true;
102: msgDispatchThread.start();
103: }
104:
105: public void stopQueue() {
106: run = false;
107: msgDispatchThread.interrupt();
108: queue.setEnabled(false);
109: setAndGetCurrentSize(0);
110: }
111:
112: public void setOptionFlag(int flag) {
113: if (flag != Channel.SEND_OPTIONS_ASYNCHRONOUS)
114: log
115: .warn("Warning, you are overriding the asynchronous option flag, this will disable the Channel.SEND_OPTIONS_ASYNCHRONOUS that other apps might use.");
116: super .setOptionFlag(flag);
117: }
118:
119: public void setMaxQueueSize(long maxQueueSize) {
120: this .maxQueueSize = maxQueueSize;
121: }
122:
123: public void setUseDeepClone(boolean useDeepClone) {
124: this .useDeepClone = useDeepClone;
125: }
126:
127: public long getMaxQueueSize() {
128: return maxQueueSize;
129: }
130:
131: public boolean getUseDeepClone() {
132: return useDeepClone;
133: }
134:
135: public long getCurrentSize() {
136: return currentSize;
137: }
138:
139: public synchronized long addAndGetCurrentSize(long inc) {
140: currentSize += inc;
141: return currentSize;
142: }
143:
144: public synchronized long setAndGetCurrentSize(long value) {
145: currentSize = value;
146: return value;
147: }
148:
149: public void start(int svc) throws ChannelException {
150: //start the thread
151: if (!run) {
152: synchronized (this ) {
153: if (!run
154: && ((svc & Channel.SND_TX_SEQ) == Channel.SND_TX_SEQ)) {//only start with the sender
155: startQueue();
156: }//end if
157: }//sync
158: }//end if
159: super .start(svc);
160: }
161:
162: public void stop(int svc) throws ChannelException {
163: //stop the thread
164: if (run) {
165: synchronized (this ) {
166: if (run
167: && ((svc & Channel.SND_TX_SEQ) == Channel.SND_TX_SEQ)) {
168: stopQueue();
169: }//end if
170: }//sync
171: }//end if
172:
173: super .stop(svc);
174: }
175:
176: public void run() {
177: while (run) {
178: LinkObject link = removeFromQueue();
179: if (link == null)
180: continue; //should not happen unless we exceed wait time
181: while (link != null && run) {
182: link = sendAsyncData(link);
183: }//while
184: }//while
185: }//run
186:
187: protected LinkObject sendAsyncData(LinkObject link) {
188: ChannelMessage msg = link.data();
189: Member[] destination = link.getDestination();
190: try {
191: super .sendMessage(destination, msg, null);
192: try {
193: if (link.getHandler() != null)
194: link.getHandler().handleCompletion(
195: new UniqueId(msg.getUniqueId()));
196: } catch (Exception ex) {
197: log.error("Unable to report back completed message.",
198: ex);
199: }
200: } catch (Exception x) {
201: ChannelException cx = null;
202: if (x instanceof ChannelException)
203: cx = (ChannelException) x;
204: else
205: cx = new ChannelException(x);
206: if (log.isDebugEnabled())
207: log.debug("Error while processing async message.", x);
208: try {
209: if (link.getHandler() != null)
210: link.getHandler().handleError(cx,
211: new UniqueId(msg.getUniqueId()));
212: } catch (Exception ex) {
213: log.error("Unable to report back error message.", ex);
214: }
215: } finally {
216: addAndGetCurrentSize(-msg.getMessage().getLength());
217: link = link.next();
218: }//try
219: return link;
220: }
221:
222: }
|