001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.mts.std;
028:
029: import java.io.FilterInputStream;
030: import java.io.FilterOutputStream;
031: import java.io.InputStream;
032: import java.io.ObjectInput;
033: import java.io.ObjectOutput;
034: import java.io.OutputStream;
035:
036: import org.cougaar.core.mts.MessageAttributes;
037: import org.cougaar.mts.base.CommFailureException;
038: import org.cougaar.mts.base.DestinationLink;
039: import org.cougaar.mts.base.DestinationLinkDelegateImplBase;
040: import org.cougaar.mts.base.MessageReader;
041: import org.cougaar.mts.base.MessageReaderDelegateImplBase;
042: import org.cougaar.mts.base.MessageWriter;
043: import org.cougaar.mts.base.MessageWriterDelegateImplBase;
044: import org.cougaar.mts.base.MisdeliveredMessageException;
045: import org.cougaar.mts.base.NameLookupException;
046: import org.cougaar.mts.base.RPCLinkProtocol;
047: import org.cougaar.mts.base.StandardAspect;
048: import org.cougaar.mts.base.UnregisteredNameException;
049:
050: /**
051: * This Aspect logs large messages, where large is defined by the
052: * <code>MaxMsgLen</code> parameter as a threshold number of bytes.
053: * The default "large" threshold is 100MB.
054: */
055: public class DetectBigMessageAspect extends StandardAspect {
056:
057: private static final int MAX_DEFAULT = 100000000; // 100MB, sort of
058: private int threshold = MAX_DEFAULT;
059:
060: // Return delegates for MessageReader, MessageWriter and
061: // DestinationLink.
062: public Object getDelegate(Object delegatee, Class type) {
063: if (type == MessageWriter.class) {
064: MessageWriter wtr = (MessageWriter) delegatee;
065: return new CountingMessageWriter(wtr);
066: } else if (type == MessageReader.class) {
067: MessageReader rdr = (MessageReader) delegatee;
068: return new CountingMessageReader(rdr);
069: } else if (type == DestinationLink.class) {
070: DestinationLink link = (DestinationLink) delegatee;
071: // Only RPC is relevant here
072: Class cls = link.getProtocolClass();
073: if (RPCLinkProtocol.class.isAssignableFrom(cls))
074: return new AddHookDestinationLink(link);
075: }
076:
077: return null;
078: }
079:
080: public void load() {
081: super .load();
082: threshold = (int) getParameter("MaxMsgLen", MAX_DEFAULT);
083: if (loggingService.isWarnEnabled()) {
084: loggingService.warn("Loaded DetectBigMessageAspect");
085: }
086: }
087:
088: public static class MessageTooBigException extends
089: java.io.IOException {
090: }
091:
092: private void checkCount(int count, String context,
093: AttributedMessage msg) throws java.io.IOException {
094: if (count >= threshold) {
095: loggingService.error("Message " + context + ": bytes = "
096: + count + " for message " + msg + " exceeds max = "
097: + threshold);
098: throw new MessageTooBigException();
099: }
100: }
101:
102: // The DestinationLink delegate
103: private class AddHookDestinationLink extends
104: DestinationLinkDelegateImplBase {
105: AddHookDestinationLink(DestinationLink delegatee) {
106: super (delegatee);
107: }
108:
109: public MessageAttributes forwardMessage(
110: AttributedMessage message) throws NameLookupException,
111: UnregisteredNameException, CommFailureException,
112: MisdeliveredMessageException {
113: // Register Aspect as a Message Streaming filter
114: message.addFilter(DetectBigMessageAspect.this );
115: return super .forwardMessage(message);
116: }
117: }
118:
119: // The MessageWriter delegate. This will do the byte-counting by
120: // creating a simple FilterOutputStream that watches all the bytes
121: // go past,
122: private class CountingMessageWriter extends
123: MessageWriterDelegateImplBase {
124:
125: private AttributedMessage msg;
126: private int count = 0;
127:
128: private class CountingOutputStream extends FilterOutputStream {
129:
130: private CountingOutputStream(OutputStream wrapped) {
131: super (wrapped);
132: }
133:
134: // Count the bytes, whichever method is used to write
135: // them. Pass the byte or bytes to 'out' rather than
136: // using super, since the default FilterOutputStream
137: // methods aren't very efficient.
138:
139: public void write(int b) throws java.io.IOException {
140: out.write(b);
141: ++count;
142: checkCount(count, "write", msg);
143: }
144:
145: public void write(byte[] b, int off, int len)
146: throws java.io.IOException {
147: out.write(b, off, len);
148: count += len;
149: checkCount(count, "write", msg);
150: }
151:
152: public void write(byte[] b) throws java.io.IOException {
153: out.write(b);
154: count += b.length;
155: checkCount(count, "write", msg);
156: }
157:
158: }
159:
160: CountingMessageWriter(MessageWriter delegatee) {
161: super (delegatee);
162: }
163:
164: // Create and return the byte-counting FilterOutputStream
165: public OutputStream getObjectOutputStream(ObjectOutput out)
166: throws java.io.IOException {
167: OutputStream raw_os = super .getObjectOutputStream(out);
168: return new CountingOutputStream(raw_os);
169: }
170:
171: // Save the message, since we'll need it later (in
172: // postProcess).
173: public void finalizeAttributes(AttributedMessage msg) {
174: super .finalizeAttributes(msg);
175: this .msg = msg;
176: }
177:
178: }
179:
180: // MessageReader delegate. In this case it does nothing.
181: // Nonetheless it has to be here, since for reasons we don't yet
182: // understand, the filtered streams have to match exactly on the
183: // reader and writer.
184: private class CountingMessageReader extends
185: MessageReaderDelegateImplBase {
186: private AttributedMessage msg;
187:
188: // Does absolutely nothing but has to be here.
189: private class CountingInputStream extends FilterInputStream {
190: private int count = 0;
191:
192: private CountingInputStream(InputStream wrapped) {
193: super (wrapped);
194: }
195:
196: public int read() throws java.io.IOException {
197: ++count;
198: checkCount(count, "read", msg);
199: return in.read();
200: }
201:
202: public int read(byte[] b, int off, int len)
203: throws java.io.IOException {
204: int inc = in.read(b, off, len);
205: count += inc;
206: checkCount(count, "read", msg);
207: return inc;
208: }
209:
210: public int read(byte[] b) throws java.io.IOException {
211: int inc = in.read(b);
212: count += inc;
213: checkCount(count, "read", msg);
214: return inc;
215: }
216: }
217:
218: CountingMessageReader(MessageReader delegatee) {
219: super (delegatee);
220: }
221:
222: public InputStream getObjectInputStream(ObjectInput in)
223: throws java.io.IOException, ClassNotFoundException {
224: InputStream raw_is = super .getObjectInputStream(in);
225: return new CountingInputStream(raw_is);
226: }
227:
228: public void finalizeAttributes(AttributedMessage msg) {
229: super.finalizeAttributes(msg);
230: this.msg = msg;
231: }
232:
233: }
234: }
|