001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: */
016:
017: package org.apache.catalina.tribes.group.interceptors;
018:
019: import java.util.HashMap;
020:
021: import org.apache.catalina.tribes.ChannelException;
022: import org.apache.catalina.tribes.ChannelMessage;
023: import org.apache.catalina.tribes.Member;
024: import org.apache.catalina.tribes.group.ChannelInterceptorBase;
025: import org.apache.catalina.tribes.group.InterceptorPayload;
026: import org.apache.catalina.tribes.io.XByteBuffer;
027: import java.util.concurrent.atomic.AtomicInteger;
028: import java.util.concurrent.locks.ReentrantReadWriteLock;
029:
030: /**
031: *
032: * The order interceptor guarantees that messages are received in the same order they were
033: * sent.
034: * This interceptor works best with the ack=true setting. <br>
035: * There is no point in
036: * using this with the replicationMode="fastasynchqueue" as this mode guarantees ordering.<BR>
037: * If you are using the mode ack=false replicationMode=pooled, and have a lot of concurrent threads,
038: * this interceptor can really slow you down, as many messages will be completely out of order
039: * and the queue might become rather large. If this is the case, then you might want to set
040: * the value OrderInterceptor.maxQueue = 25 (meaning that we will never keep more than 25 messages in our queue)
041: * <br><b>Configuration Options</b><br>
042: * OrderInteceptor.expire=<milliseconds> - if a message arrives out of order, how long before we act on it <b>default=3000ms</b><br>
043: * OrderInteceptor.maxQueue=<max queue size> - how much can the queue grow to ensure ordering.
044: * This setting is useful to avoid OutOfMemoryErrors<b>default=Integer.MAX_VALUE</b><br>
045: * OrderInterceptor.forwardExpired=<boolean> - this flag tells the interceptor what to
046: * do when a message has expired or the queue has grown larger than the maxQueue value.
047: * true means that the message is sent up the stack to the receiver that will receive and out of order message
048: * false means, forget the message and reset the message counter. <b>default=true</b>
049: *
050: *
051: * @author Filip Hanik
052: * @version 1.1
053: */
054: public class OrderInterceptor extends ChannelInterceptorBase {
055: private HashMap outcounter = new HashMap();
056: private HashMap incounter = new HashMap();
057: private HashMap incoming = new HashMap();
058: private long expire = 3000;
059: private boolean forwardExpired = true;
060: private int maxQueue = Integer.MAX_VALUE;
061:
062: ReentrantReadWriteLock inLock = new ReentrantReadWriteLock(true);
063: ReentrantReadWriteLock outLock = new ReentrantReadWriteLock(true);
064:
065: public void sendMessage(Member[] destination, ChannelMessage msg,
066: InterceptorPayload payload) throws ChannelException {
067: if (!okToProcess(msg.getOptions())) {
068: super .sendMessage(destination, msg, payload);
069: return;
070: }
071: ChannelException cx = null;
072: for (int i = 0; i < destination.length; i++) {
073: try {
074: int nr = 0;
075: try {
076: outLock.writeLock().lock();
077: nr = incCounter(destination[i]);
078: } finally {
079: outLock.writeLock().unlock();
080: }
081: //reduce byte copy
082: msg.getMessage().append(nr);
083: try {
084: getNext().sendMessage(
085: new Member[] { destination[i] }, msg,
086: payload);
087: } finally {
088: msg.getMessage().trim(4);
089: }
090: } catch (ChannelException x) {
091: if (cx == null)
092: cx = x;
093: cx.addFaultyMember(x.getFaultyMembers());
094: }
095: }//for
096: if (cx != null)
097: throw cx;
098: }
099:
100: public void messageReceived(ChannelMessage msg) {
101: if (!okToProcess(msg.getOptions())) {
102: super .messageReceived(msg);
103: return;
104: }
105: int msgnr = XByteBuffer.toInt(
106: msg.getMessage().getBytesDirect(), msg.getMessage()
107: .getLength() - 4);
108: msg.getMessage().trim(4);
109: MessageOrder order = new MessageOrder(msgnr,
110: (ChannelMessage) msg.deepclone());
111: try {
112: inLock.writeLock().lock();
113: if (processIncoming(order))
114: processLeftOvers(msg.getAddress(), false);
115: } finally {
116: inLock.writeLock().unlock();
117: }
118: }
119:
120: protected void processLeftOvers(Member member, boolean force) {
121: MessageOrder tmp = (MessageOrder) incoming.get(member);
122: if (force) {
123: Counter cnt = getInCounter(member);
124: cnt.setCounter(Integer.MAX_VALUE);
125: }
126: if (tmp != null)
127: processIncoming(tmp);
128: }
129:
130: /**
131: *
132: * @param order MessageOrder
133: * @return boolean - true if a message expired and was processed
134: */
135: protected boolean processIncoming(MessageOrder order) {
136: boolean result = false;
137: Member member = order.getMessage().getAddress();
138: Counter cnt = getInCounter(member);
139:
140: MessageOrder tmp = (MessageOrder) incoming.get(member);
141: if (tmp != null) {
142: order = MessageOrder.add(tmp, order);
143: }
144:
145: while ((order != null)
146: && (order.getMsgNr() <= cnt.getCounter())) {
147: //we are right on target. process orders
148: if (order.getMsgNr() == cnt.getCounter())
149: cnt.inc();
150: else if (order.getMsgNr() > cnt.getCounter())
151: cnt.setCounter(order.getMsgNr());
152: super .messageReceived(order.getMessage());
153: order.setMessage(null);
154: order = order.next;
155: }
156: MessageOrder head = order;
157: MessageOrder prev = null;
158: tmp = order;
159: //flag to empty out the queue when it larger than maxQueue
160: boolean empty = order != null ? order.getCount() >= maxQueue
161: : false;
162: while (tmp != null) {
163: //process expired messages or empty out the queue
164: if (tmp.isExpired(expire) || empty) {
165: //reset the head
166: if (tmp == head)
167: head = tmp.next;
168: cnt.setCounter(tmp.getMsgNr() + 1);
169: if (getForwardExpired())
170: super .messageReceived(tmp.getMessage());
171: tmp.setMessage(null);
172: tmp = tmp.next;
173: if (prev != null)
174: prev.next = tmp;
175: result = true;
176: } else {
177: prev = tmp;
178: tmp = tmp.next;
179: }
180: }
181: if (head == null)
182: incoming.remove(member);
183: else
184: incoming.put(member, head);
185: return result;
186: }
187:
188: public void memberAdded(Member member) {
189: //notify upwards
190: super .memberAdded(member);
191: }
192:
193: public void memberDisappeared(Member member) {
194: //reset counters - lock free
195: incounter.remove(member);
196: outcounter.remove(member);
197: //clear the remaining queue
198: processLeftOvers(member, true);
199: //notify upwards
200: super .memberDisappeared(member);
201: }
202:
203: protected int incCounter(Member mbr) {
204: Counter cnt = getOutCounter(mbr);
205: return cnt.inc();
206: }
207:
208: protected Counter getInCounter(Member mbr) {
209: Counter cnt = (Counter) incounter.get(mbr);
210: if (cnt == null) {
211: cnt = new Counter();
212: cnt.inc(); //always start at 1 for incoming
213: incounter.put(mbr, cnt);
214: }
215: return cnt;
216: }
217:
218: protected Counter getOutCounter(Member mbr) {
219: Counter cnt = (Counter) outcounter.get(mbr);
220: if (cnt == null) {
221: cnt = new Counter();
222: outcounter.put(mbr, cnt);
223: }
224: return cnt;
225: }
226:
227: protected static class Counter {
228: private AtomicInteger value = new AtomicInteger(0);
229:
230: public int getCounter() {
231: return value.get();
232: }
233:
234: public void setCounter(int counter) {
235: this .value.set(counter);
236: }
237:
238: public int inc() {
239: return value.addAndGet(1);
240: }
241: }
242:
243: protected static class MessageOrder {
244: private long received = System.currentTimeMillis();
245: private MessageOrder next;
246: private int msgNr;
247: private ChannelMessage msg = null;
248:
249: public MessageOrder(int msgNr, ChannelMessage msg) {
250: this .msgNr = msgNr;
251: this .msg = msg;
252: }
253:
254: public boolean isExpired(long expireTime) {
255: return (System.currentTimeMillis() - received) > expireTime;
256: }
257:
258: public ChannelMessage getMessage() {
259: return msg;
260: }
261:
262: public void setMessage(ChannelMessage msg) {
263: this .msg = msg;
264: }
265:
266: public void setNext(MessageOrder order) {
267: this .next = order;
268: }
269:
270: public MessageOrder getNext() {
271: return next;
272: }
273:
274: public int getCount() {
275: int counter = 1;
276: MessageOrder tmp = next;
277: while (tmp != null) {
278: counter++;
279: tmp = tmp.next;
280: }
281: return counter;
282: }
283:
284: public static MessageOrder add(MessageOrder head,
285: MessageOrder add) {
286: if (head == null)
287: return add;
288: if (add == null)
289: return head;
290: if (head == add)
291: return add;
292:
293: if (head.getMsgNr() > add.getMsgNr()) {
294: add.next = head;
295: return add;
296: }
297:
298: MessageOrder iter = head;
299: MessageOrder prev = null;
300: while (iter.getMsgNr() < add.getMsgNr()
301: && (iter.next != null)) {
302: prev = iter;
303: iter = iter.next;
304: }
305: if (iter.getMsgNr() < add.getMsgNr()) {
306: //add after
307: add.next = iter.next;
308: iter.next = add;
309: } else if (iter.getMsgNr() > add.getMsgNr()) {
310: //add before
311: prev.next = add;
312: add.next = iter;
313:
314: } else {
315: throw new ArithmeticException(
316: "Message added has the same counter, synchronization bug. Disable the order interceptor");
317: }
318:
319: return head;
320: }
321:
322: public int getMsgNr() {
323: return msgNr;
324: }
325:
326: }
327:
328: public void setExpire(long expire) {
329: this .expire = expire;
330: }
331:
332: public void setForwardExpired(boolean forwardExpired) {
333: this .forwardExpired = forwardExpired;
334: }
335:
336: public void setMaxQueue(int maxQueue) {
337: this .maxQueue = maxQueue;
338: }
339:
340: public long getExpire() {
341: return expire;
342: }
343:
344: public boolean getForwardExpired() {
345: return forwardExpired;
346: }
347:
348: public int getMaxQueue() {
349: return maxQueue;
350: }
351:
352: }
|