001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: */
016:
017: package org.apache.catalina.tribes.group.interceptors;
018:
019: import java.util.Arrays;
020: import java.util.HashMap;
021: import java.util.Set;
022:
023: import org.apache.catalina.tribes.ChannelException;
024: import org.apache.catalina.tribes.ChannelMessage;
025: import org.apache.catalina.tribes.Member;
026: import org.apache.catalina.tribes.group.ChannelInterceptorBase;
027: import org.apache.catalina.tribes.group.InterceptorPayload;
028: import org.apache.catalina.tribes.io.XByteBuffer;
029:
030: /**
031: *
032: * The fragmentation interceptor splits up large messages into smaller messages and assembles them on the other end.
033: * This is very useful when you don't want large messages hogging the sending sockets
034: * and smaller messages can make it through.
035: *
036: * <br><b>Configuration Options</b><br>
037: * OrderInteceptor.expire=<milliseconds> - how long do we keep the fragments in memory and wait for the rest to arrive<b>default=60,000ms -> 60seconds</b>
038: * This setting is useful to avoid OutOfMemoryErrors<br>
039: * OrderInteceptor.maxSize=<max message size> - message size in bytes <b>default=1024*100 (around a tenth of a MB)</b><br>
040: * @author Filip Hanik
041: * @version 1.0
042: */
043: public class FragmentationInterceptor extends ChannelInterceptorBase {
044: private static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory
045: .getLog(FragmentationInterceptor.class);
046:
047: protected HashMap fragpieces = new HashMap();
048: private int maxSize = 1024 * 100;
049: private long expire = 1000 * 60; //one minute expiration
050: protected boolean deepclone = true;
051:
052: public void sendMessage(Member[] destination, ChannelMessage msg,
053: InterceptorPayload payload) throws ChannelException {
054: int size = msg.getMessage().getLength();
055: boolean frag = (size > maxSize)
056: && okToProcess(msg.getOptions());
057: if (frag) {
058: frag(destination, msg, payload);
059: } else {
060: msg.getMessage().append(frag);
061: super .sendMessage(destination, msg, payload);
062: }
063: }
064:
065: public void messageReceived(ChannelMessage msg) {
066: boolean isFrag = XByteBuffer.toBoolean(msg.getMessage()
067: .getBytesDirect(), msg.getMessage().getLength() - 1);
068: msg.getMessage().trim(1);
069: if (isFrag) {
070: defrag(msg);
071: } else {
072: super .messageReceived(msg);
073: }
074: }
075:
076: public FragCollection getFragCollection(FragKey key,
077: ChannelMessage msg) {
078: FragCollection coll = (FragCollection) fragpieces.get(key);
079: if (coll == null) {
080: synchronized (fragpieces) {
081: coll = (FragCollection) fragpieces.get(key);
082: if (coll == null) {
083: coll = new FragCollection(msg);
084: fragpieces.put(key, coll);
085: }
086: }
087: }
088: return coll;
089: }
090:
091: public void removeFragCollection(FragKey key) {
092: fragpieces.remove(key);
093: }
094:
095: public void defrag(ChannelMessage msg) {
096: FragKey key = new FragKey(msg.getUniqueId());
097: FragCollection coll = getFragCollection(key, msg);
098: coll.addMessage((ChannelMessage) msg.deepclone());
099:
100: if (coll.complete()) {
101: removeFragCollection(key);
102: ChannelMessage complete = coll.assemble();
103: super .messageReceived(complete);
104:
105: }
106: }
107:
108: public void frag(Member[] destination, ChannelMessage msg,
109: InterceptorPayload payload) throws ChannelException {
110: int size = msg.getMessage().getLength();
111:
112: int count = ((size / maxSize) + (size % maxSize == 0 ? 0 : 1));
113: ChannelMessage[] messages = new ChannelMessage[count];
114: int remaining = size;
115: for (int i = 0; i < count; i++) {
116: ChannelMessage tmp = (ChannelMessage) msg.clone();
117: int offset = (i * maxSize);
118: int length = Math.min(remaining, maxSize);
119: tmp.getMessage().clear();
120: tmp.getMessage().append(msg.getMessage().getBytesDirect(),
121: offset, length);
122: //add the msg nr
123: //tmp.getMessage().append(XByteBuffer.toBytes(i),0,4);
124: tmp.getMessage().append(i);
125: //add the total nr of messages
126: //tmp.getMessage().append(XByteBuffer.toBytes(count),0,4);
127: tmp.getMessage().append(count);
128: //add true as the frag flag
129: //byte[] flag = XByteBuffer.toBytes(true);
130: //tmp.getMessage().append(flag,0,flag.length);
131: tmp.getMessage().append(true);
132: messages[i] = tmp;
133: remaining -= length;
134:
135: }
136: for (int i = 0; i < messages.length; i++) {
137: super .sendMessage(destination, messages[i], payload);
138: }
139: }
140:
141: public void heartbeat() {
142: try {
143: Set set = fragpieces.keySet();
144: Object[] keys = set.toArray();
145: for (int i = 0; i < keys.length; i++) {
146: FragKey key = (FragKey) keys[i];
147: if (key != null && key.expired(getExpire()))
148: removeFragCollection(key);
149: }
150: } catch (Exception x) {
151: if (log.isErrorEnabled()) {
152: log
153: .error(
154: "Unable to perform heartbeat clean up in the frag interceptor",
155: x);
156: }
157: }
158: super .heartbeat();
159: }
160:
161: public int getMaxSize() {
162: return maxSize;
163: }
164:
165: public long getExpire() {
166: return expire;
167: }
168:
169: public void setMaxSize(int maxSize) {
170: this .maxSize = maxSize;
171: }
172:
173: public void setExpire(long expire) {
174: this .expire = expire;
175: }
176:
177: public static class FragCollection {
178: private long received = System.currentTimeMillis();
179: private ChannelMessage msg;
180: private XByteBuffer[] frags;
181:
182: public FragCollection(ChannelMessage msg) {
183: //get the total messages
184: int count = XByteBuffer
185: .toInt(msg.getMessage().getBytesDirect(), msg
186: .getMessage().getLength() - 4);
187: frags = new XByteBuffer[count];
188: this .msg = msg;
189: }
190:
191: public void addMessage(ChannelMessage msg) {
192: //remove the total messages
193: msg.getMessage().trim(4);
194: //get the msg nr
195: int nr = XByteBuffer
196: .toInt(msg.getMessage().getBytesDirect(), msg
197: .getMessage().getLength() - 4);
198: //remove the msg nr
199: msg.getMessage().trim(4);
200: frags[nr] = msg.getMessage();
201:
202: }
203:
204: public boolean complete() {
205: boolean result = true;
206: for (int i = 0; (i < frags.length) && (result); i++)
207: result = (frags[i] != null);
208: return result;
209: }
210:
211: public ChannelMessage assemble() {
212: if (!complete())
213: throw new IllegalStateException(
214: "Fragments are missing.");
215: int buffersize = 0;
216: for (int i = 0; i < frags.length; i++)
217: buffersize += frags[i].getLength();
218: XByteBuffer buf = new XByteBuffer(buffersize, false);
219: msg.setMessage(buf);
220: for (int i = 0; i < frags.length; i++) {
221: msg.getMessage().append(frags[i].getBytesDirect(), 0,
222: frags[i].getLength());
223: }
224: return msg;
225: }
226:
227: public boolean expired(long expire) {
228: return (System.currentTimeMillis() - received) > expire;
229: }
230:
231: }
232:
233: public static class FragKey {
234: private byte[] uniqueId;
235: private long received = System.currentTimeMillis();
236:
237: public FragKey(byte[] id) {
238: this .uniqueId = id;
239: }
240:
241: public int hashCode() {
242: return XByteBuffer.toInt(uniqueId, 0);
243: }
244:
245: public boolean equals(Object o) {
246: if (o instanceof FragKey) {
247: return Arrays.equals(uniqueId, ((FragKey) o).uniqueId);
248: } else
249: return false;
250:
251: }
252:
253: public boolean expired(long expire) {
254: return (System.currentTimeMillis() - received) > expire;
255: }
256:
257: }
258:
259: }
|