001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.mts.std;
028:
029: import java.io.FilterInputStream;
030: import java.io.FilterOutputStream;
031: import java.io.InputStream;
032: import java.io.ObjectInput;
033: import java.io.ObjectOutput;
034: import java.io.OutputStream;
035:
036: import org.cougaar.core.mts.MessageAttributes;
037: import org.cougaar.mts.base.CommFailureException;
038: import org.cougaar.mts.base.DestinationLink;
039: import org.cougaar.mts.base.DestinationLinkDelegateImplBase;
040: import org.cougaar.mts.base.MessageReader;
041: import org.cougaar.mts.base.MessageReaderDelegateImplBase;
042: import org.cougaar.mts.base.MessageWriter;
043: import org.cougaar.mts.base.MessageWriterDelegateImplBase;
044: import org.cougaar.mts.base.MisdeliveredMessageException;
045: import org.cougaar.mts.base.NameLookupException;
046: import org.cougaar.mts.base.RPCLinkProtocol;
047: import org.cougaar.mts.base.StandardAspect;
048: import org.cougaar.mts.base.UnregisteredNameException;
049:
050: /**
051: * This Aspect counts the bytes of an outgoing message flowing by and
052: * sets a local attribute <code>org.cougaar.core.message.count</code>
053: * with the final value.
054: */
055: public class CountBytesStreamsAspect extends StandardAspect {
056:
057: // The name of a local attribute which will be used to store the
058: // count.
059: private static final String COUNT_ATTR = "org.cougaar.core.message.count";
060:
061: // Return delegates for MessageReader, MessageWriter and
062: // DestinationLink.
063: public Object getDelegate(Object delegatee, Class type) {
064: if (type == MessageWriter.class) {
065: MessageWriter wtr = (MessageWriter) delegatee;
066: return new CountingMessageWriter(wtr);
067: } else if (type == MessageReader.class) {
068: MessageReader rdr = (MessageReader) delegatee;
069: return new CountingMessageReader(rdr);
070: } else if (type == DestinationLink.class) {
071: DestinationLink link = (DestinationLink) delegatee;
072: // Only RPC is relevant here
073: Class cls = link.getProtocolClass();
074: if (RPCLinkProtocol.class.isAssignableFrom(cls))
075: return new BandwidthDestinationLink(link);
076: }
077:
078: return null;
079: }
080:
081: // The DestinationLink delegate
082: private class BandwidthDestinationLink extends
083: DestinationLinkDelegateImplBase {
084: BandwidthDestinationLink(DestinationLink delegatee) {
085: super (delegatee);
086: }
087:
088: public MessageAttributes forwardMessage(
089: AttributedMessage message) throws NameLookupException,
090: UnregisteredNameException, CommFailureException,
091: MisdeliveredMessageException {
092: // Register Aspect as a Message Streaming filter
093: message.addFilter(CountBytesStreamsAspect.this );
094:
095: // Compute the latency and print it along with the cached
096: // byte count (the MessageWriter will do the actual
097: // counting).
098: long start = System.currentTimeMillis();
099: MessageAttributes reply = super .forwardMessage(message);
100: long elapsed = System.currentTimeMillis() - start;
101: Integer Count = (Integer) message.getAttribute(COUNT_ATTR);
102: if (Count != null) {
103: System.out.println(" Message from "
104: + message.getOriginator() + " to "
105: + message.getTarget() + " has " + Count
106: + " bytes and took " + elapsed + " ms");
107: }
108:
109: return reply;
110: }
111: }
112:
113: // The MessageWriter delegate. This will do the byte-counting by
114: // creating a simple FilterOutputStream that watches all the bytes
115: // go past,
116: private class CountingMessageWriter extends
117: MessageWriterDelegateImplBase {
118:
119: private AttributedMessage msg;
120: private int count = 0;
121:
122: private class CountingOutputStream extends FilterOutputStream {
123:
124: private CountingOutputStream(OutputStream wrapped) {
125: super (wrapped);
126: }
127:
128: // Count the bytes, whichever method is used to write
129: // them. Pass the byte or bytes to 'out' rather than
130: // using super, since the default FilterOutputStream
131: // methods aren't very efficient.
132:
133: public void write(int b) throws java.io.IOException {
134: out.write(b);
135: ++count;
136: }
137:
138: public void write(byte[] b, int off, int len)
139: throws java.io.IOException {
140: out.write(b, off, len);
141: count += len;
142: }
143:
144: public void write(byte[] b) throws java.io.IOException {
145: out.write(b);
146: count += b.length;
147: }
148:
149: }
150:
151: CountingMessageWriter(MessageWriter delegatee) {
152: super (delegatee);
153: }
154:
155: // Create and return the byte-counting FilterOutputStream
156: public OutputStream getObjectOutputStream(ObjectOutput out)
157: throws java.io.IOException {
158: OutputStream raw_os = super .getObjectOutputStream(out);
159: return new CountingOutputStream(raw_os);
160: }
161:
162: // Save the message, since we'll need it later (in
163: // postProcess).
164: public void finalizeAttributes(AttributedMessage msg) {
165: super .finalizeAttributes(msg);
166: this .msg = msg;
167: }
168:
169: // Stash the count in the saved message's attributes. Note
170: // that we're doing this after the message has been sent.
171: // Even if it weren't a local attribute, the receive would
172: // never see it. But other aspect delegates can get at it. In
173: // fact the DestinationLink delegate above does so.
174: public void postProcess() {
175: super .postProcess();
176: if (msg != null)
177: msg.setLocalAttribute(COUNT_ATTR, new Integer(count));
178: }
179: }
180:
181: // MessageReader delegate. In this case it does nothing.
182: // Nonetheless it has to be here, since for reasons we don't yet
183: // understand, the filtered streams have to match exactly on the
184: // reader and writer.
185: private class CountingMessageReader extends
186: MessageReaderDelegateImplBase {
187:
188: // Does absolutely nothing but has to be here.
189: private class CountingInputStream extends FilterInputStream {
190:
191: private CountingInputStream(InputStream wrapped) {
192: super (wrapped);
193: }
194:
195: public int read() throws java.io.IOException {
196: return in.read();
197: }
198:
199: public int read(byte[] b, int off, int len)
200: throws java.io.IOException {
201: return in.read(b, off, len);
202: }
203:
204: public int read(byte[] b) throws java.io.IOException {
205: return in.read(b);
206: }
207: }
208:
209: CountingMessageReader(MessageReader delegatee) {
210: super (delegatee);
211: }
212:
213: public InputStream getObjectInputStream(ObjectInput in)
214: throws java.io.IOException, ClassNotFoundException {
215: InputStream raw_is = super .getObjectInputStream(in);
216: return new CountingInputStream(raw_is);
217: }
218: }
219:
220: }
|