001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.mts.std;
028:
029: import java.io.FilterInputStream;
030: import java.io.FilterOutputStream;
031: import java.io.InputStream;
032: import java.io.ObjectInput;
033: import java.io.ObjectOutput;
034: import java.io.OutputStream;
035:
036: import org.cougaar.core.mts.AttributeConstants;
037: import org.cougaar.core.mts.MessageAddress;
038: import org.cougaar.core.mts.MessageAttributes;
039: import org.cougaar.core.mts.MessageStatistics;
040: import org.cougaar.core.service.MessageStatisticsService;
041: import org.cougaar.mts.base.CommFailureException;
042: import org.cougaar.mts.base.DestinationLink;
043: import org.cougaar.mts.base.DestinationLinkDelegateImplBase;
044: import org.cougaar.mts.base.DestinationQueue;
045: import org.cougaar.mts.base.DestinationQueueDelegateImplBase;
046: import org.cougaar.mts.base.MessageDeliverer;
047: import org.cougaar.mts.base.MessageDelivererDelegateImplBase;
048: import org.cougaar.mts.base.MessageReader;
049: import org.cougaar.mts.base.MessageReaderDelegateImplBase;
050: import org.cougaar.mts.base.MessageWriter;
051: import org.cougaar.mts.base.MessageWriterDelegateImplBase;
052: import org.cougaar.mts.base.MisdeliveredMessageException;
053: import org.cougaar.mts.base.NameLookupException;
054: import org.cougaar.mts.base.StandardAspect;
055: import org.cougaar.mts.base.UnregisteredNameException;
056:
057: /**
058: * This Aspect gathers message size and count statistics using the
059: * InputStream and OutputStream of streamed protocols. The statistics
060: * are made accessible via the {@link MessageStatisticsService}, which
061: * this class implements. Per-message size statistics are also added
062: * to the AttributedMessage itself, as attributes.
063: */
064: public class StatisticsAspect extends StandardAspect implements
065: MessageStatistics, MessageStatisticsService, AttributeConstants
066:
067: {
068: // This variable holds the current size of ALL
069: // destination queues, so it behaves as it did in the original
070: // RMIMessageTransport (which had a single outgoing queue).
071: static int currentAllQueuesSize = 0;
072:
073: private long lastUpdate = System.currentTimeMillis();
074: private long totalElapsedTime = 0L;
075: private long totalQueueLength = 0L;
076: private long statisticsSentTotalBytes = 0L;
077: private long statisticsSentTotalMessages = 0L;
078: private long statisticsSentHeaderBytes = 0L;
079: private long statisticsSentAckBytes = 0L; //Acks sent for msgs received
080: private long statisticsRecvTotalBytes = 0L;
081: private long statisticsRecvTotalMessages = 0L;
082: private long statisticsRecvHeaderBytes = 0L;
083: private long statisticsRecvAckBytes = 0L; //Acks received for msgs sent
084: private long[] messageLengthHistogram = null;
085:
086: public StatisticsAspect() {
087: messageLengthHistogram = new long[MessageStatistics.NBINS];
088: }
089:
090: public synchronized MessageStatistics.Statistics getMessageStatistics(
091: boolean reset) {
092: MessageStatistics.Statistics result = new MessageStatistics.Statistics(
093: averageQueueLength(), statisticsSentTotalBytes,
094: statisticsSentHeaderBytes, statisticsSentAckBytes,
095: statisticsSentTotalMessages, statisticsRecvTotalBytes,
096: statisticsRecvHeaderBytes, statisticsRecvAckBytes,
097: statisticsRecvTotalMessages, messageLengthHistogram);
098: if (reset) {
099: totalElapsedTime = 0L;
100: totalQueueLength = 0L;
101: statisticsSentTotalBytes = 0L;
102: statisticsSentHeaderBytes = 0L;
103: statisticsSentAckBytes = 0L;
104: statisticsSentTotalMessages = 0L;
105: statisticsRecvTotalBytes = 0L;
106: statisticsRecvHeaderBytes = 0L;
107: statisticsRecvAckBytes = 0L;
108: statisticsRecvTotalMessages = 0L;
109:
110: for (int i = 0; i < messageLengthHistogram.length; i++) {
111: messageLengthHistogram[i] = 0;
112: }
113: }
114: return result;
115: }
116:
117: private int getIntAttribute(MessageAttributes attrs, String key) {
118: int result = 0;
119: Object attr = attrs.getAttribute(key);
120: if (attr != null && (attr instanceof Number))
121: result = ((Number) attr).intValue();
122: return result;
123: }
124:
125: private double averageQueueLength() {
126: if (totalElapsedTime == 0)
127: return 0;
128: return totalQueueLength / (0.0 + totalElapsedTime);
129: }
130:
131: private synchronized void updateQueueStatistics() {
132: long now = System.currentTimeMillis();
133: long elapsed = now - lastUpdate;
134: lastUpdate = now;
135: totalElapsedTime += elapsed;
136:
137: //Queuelength wieghted by time spent at this length.
138: totalQueueLength += (currentAllQueuesSize) * elapsed;
139: }
140:
141: private synchronized void updateMessageLengthStatistics(
142: int byteCount) {
143: int bin = 0;
144: int maxBin = MessageStatistics.NBINS - 1;
145: while (bin < maxBin
146: && byteCount > MessageStatistics.BIN_SIZES[bin]) {
147: bin++;
148: }
149: messageLengthHistogram[bin]++;
150: statisticsSentTotalBytes += byteCount;
151: }
152:
153: public Object getDelegate(Object delegatee, Class type) {
154: if (type == MessageWriter.class) {
155: MessageWriter wtr = (MessageWriter) delegatee;
156: return new StatisticsWriter(wtr);
157: } else if (type == MessageReader.class) {
158: MessageReader rdr = (MessageReader) delegatee;
159: return new StatisticsReader(rdr);
160: } else if (type == DestinationQueue.class) {
161: return new StatisticsDestinationQueue(
162: (DestinationQueue) delegatee);
163: } else if (type == DestinationLink.class) {
164: DestinationLink link = (DestinationLink) delegatee;
165: return new StatisticsLink(link);
166: } else if (type == MessageDeliverer.class) {
167: MessageDeliverer deliverer = (MessageDeliverer) delegatee;
168: return new StatisticsDeliverer(deliverer);
169: }
170:
171: return null;
172: }
173:
174: private class StatisticsLink extends
175: DestinationLinkDelegateImplBase {
176: StatisticsLink(DestinationLink delegatee) {
177: super (delegatee);
178: }
179:
180: public MessageAttributes forwardMessage(AttributedMessage msg)
181: throws NameLookupException, UnregisteredNameException,
182: CommFailureException, MisdeliveredMessageException {
183: // Register Aspect as a Message Streaming filter
184: msg.addFilter(StatisticsAspect.this );
185: MessageAttributes result = null;
186: try {
187: result = super .forwardMessage(msg);
188: // successful send
189: int msgBytes = getIntAttribute(msg,
190: MESSAGE_BYTES_ATTRIBUTE);
191: int hdrBytes = getIntAttribute(msg,
192: HEADER_BYTES_ATTRIBUTE);
193: int ackBytes = getIntAttribute(result,
194: HEADER_BYTES_ATTRIBUTE);
195: updateMessageLengthStatistics(msgBytes);
196: statisticsSentHeaderBytes += hdrBytes;
197: statisticsRecvAckBytes += ackBytes;
198: updateQueueStatistics();
199: --currentAllQueuesSize;
200: ++statisticsSentTotalMessages;
201: if (loggingService.isDebugEnabled()) {
202: MessageStatistics.Statistics stats = getMessageStatistics(false);
203: loggingService.debug("Send Count="
204: + stats.totalSentMessageCount + " Bytes="
205: + stats.totalSentMessageBytes
206: + " Average Message Queue Length="
207: + stats.averageMessageQueueLength);
208: }
209: return result;
210: } catch (NameLookupException ex1) {
211: throw ex1;
212: } catch (UnregisteredNameException ex2) {
213: throw ex2;
214: } catch (CommFailureException ex3) {
215: throw ex3;
216: } catch (MisdeliveredMessageException ex4) {
217: throw ex4;
218: }
219: }
220: }
221:
222: private class StatisticsDeliverer extends
223: MessageDelivererDelegateImplBase {
224: StatisticsDeliverer(MessageDeliverer deliverer) {
225: super (deliverer);
226: }
227:
228: public MessageAttributes deliverMessage(AttributedMessage msg,
229: MessageAddress dest)
230: throws MisdeliveredMessageException
231:
232: {
233: try {
234: MessageAttributes ack = super .deliverMessage(msg, dest);
235: // successful send
236: int msgBytes = getIntAttribute(msg,
237: MESSAGE_BYTES_ATTRIBUTE);
238: int hdrBytes = getIntAttribute(msg,
239: HEADER_BYTES_ATTRIBUTE);
240: int ackBytes = getIntAttribute(ack,
241: HEADER_BYTES_ATTRIBUTE);
242: statisticsRecvTotalBytes += msgBytes;
243: statisticsRecvHeaderBytes += hdrBytes;
244: statisticsSentAckBytes += ackBytes;
245: ++statisticsRecvTotalMessages;
246: if (loggingService.isDebugEnabled()) {
247: MessageStatistics.Statistics stats = getMessageStatistics(false);
248: loggingService.debug("Recv Count="
249: + stats.totalRecvMessageCount + " Bytes="
250: + stats.totalRecvMessageBytes);
251: }
252: return ack;
253: } catch (MisdeliveredMessageException ex) {
254: throw ex;
255: }
256: }
257:
258: }
259:
260: private class StatisticsDestinationQueue extends
261: DestinationQueueDelegateImplBase {
262:
263: StatisticsDestinationQueue(DestinationQueue queue) {
264: super (queue);
265: }
266:
267: public void holdMessage(AttributedMessage m) {
268: updateQueueStatistics();
269: ++currentAllQueuesSize;
270: super .holdMessage(m);
271: }
272:
273: }
274:
275: private static class StatisticsOutputStream extends
276: FilterOutputStream {
277: int byteCount = 0;
278:
279: public StatisticsOutputStream(OutputStream out) {
280: super (out);
281: }
282:
283: public void write(int b) throws java.io.IOException {
284: out.write(b);
285: byteCount += 1;
286: }
287:
288: public void write(byte[] b) throws java.io.IOException {
289: out.write(b);
290: byteCount += b.length;
291: }
292:
293: public void write(byte[] b, int o, int len)
294: throws java.io.IOException {
295: out.write(b, o, len);
296: byteCount += len;
297: }
298: }
299:
300: private class StatisticsWriter extends
301: MessageWriterDelegateImplBase {
302:
303: StatisticsOutputStream wrapper;
304: AttributedMessage msg;
305:
306: StatisticsWriter(MessageWriter delegatee) {
307: super (delegatee);
308: }
309:
310: public void finalizeAttributes(AttributedMessage msg) {
311: super .finalizeAttributes(msg);
312: this .msg = msg;
313: }
314:
315: // Create and return the byte-counting FilterOutputStream
316: public OutputStream getObjectOutputStream(ObjectOutput out)
317: throws java.io.IOException {
318: OutputStream raw_os = super .getObjectOutputStream(out);
319: wrapper = new StatisticsOutputStream(raw_os);
320: return wrapper;
321: }
322:
323: public void finishOutput() throws java.io.IOException {
324: super .finishOutput();
325: int msgBytes = wrapper.byteCount;
326:
327: msg.setLocalAttribute(MESSAGE_BYTES_ATTRIBUTE, new Integer(
328: msgBytes));
329: }
330:
331: }
332:
333: // MessageReader delegate. Counts the bytes.
334: private static class StatisticsInputStream extends
335: FilterInputStream {
336:
337: int byteCount = 0;
338:
339: private StatisticsInputStream(InputStream wrapped) {
340: super (wrapped);
341: }
342:
343: public int read() throws java.io.IOException {
344: byteCount++;
345: return in.read();
346: }
347:
348: public int read(byte[] b, int off, int len)
349: throws java.io.IOException {
350: int bytes_read = in.read(b, off, len);
351: byteCount += bytes_read;
352: return bytes_read;
353: }
354:
355: public int read(byte[] b) throws java.io.IOException {
356: int bytes_read = in.read(b);
357: byteCount += bytes_read;
358: return bytes_read;
359: }
360: }
361:
362: private static class StatisticsReader extends
363: MessageReaderDelegateImplBase {
364:
365: StatisticsInputStream wrapper;
366: AttributedMessage msg;
367:
368: StatisticsReader(MessageReader delegatee) {
369: super (delegatee);
370: }
371:
372: public InputStream getObjectInputStream(ObjectInput in)
373: throws java.io.IOException, ClassNotFoundException {
374: InputStream raw_is = super .getObjectInputStream(in);
375: wrapper = new StatisticsInputStream(raw_is);
376: return wrapper;
377: }
378:
379: public void finalizeAttributes(AttributedMessage msg) {
380: this .msg = msg;
381: super .finalizeAttributes(msg);
382: }
383:
384: public void finishInput() throws java.io.IOException {
385: super .finishInput();
386: int msgBytes = wrapper.byteCount;
387: msg.setLocalAttribute(MESSAGE_BYTES_ATTRIBUTE, new Integer(
388: msgBytes));
389: }
390:
391: }
392:
393: }
|